配置

pom.xml依赖


        <!--        阿里日志服务器-->
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>log-loghub-producer</artifactId>
            <version>0.1.4</version>
            <exclusions>
                <exclusion>
                    <groupId>com.alibaba</groupId>
                    <artifactId>fastjson</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>aliyun-log-producer</artifactId>
            <version>0.3.4</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>aliyun-log</artifactId>
            <version>0.6.33</version>
        </dependency>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>2.5.0</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>loghub-client-lib</artifactId>
            <version>0.6.16</version>
        </dependency>

配置AliLogConfig

package com.yhzy.doudoubookserver.global.alilog;

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.producer.LogProducer;
import com.aliyun.openservices.log.producer.ProducerConfig;
import com.aliyun.openservices.log.producer.ProjectConfig;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

/**
 * @author zhangqinghe
 * @version 1.0.0
 * @email her550@dingtalk.com
 * @date 2020/12/23
 * @since 1.0.0
 */
@Configuration
@Scope("singleton")
public class AliLogConfig {
    public static String accessKeyId = "";
    public static String accessKeySecret = "";
    public static String endPoint = "";
    public static String projectName = SLSEnvironment.BOOK_PROJECT;

    @Bean
    @ConditionalOnClass(LogProducer.class)
    public LogProducer getLogProducer() {
        LogProducer producer = new LogProducer(producerConfig());
        producer.setProjectConfig(projectConfig());
        return producer;
    }

    @Bean
    @ConditionalOnClass(ProducerConfig.class)
    public ProducerConfig producerConfig() {
        ProducerConfig producerConfig = new ProducerConfig();
        //被缓存起来的日志的发送超时时间,如果缓存超时,则会被立即发送,单位是毫秒
        producerConfig.packageTimeoutInMS = 1000;
        //每个缓存的日志包的大小的上限,不能超过5MB,单位是字节
        producerConfig.logsBytesPerPackage = 5 * 1024 * 1024;
        //每个缓存的日志包中包含日志数量的最大值,不能超过4096
        producerConfig.logsCountPerPackage = 4096;
        //单个producer实例可以使用的内存的上限,单位是字节
        producerConfig.memPoolSizeInByte = 1000 * 1024 * 1024;
        //IO线程池最大线程数量,主要用于发送数据到日志服务
        producerConfig.maxIOThreadSizeInPool = 50;
        //当使用指定shardhash的方式发送日志时,这个参数需要被设置,否则不需要关心。后端merge线程会将映射到同一个shard的数据merge在一起,而shard关联的是一个hash区间,
        //producer在处理时会将用户传入的hash映射成shard关联hash区间的最小值。每一个shard关联的hash区间,producer会定时从loghub拉取,该参数的含义是每隔shardHashUpdateIntervalInMS毫秒,
        producerConfig.shardHashUpdateIntervalInMS = 10 * 60 * 1000;
        producerConfig.retryTimes = 3;
        return producerConfig;
    }

    @Bean
    @ConditionalOnClass(ProjectConfig.class)
    public ProjectConfig projectConfig() {
        return new ProjectConfig(projectName, endPoint, accessKeyId, accessKeySecret);
    }

    /**
     * 读取sls对象 用于读取数据
     * @return
     */
    @Bean
    public Client client(){
        String accessId = "";
        String accessKey = "";
        String host = "";
        return new Client(host, accessId, accessKey);
    }
}

配置AliLogUtil

package com.yhzy.doudoubookserver.common;

import com.aliyun.openservices.log.common.LogItem;
import com.aliyun.openservices.log.producer.LogProducer;
import com.yhzy.doudoubookserver.global.alilog.AliLogConfig;
import com.yhzy.doudoubookserver.global.alilog.CallbackLogInfo;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Vector;

/**
 * @author zhangqinghe
 * @version 1.0.0
 * @email her550@dingtalk.com
 * @date 2020/12/23
 * @since 1.0.0
 */
@Component
public class AliLogUtil {

    @Resource
    private AliLogConfig aliLogConfig;

    public void saveLog(String projectName,String logStore, Vector<LogItem> logGroup, String topic, String source,Long millis) throws InterruptedException {
        final LogProducer logProducer = aliLogConfig.getLogProducer();
        // 并发调用 send 发送日志
        logProducer.send(projectName, logStore, topic, source, logGroup,
                new CallbackLogInfo(projectName, logStore, topic,null, source, logGroup, logProducer));
        //主动刷新缓存起来的还没有被发送的日志
        logProducer.flush();
        //等待发送线程退出
        Thread.sleep(millis);
        //关闭后台io线程,close会将调用时刻内存中缓存的数据发送出去
        logProducer.close();
    }
}

配置CallbackLogInfo

package com.yhzy.doudoubookserver.global.alilog;

import com.aliyun.openservices.log.common.LogItem;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.producer.ILogCallback;
import com.aliyun.openservices.log.producer.LogProducer;
import com.aliyun.openservices.log.response.PutLogsResponse;

import java.util.Vector;

/**
 * @author zhangqinghe
 * @version 1.0.0
 * @email her550@dingtalk.com
 * @date 2020/12/23
 * @since 1.0.0
 */
public class CallbackLogInfo extends ILogCallback {
    // 保存要发送的数据,当时发生异常时,进行重试
    public String project;
    public String logstore;
    public String topic;
    public String shardHash;
    public String source;
    public Vector<LogItem> items;
    public LogProducer producer;
    public int retryTimes = 0;

    public CallbackLogInfo(String project, String logstore, String topic, String shardHash, String source,
                           Vector<LogItem> items, LogProducer producer) {
        super();
        this.project = project;
        this.logstore = logstore;
        this.topic = topic;
        this.shardHash = shardHash;
        this.source = source;
        this.items = items;
        this.producer = producer;
    }

    public void onCompletion(PutLogsResponse response, LogException e) {
        if (e != null) {
            // 打印异常
            System.out.println(e.GetErrorCode() + ", " + e.GetErrorMessage() + ", " + e.GetRequestId());
            // 最多重试三次
            if (retryTimes++ < 3) {
                producer.send(project, logstore, topic, source, shardHash, items, this);
            }
        } else {
            //请求id
            System.out.println("send success, request id: " + response.GetRequestId());
        }
    }
}

配置SLSEnvironment

记录sls日志相关 project & logStore 参数名配置

package com.yhzy.doudoubookserver.global.alilog;


import java.time.LocalDateTime;
import java.time.ZoneOffset;

/**
 * sls日志相关 project & logStore 参数名配置
 *
 * @author zhangqinghe
 * @version 1.0.0
 * @email her550@dingtalk.com
 * @date 2020/12/25
 * @since 1.0.0
 */
public interface SLSEnvironment {
    /** 测试project*/
    String TEST_PROJECT = "test_project";

    /** 测试logStore*/
    String TEST_LOGSTORE = "test_logstore"; 
    
    /** 开始时间*/
    public static Integer FROM() {
        return Math.toIntExact(LocalDateTime.now().plusDays(-3).withHour(0).withMinute(0).withSecond(0).withNano(0).toInstant(ZoneOffset.of("+8")).toEpochMilli() / 1000);
    }
    /** 结束时间*/
    public static Integer TO() {
        return Math.toIntExact(LocalDateTime.now().plusDays(1).withHour(23).withMinute(59).withSecond(59).withNano(0).toInstant(ZoneOffset.of("+8")).toEpochMilli() / 1000);
    }
    
    /** 最小开始时间    该时间为2020年12月1日的unix时间戳*/
    public static Integer MINFROM(){
        return 1606752000;
    }
    /** 最大结束时间    该时间为当前时间的unix时间戳*/
    public static Integer MAXTO() {
        return Math.toIntExact((System.currentTimeMillis()+86400000) / 1000);
    }

}

读取写入测试AliLogTest

读取es中的数据写入到sls中

package com.yhzy;

/**
 * @author zhangqinghe
 * @version 1.0.0
 * @email her550@dingtalk.com
 * @date 2020/12/24
 * @since 1.0.0
 */
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.*;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.response.*;
import com.yhzy.doudoubookserver.common.AliLogUtil;
import com.yhzy.doudoubookserver.global.alilog.SLSEnvironment;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.*;

/**
 * @author zhangqinghe
 * @version 1.0.0
 * @email her550@dingtalk.com
 * @date 2020/12/23
 * @since 1.0.0
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = DoudouSLSApplication.class)
public class AliLogTest {
    @Resource
    private AliLogUtil aliLogUtil;
    @Resource
    private RestHighLevelClient restHighLevelClient;

    /**
     * 读取分析测试
     * @throws LogException
     */
    @Test
    public void read() throws LogException {
        String accessId = "";
        String accessKey = "";
        String host = "";
        String project = "zqhtest";
        String logStore = "test_logstore";

        Client client = new Client(host, accessId, accessKey);
        long time = new Date().getTime()/1000;
        /*
         * project : Project名称。
         * logStore : LogStore名称。
         * from : 查询开始时间点。Unix时间戳格式,表示从1970-1-1 00:00:00 UTC计算起的秒数。
         * to : 查询结束时间点。Unix时间戳格式,表示从1970-1-1 00:00:00 UTC计算起的秒数。
         * topic : 日志主题。
         * query : 查询分析语句。
               具体查看 https://help.aliyun.com/document_detail/53608.html?spm=a2c4g.11186623.2.17.56322e4acEkU4X#concept-nyf-cjq-zdb
         * line : 请求返回的最大日志条数。最小值为0,最大值为100,默认值为100。  没用明白呢...这个和query中的limit是两个东西????
         * offset : 查询开始行。默认值为0。
         * reverse : 是否按日志时间戳逆序返回日志,精确到分钟级别。默认值为false。
               true:按照逆序返回日志。
               false:按照顺序返回日志。
         */
        GetLogsResponse getLogsResponse = client.GetLogs(
                project,
                logStore,
                (int) (time - 36000000),
                (int) time,
                "",
                "* | select app_version,channel_id,content_id,COUNT(content_id) contentCount,COUNT(content_id) chapterCount, sum(time) timeSum GROUP BY app_version,channel_id,content_id LIMIT 100010",
                25,
                0,
                false);

        for (QueriedLog getLog : getLogsResponse.GetLogs()) {
            System.out.println(getLog.GetLogItem().ToJsonString());
        }
    }

    /**
     * 采集数据测试
     *  读取es中的数据,写入到sls中
     * @throws IOException
     */
    @Test
    public void test1() throws IOException {

        Vector<LogItem> logItems = new Vector<>();
        //读取es中的书籍阅读数据
        SearchRequest request = new SearchRequest("bookanalysis");

        long start = LocalDateTime.now().plusDays(-1).withHour(11).withMinute(0).withSecond(0).withNano(0).toInstant(ZoneOffset.of("+8")).toEpochMilli();
        long end = LocalDateTime.now().plusDays(-1).withHour(12).withMinute(0).withSecond(0).withNano(0).toInstant(ZoneOffset.of("+8")).toEpochMilli();

        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
        List<QueryBuilder> filter = boolQuery.filter();
        //时间区间条件
        filter.add(rangeQuery(start, end, "create_time"));
        //这里拼接动态条件
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        //添加条件
        searchSourceBuilder.query(boolQuery);
        request.source(searchSourceBuilder.trackTotalHits(true).size(15000));
        SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
        for (SearchHit hit : response.getHits().getHits()) {
            Map<String, Object> map = hit.getSourceAsMap();
            LogItem logItem = new LogItem();
            logItem.PushBack("app_version",map.get("app_version")==null?"1":map.get("app_version").toString());
            logItem.PushBack("book_id",map.get("book_id")==null?"1":map.get("book_id").toString());
            logItem.PushBack("book_name",map.get("book_name")==null?"1":map.get("book_name").toString());
            logItem.PushBack("category_id_1",map.get("category_id_1")==null?"1":map.get("category_id_1").toString());
            logItem.PushBack("category_id_2",map.get("category_id_2")==null?"1":map.get("category_id_2").toString());
            logItem.PushBack("channel_id",map.get("channel_id")==null?"1":map.get("channel_id").toString());
            logItem.PushBack("chapter_name",map.get("chapter_name")==null?"1":map.get("chapter_name").toString());
            logItem.PushBack("content_id",map.get("content_id")==null?"1":map.get("content_id").toString());
            logItem.PushBack("create_time",map.get("create_time")==null?"1":map.get("create_time").toString());
            logItem.PushBack("device_id",map.get("device_id")==null?"1":map.get("device_id").toString());
            logItem.PushBack("is_new",map.get("is_new")==null?"1":map.get("is_new").toString());
            logItem.PushBack("is_official",map.get("is_official")==null?"1":map.get("is_official").toString());
            logItem.PushBack("is_vip",map.get("is_vip")==null?"1":map.get("is_vip").toString());
            logItem.PushBack("log_type",map.get("log_type")==null?"1":map.get("log_type").toString());
            logItem.PushBack("position",map.get("position")==null?"1":map.get("position").toString());
            logItem.PushBack("sex",map.get("site")==null?"1":map.get("site").toString());
            logItem.PushBack("time",map.get("time")==null?"1":map.get("time").toString());
            logItem.PushBack("user_id",map.get("user_id")==null?"1":map.get("user_id").toString());
            logItems.add(logItem);
        }

        try {
            aliLogUtil.saveLog(SLSEnvironment.BOOK_PROJECT,SLSEnvironment.BOOK_LOGSTORE,logItems,"read","source ip",1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static RangeQueryBuilder rangeQuery(Long startTime, Long endTime, String name){
        RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(name);
        if (startTime != null) {
            rangeQueryBuilder.gte(startTime);
        }
        if (endTime != null) {
            rangeQueryBuilder.lte(endTime);
        }
        return rangeQueryBuilder;
    }
}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐