前言

前段时间在工作中实际遇到了kafka消费的问题,通过查找定位并解决了问题,做些分享。

一、生产者有没有丢失消息?

        1、生产者发消息原理

           producer向broker发送消息,一旦这条消息被commit,表示消息已经被写入到磁盘(segment文件)。就不会丢失。但如果producer发送消息时,由于网络原因,producer没有收到commit,无法判断broker的情况。虽然不知道这条消息的结果,但是producer可能为这消息生成一个唯一id,这样在发送失败后,重复发送消息,broker接受到消息后校验消息id是否处理过即可达到exactly once的效果。

        2、如何保证消息不丢失?

        设置了acks=all,一定不会丢,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。

二、消费者有没有丢失消息?

        1、消费者消费消息得原理

        consumer从broker上读取消息后,可以选择commit,该操作会将consumer在partition上读取消息的offset,记录下来,下一次该consumer读取消息时,会从offset之后的消息开始读取。如果未提交commit,下次读取数据还是和上次一样。

当然可以设置autocommit,就是说consumer一旦读取到数据,立刻提交commit。如果是这样可以算是达到exactly once。但实际使用中,一般都是将数据处理完成之后,才会提交。处理数据的过程就会有很多问题。所以kafka默认保证at least once。所以重复消费的问题只能交给消费者自己实现。

        2、如何解决重复消费问题?        

1 提高消费端的处理性能,避免触发balance

采用多线程去处理数据,缩短单个消息消费的时长

调整消息处理的超时时间

减少一次性从broker拉取数据的条数

2 使用ConsumerRebalanceListener,再均衡监听器

可以设定在均衡机制触发前后的准备和收尾工作

3 使用消息幂等性

开启kafka的幂等性功能

将消息生成md5保存到mysql或者是redis中,在处理消息前检查下mysql或者是redis是否已经处理过了


总结

其实日常开发中kafka和rocketmq底层都一样,java的技术又很成熟,所以问题一般不会出现在中间件自身。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐