解决kafka主题分区数量限制消费速度的问题
解决kafka主题分区数量限制消费速度的问题
·
一、背景
在项目中使用kafka消费的时候,会遇到主题已经建立,分区数不能修改的问题,这个时候无法通过横向扩展kafka消费者来增大消息消费吞吐量,这个时候需要对于单消费者增加其消费能力。
二、解决方法
使用kafka批量消费,拉取到应用程序里面再通过多线程消费,增加消费速度
三、步骤
1. pom引入spring-kafka的依赖
2. 增加spring kafka的配置
listener.concurrency值不能超过主题分区数量
设置enable-auto-commit为true之后,kafka的offset会自动提交
也可以设置为false手动提交offset,但是批量方式不支持单笔记录的提交
3. 使用KafkaListener批量消费
传参为List<ConsumerRecord<?, ?>> 类型
拉取到的数据根据设定的线程数量均分,然后交到每个线程单独处理
@KafkaListener(topics = "xxx-topic",groupId = "xxx-group")
public void consumeData(List<ConsumerRecord<?, ?>> records) {
List<JSONObject> executorData = records.stream().map(record -> JSONObject.parseObject((String) record.value()))
.collect(Collectors.toList());
if (!executorData.isEmpty()) {
//将本次拉取的数据均分到16个线程
List<List<JSONObject>> tasks = ProcessUtil.averageAssign(executorData, 16);
for (List<JSONObject> task : tasks) {
if (!task.isEmpty()) {
//线程池处理任务
executor.execute(new ProcessExecutor(task));
}
}
}
}
/**
* 将一个list均分成n个list
*/
public static <T> List<List<T>> averageAssign(List<T> source, int n) {
List<List<T>> result = new ArrayList<List<T>>();
int remainder = source.size() % n; //先计算出余数
int number = source.size() / n; //然后是商
int offset = 0;//偏移量(用以标识加的余数)
for (int i = 0; i < n; i++) {
List<T> value;
if (remainder > 0) {
value = source.subList(i * number + offset, (i + 1) * number + offset + 1);
remainder--;
offset++;
} else {
value = source.subList(i * number + offset, (i + 1) * number + offset);
}
result.add(value);
}
return result;
}
4. 处理完成批量存入es
这里是把每个线程处理的结果单独汇总,存入es,这样做的好处是可以利用上es集群的性能,最大化插入效率。这里也可以用线程安全的集合汇总各个线程处理的数据,一次性批量保存到es。
@Override
public void run() {
String threadName = Thread.currentThread().getName();
if (threadName.startsWith(StaticClass.PROCESS_DATA_THREAD_PREFIX)) {
List<JSONObject> saveEsList = new ArrayList<>();
for (JSONObject object : list) {
//处理结果,获取返回
JSONObject processResult = getResult();
if (processResult != null) {
saveEsList.add(processResult);
}
}
if (!saveEsList.isEmpty()) {
StaticClass.staticElasticsearchUtil.bulkAddIndex(saveEsList, "xxx_index");
}
}
}
更多推荐
已为社区贡献4条内容
所有评论(0)