KafkaConsumer实现精确的latest(结尾)开始消费
需求调用某个接口,返回结果包含traceId,然后跟进该traceId去kafka查找debug日志。分析因为拿到traceId的时候,对方其实已经把debug信息发送到kafka了,所以我再在拿到traceId,再去kafka查找就要设置auto.offset.reset=earliest, 这就会导致我多查询很多的无效数据。假如我在调用接口前,先订阅该topic,使用默认auto_offset
代码
代码:https://gitee.com/ydfind/java-total/tree/master/debug-example
需求
调用某个接口,返回结果包含traceId和商品集合,然后根据该traceId去kafka查找debug日志,包含该traceId的日志,数量是商品集合的数量(每个商品都有对应的一条kafka日志,该日志即为商品相关的信息),对日志进行处理后,返回前端展示。
分析
因为拿到traceId的时候,对方其实已经把debug信息发送到kafka了,所以我在拿到traceId,再去kafka查找就要设置auto.offset.reset=earliest, 这就会导致我多查询很多的无效数据。
假如我在调用接口前,先订阅该topic,使用默认auto_offset_reset为latest,在拿到traceId再poll(),结果怎么样呢?不行,因为在poll时发现position为空才会根据auto_offset_reset策略设置,而poll前对方接口早就把信息发到kafka了,即消息被错过了
后面发现可以利用seek函数实现这个功能:
consumer.subscribe(Collections.singletonList("topic"));
Set<TopicPartition> topicPartitions = null;
while (CollectionUtils.isEmpty(topicPartitions)) {
consumer.poll(Duration.ofMillis(100));
topicPartitions = consumer.assignment();
}
Map<TopicPartition, Long> topicPartitionLongMap = consumer.endOffsets(topicPartitions);
for (TopicPartition partition: topicPartitions) {
consumer.seek(partition, topicPartitionLongMap.get(partition));
}
我们发现有seekToEnd函数,但其实并不能实现我们的需求,具体看源码:
@Override
public void seekToEnd(Collection<TopicPartition> partitions) {
if (partitions == null)
throw new IllegalArgumentException("Partitions collection cannot be null");
acquireAndEnsureOpen();
try {
Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
for (TopicPartition tp : parts) {
log.debug("Seeking to end of partition {}", tp);
// 这里只是把对应分区的offset重置策略设为LATEST,但其实默认就是这个
subscriptions.requestOffsetReset(tp, OffsetResetStrategy.LATEST);
}
} finally {
release();
}
}
我们可以看到 只是把对应分区的offset重置策略设为LATEST,但其实默认就是这个。
优化
因为我是用户组里只有一个用户消费,即该KafkaConsumer默认分配所有的分区,可以改为下面实现:
List<PartitionInfo> topic = consumer.partitionsFor("topic");
List<TopicPartition> topicPartitions = topic.stream()
.map(item -> new TopicPartition(item.topic(), item.partition()))
.collect(Collectors.toList());
// 订阅所有分区
consumer.assign(topicPartitions);
Map<TopicPartition, Long> topicPartitionLongMap = consumer.endOffsets(topicPartitions);
for (TopicPartition partition: topicPartitions) {
consumer.seek(partition, topicPartitionLongMap.get(partition));
}
发现上面需要4s多的时间,而优化后1s不到!
总结
实现上面的需求的方法:
1.auto.offset.reset设置为earliest,但可能会多消费很多消息; // 具体多消费的消息不清楚
2.auto.offset.reset默认,利用seek函数来定位到最近的,若手动订阅所有的分区,会快很多!
其它:
1.其实还有offsetForTime函数,但具体没有尝试过
更多推荐
所有评论(0)