Kafka原理解析(二)-- Kafka创建主题Topic流程解析
本章主要从源码层次解剖Kafka的主题创建流程,但不会太细,主要是从环节和机制入手。
系列文章目录
第一章 Kafka原理解析(一)-- Kafka主要流程概览
第二章 Kafka原理解析(二)-- Kafka创建主题Topic流程解析
前言
Kafka是一个主流的大数据级别的分布式流式消息处理系统,其Cluster内部实现涉及的知识点繁多,每一个知识点都可以单独作为一个技术话题进行讨论和参考,本文只是将这些知识点整合到一起,用简单易懂的语言进行描述,重在概念、流程和机制,也便于统一查阅和备忘。
本章主要从源码层次解剖Kafka的主题创建流程,但不会太细,主要是从环节和机制入手。
一、Kafka中主题创建的元数据要求
Kafka中的主题Topic是Kafka中的基础概念,是一切消息处理的基础,主题属于Kafka元数据的一部分,会存储在Zookeeper中,因此创建主题的过程也就是往Zookeeper写入数据的过程。
创建主题Topic需要提交的信息主要包括以下5个:
- 主题名称(必填):
要求:不能是已存在的名称,长度不超过 249 ,最好不要有“.",只输入字母数字或下划线 - 该主题的分区数量numPartitions(可选):
不输入的话系统根据配置中的默认分区执行 - 每个分区的副本数replicationFactor(可选):
不输入的话系统根据配置中的默认分区执行,副本数不能大于broker的数量 - 分区和Brokers的分配方案assignments(可选):
可以输入分区下标对应的brokerId列表(对应副本所在),不输入的话系统根据默认分配方案进行分配(后面会介绍) - 执行操作的配置环境configs(可选):
默认会找到类路径下的配置,还可以直接指定一些配置,如:brokers,zookeeper等
二、Kafka中主题创建的方式
Kafka中有配置可以选择自动创建主题,通过auto.create.topics.enable=true来实现
一般使用中不会自动创建,都是手动创建,更好管理,我们着重讨论手工创建主题
1. 三种创建方式
主题一般需要提前手工创建,创建的方式有三种:
1)通过服务器端脚本连接Zookeeper进行创建(不推荐)
bin/kafka-topics.sh --create -zookeeper localhost:2181 --topic input_topic
2)通过服务器端脚本连接Broker进行创建
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic input_topic
3)通过JAVA代码创建
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092")
Admin adminClient = Admin.create(props);
final String INPUT_TOPIC = "input-topic";
NewTopic newTopic = new NewTopic(INPUT_TOPIC,Optional.empty(),Optional.empty());
final List<NewTopic> newTopics = Arrays.asList(newTopic);
adminClient.createTopics(newTopics).all().get();
2. 脚本kafka-topics.sh解析
脚本kafka-topics.sh中其实也是启动一个JAVA类TopicCommand
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
TopicCommand是用scala写的在源码核心core模块中,他接受bootstrap-server和zookeeper两种类型的参数,这两个参数互斥,只能选一种。
TopicCommand根据输入的两个参数来确定使用哪一种Client:
- bootstrap-server ----- KafkaAdminClient (连接Kafka Controller服务器进行主题创建操作)
- zookeeper ----- KafkaZkClient (直接本地创建好主题元数据后写入Zookeeper,不通过服务器)
现在zookeeper这种方式已经被DEPRECATED不推荐了
3. 创建时分区和副本数的定制
- 脚本方式:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic input_topic
- 代码方式:
final short replicationFactor = 3;
final int numPartitions = 3;
NewTopic newTopic = new NewTopic(INPUT_TOPIC,numPartitions, replicationFactor);
4. 创建时分区分配方案的定制
指定分区分配方案就是要指定某一个分区的所有副本与Kafka Broker Id之间的存储映射关系。如果制定了分区方案,则分区和副本数已定,单个的分区和副本数就不能输入了
- 脚本方式:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replica-assignment 0:1:2,3:4:5,6:7:8 --topic input_topic
规则:
某一个分区的副本所在broker,按顺序以冒号隔开,多个分区之间以逗号隔开,
如:0:1:2,3:4:5,6:7:8 代表第一个分区的副本放在0,1,2这三台broker中,第二个分区的副本放在3,4,5这三台broker中,第三个分区的副本放在6,7,8这三台broker中。
- 代码方式:
Map<Integer, List<Integer>> replicasAssignments = new HashMap<>();
replicasAssignments.put(0, Arrays.asList(0,1,2));
replicasAssignments.put(1, Arrays.asList(3,4,5));
replicasAssignments.put(2, Arrays.asList(6,7,8));
NewTopic newTopic = new NewTopic(INPUT_TOPIC,replicasAssignments);
5. 创建时分区分配方案的系统默认策略
若不输入分区分配方案,则系统使用默认分区策略来创建分区的分配方案 由AdminUtils.assignReplicasToBrokers方法负责计算。
该方法还判断目前Broker是否有机架感知配置,若有的话,会尽量让副本分布到不同机架上。
Kafka中默认的分区分配策略为:
- 递增移位值 increasingShift
由于分区的数量可能超过broker的数量,为了使副本分布更均匀,引进一个递增移位值increasingShift,初始化为0。在分配某个分区的副本的broker的时候,定好第一个副本broker后,正常第二个副本会按顺序取后面的broker,即与第一个副本的间隔为1,依次类推。若此时分区的数量超过超过broker的数量,这时就又从第一个broker开始轮起,则此时increasingShift递增到1,分区的第二个副本将和第一个副本的间隔变为2,后面副本间隔不变。在一个新的分区又要到第三次轮到第一个broker时,increasingShift递增到2,分区的第二个副本将和第一个副本的间隔变为3,后面副本间隔不变。 - 第i分区p(i)的副本分配算法 (伪代码)
//若又开始一轮broker分配,increasingShift递增1
if i == brokes.length then
increasingShift += 1
// 第i个分区的第一个副本放在第i个broker
firstReplicaIndex = i % brokes.length
//分配其他副本的brokers
foreach j in replicas: //第二个副本开始的broker为第一个副本的位置+1再加increasingShift 依次类推。
otherReplicaIndex = (firstReplicaIndex + 1 + increasingShift + j) % brokes.length ;
//得到第i个分区的分配方案
currentPartitionAssignment = (i -> [firstReplicaIndex ,otherReplicaIndex ... ])
- 第i分区p(i)的副本分配算法的随机性增强
Kafka中为了调节不同主题的分区均匀分布,开始第一个分区计算时随机从某个broker开始,不一定要每次都从第一个开始。
increasingShift递增因子也是随机生成初始值,再递增。 - 完整的分配流程
将所有的分配计算一遍即可完成分配
foreach i in numPartitions:
assignOnePartition(i);
最终形成的分配方案结构跟定制输入的一样。
三、Kafka中主题创建的主要流程
Kafka主题的创建均是依赖于KafkaAdminClient跟Controller服务器进行RPC通信完成主题创建流程。在此之前首先需要跟任意broker服务器通信得到Kafka的metaddata数据,来获得ControllerNode的信息,然后再跟ControllerNode进行通信。
主要流程:
- KafkaAdminClient启动后在后台启动一个线程,定时更新Metadata。更新Metadata时连接的Node是任意一台比较闲的broker结点。
- 更新Metadata时由AdminMetadataManager负责向broker发起请求,该broker在KafkaApis里面直接处理该请求返回当前缓存的Metadata数据。AdminMetadataManager获得返回后更新Metadata,KafkaAdminClient顺利获得Metadata中的ControllerNode的信息,若得不到它会一直循环等待中。
- 获得ControllerNode信息后,KafkaAdminClient可以连接上并发送createTopics请求,Controller获得请求后交给ZkAdminManager来处理,因为毕竟要跟Zookeeper打交道。
- ZkAdminManager首先对没有输入的参数补充默认值(如分区数和副本数),以及分配分区副本方案等。
- 调用AdminZkClient的validateTopicCreate进行验证主题输入参数的合法性。
- 调用AdminZkClient的createTopicWithAssignment将主题元数据写入Zookeeper。
有关元数据的结构和Zookeeper的存储结构及协调处理在下一章介绍。
总结
本文结合源码总结了Kafka创建主题的相关环节和流程。
更多推荐
所有评论(0)