kafka学习(五):消费者分区策略(再平衡机制)
*** 返回序列化后的自定义数据*/}/*** 分区分配的计算逻辑*//*** 当组成员从领导者那里收到其分配时调用的回调*/}/*** 指明使用的再平衡协议* 默认使用RebalanceProtocol.EAGER协议, 另外一个可选项为 RebalanceProtocol.COOPERATIVE*/}/***/}/*** 分配器的名字。
kafka再平衡机制:指的是kafka consumer锁订阅的topic发生变化时 发生的一种分区重分配机制。
一般有三种情况会出发consumer的分区分配策略(再平衡机制):
1、consumer group 中新增或删除某个consumer,导致其所消费的分区需要分配到组内其他的consumer上。
2、consumer订阅的topic发生变化,比如订阅的topic采用的是正则表达式的形式。如 test-* 此时如果有新建了一个topic test-user,那么这个topic的所有分区也是会自动分配给当前的consumer的,此时就会发生再平衡。
3、consumer所订阅的topic发生了新增分区的行为,那么新增的分区就会分配给当前的consumer,此时就会触发再平衡。
Kafka Consumer用来从Kafka集群拉取数据,通过consumer groups允许多个进程共同分担消费/处理数据的工作,这些进程可以运行在一台机器或者多台机器。而ConsumerPartitionAssignor接口用来定制consumer的分区分配策略。通过consumer配置项partition.assignment.strtegy指定分区分配策略类。消费者分区分配策略类的实现方法共有四种。
第一种分区策略:RoundRobinAssignor
第二种分区策略:RangeAssignor
第三种分区策略:StickyAssignor
第四种分区策略:CooperativeStickyAssignor(kafka2.4.0版本引入的策略)
修改kafka分区分配策略:
partition.assignment.strategy= range+CooperativeStucky或 roundrobin,kafka可以同时使用多个分区分配策略。
1、RangeAssignor :范围分区分配策略
partition.assignment.strategy=]org.apache.kafka.clients.consumer.RangeAssignor
它是以单个Topic为一个维度来计算分配的, 只负责将每一个Topic的分区尽可能均衡的分配给消费者:
-
消费组里面所有消费者consumer按照字母排序,给Topic的分区按照分区号排序。
-
先计算每个分区最少平均分配多少个分区数, 然后余下的逐个分。
即:首先会计算每个consumer可以消费的分区个数,然后按照顺序将指定个数范围的分区分配给各个consumer。
示例:
如上图,有topic t1 和 消费组,t1 有四个分区,消费组有三个消费者。
分区依次为:p0、p1、p2、p3
假设 N = 分区数,M = 消费者数量
A = N/M = 4/3 = 1 -- 每个消费者最少的分区数
B = N%M = 4%3 = 1 -- 需要多消费分区的 消费者个数
分配:前B个消费者,分配A+1个分区,其余的消费者分配 A个分区。
所以,最终的分配结果就是:
c0:p0、p1
c1:p2
c2:p3
Range弊端:
Range针对单个Topic的情况下显得比较均衡, 但是假如Topic很多的话, consumer排序靠前的可能会比 consumer排序靠后的负载多很多。
RangeAssignor核心源码如下:
// partitionsPerTopic表示topic和分区关系,key是topic,value是分区数量
// subscriptions表示订阅关系,key是消费者,value是订阅的topic
@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
// 得到topic和订阅的消费者集合信息,例如{t0:[c0, c1], t1:[C0, C1]}
Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
// 保存topic分区和订阅该topic的消费者关系结果map
Map<String, List<TopicPartition>> assignment = new HashMap<>();
for (String memberId : subscriptions.keySet())
// memberId就是消费者client.id+uuid(kafka在client.id上追加的)
assignment.put(memberId, new ArrayList<TopicPartition>());
// 遍历每个topic和消费者集合信息组成的map(由这个遍历可知,range策略分配结果在各个topic之间互不影响)
for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
// topic名称
String topic = topicEntry.getKey();
// topic的消费者集合信息
List<String> consumersForTopic = topicEntry.getValue();
// 当前topic的分区数量
Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
// 如果当天topic没有分区,那么继续遍历下一个topic
if (numPartitionsForTopic == null)
continue;
// 消费者集合根据字典排序
Collections.sort(consumersForTopic);
// 每个topic分区数量除以消费者数量,得出每个消费者分配到的分区数量
int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
// 无法整除的剩余分区数量
int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
// 根据topic名称和分区数量,得到分区集合信息
List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
// 遍历订阅当前topic的消费者集合
for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
// 分配到的分区的开始位置
int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
// 分配到的分区数量(整除分配到的分区数量,加上1个无法整除分配到的分区--如果有资格分配到这个分区的话。判断是否有资格分配到这个分区:如果整除后余数为m,那么排序后的消费者集合中前m个消费者都能分配到一个额外的分区)
int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
// 给消费者分配分区
assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
}
}
return assignment;
}
2、RoundRobinAssignor :轮询分区策略
RoundRobin是针对所有topic分区。它是采用轮询分区策略,是把所有的partition、所有的consumer列举出来 进行排序,最后再通过轮询分配partition给每个消费者(如果该消费者没有订阅该主题则跳到下一个消费者)。
RoundRobinAssignor核心源码如下:
// partitionsPerTopic表示topic和分区关系,key是topic,value是分区数量
// subscriptions表示订阅关系,key是消费者,value是订阅的topic信息
@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
Map<String, List<TopicPartition>> assignment = new HashMap<>();
for (String memberId : subscriptions.keySet())
assignment.put(memberId, new ArrayList<TopicPartition>());
// 将消费者集合先按照字典排序,再构造成一个环形迭代器
CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet()));
// 以topic名称排序(SortedSet<String> topics = new TreeSet<>();TreeSet保存topic名称从而实现排序),遍历topic下的分区,得到全部分区(分区主要信息包括topic名称和分区编号)
for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {
final String topic = partition.topic();
// assigner.peek()得到最后一次遍历的消费者。如果遍历的当前分区所属topic不在最后一次遍历的消费者订阅的topic范围内,那么从环形迭代器中轮询选择下一个消费者,直到选择的消费者订阅的topic集合包含当前topic。
while (!subscriptions.get(assigner.peek()).topics().contains(topic))
assigner.next();
// 给消费者分配分区,并轮询到下一个消费者
assignment.get(assigner.next()).add(partition);
}
return assignment;
}
如上图,有三个消费者consumer-0、consumer-1、consumer-2,三个主题topic-A、topic-B、topic-C,从而得到分区topic-A-p0、topic-A-1、topic-A-p2 ......topic-C-p2。
当然得到上图中结果的前提是Consumer Group中的三个消费者订阅信息是一致的,通过轮询分配,才得到上面的结果。
如果订阅信息不一致,得到的结果也不均匀,下面举个例子:
图:RoundRobin-2
如上图,Consumer0订阅Topic-A、Topic-B,Consumer1订阅Topic-B、Topic-C。
顺序注意图中的Seq,先分配TopicA。
第一轮 : Consumer-0: Topic-A-P0
由于Consumer-1没有订阅Topic-A,所以只能找到Topic-B给Consumer-1分配
于是 Consumer-1: Topic-B-Partition0
------------------------------------------------------------------------------------------------------
第二轮: Consumer-0: Topic-A-Partition0,Topic-A-Partition1
Consumer-1: Topic-B-Partition0,Topic-B-Partition1
------------------------------------------------------------------------------------------------------
第三轮: Consumer-0: Topic-A-Partition0,Topic-A-Partition1,Topic-A-Partition2
Consumer-1: Topic-B-Partition0,Topic-B-Partition1,Topic-B-Partition2
------------------------------------------------------------------------------------------------------
第四、五、六轮:
Consumer-0: Topic-A-Partition0,Topic-A-Partition1,Topic-A-Partition2
Consumer-1: Topic-B-Partition0,Topic-B-Partition1,Topic-B-Partition2,Topic-C-Partition-0,Topic-C-Partition-1,Topic-C-Partition-2
------------------------------------------------------------------------------------------------------
可以看到Consumer-1多消费了3个分区。所以在Consumer Group有订阅消息不一致的情况下,我们最好不要选用RoundRobin。
3、StickyAssignor :粘性分区策略
kafka在0.11版本引入了Sticky分区分配策略,它的两个主要目的是:
1、分区的分配要尽可能的均匀,分配给消费者者的主题分区数最多相差一个;
2、分区的分配尽可能的与上次分配的保持相同。
当两者发生冲突时,第一个目标优先于第二个目标。
例如:三个consumers(C0、C1、C2),四个Topics(T0、T1、T2、T3)。
如果Consumers订阅信息为:
C0 T0、T1、T2、T3
C1 T0、T1、T2、T3
C2 T0、T1、T2、T3
则RoundRobinAssignor和StickyAssignor分区分配方案均为:
C0 T0P0、T1P1、T3P0
C1 T0P1、T2P0、T3P1
C2 T1P0、T2P1
现在,假设C1被移除,将触发分区重分配。此时,RoundRobinAssignor和StickyAssignor的分区分配方案将有所差异。
RoundRobinAssignor分区分配方案将变为:
C0 | T0P0、T1P0、T2P0、T3P0 |
---|---|
C2 | T0P1、T1P1、T2P1、T3P1 |
保留之前的分区分配方案的3个分区不变。
StickyAssignor分区分配方案将变为:
C0 | T0P0、T1P1、T3P0、T2P0 |
---|---|
C2 | T1P0、T2P1、T0P1、T3P1 |
保留之前的分区分配方案的5个分区不变。由此可见,StickyAssignor尽量保存之前的分区分配方案,分区重分配变动更小。
以上为所有consumers均订阅所有topics的场景,接着介绍consumers订阅topics不相同的场景。例如:三个consumers(C0、C1、C2),三个Topics(T0、T1、T2),分区数分别为1、2、3。如果consumers订阅信息为:
C0 | T0 |
---|---|
C1 | T0、T1 |
C2 | T0、T1、T2 |
RoundRobinAssignor分区分配方案为:
C0 | T0P0 |
---|---|
C1 | T1P0 |
C2 | T1P1、T2P0、T2P1、T2P2 |
StickyAssignor分区分配方案为:
C0 | T0P0 |
---|---|
C1 | T1P0、T1P1 |
C2 | T2P0、T2P1、T2P2 |
由此可见,分区均衡性来说,RoundRobinAssignor不如StickyAssignor均衡。
现在,假设C0被移除,将触发分区重分配。此时,RoundRobinAssignor和StickyAssignor的分区分配方案将有所差异。
RoundRobinAssignor分区分配方案将变为:
C1 | T0P0、T1P1 |
---|---|
C2 | T1P0、T2P0、T2P1、T2P2 |
保留之前的分区分配方案的4个分区不变。
StickyAssignor分区分配方案将变为:
C1 | T1P0、T1P1、T0P0 |
---|---|
C2 | T2P0、T2P1、T2P2 |
保留之前的分区分配方案的5个分区不变。
由此可见,StickyAssignor尽量保存之前的分区分配方案,分区重分配变动更小。
4、CooperativeStickyAssignor策略
上述三种分区分配策略均是基于eager协议,Kafka2.4.0开始引入CooperativeStickyAssignor策略。
CooperativeStickyAssignor与之前的StickyAssignor虽然都是维持原来的分区分配方案,最大的区别是:
StickyAssignor仍然是基于eager协议,分区重分配时候,都需要consumers先放弃当前持有的分区,重新加入consumer group;
而CooperativeStickyAssignor基于cooperative协议,该协议将原来的一次全局分区重平衡,改成多次小规模分区重平衡。渐进式的重平衡。
例如:一个Topic(T0,三个分区),两个consumers(consumer1、consumer2)均订阅Topic(T0)。如果consumers订阅信息为:
consumer1 | T0P0、T0P2 |
---|---|
consumer2 | T0P1 |
此时,新的consumer3加入消费者组,那么:
基于eager协议的分区重分配策略流程:
1、consumer1、 consumer2正常发送心跳信息到Group Coordinator。
2、随着consumer3加入,Group Coordinator收到对应的Join Group请求,Group Coordinator确认有新成员需要加入消费者组。
3、Group Coordinator 通知consumer1和consumer2,需要rebalance(再平衡)了。
4、consumer1和consumer2放弃(revoke)当前各自持有的已有分区,重新发送Join Group请求到Group Coordinator。
5、Group Coordinator依据指定的分区分配策略的处理逻辑,生成新的分区分配方案,然后通过Sync Group请求,将新的分区分配方案发送给consumer1、consumer2、consumer3。
6、所有consumers按照新的分区分配,重新开始消费数据。
基于cooperative协议的分区分配策略的流程:
1、consumer1、 consumer2正常发送心跳信息到Group Coordinator。
2、随着consumer3加入,Group Coordinator收到对应的Join Group请求,Group Coordinator确认有新成员需要加入消费者组。
3、Group Coordinator 通知consumer1和consumer2,需要rebalance了。
4、consumer1、consumer2通过Join Group请求将已经持有的分区发送给Group Coordinator。注意:并没有放弃(revoke)已有分区。
5、Group Coordinator取消consumer1对分区p2的消费,然后发送sync group请求给consumer1、consumer2。
6、consumer1、consumer2接收到分区分配方案,重新开始消费。至此,一次Rebalance完成。
7、当前p2也没有被消费,再次触发下一轮rebalance,将p2分配给consumer3消费。
5、重平衡协议
分区策略本质上分为两大类
- RebalanceProtocol.EAGER
- RebalanceProtocol.COOPERATIVE 协作重平衡,kafak2.4出的功能。
这两个区别是:
EAGER :重新平衡协议要求消费者在参与重新平衡事件之前始终撤销其拥有的所有分区。因此,它允许完全改组分配。
COOPERATIVE:协议允许消费者在参与再平衡事件之前保留其当前拥有的分区。分配者不应该立即重新分配任何拥有的分区,而是可以指示消费者需要撤销分区,以便可以在下一次重新平衡事件中将被撤销的分区重新分配给其他消费者。
COOPERATIVE协议将一次全局重平衡,改成每次小规模重平衡,直至最终收敛平衡的过程。
COOPERATIVE有效的改进来在此之前EAGER协议重平衡而触发的stop-the-world(STW)
以下3种策略都是 RebalanceProtocol.EAGER 协议:
- RangeAssignor 范围分区分配策略
- RoundRobinAssignor 轮询分区策略
- StickyAssignor 粘性分区策略
而CooperativeStickyAssignor分配策略是使用的 RebalanceProtocol.COOPERATIVE协议
6、自定义分配策略
想要自定义分配策略,只需要实现接口:
public interface ConsumerPartitionAssignor {
/**
* 返回序列化后的自定义数据
*/
default ByteBuffer subscriptionUserData(Set<String> topics) {
return null;
}
/**
* 分区分配的计算逻辑
*/
GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription);
/**
* 当组成员从领导者那里收到其分配时调用的回调
*/
default void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
}
/**
* 指明使用的再平衡协议
* 默认使用RebalanceProtocol.EAGER协议, 另外一个可选项为 RebalanceProtocol.COOPERATIVE
*/
default List<RebalanceProtocol> supportedProtocols() {
return Collections.singletonList(RebalanceProtocol.EAGER);
}
/**
* Return the version of the assignor which indicates how the user metadata encodings
* and the assignment algorithm gets evolved.
*/
default short version() {
return (short) 0;
}
/**
* 分配器的名字
* 例如 RangeAssignor、RoundRobinAssignor、StickyAssignor、CooperativeStickyAssignor
* 对应的名字为
* range、roundrobin、sticky、cooperative-sticky
*/
String name();
我们也可以根据自己的需求来实现其他的抽象类。
比如:AbstractStickyAssignor抽象类就是专门给粘性分区使用的抽象类。
/**
* 自定义实现的随机选择消费者分配器
*/
public class AfeiAssignor extends AbstractPartitionAssignor {
private Map<String, List<String>> consumersPerTopic(Map<String, Subscription> consumerMetadata) {
Map<String, List<String>> res = new HashMap<>();
for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
String consumerId = subscriptionEntry.getKey();
for (String topic : subscriptionEntry.getValue().topics()) {
put(res, topic, consumerId);
}
}
return res;
}
@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
// 得到topic和订阅该topic的消费者集合(参考RangeAssignor中的consumersPerTopic()方法)
Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
Map<String, List<TopicPartition>> assignment = new HashMap<>();
for (String memberId : subscriptions.keySet()) {
assignment.put(memberId, new ArrayList<>());
}
// 遍历每个topic
for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
String topic = topicEntry.getKey();
// 订阅当前topic的所有消费者集合
List<String> consumersForTopic = topicEntry.getValue();
int consumerSize = consumersForTopic.size();
Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
if (numPartitionsForTopic == null) {
continue;
}
// 当前topic下所有分区
List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
for (TopicPartition partition:partitions){
// 随机选择一个消费者
int rand = new Random().nextInt(consumerSize);
// 得到随机选择的消费者
String selectedConsumer = consumersForTopic.get(rand);
// 给选择的消费者分配当前分区
assignment.get(selectedConsumer).add(partition);
}
}
System.out.println("分配结果: "+new Gson().toJson(assignment));
return assignment;
}
@Override
public String name() {
return "afei";
}
}
更多推荐
所有评论(0)