KafkaConsumer实现精确的latest开始消费

代码

代码:https://gitee.com/ydfind/java-total/tree/master/debug-example

需求

调用某个接口,返回结果包含traceId和商品集合,然后根据该traceId去kafka查找debug日志,包含该traceId的日志,数量是商品集合的数量(每个商品都有对应的一条kafka日志,该日志即为商品相关的信息),对日志进行处理后,返回前端展示。

分析

因为拿到traceId的时候,对方其实已经把debug信息发送到kafka了,所以我在拿到traceId,再去kafka查找就要设置auto.offset.reset=earliest, 这就会导致我多查询很多的无效数据。

假如我在调用接口前,先订阅该topic,使用默认auto_offset_reset为latest,在拿到traceId再poll(),结果怎么样呢?不行,因为在poll时发现position为空才会根据auto_offset_reset策略设置,而poll前对方接口早就把信息发到kafka了,即消息被错过了

后面发现可以利用seek函数实现这个功能:

consumer.subscribe(Collections.singletonList("topic"));
        Set<TopicPartition> topicPartitions = null;
        while (CollectionUtils.isEmpty(topicPartitions)) {
            consumer.poll(Duration.ofMillis(100));
            topicPartitions = consumer.assignment();
        }
        Map<TopicPartition, Long> topicPartitionLongMap = consumer.endOffsets(topicPartitions);
        for (TopicPartition partition: topicPartitions) {
            consumer.seek(partition, topicPartitionLongMap.get(partition));
        }

我们发现有seekToEnd函数,但其实并不能实现我们的需求,具体看源码:

@Override
    public void seekToEnd(Collection<TopicPartition> partitions) {
        if (partitions == null)
            throw new IllegalArgumentException("Partitions collection cannot be null");

        acquireAndEnsureOpen();
        try {
            Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
            for (TopicPartition tp : parts) {
                log.debug("Seeking to end of partition {}", tp);
                // 这里只是把对应分区的offset重置策略设为LATEST,但其实默认就是这个
                subscriptions.requestOffsetReset(tp, OffsetResetStrategy.LATEST);
            }
        } finally {
            release();
        }
    }

我们可以看到 只是把对应分区的offset重置策略设为LATEST,但其实默认就是这个。

优化

因为我是用户组里只有一个用户消费,即该KafkaConsumer默认分配所有的分区,可以改为下面实现:

 List<PartitionInfo> topic = consumer.partitionsFor("topic");
        List<TopicPartition> topicPartitions = topic.stream()
                .map(item -> new TopicPartition(item.topic(), item.partition()))
                .collect(Collectors.toList());
        // 订阅所有分区
        consumer.assign(topicPartitions);
        Map<TopicPartition, Long> topicPartitionLongMap = consumer.endOffsets(topicPartitions);
        for (TopicPartition partition: topicPartitions) {
            consumer.seek(partition, topicPartitionLongMap.get(partition));
        }

发现上面需要4s多的时间,而优化后1s不到!

总结

实现上面的需求的方法:
1.auto.offset.reset设置为earliest,但可能会多消费很多消息; // 具体多消费的消息不清楚
2.auto.offset.reset默认,利用seek函数来定位到最近的,若手动订阅所有的分区,会快很多!

其它:
1.其实还有offsetForTime函数,但具体没有尝试过

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐