遇到kafka消息丢失和重复消费问题该如何处理
前段时间在工作中实际遇到了kafka消费的问题,通过查找定位并解决了问题,做些分享。提示以下是本篇文章正文内容,下面案例可供参考第一次写。。。写得不咋地。。。。
前言
前段时间在工作中实际遇到了kafka消费的问题,通过查找定位并解决了问题,做些分享。
一、生产者有没有丢失消息?
1、生产者发消息原理
producer向broker发送消息,一旦这条消息被commit,表示消息已经被写入到磁盘(segment文件)。就不会丢失。但如果producer发送消息时,由于网络原因,producer没有收到commit,无法判断broker的情况。虽然不知道这条消息的结果,但是producer可能为这消息生成一个唯一id,这样在发送失败后,重复发送消息,broker接受到消息后校验消息id是否处理过即可达到exactly once的效果。
2、如何保证消息不丢失?
设置了acks=all
,一定不会丢,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。
二、消费者有没有丢失消息?
1、消费者消费消息得原理
consumer从broker上读取消息后,可以选择commit,该操作会将consumer在partition上读取消息的offset,记录下来,下一次该consumer读取消息时,会从offset之后的消息开始读取。如果未提交commit,下次读取数据还是和上次一样。
当然可以设置autocommit,就是说consumer一旦读取到数据,立刻提交commit。如果是这样可以算是达到exactly once。但实际使用中,一般都是将数据处理完成之后,才会提交。处理数据的过程就会有很多问题。所以kafka默认保证at least once。所以重复消费的问题只能交给消费者自己实现。
2、如何解决重复消费问题?
1 提高消费端的处理性能,避免触发balance
采用多线程去处理数据,缩短单个消息消费的时长
调整消息处理的超时时间
减少一次性从broker拉取数据的条数
2 使用ConsumerRebalanceListener,再均衡监听器
可以设定在均衡机制触发前后的准备和收尾工作
3 使用消息幂等性
开启kafka的幂等性功能
将消息生成md5保存到mysql或者是redis中,在处理消息前检查下mysql或者是redis是否已经处理过了
总结
其实日常开发中kafka和rocketmq底层都一样,java的技术又很成熟,所以问题一般不会出现在中间件自身。
更多推荐
所有评论(0)