序言

新年快乐!

Apache的Kafka是一个分布式流平台(A Distributed Streaming Platform),也是一个分布式日志系统。在第一家公司里,它主要是用于采集大量的日志,并处理这些日志信息(利用正则表达式过滤出必要的日志信息并产生告警)。在目前这家公司,除了日志采集,它还用于异步接口的消息通知。事实上,Kafka作为常用的消息中间件最主要的作用就是解耦、异步、削峰。有过了解的人应该都知道。最近在开发过程中遇到一点问题记录一下,

问题描述:

Kafka在发送消息后,消费者一直没有commit确认已经收到消息,然后超过了消费者设置的最大拉取时间,然后会报下面的错误信息

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.This means that the time between subsequent calls to poll() was longer the group has already rebalanced and assigned the partitions to another member.

可能原因:

1、可能某个消费者节点正在调试,导致线程一直阻塞在那里,然后超过了最大拉取时间,kafka集群节点收到消费者的commit超时,导致kafka集群重新负载均衡(rebalance)。

2、也可能消费者在处理消息时间过长,没有及时commit,导致超过了最大拉取时间。

消费者配置中的MaxPollIntervalMs

Apache官网max.poll.interval.ms上的解释如下,消费者组中的一员在拉取消息时如果超过了设置的最大拉取时间,则会认为消费者消费消息失败,kafka会重新进行重新负载均衡,以便把消息分配给另一个消费组成员。

 消费者配置的最大拉取时间默认时间是5分钟,不宜设置过大或过小。

参考链接:

1、Apache Kafka

2、https://ifeve.com/kafka-1/https://ifeve.com/kafka-1/

3、 kafka的maxPollIntervalMs设置太小引发的惨案 (转) - 木西-Muxy - 博客园

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐