项目场景:

最近项目上遇到了kafka的listerner方法经常被重复执行的问题。

场景大概是这样的:一个kafka producer向kafka中发送数据,然后consumer消费这个数据,并发送一些邮件。


问题描述:

最近项目上的邮件经常会发送重复发送两份,本以为是偶发的事件,后来发现越来越频繁,甚至会重复发送三四风,故决定去查一查原因。

邮件发送的逻辑是从kafka中接收数据,然后整合之后再发送,因为kafka中数据量太大,消费次数很多不太好查,所以决定再本地先复现。

在本地的环境中,观察日志,发现在每次发送重复邮件的时候,kafka的日志中会加着这样一行:

Offset commit failed on partition xxx-report-0 at offset 47: The coordinator is not aware of this member.

意思是offset提交失败,协调器不知道这个成员。


原因分析:

经过分析,是offset提交失败后,offset没有变化,kafka仍然认为该条消息没有被消费,所以listerner还会接收到该条消息。


解决方案:

经过研读kafka的文档,发现问题出现在自动提交offset上了,如果配置文件中设置enable-auto-commit为true的话,sping-kafka会在后续自动执行consumer.commitSync();

故需要把offset的commit设置为手动执行,需要在配置文件中设置这两行:

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE

将enable-auto-commit设置为false,表示不会自动提交offset

ackMode是个枚举类型:

  • RECORD
    每处理一条commit一次
  • BATCH(默认)
    每次poll的时候批量提交一次,频率取决于每次poll的调用频率
  • TIME 
    每次间隔ackTime的时间去commit
  • COUNT 
    累积达到ackCount次的ack去commit
  • COUNT_TIME
    ackTime或ackCount哪个条件先满足,就commit
  • MANUAL
    listener负责ack,但是背后也是批量上去
  • MANUAL_IMMEDIATE
    listner负责ack,每调用一次,就立即commit

 

在代码中修改:

 @KafkaListener(
      topics = {"report"},
      errorHandler = "reportErrorHandler")
  public void listen(ConsumerRecord<?, ?> record, Acknowledgment ack) {
    Optional<?> kafkaMessage = Optional.ofNullable(record.value());
    log.info("get a message from kafka: topic : {}, offset : {}", record.topic(), record.offset());
    // do something
    ack.acknowledge();
  }

最下面的ack.acknowledge()是表示手动提交这个offset,经此改动,kafka的数据就没有被消费多次了。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐