一、背景
在项目中使用kafka消费的时候,会遇到主题已经建立,分区数不能修改的问题,这个时候无法通过横向扩展kafka消费者来增大消息消费吞吐量,这个时候需要对于单消费者增加其消费能力。

二、解决方法

使用kafka批量消费,拉取到应用程序里面再通过多线程消费,增加消费速度

三、步骤

1. pom引入spring-kafka的依赖​​​​​​​

2. 增加spring kafka的配置

 

listener.concurrency值不能超过主题分区数量

设置enable-auto-commit为true之后,kafka的offset会自动提交

也可以设置为false手动提交offset,但是批量方式不支持单笔记录的提交

3. 使用KafkaListener批量消费​​​​​​​

传参为List<ConsumerRecord<?, ?>> 类型

拉取到的数据根据设定的线程数量均分,然后交到每个线程单独处理

@KafkaListener(topics = "xxx-topic",groupId = "xxx-group")
public void consumeData(List<ConsumerRecord<?, ?>> records) {
    List<JSONObject> executorData = records.stream().map(record -> JSONObject.parseObject((String) record.value()))
            .collect(Collectors.toList());
    if (!executorData.isEmpty()) {
        //将本次拉取的数据均分到16个线程
        List<List<JSONObject>> tasks = ProcessUtil.averageAssign(executorData, 16);
        for (List<JSONObject> task : tasks) {
            if (!task.isEmpty()) {
                //线程池处理任务
                executor.execute(new ProcessExecutor(task));
            }
        }
    }
}
/**
 * 将一个list均分成n个list
 */
public static <T> List<List<T>> averageAssign(List<T> source, int n) {
    List<List<T>> result = new ArrayList<List<T>>();
    int remainder = source.size() % n;  //先计算出余数
    int number = source.size() / n;  //然后是商
    int offset = 0;//偏移量(用以标识加的余数)
    for (int i = 0; i < n; i++) {
        List<T> value;
        if (remainder > 0) {
            value = source.subList(i * number + offset, (i + 1) * number + offset + 1);
            remainder--;
            offset++;
        } else {
            value = source.subList(i * number + offset, (i + 1) * number + offset);
        }
        result.add(value);
    }
    return result;
}

4. 处理完成批量存入es

这里是把每个线程处理的结果单独汇总,存入es,这样做的好处是可以利用上es集群的性能,最大化插入效率。这里也可以用线程安全的集合汇总各个线程处理的数据,一次性批量保存到es。

@Override
public void run() {
    String threadName = Thread.currentThread().getName();
    if (threadName.startsWith(StaticClass.PROCESS_DATA_THREAD_PREFIX)) {
        List<JSONObject> saveEsList = new ArrayList<>();
        for (JSONObject object : list) {
            //处理结果,获取返回
            JSONObject processResult = getResult();
            if (processResult != null) {
                saveEsList.add(processResult);
            }
        }
        if (!saveEsList.isEmpty()) {
            StaticClass.staticElasticsearchUtil.bulkAddIndex(saveEsList, "xxx_index");
        }
    }
}

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐