kafka消息重复消费解决方案
Kafka消费者以消费者组(Consumer Group)的形式消费一个topic,发布到topic中的每个记录将传递到每个订阅的消费者组中的一个消费者实例。Consumer Group 之间彼此独立,互不影响,它们能够订阅相同的一组主题而互不干涉。......
目录
1、consumer在消费过程中,应用进程被强制kill掉或发生异常退出
一、kafka消费者的特点
Kafka消费者以消费者组(Consumer Group)的形式消费一个topic,发布到topic中的每个记录将传递到每个订阅的消费者组中的一个消费者实例。Consumer Group 之间彼此独立,互不影响,它们能够订阅相同的一组主题而互不干涉。
kafka中跟消费者有关的几个重要配置参数:
enable.auto.commit 默认值true,表示消费者会周期性自动提交消费的offset
auto.commit.interval.ms 在enable.auto.commit 为true的情况下自动提交的间隔,默认值5000ms
max.poll.records 单次消费者拉取的最大数据条数,默认值500
max.poll.interval.ms 默认值5分钟,表示若5分钟之内消费者没有消费完上一次poll的消息,那么consumer会主动发起离开group的请求
消费者消费消息之后又是如何提交对应消息的位移(offset)的呢?
当在autocommit=true时(自动提交offset时),当上一次poll方法拉取的消息消费完时会进行下一次poll,在经过auto.commit.interval.ms间隔后,下一次调用poll时才会提交所有已消费消息的offset。
二、出现重复消费的情况
在配置自动提交enable.auto.commit 默认值true情况下,出现重复消费的场景有以下几种:
1、consumer在消费过程中,应用进程被强制kill掉或发生异常退出
例如:在一次poll500条消息后,消费到200条时,进程被强制kill消费导致offset 未提交,或出现异常退出导致消费到offset未提交。下次重启时,依然会重新拉取这500消息,这样就造成之前消费到200条消息重复消费了两次。因此在有消费者线程的应用中,应尽量避免使用kill -9这样强制杀进程的命令。
2、消费者消费时间过长
max.poll.interval.ms参数定义了两次poll的最大间隔,它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll方法返回的消息,那么Consumer 会主动发起“离开组”的请求。
在离开消费组后,开始rebalance,因此提交offset失败。之后重新rebalance,消费者再次分配partition后,再次poll拉取消息依然从之前消费过的消息处开始消费,这样就造成重复消费。而且若不解决消费单次消费时间过长的问题,这部分消息可能会一直重复消费。
三、kafka重复消费的解决方案
1、提高消费能力
1)提高单条消息的处理速度,例如对消息处理中比 较耗时的步骤可通过异步的方式进行处理、利用多线程处理等。
2)在缩短单条消息消费时常的同时,根据实际场景可将max.poll.interval.ms值设置大一点,避免不 必要的rebalance,此外可适当减小max.poll.records的值,默认值是500,可根 据实际消息速率适当调小。
2、将消费的接口幂等处理,从而不用考虑重复消费的问题
更多推荐
所有评论(0)