kafka消费组模型

  • 同一个消费组,一个partition只能被一个consumer订阅消费,但一个consumer可订阅多个分区,也即是每条消息只会被同一个消费组的某一个消费者消费,确保不会被重复消费;
  • 一个分区可被不同消费组订阅,这里有种特殊情况,加入每个消费组只有一个消费者,这样分区就会广播到所有消费者上,实现广播模式消费。

什么情况下会出发Rebalance

从图中可看出,Kafka 重平衡是外部触发导致的,触发 Kafka 重平衡的有以下几种情况

  1. 消费组成员发生变更,有新消费者加入或者离开,或者有消费者崩溃;
  2. 消费组订阅的主题数量发生变更;
  3. 消费组订阅的分区数发生变更。

每个消费者都会跟 Coordinator 保持心跳,当以上情况发生时,心跳响应就会包含 REBALANCE_IN_PROGRESS 命令,消费者停止消费,加入到重平衡事件当中。

rebalance所涉及的参数

在消费者启动时,某些参数会影响重平衡机制的发生,不当导致频繁重平衡,严重影响消费速度,下面跟大家说说这几个参数的一些要点:

  • session.timeout.ms

该参数是 Coordinator 检测消费者失败的时间,即在这段时间内客户端是否跟 Coordinator 保持心跳,如果该参数设置数值小,可以更早发现消费者崩溃的信息,从而更快地开启重平衡,避免消费滞后,但是这也会导致频繁重平衡,这要根据实际业务来衡量。

  • max.poll.interval.ms

消费者处理消息逻辑的最大时间,对于某些业务来说,处理消息可能需要很长时间,比如需要 1分钟,那么该参数就需要设置成大于 1分钟的值,否则就会被 Coordinator 剔除消息组然后重平衡。

  • heartbeat.interval.ms
    该参数跟 session.timeout.ms 紧密关联,前面也说过,只要在 session.timeout.ms 时间内与 Coordinator 保持心跳,就不会被 Coordinator 剔除,那么心跳间隔的时间就是 session.timeout.ms,因此,该参数值必须小于 session.timeout.ms,以保持 session.timeout.ms 时间内有心跳。

减少Rebalance

第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被 “踢出”Group 而引发的。这种情况下我们可以设置 session.timeout.ms 和 heartbeat.interval.ms 的值,来尽量避免rebalance的出现。(以下的配置是在网上找到的最佳实践,暂时还没测试过

  • 设置 session.timeout.ms = 6s。
  • 设置 heartbeat.interval.ms = 2s。
  • 要保证 Consumer 实例在被判定为 “dead” 之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。

将 session.timeout.ms 设置成 6s 主要是为了让 Coordinator 能够更快地定位已经挂掉的 Consumer,早日把它们踢出 Group。

第二类非必要 Rebalance 是 Consumer 消费时间过长导致的。此时,max.poll.interval.ms 参数值的设置显得尤为关键。如果要避免非预期的 Rebalance,你最好将该参数值设置得大一点,比你的下游最大处理时间稍长一点。

重平衡流程

在新版本中,消费组的协调管理已经依赖于 Broker 端某个节点,该节点即是该消费组的 Coordinator, 并且每个消费组有且只有一个 Coordinator,它负责消费组内所有的事务协调,其中包括分区分配,重平衡触发,消费者离开与剔除等等,整个消费组都会被 Coordinator 管控着,在每个过程中,消费组都有一个状态,Kafka 为消费组定义了 5 个状态,如下:

  • Empty:消费组没有一个活跃的消费者;
  • PreparingRebalance:消费组准备进行重平衡,此时的消费组可能已经接受了部分消费者加入组请求;
  • AwaitingSync:全部消费者都已经加入组并且正在进行重平衡,各个消费者等待 Broker 分配分区方案;
  • Stable:分区方案已经全部发送给消费者,消费者已经在正常消费;
  • Dead:该消费组被 Coordinator 彻底废弃。

可以看出,重平衡发生在 PreparingRebalance 和 AwaitingSync 状态机中,重平衡主要包括以下两个步骤:

  1. 加入组(JoinGroup):当消费者心跳包响应 REBALANCE_IN_PROGRESS 时,说明消费组正在重平衡,此时消费者会停止消费,并且发送请求加入消费组;
  2. 同步更新分配方案:当 Coordinator 收到所有组内成员的加入组请求后,会选出一个consumer Leader,然后让consumer Leader进行分配,分配完后会将分配方案放入SyncGroup请求中发送会Coordinator,Coordinator根据分配方案发送给每个消费者。

重平衡场景举例


Kafka 之 Group 状态变化分析及 Rebalance 过程

Group 状态机

在 0.9之后的 Kafka,出现了几个新变动:

  1.  Server 端增加了 GroupCoordinator 这个角色
  2. 将 topic 的 offset 信息由之前存储在 zookeeper 上改为存储到一个特殊的 topic 中(__consumer_offsets),主要原因是zk不适用频繁的写操作。

offset 那些事

其中,Last Committed Offset 和 Current Position 是与 Consumer Client 有关,High Watermark 和 Log End Offset 与 Producer Client 数据写入和 replica 之间的数据同步有关。

  • Last Committed Offset: group 最新一次 commit 的 offset,表示这个 group 已经把 Last Committed Offset 之前的数据都消费成功了;
  • Current Position:group 当前消费数据的 offset,也就是说,Last Committed Offset 到 Current Position 之间的数据已经拉取成功,可能正在处理,但是还未 commit;
  • Log End Offset:Producer 写入到 Kafka 中的最新一条数据的 offset;
  • High Watermark:已经成功备份到其他 replicas 中的最新一条数据的 offset,也就是说 Log End Offset 与 High Watermark 之间的数据已经写入到该 partition 的 leader 中,但是还未成功备份到其他的 replicas 中,这部分数据被认为是不安全的,是不允许 Consumer 消费的(这里说得不是很准确,可以参考:Kafka水位(high watermark)与leader epoch的讨论 这篇文章)Kafka水位(high watermark)与leader epoch的讨论

Topic __consumer_offsets

__consumer_offsets 是 Kafka 内部使用的一个 topic,专门用来存储 group 消费的情况,默认情况下有50个 partition,每个 partition 三副本,而具体 group 的消费情况要存储到哪一个 partition 上,是根据 abs(GroupId.hashCode()) % NumPartitions 来计算(其中,NumPartitions 是__consumer_offsets 的 partition 数,默认是50个)的。

GroupCoordinator

根据上面所述,一个具体的 group,是根据其 group 名进行 hash 并计算得到其具对应的 partition 值,该 partition leader 所在 Broker 即为该 Group 所对应的 GroupCoordinator,GroupCoordinator 会存储与该 group 相关的所有的 Meta 信息。

在 Broker 启动时,每个 Broker 都会启动一个 GroupCoordinator 服务,但只有 __consumer_offsets 的 partition 的 leader 才会直接与 Consumer Client 进行交互,也就是其 group 的 GroupCoordinator,其他的 GroupCoordinator 只是作为备份,一旦作为 leader 的 Broker 挂掉之后及时进行替代。

Consumer 初始化

Basic Poll Loop:基本的poll循环模型

Consumer需要支持并行地拉取数据,常见的情况就是从分布在不同broker上的多个topic的多个partition上拉取数据。为了实现这种情况,Kafka使用了一套类似于Unix中的poll或者select调用的API风格:一旦topic进行注册,未来所有的coordination、rebalance和数据拉取都是在一个event loop中通过一个单一的poll调用来触发的。这种实现方式是简单有效的,它可以处理来自单线程的所有IO。

在订阅了一个topic之后,你需要启动一个event loop来获得partition分配并开始开始拉取数据,这听起来很复杂,但是你需要做的就是在一个循环中调用poll方法,然后Consumer会自动处理其他的所有的事情。每一次对于poll方法的调用都会返回一个从其所分配的partition上拉取的message集合(集合可能会空)。下面的例子展示了在一个基本的poll循环模型中打印Consumer拉取的mmessage的offset和value。

这个pollAPI返回了根据Current Position拉取到的record。当group第一次创建时,这个位置是根据配置来进行设置的,可以被设置每个partition的最早或者最新的offset。但是一旦这个Consumer开始commit offset,之后的每次rebalance都会把position重置到Last Committed Offset位置。poll的这个参数是用来控制当Consumer在Current Position等待数据时block的最大时间,只要有任何record是可用的,Consumer就会立马返回,但是如果没有任何record是可用,Consumer将会等待一定的时长(被设置的时间)。

Consumer最初被设计时就是运行在它自己的线程上,在多线程情况下使用时如果没有额外的同步机制它并不是线程安全的,而且也不推荐去尝试。在这个例子中,我们使用了一个flag(runnning),当应用关掉时它用于从poll循环中中断。当这个flag被其他线程(例如:关闭进程的线程)设置为false时,当poll返回时循环就会结束,而且无论是否返回record应用都会结束进程。

当Consumer进程结束时,你应该显式地关闭Consumer进程,这样不仅可以清除使用的socket,而且可以确保Consumer会向Coordinator发送它离开group的信息。

思考:1.只要有数据,poll就立马返回吗?还是poll会等待一段时间或者一定消息量后返回?2.poll中设置的time参数在什么情况下起作用?如果拉取的消息为空,而时间又超出的话会出现什么情况?

Group中每一个Consumer都被安排它订阅topic的partitions的一个子集,group会使用一个group锁在这些partition上。只要这些锁还被持有,其他的Consumer成员就不能从这些partition上读取数据。如果这些Consumer运行正常,这种情况就是我们想要的结果,这也是避免重复读消费数据的唯一办法。但是如果由于节点或者程序故障造成Consumer异常退出时,你需要能够释放这些锁,以便这些partition可以被安排到其他健康的Consumer上。

Kafka的group coordination protocol通过心跳机制来解决这个问题(Consumer通过心跳机制来实现持有锁和释放锁),在每一次rebalance之后,当前group中的所有Consumer都会定期向group的coordinator发送心跳信息,如果可以收到这个Consumer的心跳信息,就证明这个Consumer是正常的。一旦收到心跳信息,这个coordinator会重新开始计时。如果定时到了而还没有收到心跳信息,coordinator将会把这个consumer标记为dead,并且会向group的其他成员发送信号,这样就会进行rebalance操作,从而重新对这些partition进行分配。定时的时长就是session 时长,它可以通过客户端的session.timeout.ms这个参数来设置

session时长机制可以确保如果遇到节点或者应用崩亏、或者网络把consumer从group中隔离的情况,锁会被释放。但是,通常应用失败的情况处理起来有点麻烦,因为即使Consumer仍然向coordinator发送心跳信息也不能证明应用是正常运行的。

Consumer的poll循环是被设置为解决这个问题,当你调用poll方法或者其他的阻塞的API时所有的网络IO就已经完成。而且Consumer并不会在后台调用任何其他线程,这就意味着心跳信息只是在调用poll方法时发送给coordinator的。如果因为处理代码的逻辑部分抛出异常或者下游系统崩溃而造成应用停止poll方法调用,那么也会造成没有任何心跳被发送,然后session定时就会超时,这个group就会进行rebalance操作。

如果一个consumer在给定的时间内没有发送心跳信息,这种机制就会被触发一个虚假的rebalance操作。当然可以通过将定时设置足够大来避免这种情况的发生,它默认的时长是30s,但是它没有必要的将时长设置高达几分钟。设置为更长时长的一个问题就是它需要花费更多的时间来发现失败的Consumer。

 

 

参考

https://www.jianshu.com/p/80721b0bdd1b

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐