消息重复和丢失是kafka中很常见的问题,主要发生在如下三个阶段:数据库

  1. 生产者阶段
  2. broke阶段
  3. 消费者阶段

1、生产者阶段重复场景

一、根本缘由

生产发送的消息没有收到正确的broke响应,致使producer重试。producer发出一条消息,broke落盘之后由于网络等种种缘由发送端获得一个发送失败的响应或者网络中断,而后producer收到一个可恢复的Exception重试消息致使消息重复。

二、重试过程

 说明:
1. new KafkaProducer()后建立一个后台线程KafkaThread扫描RecordAccumulator中是否有消息;
2. 调用KafkaProducer.send()发送消息,实际上只是把消息保存到RecordAccumulator中;
3. 后台线程KafkaThread扫描到RecordAccumulator中有消息后,将消息发送到kafka集群;
4. 若是发送成功,那么返回成功;
5. 若是发送失败,那么判断是否容许重试。若是不容许重试,那么返回失败的结果;若是容许重试,把消息再保存到RecordAccumulator中,等待后台线程KafkaThread扫描再次发送; 

三、可恢复异常说明

异常是RetriableException类型或者TransactionManager容许重试;RetriableException类继承关系如下:

四、记录顺序问题

 若是设置max.in.flight.requests.per.connection大于1(默认5,单个链接上发送的未确认请求的最大数量,表示上一个发出的请求没有确认下一个请求又发出了)。大于1可能会改变记录的顺序,由于若是将两个batch发送到单个分区,第一个batch处理失败并重试,可是第二个batch处理成功,那么第二个batch处理中的记录可能先出现被消费。spa

  设置max.in.flight.requests.per.connection为1,可能会影响吞吐量,能够解决单台producer发送顺序问题。若是多个producer,producer1先发送一个请求,producer2后发送请求,这是producer1返回可恢复异常,重试必定次数成功了。虽然时producer1先发送消息,可是producer2发送的消息会被先消费。

2、生产者发送重复解决方案

 一、启动kafka的幂等性

要启动kafka的幂等性,无需修改代码,默认为关闭,须要修改配置文件:enable.idempotence=true 同时要求 ack=all 且 retries>1。3d

  幂等原理:日志

  每一个producer有一个producer id,服务端会经过这个id关联记录每一个producer的状态,每一个producer的每条消息会带上一个递增的sequence,服务端会记录每一个producer对应的当前最大sequence,producerId + sequence ,若是新的消息带上的sequence不大于当前的最大sequence就拒绝这条消息,若是消息落盘会同时更新最大sequence,这个时候重发的消息会被服务端拒掉从而避免消息重复。该配置一样应用于kafka事务中。

二、ack=0,不重试。

可能会丢消息,适用于吞吐量指标重要性高于数据丢失,例如:日志收集。

3、生产者和broker阶段消息丢失场景

一、ack=0,不重试

producer发送消息完,无论结果了,若是发送失败也就丢失了。

二、ack=1,leader crash

producer发送消息完,只等待leader写入成功就返回了,leader crash了,这时follower没来及同步,消息丢失。

三、unclean.leader.election.enable 配置true

容许选举ISR之外的副本做为leader,会致使数据丢失,默认为false。producer发送异步消息完,只等待lead写入成功就返回了,leader crash了,这时ISR中没有follower,leader从OSR中选举,由于OSR中原本落后于Leader形成消息丢失。

4、生产者和broker阶段消息丢失解决方案

一、配置:ack=all / -1,tries > 1,unclean.leader.election.enable : false

producer发送消息完,等待ollower同步完再返回,若是异常则重试。这是副本的数量可能影响吞吐量,最大不超过5个,通常三个足够了。

不容许选举ISR之外的副本做为leader。

二、配置:min.insync.replicas > 1

当producer将acks设置为“all”(或“-1”)时,min.insync。副本指定必须确认写操做成功的最小副本数量。若是不能知足这个最小值,则生产者将引起一个异常(要么是NotEnoughReplicas,要么是NotEnoughReplicasAfterAppend)。

当一块儿使用时,min.insync.replicas和ack容许执行更大的持久性保证。一个典型的场景是建立一个复制因子为3的主题,设置min.insync复制到2个,用“all”配置发送。将确保若是大多数副本没有收到写操做,则生产者将引起异常。

三、失败的offset单独记录

producer发送消息,会自动重试,遇到不可恢复异常会抛出,这时能够捕获异常记录到数据库或缓存,进行单独处理。

5、消费者数据重复场景及解决方案

一、根本缘由

数据消费完没有及时提交offset到broke。

二、场景

消息消费端在消费过程当中挂掉没有及时提交offset到broke,另外一个消费端启动拿以前记录的offset开始消费,因为offset的滞后性可能会致使新启动的客户端有少许重复消费。

三、解决方案

一、取消自动自动提交

每次消费完或者程序退出时手动提交。这可能也无法保证一条重复。

二、下游作幂等

通常的解决方案是让下游作幂等或者尽可能每消费一条消息都记录offset,对于少数严格的场景可能须要把offset或惟一ID,例如订单ID和下游状态更新放在同一个数据库里面作事务来保证精确的一次更新或者在下游数据表里面同时记录消费offset,而后更新下游数据的时候用消费位点作乐观锁拒绝掉旧位点的数据更新。

原文:kafka消息重复和丢失的场景及解决方案分析

相关:Kafka消息中间件到底会不会丢消息

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐