系列文章目录

第一章 Kafka原理解析(一)-- Kafka主要流程概览
第二章 Kafka原理解析(二)-- Kafka创建主题Topic流程解析


前言

Kafka是一个主流的大数据级别的分布式流式消息处理系统,其Cluster内部实现涉及的知识点繁多,每一个知识点都可以单独作为一个技术话题进行讨论和参考,本文只是将这些知识点整合到一起,用简单易懂的语言进行描述,重在概念、流程和机制,也便于统一查阅和备忘。


本章主要从源码层次解剖Kafka的主题创建流程,但不会太细,主要是从环节和机制入手。

一、Kafka中主题创建的元数据要求

Kafka中的主题Topic是Kafka中的基础概念,是一切消息处理的基础,主题属于Kafka元数据的一部分,会存储在Zookeeper中,因此创建主题的过程也就是往Zookeeper写入数据的过程
创建主题Topic需要提交的信息主要包括以下5个:

  • 主题名称(必填):
    要求:不能是已存在的名称,长度不超过 249 ,最好不要有“.",只输入字母数字或下划线
  • 该主题的分区数量numPartitions(可选):
    不输入的话系统根据配置中的默认分区执行
  • 每个分区的副本数replicationFactor(可选):
    不输入的话系统根据配置中的默认分区执行,副本数不能大于broker的数量
  • 分区和Brokers的分配方案assignments(可选):
    可以输入分区下标对应的brokerId列表(对应副本所在),不输入的话系统根据默认分配方案进行分配(后面会介绍)
  • 执行操作的配置环境configs(可选):
    默认会找到类路径下的配置,还可以直接指定一些配置,如:brokers,zookeeper等

二、Kafka中主题创建的方式

Kafka中有配置可以选择自动创建主题,通过auto.create.topics.enable=true来实现
一般使用中不会自动创建,都是手动创建,更好管理,我们着重讨论手工创建主题

1. 三种创建方式

主题一般需要提前手工创建,创建的方式有三种:
1)通过服务器端脚本连接Zookeeper进行创建(不推荐)

bin/kafka-topics.sh --create -zookeeper localhost:2181  --topic input_topic

2)通过服务器端脚本连接Broker进行创建

bin/kafka-topics.sh --create --bootstrap-server localhost:9092  --topic input_topic

3)通过JAVA代码创建

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092")
Admin adminClient = Admin.create(props);
final String INPUT_TOPIC = "input-topic";
NewTopic newTopic = new NewTopic(INPUT_TOPIC,Optional.empty(),Optional.empty());
final List<NewTopic> newTopics = Arrays.asList(newTopic);
adminClient.createTopics(newTopics).all().get();

2. 脚本kafka-topics.sh解析

脚本kafka-topics.sh中其实也是启动一个JAVA类TopicCommand

exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"

TopicCommand是用scala写的在源码核心core模块中,他接受bootstrap-server和zookeeper两种类型的参数,这两个参数互斥,只能选一种。

TopicCommand根据输入的两个参数来确定使用哪一种Client:

  • bootstrap-server ----- KafkaAdminClient (连接Kafka Controller服务器进行主题创建操作)
  • zookeeper ----- KafkaZkClient (直接本地创建好主题元数据后写入Zookeeper,不通过服务器)
    现在zookeeper这种方式已经被DEPRECATED不推荐了

3. 创建时分区和副本数的定制

  • 脚本方式:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3  --topic input_topic
  • 代码方式:
final short replicationFactor = 3;
final int numPartitions = 3;
NewTopic newTopic = new NewTopic(INPUT_TOPIC,numPartitions, replicationFactor);

4. 创建时分区分配方案的定制

指定分区分配方案就是要指定某一个分区的所有副本与Kafka Broker Id之间的存储映射关系。如果制定了分区方案,则分区和副本数已定,单个的分区和副本数就不能输入了

  • 脚本方式:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replica-assignment 0:1:2,3:4:5,6:7:8  --topic input_topic

规则:

某一个分区的副本所在broker,按顺序以冒号隔开,多个分区之间以逗号隔开,

如:0:1:2,3:4:5,6:7:8 代表第一个分区的副本放在0,1,2这三台broker中,第二个分区的副本放在3,4,5这三台broker中,第三个分区的副本放在6,7,8这三台broker中。

  • 代码方式:
Map<Integer, List<Integer>> replicasAssignments = new HashMap<>();
replicasAssignments.put(0, Arrays.asList(0,1,2));
replicasAssignments.put(1, Arrays.asList(3,4,5));
replicasAssignments.put(2, Arrays.asList(6,7,8));
NewTopic newTopic = new NewTopic(INPUT_TOPIC,replicasAssignments);

5. 创建时分区分配方案的系统默认策略

若不输入分区分配方案,则系统使用默认分区策略来创建分区的分配方案 由AdminUtils.assignReplicasToBrokers方法负责计算
该方法还判断目前Broker是否有机架感知配置,若有的话,会尽量让副本分布到不同机架上。

Kafka中默认的分区分配策略为:

  • 递增移位值 increasingShift
    由于分区的数量可能超过broker的数量,为了使副本分布更均匀,引进一个递增移位值increasingShift,初始化为0。在分配某个分区的副本的broker的时候,定好第一个副本broker后,正常第二个副本会按顺序取后面的broker,即与第一个副本的间隔为1,依次类推。若此时分区的数量超过超过broker的数量,这时就又从第一个broker开始轮起,则此时increasingShift递增到1,分区的第二个副本将和第一个副本的间隔变为2,后面副本间隔不变。在一个新的分区又要到第三次轮到第一个broker时,increasingShift递增到2,分区的第二个副本将和第一个副本的间隔变为3,后面副本间隔不变。
  • 第i分区p(i)的副本分配算法 (伪代码)
 //若又开始一轮broker分配,increasingShift递增1
 if i == brokes.length then
   increasingShift += 1 
   
 // 第i个分区的第一个副本放在第i个broker
 firstReplicaIndex = i % brokes.length    
 
 //分配其他副本的brokers
 foreach j in replicas:   //第二个副本开始的broker为第一个副本的位置+1再加increasingShift 依次类推。
     otherReplicaIndex = (firstReplicaIndex  + 1 + increasingShift + j) % brokes.length ; 
     
 //得到第i个分区的分配方案
 currentPartitionAssignment = (i -> [firstReplicaIndex ,otherReplicaIndex ... ])
  • 第i分区p(i)的副本分配算法的随机性增强
    Kafka中为了调节不同主题的分区均匀分布,开始第一个分区计算时随机从某个broker开始,不一定要每次都从第一个开始。
    increasingShift递增因子也是随机生成初始值,再递增。
  • 完整的分配流程
    将所有的分配计算一遍即可完成分配
 foreach i in numPartitions:  
    assignOnePartition(i);

最终形成的分配方案结构跟定制输入的一样。

三、Kafka中主题创建的主要流程

Kafka主题的创建均是依赖于KafkaAdminClient跟Controller服务器进行RPC通信完成主题创建流程。在此之前首先需要跟任意broker服务器通信得到Kafka的metaddata数据,来获得ControllerNode的信息,然后再跟ControllerNode进行通信。
在这里插入图片描述

主要流程:

  • KafkaAdminClient启动后在后台启动一个线程,定时更新Metadata。更新Metadata时连接的Node是任意一台比较闲的broker结点。
  • 更新Metadata时由AdminMetadataManager负责向broker发起请求,该broker在KafkaApis里面直接处理该请求返回当前缓存的Metadata数据。AdminMetadataManager获得返回后更新Metadata,KafkaAdminClient顺利获得Metadata中的ControllerNode的信息,若得不到它会一直循环等待中。
  • 获得ControllerNode信息后,KafkaAdminClient可以连接上并发送createTopics请求,Controller获得请求后交给ZkAdminManager来处理,因为毕竟要跟Zookeeper打交道。
  • ZkAdminManager首先对没有输入的参数补充默认值(如分区数和副本数),以及分配分区副本方案等。
  • 调用AdminZkClient的validateTopicCreate进行验证主题输入参数的合法性。
  • 调用AdminZkClient的createTopicWithAssignment将主题元数据写入Zookeeper。

有关元数据的结构和Zookeeper的存储结构及协调处理在下一章介绍。


总结

本文结合源码总结了Kafka创建主题的相关环节和流程。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐