kafka的分区策略

kafka produce端的分区策略

第一种分区策略:给定了分区号,直接将数据发送到指定的分区里面去

第二种分区策略:没有给定分区号,给定数据的key值,通过key取上hashCode进行分区

第三种分区策略:既没有给定分区号,也没有给定key值,直接轮循进行分区

第四种分区策略:自定义分区, producer.send(new ProducerRecord<String, String>("test",i+""));

kafka 消费者的的分区策略

修改kafka分区分配策略:
partition.assignment.strategy= range+CooperativeStucky(默认值) 或 roundrobin,kafka可以同时使用多个分区分配策略。
   Kafka Consumer用来从Kafka集群拉取数据,通过consumer groups允许多个进程共同分担消费/处理数据的工作,这些进程可以运行在一台机器或者多台机器。而ConsumerPartitionAssignor接口用来定制consumer的分区分配策略。通过consumer配置项partition.assignment.strtegy指定分区分配策略类。消费者分区分配策略类的实现方法共有四种。 

有四种分配策略

第一种分区策略:RoundRobinAssignor

第二种分区策略:RangeAssignor详解

第三种分区策略:StickyAssignor详解

第四种分区策略:CooperativeStickyAssignor详解
第一种分区策略:RoundRobinAssignor

RoundRobin是针对所有topic分区。它是采用轮询分区策略,是把所有的partition和所有的consumer列举出来,然后按照hashcode进行排序,最后再通过轮询算法来分配partition给每个消费者。 。

例如:两个consumers(C0、C1),两个topics(T0、T1),分区数均为3。如果Consumers订阅信息为:

C0T0、T1
C1T0、T1

则Consumers分区分配方案为:

C0T0P0、T0P2、T1P1
C1T0P1、T1P0、T1P2

但是,如果consumers订阅Topics不相同,仍然按照轮询方式进行分配,将导致consumer之间分区分配不均衡。例如:三个consumers(C0、C1、C2),三个Topics(T0、T1、T2),T0分区数为1、T1分区数为2、T3分区数为3。如果Consumers订阅信息为:

C0T0
C1T0、T1
C2T0、T1、T2

则Consumers分区分配方案为:

C0T0P0
C1T1P0
C2T1P1、T2P0、T2P1、T2P2
第二种分区策略:RangeAssignor详解

与RoundRobinAssignor不同,RangeAssignor作用域为每个Topic。对于每一个Topic,将该Topic的所有可用partitions和订阅该Topic的所有consumers展开(字典排序),然后将partitions数量除以consumers数量,算数除的结果分别分配给订阅该Topic的consumers,算数除的余数分配给前一个或者前几个consumers。所以,如果该Topic的partitions数量与订阅该Topic的consumers数量不是整数倍关系,将造成前一个或者前几个consumer分配到较多的partitions,达不到consumer之间分区分配均衡的效果(不管是面向所有Topics还是单个Topic)。例如:两个consumers(C0、C1),两个Topics(T0、T1),分区数均为3。如果Consumers订阅信息为:

C0T0
C1T1

则Consumers分区分配方案为:

C0T0P0、T0P1、T1P0、T1p1
C1T0P2、T1P2
第三种分区策略:StickyAssignor详解

前两种分配策略(RoundRobinAssignor、RangeAssignor)都存在分区分配不均衡的情况,而StickyAssignor有两个目标:首先,尽可能保证分区分配均衡(即分配给consumers的分区数最大相差为1);其次,当发生分区重分配时,尽可能多的保留现有的分配结果。当然,第一个目标的优先级高于第二个目标。乍一看,这个描述跟RoundRobinAssignor相同,其实并非如此。例如:三个consumers(C0、C1、C2),四个Topics(T0、T1、T2、T3)。

如果Consumers订阅信息为:

C0T0、T1、T2、T3
C1T0、T1、T2、T3
C2T0、T1、T2、T3

则RoundRobinAssignor和StickyAssignor分区分配方案均为:

C0T0P0、T1P1、T3P0
C1T0P1、T2P0、T3P1
C2T1P0、T2P1

现在,假设C1被移除,将触发分区重分配。此时,RoundRobinAssignor和StickyAssignor的分区分配方案将有所差异。

RoundRobinAssignor分区分配方案将变为:

C0T0P0、T1P0、T2P0、T3P0
C2T0P1、T1P1、T2P1、T3P1

保留之前的分区分配方案的3个分区不变。

StickyAssignor分区分配方案将变为:

C0T0P0、T1P1、T3P0、T2P0
C2T1P0、T2P1、T0P1、T3P1

保留之前的分区分配方案的5个分区不变。由此可见,StickyAssignor尽量保存之前的分区分配方案,分区重分配变动更小。

以上为所有consumers均订阅所有topics的场景,接着介绍consumers订阅topics不相同的场景。例如:三个consumers(C0、C1、C2),三个Topics(T0、T1、T2),分区数分别为1、2、3。如果consumers订阅信息为:

C0T0
C1T0、T1
C2T0、T1、T2

RoundRobinAssignor分区分配方案为:

C0T0P0
C1T1P0
C2T1P1、T2P0、T2P1、T2P2

StickyAssignor分区分配方案为:

C0T0P0
C1T1P0、T1P1
C2T2P0、T2P1、T2P2

由此可见,分区均衡性来说,RoundRobinAssignor不如StickyAssignor均衡。

现在,假设C0被移除,将触发分区重分配。此时,RoundRobinAssignor和StickyAssignor的分区分配方案将有所差异。

RoundRobinAssignor分区分配方案将变为:

C1T0P0、T1P1
C2T1P0、T2P0、T2P1、T2P2

保留之前的分区分配方案的4个分区不变。

StickyAssignor分区分配方案将变为:

C1T1P0、T1P1、T0P0
C2T2P0、T2P1、T2P2

保留之前的分区分配方案的5个分区不变。

由此可见,StickyAssignor尽量保存之前的分区分配方案,分区重分配变动更小。

第四种分区策略:CooperativeStickyAssignor详解

上述三种分区分配策略均是基于eager协议,Kafka2.4.0开始引入CooperativeStickyAssignor策略。CooperativeStickyAssignor与之前的StickyAssignor虽然都是维持原来的分区分配方案,最大的区别是:StickyAssignor仍然是基于eager协议,分区重分配时候,都需要consumers先放弃当前持有的分区,重新加入consumer group;而CooperativeStickyAssignor基于cooperative协议,该协议将原来的一次全局分区重平衡,改成多次小规模分区重平衡。

例如:一个Topic(T0,三个分区),两个consumers(consumer1、consumer2)均订阅Topic(T0)。如果consumers订阅信息为:

consumer1T0P0、T0P2
consumer2T0P1

此时,新的consumer3加入消费者组,那么基于eager协议的分区重分配策略流程:

img

1.consumer1、 consumer2正常发送心跳信息到Group Coordinator。

2.随着consumer3加入,Group Coordinator收到对应的Join Group请求,Group Coordinator确认有新成员需要加入消费者组。

3.Group Coordinator 通知consumer1和consumer2,需要rebalance了。

4.consumer1和consumer2放弃(revoke)当前各自持有的已有分区,重新发送Join Group请求到Group Coordinator。

5.Group Coordinator依据指定的分区分配策略的处理逻辑,生成新的分区分配方案,然后通过Sync Group请求,将新的分区分配方案发送给consumer1、consumer2、consumer3。

6.所有consumers按照新的分区分配,重新开始消费数据。

而基于cooperative协议的分区分配策略的流程:

img

1.consumer1、 consumer2正常发送心跳信息到Group Coordinator。

2.随着consumer3加入,Group Coordinator收到对应的Join Group请求,Group Coordinator确认有新成员需要加入消费者组。

3.Group Coordinator 通知consumer1和consumer2,需要rebalance了。

4.consumer1、consumer2通过Join Group请求将已经持有的分区发送给Group Coordinator。注意:并没有放弃(revoke)已有分区。

5.Group Coordinator取消consumer1对分区p2的消费,然后发送sync group请求给consumer1、consumer2。

6.consumer1、consumer2接收到分区分配方案,重新开始消费。至此,一次Rebalance完成。

r2通过Join Group请求将已经持有的分区发送给Group Coordinator。注意:并没有放弃(revoke)已有分区。

5.Group Coordinator取消consumer1对分区p2的消费,然后发送sync group请求给consumer1、consumer2。

6.consumer1、consumer2接收到分区分配方案,重新开始消费。至此,一次Rebalance完成。

7.当前p2也没有被消费,再次触发下一轮rebalance,将p2分配给consumer3消费。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐