消费者poll消息得过程(poll的意思是从broker拿消息,并不代表拿到就消费成功了)

  • 消费者建立了与broker之间的⻓连接,开始poll消息。
  • 默认一次poll 500条消息
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500 );

可以根据消费速度的快慢来设置,因为如果两次poll的时间如果超出了30s的时间间隔,kafka会认为其消费能力过弱,将其踢出消费组。将分区分配给其他消费者。

可以通过这个值进行设置:

props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000 );

如果每隔1s内没有poll到任何消息,则继续去poll消息,循环往复,直到poll到消息。如果超出了1s,则此次⻓轮询结束。

ConsumerRecords<String, String> records =consumer.poll(Duration.ofMillis( 1000 ));

消费者发送心跳的时间间隔

props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000 );

kafka如果超过 10 秒没有收到消费者的心跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者。

props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000 );

自动提交offset

消费者poll到消息后默认情况下,会自动向broker的_consumer_offsets主题提交当前主题-分区消费的偏移量。

自动提交会丢消息: 因为如果消费者还没消费完poll下来的消息就自动提交了偏移量,那么此 时消费者挂了,于是下一个消费者会从已提交的offset的下一个位置开始消费消息。之前未被消费的消息就丢失掉了。

手动提交offset

手动提交分为手动同步提交与手动异步提交

很多时候并不是说拉取到消息就算消费完成,而是需要将消息写入数据库、写入本地缓存,或者是更加复杂的业务处理。在这些场景下,所有的业务处理完成才能认为消息被成功消费,手动的提交方式可以让开发人员根据程序的逻辑在合适的地方进行位移提交。开启手动提交功能的前提是消费者客户端参数enable.auto.commit 配置为false ,示例如下:

properties.put("enable.auto.commit", "false");

手动同步提交 

手动提交可以细分为同步提交和异步提交,对应于KafkaConsumer 中的commitSync()和commitAsync()两种类型的方法。我们这里先讲述同步提交的方式commitSync()方法
定义如下:

public void commitSync()

这个方法很简单,下面使用它演示同步提交的简单用法: 

while(true) {
        ConsumerRecords<String , String> records= consumer.poll(1000);
        for (ConsumerRecord<String, String> record : records) {
        //do some logical processing .
        }
        consumer.commitSync() ;
    }

可以看到示例中先对拉取到的每一条消息做相应的逻辑处理,然后对整个消息集做同步提交。参考KafkaConsumer 源码中提供的示例,针对上面的示例还可以修改为批量处理+批量提交的方式, 关键代码如下: 

final int minBatchSize=200;
List<ConsumerRecord> buffer=new ArrayList<>();
while(isRunning.get()){
    ConsumerRecord<String,String> records=consumer.poll(1000);
    for (ConsumerRecord<String,String> record:records){
        buffer.add(record);
    }
    if (buffer.size()>=minBatchSize){
        consumer.commitSync();
        buffer.clear();
    }
}

上面的示例中将拉取到的消息存入缓存buffer,等到积累到足够多的时候,也就是示例中大于等于200 个的时候,再做相应的批量处理,之后再做批量提交。这两个示例都有重复消费的问题,如果在业务逻辑处理完之后,并且在同步位移提交前,程序出现了崩渍, 那么待恢复之后又只能从上一次位移提交的地方拉取消息,由此在两次位移提交的窗口中出现了重复消费的现象。

commitSync ()方法会根据poll()方法拉取的最新位移来进行提交,只要没有发生不可恢复的错误( Unrecoverable Eηor ),它就会阻塞消费者线程直至位移提交完成。对于不可恢复的错误,比如CommitFailedException 、WakeupException 、InterruptException 、AuthenticationException 、AuthorizationException 等,我们可以将其捕获并做针对性的处理。对于采用commitSync()的无参方法而言,它提交消费位移的频率和拉取批次消息、处理批
次消息的频率是一样的,如果想寻求更细粒度的、更精准的提交,那么就需要使用commitSync()的另一个含参方法,具体定义如下:

public void commitSync(final Map<TopicPartition , OffsetAndMetadata> offsets)

 该方法提供了一个offsets 参数, 用来提交指定分区的位移。无参的commitSync()方法只能提交当前批次对应的position 值。如果需要提交一个中间值,比如业务每消费一条消息就提交一次位移,那么就可以使用这种方式

while ((isRunning.get())){
    ConsumerRecords<String,String> records=consumer.poll(1000);
    for (ConsumerRecord<String,String> record:records){
        long offset=record.offset();
        TopicPartition partition=new TopicPartition(record.topic() ,record.partition());
        consumer.commitSync(Collections.singletonMap(partition,new offsetAndMetdata(offset+1)));
    }
}

在实际应用中,很少会有这种每消费一条消息就提交一次消费位移的必要场景。commitSync()方法本身是同步执行的,会耗费一定的性能,而示例中的这种提交方式会将性能拉到一个相当低的点。更多时候是按照分区的粒度划分提交位移的界限,这里我们就要用到了ConsumerRecords 类的partitions()方法和records(TopicPartition)方法,

异步提交的方式( commitAsync())
在执行的时候消费者线程不会被阻塞, 可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作。异步提交可以便消费者的性能得到一定的增强。commitAsync 方法有三个不同的重载方法,具体定义如下:

public void commitAsync()
public void commitAsync(OffsetCommitCallback callback)
public void commitAsync(final Map<TopicPartition , OffsetAndMetadata> offsets , OffsetCommitCallback callback)

第一个无参的方法和第三个方法中的offsets 都很好理解,对照commitSync()方法即可。关键的是这里的第二个方法和第三个方法中的callback 参数,它提供了一个异步提交的回调方法,当位移提交完成后会回调OffsetCommitCallback 中的onComplete()方法。 

commitAsync()提交的时候同样会有失败的情况发生,那么我们应该怎么处理呢?读者有可能想到的是重试,问题的关键也就在这里了。如果某一次异步提交的消费位移为x , 但是提交失败了,然后下一次又异步提交了消费位移为x+y,这次成功了。如果这里引入了重试机制,前一次的异步提交的消费位移在重试的时候提交成功了,那么此时的消费位移又变为了x 。如果此时发生异常(或者再均衡) , 那么恢复之后的消费者(或者新的消费者)就会从x 处开始消费消息,这样就发生了重复消费的问题。
为此我们可以设置一个递增的序号来维护异步提交的顺序,每次位移提交之后就增加序号相对应的值。在遇到位移提交失败需要重试的时候,可以检查所提交的位移和序号的值的大小,如果前者小于后者,则说明有更大的位移己经提交了,不需要再进行本次重试:如果两者相同,则说明可以进行重试提交。除非程序编码错误,否则不会出现前者大于后者的情况。如果位移提交失败的情况经常发生,那么说明系统肯定出现了故障,在-般情况下,位移提交失败的情况很少发生,不重试也没有关系,后面的提交也会有成功的。重试会增加代码逻辑的复杂度,不重试会增加重复消费的概率。如果消费者异常退出,那么这个重复消费的问题就很难避免,因为这种情况下无法及时提交消费位移;如果消费者正常退出或发生再均衡的情况,那么可以在退出或再均衡执行之前使用同步提交的方式做最后的把关。

try{
    while(isRunning.get()){
        consumer.commitAsync();
    }
}finally {
    try{
        consumer.commitSync();
    }finally {
        {
            consumer.close();
        }
    }
}

指定分区消费

consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0 )));

消息回溯消费

consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0 )));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME,0 )));

指定offset消费

consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0 )));
consumer.seek(new TopicPartition(TOPIC_NAME, 0 ), 10 );

从指定时间点消费

List<PartitionInfo> topicPartitions =consumer.partitionsFor(TOPIC_NAME);
//从 1 小时前开始消费
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60 ;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) {
    map.put(new TopicPartition(TOPIC_NAME, par.partition()),fetchDataTime);
}
Map<TopicPartition, OffsetAndTimestamp> parMap =consumer.offsetsForTimes(map);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :parMap.entrySet()) {
    TopicPartition key = entry.getKey();
    OffsetAndTimestamp value = entry.getValue();
    if (key == null || value == null) continue;
    Long offset = value.offset();
    System.out.println("partition-" + key.partition() +"|offset-" + offset);
    System.out.println();
    //根据消费里的timestamp确定offset
    if (value != null) {
        consumer.assign(Arrays.asList(key));
        consumer.seek(key, offset);
    }
}

新消费组得消费偏移量

当消费主题的是一个新的消费组,或者指定offset的消费方式,offset不存在,那么应该如何消费?

  • latest(默认) :只消费自己启动之后发送到主题的消息
  • earliest:第一次从头开始消费,以后按照消费offset记录继续消费,这个需要区别于consumer.seekToBeginning(每次都从头开始消费)

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐