主题的管理

创建主题

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic hello --partitions 4 --replication-factor 2

--topic 指定主题名称
--partitions 指定分区数
--replication-factor 指定副本个数

脚本执行完成后,log.dir目录下创建对应的主题分区。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
4个分区,两个副本总共八个副本,分配到三台机器(这里是用windows搭建的集群),8 = 3 + 3 + 2。如果是三个分区三个副本,9 = 3 + 3 + 3。

主题、分区、副本、log关系如下

在这里插入图片描述
这里我们还可以通过zk的客户端查看主题信息/brokers/topics/{topicName}

> get /brokers/topics/hello
{"removing_replicas":{},"partitions":{"2":[3,1],"1":[2,3],"0":[1,2],"3":[1,3]},"topic_id":"HPuimf3HToqKTYskSjPtJw","adding_replicas":{},"version":3}

还可以通过describe来查看分区的分配细节

> bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic hello

Topic: hello    TopicId: HPuimf3HToqKTYskSjPtJw PartitionCount: 4       ReplicationFactor: 2    Configs:
        Topic: hello    Partition: 0    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: hello    Partition: 1    Leader: 2       Replicas: 2,3   Isr: 2,3
        Topic: hello    Partition: 2    Leader: 3       Replicas: 3,1   Isr: 3,1
        Topic: hello    Partition: 3    Leader: 1       Replicas: 1,3   Isr: 1,3

Topic表示主题名称,Partition分区号,Leader表示leader副本对应的brokerId,Isr表示ISR集合,Relicas表示所有分区的所有副本情况,即AR,数字表示的是brokerId。

创建分区的时候可以通过参数replica-assignment手动指定副本的分配方案。根据分区号的数值从小到到大的顺序排列。分区之间用,隔开,分区内多个副本用:隔开。

> bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic world --replica-assignment 1:2,1:3,2:3

Topic: world    TopicId: q9wPPkKhQEuh8-ENhQNDOg PartitionCount: 3       ReplicationFactor: 2    Configs:
        Topic: world    Partition: 0    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: world    Partition: 1    Leader: 1       Replicas: 1,3   Isr: 1,3
        Topic: world    Partition: 2    Leader: 2       Replicas: 2,3   Isr: 2,3

注意要点:主题的命名同样不推荐(虽然可以这样做)使用双下画线__开头,因为以双下画线开头的主题一般看作Kafka的内部主题,比如__consumer_offsets__transaction_state。主题的名称必须由大小写字母、数字、点号“.”、连接线“-”、下画线“_”组成,不能为空,不能只有点号“.”,也不能只有双点号“…”,且长度不能超过249。

查看主题

list查看所有主题

> bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list

–describe 还可以指定多个主题

> bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic hello,world

Topic: hello    TopicId: HPuimf3HToqKTYskSjPtJw PartitionCount: 4       ReplicationFactor: 2    Configs:
        Topic: hello    Partition: 0    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: hello    Partition: 1    Leader: 2       Replicas: 2,3   Isr: 2,3
        Topic: hello    Partition: 2    Leader: 3       Replicas: 3,1   Isr: 3,1
        Topic: hello    Partition: 3    Leader: 1       Replicas: 1,3   Isr: 1,3
Topic: world    TopicId: q9wPPkKhQEuh8-ENhQNDOg PartitionCount: 3       ReplicationFactor: 2    Configs:
        Topic: world    Partition: 0    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: world    Partition: 1    Leader: 1       Replicas: 1,3   Isr: 1,3
        Topic: world    Partition: 2    Leader: 2       Replicas: 2,3   Isr: 2,3

修改主题

增加分区

> bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic world --partitions 5

Topic: world    TopicId: q9wPPkKhQEuh8-ENhQNDOg PartitionCount: 5       ReplicationFactor: 2    Configs:
        Topic: world    Partition: 0    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: world    Partition: 1    Leader: 1       Replicas: 1,3   Isr: 1,3
        Topic: world    Partition: 2    Leader: 2       Replicas: 2,3   Isr: 2,3
        Topic: world    Partition: 3    Leader: 1       Replicas: 1,3   Isr: 1,3
        Topic: world    Partition: 4    Leader: 2       Replicas: 2,1   Isr: 2,1

Kafka是不能减少分区的。

删除分区

> bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --delete --topic hello_config

这里的删除只是将topic标记为已删除,并没有实际删除。

要真正的删除主题,需要执行下面三个步骤,步骤一和步骤二可以互换。

步骤一:删除zookeeper的/config/topics/{topic}结点

> delete /config/topics/demo

步骤二:删除zookeeper的/brokers/topics/{topic}及其子结点

deleteall /brokers/topics/demo

步骤三:删除kafka日志文件

初识KafkaAdminClient

基本使用

import org.apache.kafka.clients.admin.*;

import java.util.Collections;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;

public class KafkaTopicDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        String server = "127.0.0.1:9092";
        String topic = "demo";

        Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, server);

        AdminClient adminClient = KafkaAdminClient.create(properties);

        // 创建主题
        NewTopic newTopic = new NewTopic(topic, 2, (short) 1);
        CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(newTopic));
        createTopicsResult.values().entrySet().forEach(System.out::println);

        // 主题列表
        ListTopicsResult listTopicsResult = adminClient.listTopics();
        Set<String> names = listTopicsResult.names().get();
        System.out.println("topics: " + names);

        adminClient.close();

    }

}

创建主题的时候一样可以自定义分区副本方案。

Map<Integer, List<Integer>> replicasAssignments = new HashMap<>(3);         
replicasAssignments.put(0, Arrays.asList(1, 2));                            
replicasAssignments.put(1, Arrays.asList(1, 3));                            
replicasAssignments.put(2, Arrays.asList(2, 3));                            
NewTopic newTopic = new NewTopic(topic, replicasAssignments);               

其中replicasAssignments的key表示分区号,value表示副本存储的brokerId。

创建主题、主题列表、修改分区、删除分区都可以通过AdminClient提供的方法实现,比较简单,这里就不演示了。

分区管理

优先副本的选举

Kafka三个结点,brokerId分别是1、2、3,创建一个主题(hello),三个分区、三个备份。

Topic: hello    TopicId: uXMDcDCeS3uKZp-Fq7Q1xQ PartitionCount: 3       ReplicationFactor: 3    Configs:
        Topic: hello    Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1
        Topic: hello    Partition: 1    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2
        Topic: hello    Partition: 2    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3

在创建主题的时候,副本尽可能的均匀分不到Kafka集群的各个节点,对应leader副本额分配也比较均匀。上面的输出结果可以看到,leader副本分别分布在1、2、3上。针对同一分区,同一个broker结点不可能出现它的多个副本,即一个结点最多一个副本。

随着时间的推移,kafka结点可能宕机或者崩溃,leader发生故障,其中一个follower称为为新leader,这样会导致集群的不均衡,从而影响整体的健壮性和稳定性。原来的leader恢复后加入集群只能作为follower而不再对外提供服务。比如这里将节点3重启,那么新的分区副本信息如下

Topic: hello    TopicId: uXMDcDCeS3uKZp-Fq7Q1xQ PartitionCount: 3       ReplicationFactor: 3    Configs:
        Topic: hello    Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,1,3
        Topic: hello    Partition: 1    Leader: 1       Replicas: 3,1,2 Isr: 1,2,3
        Topic: hello    Partition: 2    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3

这里可以看到分区2的leader从节点3转到了节点1,这样一来,均衡变成了失衡,上面的输出结果可以看到,节点1的负载最高,节点2的负载最低。

为了有效治理负载失衡的情况,Kafka引入了优先副本(preferred replica)的概念。有限副本指在AR集合列表中的第一个副本。比如上面第一个分区的AR为[2,3,1],那么这个分区的优先副本为2。理想情况下,优先副本就是该分区的leader副本,这样就保证了leader均衡分布。leader分布过于集中,会造成集群的不均衡。

优先副本的选举指通过一定的方式促使优先副本选举为leader副本,来促进集群均衡,这一行为也称为"分区均衡"。

分区均衡并不意味着Kafka集群的负载均衡,还需要考虑集群分区分配是否均衡。更进一步,leader副本的负载可能也不相同。

Kafka提供自动平衡功能,broker端的参数auto.leader.rebalance.enable,默认为true,即默认情况是开启的。开启自动平衡功能,Kafka控制器会启动一个定时任务,计算每个broker结点分区不均衡率(broker中不平衡率 = 非优先副本的leader个数 / 分区总数)是否超过leader.imbalance.per.broker.percentage参数配置的比值,默认10%,超过则自动执行优先副本的选举动作以达求分区平衡。执行周期leader.imbalance.check.interval.seconds,默认300秒。

生产环境不建议自动平衡,因为这可能引起负面性能问题,可能会造成客户端一定时间的阻塞。因为执行时间无法掌控,如果流量高峰期执行优先副本自动选举的话,势必会造成业务阻塞、频繁超时之类的风险。分区副本的均衡也不能完全保证集群整体均衡,并且集群一定程度上的不均衡是可以忍受的,为了防止关键时刻“掉链子”,建议手动指定平衡操作。kafka-preferred-replica-election.sh脚本提供了这个操作。

> bin/kafka-preferred-replica-election.sh --zookeeper 127.0.0.1:2181

This tool is deprecated. Please use kafka-leader-election tool. Tracking issue: KAFKA-8405
Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.
Use --bootstrap-server instead to specify a broker to connect to.
Created preferred replica election path with hello-0,hello-1,hello-2
Successfully started preferred replica election for partitions Set(hello-0, hello-1, hello-2)

可以看到,脚本执行后,leader副本的分布和刚创建时一样了。执行分区数多,必然会对客户端造成一定的影响。

kafka-preferred-replica-election.sh还提供了path-to-json-file对部分分区执行优先副本的选举操作。

新建文件election.json

{
	"partitions": [
		{
			"partition": 0,
			"topic": "hello"
		},
		{
			"partition": 1,
			"topic": "hello"
		},
		{
			"partition": 2,
			"topic": "hello"
		}
	]
}

执行以下命令

> bin/kafka-preferred-replica-election.sh --zookeeper 127.0.0.1:2181 --path-to-json-file election.json

注意:执行的时候避免流量高峰期。

分区重分配

当集群中的某个结点下线时,如果分区是单副本的,这些分区就不可用了,在结点恢复之前,响应的数据就丢失了;如果分区是多副本,那么这个结点上的leader转移到其它结点上。结点下线,Kafka是不会将这个结点上的分区副本自动的迁移到其它结点,如果放任不管的话,会影响集群的负载均衡,还会影响整体服务的可用性和可靠性。

当集群中的某个结点需要下线时,为了保证分区以副本的合理分配,希望通过某种方式将该结点上的分区副本迁移到其它可用结点上。当新增结点时,只有新建的主题分区才有可能配到到这个结点,之前创建的主题不会自动分配到这个新的结点中,这样新结点的负载和原结点负载之前严重不均衡。

为了让分区再次合理分配,也就是所谓的分区重分配。Kafka提供了kafka-reassign-partitions.sh脚本来执行重新分配的工作,它可以在线扩充、broker结点失效的场景下分区迁移。

kafka-reassign-partitions.sh脚本的使用分为3个步骤:首先创建需要一个包含主题清单的JSON文件,其次根据主题清单和broker结点清单生成一份重新分配方案,最后根据这份方案执行具体的重分配动作。

三个结点(broker1、broker2、broker3),新建主题topic-reassign,三个分区,两个副本。

Topic: topic-reassign   TopicId: puzCwIFzSsqFm6UEQ3eSiA PartitionCount: 3       ReplicationFactor: 2    Configs:
        Topic: topic-reassign   Partition: 0    Leader: 3       Replicas: 3,1   Isr: 3,1
        Topic: topic-reassign   Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: topic-reassign   Partition: 2    Leader: 2       Replicas: 2,3   Isr: 2,3

现在由于某种原因需要下线broker3。第一步,新建reassign.json

{
	"topics": [
		{
			"topic": "topic-reassign"
		}
	],
	"version": 1
}

第二步,生成候选重分配方案。

> bin/kafka-reassign-partitions.sh -zookeeper 127.0.0.1:2181 --generate --topics-to-move-json-file reassign.json --broker-list 1,2

Current partition replica assignment
{"version":1,"partitions":[{"topic":"topic-reassign","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"topic-reassign","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"topic-reassign","partition":2,"replicas":[2,3],"log_dirs":["any","any"]}]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"topic-reassign","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"topic-reassign","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"topic-reassign","partition":2,"replicas":[2,1],"log_dirs":["any","any"]}]}

–broker-list指定要分配的结点列表,需要保留的结点。

上面有两个json,第一个Current partition replica assignment,执行分区重新分配的时候将这个内存保存下来,以便后续回滚。第二个Proposed partition reassignment configuration重分配候选方案,注意,这里只是一个方案并没有执行。

第三部,将第二个json保存为新文件project.json,执行一下命令

> bin/kafka-reassign-partitions.sh -zookeeper 127.0.0.1:2181 --execute --reassignment-json-file project.json

Current partition replica assignment

{"version":1,"partitions":[{"topic":"topic-reassign","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"topic-reassign","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"topic-reassign","partition":2,"replicas":[2,3],"log_dirs":["any","any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for topic-reassign-0,topic-reassign-1,topic-reassign-2

再看分区分配

Topic: topic-reassign   TopicId: puzCwIFzSsqFm6UEQ3eSiA PartitionCount: 3       ReplicationFactor: 2    Configs:
        Topic: topic-reassign   Partition: 0    Leader: 2       Replicas: 2,1   Isr: 1,2
        Topic: topic-reassign   Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: topic-reassign   Partition: 2    Leader: 2       Replicas: 2,1   Isr: 2,1

这里我们还可以查看重分配进度

> bin/kafka-reassign-partitions.sh -zookeeper 127.0.0.1:2181 --verify --reassignment-json-file project.json

Reassignment of partition topic-reassign-0 is complete.
Reassignment of partition topic-reassign-1 is complete.
Reassignment of partition topic-reassign-2 is complete.
Clearing broker-level throttles on brokers 1,2
Clearing topic-level throttles on topic topic-reassign

复制限流

分区重分配本质在于数据复制,先增加新的副本,然后数据同步,最后删除就副本。复制会占用额外资源,分配量大会严重影响性能。减小重分配粒度,小批次的方式操作是一种可行思路。如果集群中某个主题的某个分区在某段时间特别大,那么减小粒度是不足以应对的,这是需要一个限流机制,对副本间的复制流量加以限制来保证重分配期间整体服务不会手太大影响。

复制限流有两种方式,kafka-topics.shkafka-reassign-partitions.sh

broker有两个复制限流相关参数:follower.replication.throttled.ratefollower.replication.throttled,前者用于设置follower副本的复制速度,后者用于设置leader副本的传输速度,单位是B/s。通常情况下,两者配置值相同。

主题级别也有两个相关的参数限制复制的速度:leader.replication.throttled.replicasfollower.replication.throttled.replicas,分别用来配置被限制速度的主题对应的leader副本列表和follower副本列表。

kafka-reassign-partitions.sh脚本本身提供了限流的功能,只需要一个throttle参数即可。

> bin/kafka-reassign-partitions.sh -zookeeper 127.0.0.1:2181 --execute --reassignment-json-file project.json --throttle 10

修改副本因子

创建主题后能够修改分区个数,同样可以修改副本因子(副本数)。修改副本因子的场景也很多,比如创建副本时写错了副本因子需要修改,运行一段时间后增加副本提高容错性和可靠性。也是使用kafka-reassign-partitions.sh实现的。

project.json内容如下

{
  "version": 1,
  "partitions": [
    {
      "topic": "topic-reassign",
      "partition": 0,
      "replicas": [
        2,
        1
      ],
      "log_dirs": [
        "any",
        "any"
      ]
    },
    {
      "topic": "topic-reassign",
      "partition": 1,
      "replicas": [
        1,
        2
      ],
      "log_dirs": [
        "any",
        "any"
      ]
    },
    {
      "topic": "topic-reassign",
      "partition": 2,
      "replicas": [
        2,
        1
      ],
      "log_dirs": [
        "any",
        "any"
      ]
    }
  ]
}

这里可以看到replicas只有两个,副本增加到三个需要变动的地方①②,简化后的内容如下

{
  "partitions": [
    {
      "topic": "topic-reassign",
      "partition": 0,
      "replicas": [
        2,
        1,
        // ①
        3
      ],
      "log_dirs": [
        "any",
        "any",
        // ②
        "any"
      ]
    }
  ]
}

与修改分区不同的是,副本还可以减少。

如何选择合适的分区数

如何选择合适的分区数?对于这个问题,似乎没有非常权威的答案。还是需要根据实际的业务场景、软件条件、硬件条件、负载等情况来做具体考量。

性能测试工具

设定分区时,一般需要考虑性能因素,对不同的硬件而言,性能可能不太一样。

Kafka提供的性能测试工具,生产者性能测试kafka-producer-perf-test.sh和消费者kafka-consumer-perf-test.sh性能测试。

向某个主题发送1000000条消息,每条消息大小为1024B

> bin/kafka-producer-perf-test.sh --topic hello-world --num-records 1000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=127.0.0.1:9092 acks=1

23671 records sent, 4733.3 records/sec (4.62 MB/sec), 2231.4 ms avg latency, 3537.0 ms max latency.
65025 records sent, 13005.0 records/sec (12.70 MB/sec), 2868.0 ms avg latency, 4391.0 ms max latency.
83430 records sent, 16686.0 records/sec (16.29 MB/sec), 1966.2 ms avg latency, 2495.0 ms max latency.
......

throughput用来限流,值小于0表示不限流,值大于0,当吞吐量大于该值就会阻塞一段时间。

kafka-producer-perf-test.sh还有一个print-metrics,用这个参数在测试完成后打印更多之标的。

> bin/kafka-producer-perf-test.sh --topic hello-world --print-metrics --num-records 1000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=127.0.0.1:9092 acks=1

28681 records sent, 5735.1 records/sec (5.60 MB/sec), 1737.4 ms avg latency, 2989.0 ms max latency
......
Metric Name                                                                           Value
app-info:commit-id:{client-id=producer-1}                                           : 839b886f9b732b15
app-info:start-time-ms:{client-id=producer-1}                                       : 1658741621863
app-info:version:{client-id=producer-1}                                             : 2.8.1
kafka-metrics-count:count:{client-id=producer-1}                                    : 102.000
producer-metrics:batch-size-avg:{client-id=producer-1}                              : 15555.923
......

records/sec表示以每秒发送消息数来统计吞吐量,括号中的MB/sec表示以每秒发送的消息大小来统计吞吐量,注意这两者的维度;avg latency表示消息处理的平均耗时;max latency表示消息处理的最大耗时,50th、95th分别表示50%、95%的消息处理耗时。

kafka-consumer-perf-test.sh使用比较简单。

> bin/kafka-consumer-perf-test.sh --topic topic-1 --messages 1000000 --broker-list 127.0.0.1:9092

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2022-07-25 17:42:28:524, 2022-07-25 17:42:44:168, 976.5625, 62.4241, 1000000, 63922.2705, 3329, 12315, 79.2986, 81201.7864

输出结果中包含了多项信息,分别对应起始运行时间(start.time)、结束运行时间(end.time)、消费的消息总量(data.consumed.in.MB,单位为MB)、按字节大小计算的消费吞吐量(MB.sec,单位为MB/s)、消费的消息总数(data.consumed.in.nMsg)、按消息个数计算的吞吐量(nMsg.sec)、再平衡的时间(rebalance.time.ms,单位为ms)、拉取消息的持续时间(fetch.time.ms,单位为ms)、每秒拉取消息的字节大小(fetch.MB.sec,单位为 MB/s)、每秒拉取消息的个数
(fetch.nMsg.sec)。其中fetch.time.ms = end.time - start.time - rebalance.time.ms。

分区数越多吞吐量约高?

分区是Kafka最小的操作单元,对生产者而言,每一个分区的数据写入是完全可以并行的;对消费者而言,Kafka只允许单个分区被一个消费线程消费,消费组的消费并行度依赖于消费的分区数。这样看来,一个主题分区越多,理论上吞吐量越大,实时是这样吗?

消息吞吐量,刨开硬件资源,写入的吞吐量还会受消息大小、消息压缩方式、发送方式(同步/异步)、消息确认类型(acks)、副本因子等参数影响,消息消费吞吐量还会收到引用逻辑处理速度的影响。

随着分区数增加,吞吐量也在增加,但是到了某个阈值,吞吐量又在下降。

分区的上限

一味的增加分区数并不能使吞吐量一直得到提升,并且分区数也不能一直增加,如果超过默认的配置值,还会引起kafka进行崩溃。读者可以在一台普通的Linux上创建一个包含10000个分区的主题。

创建后,发现kafka崩溃了。此时可能会想到可能是内存不足导致的,其实不是。

查看kafka的日志文件logs/server.log

在这里插入图片描述

其中最关键的信息是"Too many open files",这是一种常见的Linux系统错误,意味着文件描述符不足,这里就不更深入的探讨了。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐