首先kafka的消费者组机制一直很受诟病,就是很受诟病的。

过度设计,也不实用,永远在做没必要的重平衡。很多情况架构上和客户端上都可以自己做。

其次,如果用了咋办

什么时候会发生rebalance?

前面我们已经说到,rebalance 其实就是对 partition 进行重新分配。那么什么时候会发生 rebalance 呢?其实在以下三种情况下,会触发 rebalance:

  • 订阅 Topic 的分区数发生变化。
  • 订阅的 Topic 个数发生变化。
  • 消费组内成员个数发生变化。例如有新的 consumer 实例加入该消费组或者离开组。
  • 「消费组内成员个数发生变化」的几种情况:

    • 新成员加入
    • 组成员主动离开
    • 组成员崩溃

 --------------------------------------------------------------------

20220421,

一次重平衡复现实验,重启了一台机器上的各个consumer group成员。用了4分钟最终完成重平衡。

第一次报警:

{"Level":"error","Time":"2022-04-21T20:38:02.9884226+08:00","LoggerName":"","Message":"Cleanup session MemberID :[vwin04_hunter-69c09b1d-a46b-40f8-8e55-e7c07580b46b] , GenerationID :[3980] , Claims :[map[vsdir:[12 13 14 15 16 17]]]","Caller":{"Defined":true,"PC":23615323,"File":"/groupconsumer.go","Line":67,"Function":"cloudscan/pubsub.(*GroupConsumer).Cleanup"},"Stack":"","Subject":"csscand告警 engine: [hunter] , version:[1.7.0]","IP":"","HostName":"vwin04"}

最后一次报警:

{"Level":"error","Time":"2022-04-21T20:42:08.2430919+08:00","LoggerName":"","Message":"Cleanup session MemberID :[vwin04_comodo-724c94cf-551f-4cd4-abb5-389c47f63a07] , GenerationID :[11674] , Claims :[map[vsdir:[16 17 18 19 20 21 22 23]]]","Caller":{"Defined":true,"PC":23615323,"File":"/groupconsumer.go","Line":67,"Function":"cloudscan/pubsub.(*GroupConsumer).Cleanup"},"Stack":"","Subject":"csscand告警 engine: [comodo] , version:[1.7.0]","IP":"","HostName":"vwin04"}

------------------------------------------------------------------------------

20220419

调了

config.Consumer.Group.Session.Timeout = time.Second * 120

加了这一行config.Consumer.Group.Rebalance.Timeout = config.Consumer.Group.Session.Timeout

config.Consumer.Group.Rebalance.Timeout   rebalance的timeout 设置为了2分钟,并且和Session.Timeout相等

20220421

Group vsarcabit remove dynamic members who haven't joined: Set(10-52-6-136_arcabit-590f2304-7918-44f7-9c79-753d548bd41e) (kafka.coordinator.group.GroupCoordinator)

参考https://zhuanlan.zhihu.com/p/109574627

RPC

与 Rebalance 相关有 JoinGroup 和 SyncGroup 两个接口,再加上 Heartbeat 接口。

JoinGroup 接口演进到了第六版:

JoinGroup Request (Version: 6) => group_id session_timeout_ms rebalance_timeout_ms member_id group_instance_id protocol_type [protocols] TAG_BUFFER 
  group_id => COMPACT_STRING
  session_timeout_ms => INT32
  rebalance_timeout_ms => INT32
  member_id => COMPACT_STRING
  group_instance_id => COMPACT_NULLABLE_STRING
  protocol_type => COMPACT_STRING
  protocols => name metadata TAG_BUFFER 
    name => COMPACT_STRING
    metadata => COMPACT_BYTES

JoinGroup Response (Version: 6) => throttle_time_ms error_code generation_id protocol_name leader member_id [members] TAG_BUFFER 
  throttle_time_ms => INT32
  error_code => INT16
  generation_id => INT32
  protocol_name => COMPACT_STRING
  leader => COMPACT_STRING
  member_id => COMPACT_STRING
  members => member_id group_instance_id metadata TAG_BUFFER 
    member_id => COMPACT_STRING
    group_instance_id => COMPACT_NULLABLE_STRING
    metadata => COMPACT_BYTES

当有任何新的 Consumer 发起 JoinGroup 后,Coordinator 会进入 PreparingBalance 状态,递增 generationID,随后等待活跃的所有 Consumer 重新加入 Group,等待的期限为 rebalance_timeout_ms(默认值为 60s)。

里面有两个 timeout 值得注意:

  • session_timeout_ms:表示 Coordinator 如果超过该超时值没有收到心跳,则认为 session 过期;
  • rebalance_timeout_ms:表示 Coordinator 等待所有 Consumer 重新申请加入的最大时限;

假如我扩容 100 个 Consumer,要等多久生效?显然不能是 60s。那么 Coordinator 怎么知道要等到什么时候呢?等待所有活跃 session 的 Consumer 都发送过 JoinGroup 便可以了。每个 Consumer 每 3s 与 Coordinator 发一次心跳,这也意味着一次 rebalance 大约需要 3s 左右的等待。如果减少心跳的时间间隔,Rebalance 的生效时间应能够相应减少。

也就是说,所有客户端的 JoinGroup 会阻塞直到所有活跃 Session 的 Consumer 皆执行了 JoinGroup 为止。随后 ConsumerGroup 将进入 CompletingBalance 状态。

haven't joined,就是join的阶段没成功,那为什么没成功?

等待所有活跃 session 的 Consumer 都发送过 JoinGroup 便可以了。每个 Consumer 每 3s 与 Coordinator 发一次心跳,这也意味着一次 rebalance 大约需要 3s 左右的等待。如果减少心跳的时间间隔,Rebalance 的生效时间应能够相应减少。

我们的配置,增大了config.Consumer.Group.Heartbeat.Interval = time.Second * 20,

由3秒变成了,20秒,那么相当于

原来默认

rebalance_timeout_ms = 60
3秒一次,可以进行20次
现在20秒一次,只能进行3次,极大减少了。调整
rebalance_timeout_ms 为120秒后,也只是变成了6次。
所以应该减少Heartbeat.Interval=5秒,或者就为3秒

像raft协议一样,心跳是待着逻辑信息在里面的,所以心跳不能间隔太大


-------------------------------------------------------------------------------------------------------------

20220418:

generation 3258  是可用的。

一个正常的kafka server端 rablance日志:

[2022-04-18 12:35:34,586] INFO [GroupCoordinator 1]: Dynamic Member with unknown member id joins group vsarcabit in Stable state. Created a new member id _arcabit-a988337d-c2e7-436f-a556-7e7f34576a83 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-18 12:35:34,586] INFO [GroupCoordinator 1]: Preparing to rebalance group vsarcabit in state PreparingRebalance with old generation 3257 (__consumer_offsets-11) (reason: Adding new member _arcabit-a988337d-c2e7-436f-a556-7e7f34576a83 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-04-18 12:35:35,047] INFO [GroupCoordinator 1]: Dynamic Member with unknown member id joins group vsarcabit in PreparingRebalance state. Created a new member id _arcabit-dce0a743-6b08-4ab7-879c-7518c29f15c8 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-18 12:35:43,245] INFO [GroupCoordinator 1]: Dynamic Member with unknown member id joins group vsarcabit in PreparingRebalance state. Created a new member id _arcabit-ae61f5ce-c88f-44e7-9475-95e150c39741 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-18 12:35:48,499] INFO [GroupCoordinator 1]: Dynamic Member with unknown member id joins group vsarcabit in PreparingRebalance state. Created a new member id_arcabit-70411222-99d0-453c-aa53-e7c5262569e4 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-18 12:36:14,537] INFO [GroupCoordinator 1]: Stabilized group vsarcabit generation 3258 (__consumer_offsets-11) (kafka.coordinator.group.GroupCoordinator)
[2022-04-18 12:36:14,540] INFO [GroupCoordinator 1]: Assignment received from leader for group vsarcabit for generation 3258 (kafka.coordinator.group.GroupCoordinator)

一个正常的client端日志,

Apr 18 12:36:14  csscand[24350]: {"level":"info","time":"2022-04-18T12:36:14.578+0800","caller":"pubsub/groupconsumer.go:57","msg":"Setup session MemberID :[_arcabit-1b56a50a-ec96-4a48-986b-d90afc70cb0d] , GenerationID :[3258] , Claims :[map[vsdir:[6 7 8]]]",,"version":"v1.6.4","author":"donghongchen","buildDate":"2022-04-18T10:51:05","engine":"arcabit","app":"csscand"}

-----------------------------------------------------------------------------------

所以,有 Group vsarcabit remove dynamic members who haven't joined,是异常的。

20220418,一个完整的kafka 某个消费者组 rebalance日志:

[2022-04-17 02:35:12,783] INFO [GroupCoordinator 1]: Dynamic Member with unknown member id joins group vsarcabit in Stable state. Created a new member id 10-52-6-136_arcabit-590f2304-7918-44f7-9c79-753d548bd41e for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 02:35:12,783] INFO [GroupCoordinator 1]: Preparing to rebalance group vsarcabit in state PreparingRebalance with old generation 3105 (__consumer_offsets-11) (reason: Adding new member 10-52-6-136_arcabit-590f2304-7918-44f7-9c79-753d548bd41e with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 02:36:12,783] INFO [GroupCoordinator 1]: Group vsarcabit remove dynamic members who haven't joined: Set(10-52-6-28_arcabit-64bcc707-3180-46ac-ada5-06a1d117b268) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 02:36:12,783] INFO [GroupCoordinator 1]: Stabilized group vsarcabit generation 3106 (__consumer_offsets-11) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 02:36:12,789] INFO [GroupCoordinator 1]: Assignment received from leader for group vsarcabit for generation 3106 (kafka.coordinator.group.GroupCoordinator)

这里虽然时间有空隙,但是一个comsumer group的连续日志:

[2022-04-17 03:17:16,370] INFO [GroupCoordinator 1]: Dynamic Member with unknown member id joins group vsarcabit in Stable state. Created a new member id 10-52-6-224_arcabit-ade8d685-c136-4edc-bbef-9ed297b9a8a0 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:17:16,370] INFO [GroupCoordinator 1]: Preparing to rebalance group vsarcabit in state PreparingRebalance with old generation 3106 (__consumer_offsets-11) (reason: Adding new member 10-52-6-224_arcabit-ade8d685-c136-4edc-bbef-9ed297b9a8a0 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:17:20,200] INFO [ProducerStateManager partition=tb_asset-9] Writing producer snapshot at offset 380969561 (kafka.log.ProducerStateManager)
[2022-04-17 03:17:20,201] INFO [Log partition=tb_asset-9, dir=/data/kafka-data] Rolled new log segment at offset 380969561 in 1 ms. (kafka.log.Log)
[2022-04-17 03:17:58,606] INFO [GroupCoordinator 1]: Group vstws remove dynamic members who haven't joined: Set(vwin01_tws-ad4fae20-babf-4897-90c3-7ccbb46b04af) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:17:58,606] INFO [GroupCoordinator 1]: Stabilized group vstws generation 10806 (__consumer_offsets-23) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:17:58,609] INFO [GroupCoordinator 1]: Assignment received from leader for group vstws for generation 10806 (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:18:16,371] INFO [GroupCoordinator 1]: Group vsarcabit remove dynamic members who haven't joined: Set(10-52-6-136_arcabit-590f2304-7918-44f7-9c79-753d548bd41e) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:18:16,371] INFO [GroupCoordinator 1]: Stabilized group vsarcabit generation 3107 (__consumer_offsets-11) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:18:16,375] INFO [GroupCoordinator 1]: Assignment received from leader for group vsarcabit for generation 3107 (kafka.coordinator.group.GroupCoordinator)

[2022-04-17 03:24:20,903] INFO [GroupCoordinator 1]: Dynamic Member with unknown member id joins group vsarcabit in Stable state. Created a new member id 10-52-6-249_arcabit-5b1cefdc-9755-44a0-9806-521eea353142 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:24:20,903] INFO [GroupCoordinator 1]: Preparing to rebalance group vsarcabit in state PreparingRebalance with old generation 3107 (__consumer_offsets-11) (reason: Adding new member 10-52-6-249_arcabit-5b1cefdc-9755-44a0-9806-521eea353142 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:25:20,902] INFO [GroupCoordinator 1]: Group vsarcabit remove dynamic members who haven't joined: Set(10-52-6-224_arcabit-ade8d685-c136-4edc-bbef-9ed297b9a8a0) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:25:20,902] INFO [GroupCoordinator 1]: Stabilized group vsarcabit generation 3108 (__consumer_offsets-11) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:25:20,903] INFO [GroupCoordinator 1]: Assignment received from leader for group vsarcabit for generation 3108 (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:55:30,435] INFO [GroupCoordinator 1]: Dynamic Member with unknown member id joins group vsarcabit in Stable state. Created a new member id 10-52-6-131_arcabit-02916ed4-6ada-416c-8b41-a78829ee0488 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:55:30,436] INFO [GroupCoordinator 1]: Preparing to rebalance group vsarcabit in state PreparingRebalance with old generation 3108 (__consumer_offsets-11) (reason: Adding new member 10-52-6-131_arcabit-02916ed4-6ada-416c-8b41-a78829ee0488 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:56:30,436] INFO [GroupCoordinator 1]: Group vsarcabit remove dynamic members who haven't joined: Set(10-52-6-249_arcabit-5b1cefdc-9755-44a0-9806-521eea353142) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:56:30,436] INFO [GroupCoordinator 1]: Stabilized group vsarcabit generation 3109 (__consumer_offsets-11) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:56:30,451] INFO [GroupCoordinator 1]: Assignment received from leader for group vsarcabit for generation 3109 (kafka.coordinator.group.GroupCoordinator)

-------------------------------------------------------------------------------------------------------------------- 

服务端日志:

[2022-04-04 16:07:10,255] INFO [GroupCoordinator 2]: Dynamic Member with unknown member id joins group vskingsoft in Stable state. Created a new member id sarama-a1086e50-96c5-4395-9cc5-c2f5a72b68e1 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:07:10,256] INFO [GroupCoordinator 2]: Preparing to rebalance group vskingsoft in state PreparingRebalance with old generation 7259 (__consumer_offsets-0) (reason: Adding new member sarama-a1086e50-96c5-4395-9cc5-c2f5a72b68e1 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:08:03,853] INFO [GroupCoordinator 2]: Dynamic Member with unknown member id joins group vskingsoft in PreparingRebalance state. Created a new member id sarama-42fb1447-ff68-48b3-91d4-2d2a598e0755 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:08:04,114] INFO [GroupCoordinator 2]: Dynamic Member with unknown member id joins group vskingsoft in PreparingRebalance state. Created a new member id sarama-cfdf21c8-ef99-4382-85be-d3518dd9b1ff for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:08:10,256] INFO [GroupCoordinator 2]: Group vskingsoft remove dynamic members who haven't joined: Set(sarama-b4c83a70-f6bb-430f-805f-176023f24d25, sarama-32d526c5-5766-49c1-9423-2bbec4a683a4, sarama-a0d6e6b6-4b8d-43e3-9be7-a51c03929f40, sarama-88995424-4fbe-4512-8c7f-1637998310f8, sarama-9d889287-5823-4e59-8b8d-ed016ef44180) (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:08:10,256] INFO [GroupCoordinator 2]: Stabilized group vskingsoft generation 7260 (__consumer_offsets-0) (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:08:10,260] INFO [GroupCoordinator 2]: Assignment received from leader for group vskingsoft for generation 7260 (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:09:19,166] INFO [GroupCoordinator 2]: Dynamic Member with unknown member id joins group vskingsoft in Stable state. Created a new member id sarama-668f4500-b31b-49d7-b3a6-4194b9dbae89 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:09:19,166] INFO [GroupCoordinator 2]: Preparing to rebalance group vskingsoft in state PreparingRebalance with old generation 7260 (__consumer_offsets-0) (reason: Adding new member sarama-668f4500-b31b-49d7-b3a6-4194b9dbae89 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:09:28,431] INFO [GroupCoordinator 2]: Dynamic Member with unknown member id joins group vskingsoft in PreparingRebalance state. Created a new member id sarama-92092c42-3cb3-4ccd-bae9-92ffc53348dd for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:09:34,467] INFO [GroupCoordinator 2]: Dynamic Member with unknown member id joins group vskingsoft in PreparingRebalance state. Created a new member id sarama-7f49d3d4-1ad2-461b-b5ce-1a4cd12dca66 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:10:06,090] INFO [GroupCoordinator 2]: Dynamic Member with unknown member id joins group vskingsoft in PreparingRebalance state. Created a new member id sarama-d7cf7efd-ba01-48b5-a6ff-ecee6ed161ba for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:10:19,165] INFO [GroupCoordinator 2]: Group vskingsoft remove dynamic members who haven't joined: Set(sarama-cfdf21c8-ef99-4382-85be-d3518dd9b1ff, sarama-a1086e50-96c5-4395-9cc5-c2f5a72b68e1, sarama-42fb1447-ff68-48b3-91d4-2d2a598e0755) (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:10:19,165] INFO [GroupCoordinator 2]: Stabilized group vskingsoft generation 7261 (__consumer_offsets-0) (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:10:19,182] INFO [GroupCoordinator 2]: Assignment received from leader for group vskingsoft for generation 7261 (kafka.coordinator.group.GroupCoordinator)

表现:

就是不断重平衡,导致业务消费速度变慢。

客户端日志没找到有效消息。

以前遇到的情况,

kafka troubleshooting,The provided member is not known in the current generation,有时候日志里还会伴随着 i/o timeout

可能原因1,代码里解决了

case <-sess.Context().Done()



目前还没有定位到,为什么有动态成员加入consumer group

可能原因3:

watermill库的问题。这个库已经不用了。

read tcp :49560->:9092: i/o timeout under sarama kafka golang panic

这里的也改了。config.Consumer.Group.Session.Timeout time.Second 120,也就是心跳超时时间,但我们的网络超时时间很小,默认30秒,30秒我们的场景,文件扫描消费时间长,30秒是可能处理不完数据的。最终配置:

// https://github.com/Shopify/sarama/issues/1422  config.Net.ReadTimeout = config.Consumer.Group.Session.Timeout + 30*time.Second



从server端日志定位的结论:
目前定位的结论,我们使用的动态成员id,然后不断有新成员申请加入group,然后触发重平衡。而且这个新成员又经常加入不成功。


目前处置方式:
如果这个问题持续出现,就关注下是什么触发的动态新成员加入。如果不出现了,就可以不关注了。
如果找不到是什么触发的动态新成员加入,那就换静态成员跑跑看.
---------------------------------
20220411,后续处置方式,重启了kafka客户端所在的程序后,问题不在出现了,说明卡bug了

-------------------------------
20220415 有一个找到问题了,是因为oom 然后进程被kill,然后重连kafka,所以日志里(reason: Adding new member sarama-a1086e50-96c5-4395-9cc5-c2f5a72b68e1 with group instance id None) (kafka.coordinator.group.GroupCoordinator,所以重平衡
----------------------------------
静态消费组相关资料:

kafka 静态消费组成员

kafka 静态消费组成员

kafka的消费者组机制一直很受诟病,原因是他的设计看起来是比较美好的,但是在实际使用过程中,由于各种业务本身的消费逻辑漫长或者用户的使用姿势问题,导致自身的消费者组经常陷入无限的重平衡中,而由于消费者组的STW机制也会导致同组内的其他消费者出现消费停止的情况。这种现象在越大的工业集群中越容易出现,所以为了改进这种现象,kafka从2.3版本开始提供了静态消费者组的机制。(云上ckafka可以购买专业版2.4 也可以支持本特性)

为什么需要

kafka的消费者组机制,可以支持某个程序故障退出了,剩下的消费者可以快速拥有退出消费者的分区,并继续消费。但是这里存在一些问题使得消费者组的实际表现并不怎么好,同时现代的程序架构下,并不需要kafka本身的消费组机制来达成故障恢复的能力。

  1. 消费者能力已经到顶了,如果再拥有退出消费者的分区,由于消费能力不够,导致不断触发重平衡,于是整个消费者组都没法继续消费。
  2. 消费者虽然退出了,但是由于现代程序架构下大家普遍使用了supervisor机制或者是运行在k8s上的pod,消费者可能很快就会回来,但是这个时候重平衡已经触发了,由于消费者回来,又会触发一次重平衡,这种情况下每次退出恢复都会导致两次重平衡的出现,这种不必要的重平衡在大型消费集群中出现是很难接受的。
  3. 快速的滚动升级,正常的程序迭代,由于每次发布都会导致服务的重启,触发整个消费者组的重平衡,这种情况在现代架构下看起来也是不必要的。
  4. kafka的消费者是不能超过分区数的,虽然在表面看来超过了分区数只是会有部分消费者无法拥有分区,但是从实际的生产环境来看,由于重平衡时多个消费者可能出现间歇性拥有某几个分区,然后在消费能力不足,且消费逻辑比较漫长的情况下,又出现反复重平衡。

基本原理

静态消费者组会尽量在 组成员发生一些变动的时候阻止消费者组状态从 STABLE 变换为 PREPARE_REBALANCE。

为了达成这样的目的,kafka在2.3版本修改了Group的多个API且更改了启动了静态消费者的客户端退出逻辑

  1. 加入group.instance.id 参数,用于识别静态消费者成员,一旦设定了这个参数消费者就会被认为是静态消费者
  2. 静态消费者退出的时候不再往服务端发生LeaveGroup请求,直到session超时,才会被剔除消费者组
  3. 加大了服务端的最大session超时,在服务端支持下,客户端的最大session超时可以设定为30分钟

静态消费者情况下重平衡逻辑及注意事项

  1. 消费者组成员增加,会触发重平衡
  2. session超时会触发重平衡(这里session超时配置建议是基于能够容忍不可用的时间来配置,尽量延长为重启的程序和消费慢的程序留出时间)
  3. max.poll.interval.ms 始终大于 session.timeout.ms 如果session timeout为5min,那么poll.interval.ms也要大于5min
  4. 客户端程序必须要自己确保group.instance.id的唯一性,重复的group.instance.id加入同一个消费者组会报错
  5. 目前已知java官方客户端(2.3以上)和Librdkafka(1.4.0以上) 支持本特性,sarama暂时不支持

附录代码

#include<iostream>
#include <librdkafka/rdkafkacpp.h> int main() { std::string err; std::vector<std::string> topics; auto conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); conf->set("bootstrap.servers", "vip:port", err); conf->set("enable.partition.eof", "false", err); conf->set("group.id", "markstatic", err); conf->set("group.instance.id", "consumer-450", err); // instance.id必须唯一 conf->set("session.timeout.ms", "600000", err); // sesion timeout 为能够忍受的分区不可用最长时间 conf->set("max.poll.interval.ms", "600500", err); // poll.interval.ms需要大于 sesion timeout auto consumer = RdKafka::KafkaConsumer::create(conf, err); if (!consumer) { std::cerr << "Failed to create consumer: " << err << std::endl; exit(1); } topics.push_back("mytest1"); auto suberr = consumer->subscribe(topics); if (suberr) { std::cerr << "Failed to subscribe to " << topics.size() << " topics: " << RdKafka::err2str(suberr) << std::endl; exit(1); } while (true) { auto msg = consumer->consume(1000); std::cout << " Message in " << msg->topic_name() << " [" << msg->partition() << "] at offset " << msg->offset() << std::endl; delete msg; } }



1.参数group.instance.id说明:

配置了此参数,说明该consumer是静态成员。

静态成员配以较大的session超时设置能够避免因成员临时不可用(比如重启)而引发的Rebalance。

2.参数session.timeout.ms说明:

配合group.instance.id使用,如果该consumer超过了该时间,还没有上线,那么将触发rebalance。

如果不配置此参数,默认时间是6000ms。

实际生成中,程序或容器的重启可能需要几分钟。因此可以设置大一点。





参考连接:
StackOverflow上也有这个问题,不过还没有答案:
https://stackoverflow.com/questions/62878735/kafka-one-particular-consumers-group-is-rebalancing


https://cloud.tencent.com/developer/article/1786605
https://stackoverflow.com/questions/62878735/kafka-one-particular-consumers-group-is-rebalancing
https://blog.csdn.net/weixin_33970380/article/details/113316660  这是sarama库加入新成员的源码
https://olnrao.wordpress.com/2015/05/15/apache-kafka-case-of-mysterious-rebalances/
https://www.cnblogs.com/chanshuyi/p/kafka_rebalance_quick_guide.html
https://www.cnblogs.com/zstiancai/p/15708612.html
https://www.icode9.com/content-1-956448.html
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐