1. 获取topic下所有的partion
  2. 计算每个partion的offset
  3. 将分区offset移动最新的位置
  4. 提交分区最新的位置

如下代码

public class KafkaConsumerOffsetManager {

    private KafkaConsumer consumer;

    private String topic;

    public KafkaConsumerRunnable(KafkaConsumer consumer, String topic) {
        this.consumer = consumer;
        this.topic = topic;
    }

    public void run() {

        Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
        
        //找到topic下所有的分区
        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
                
        List<TopicPartition> tp =new ArrayList<TopicPartition>();

        if (null != partitionInfos && partitionInfos.size() > 0) {

            partitionInfos.forEach(p -> {

                tp.add(new TopicPartition(topic,p.partition()));

                //消费者分配到该分区
                consumer.assign(tp);

                //移动到最新offset
                consumer.seekToEnd(tp);


                //获取到该分区的last offset 
                long position = consumer.position(new TopicPartition(topic, p.partition()));

                offset.put(new TopicPartition(topic, p.partition()), new OffsetAndMetadata(position + 1));
            });
        }

        consumer.commitAsync(offset, new OffsetCommitCallback() {
            @Override
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                /** 异常同步去提交 **/
                if (null != exception) {
                    consumer.commitSync(offsets);
                }
            }
        });

    }

}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐