本文所有命令都是基于 KAFKA_HOME 目录下运行。

kafka-topics.sh 脚本主要负责 topic 相关的操作。它的具体实现是通过 kafka-run-class 来调用 TopicCommand 类,并根据参数执行指定的功能。

一、创建 Topic

 TopicCommand.createTopic() 方法负责创建 Topic,其核心逻辑是确定新建 Topic 中有多少个分区及每个分区中的副本如何分配,既支持使用 replica-assignment 参数手动分配,也支持使用 partitions 参数和 replication-factor 参数指定分区个数和副本个数进行自动分配。之后该方法会将副本分配结果写入到 ZooKeeper 中。
 

注意:Kafka 从 2.2 版本开始将 kafka-topic.sh 脚本中的 −−zookeeper 参数标注为 “过时”,推荐使用 −−bootstrap-server 参数。若读者依旧使用的是 2.1 及以下版本,请将下述的 --bootstrap-server 参数及其值手动替换为 --zookeeper zk1:2181,zk2:2181,zk:2181。一定要注意两者参数值所指向的集群地址是不同的。

形式一
 使用 replica-assignment 参数手动指定 Topic Partition Replica 与 Kafka Broker 之间的存储映射关系。
bin/kafka-topics.sh --create --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName --replica-assignment 0:1,1:2,2:0
1
 注意:0:1,1:2,2:0 中的数字均为 broker.id;3个分区(逗号分隔);每个分区有两个副本(副本所在的 broker 以冒号分割)。
此形式在最新的 2.3 版本中会报 Aborted due to timeout 异常,建议使用形式二。

形式二
 使用 partitions 和 replication-factor 参数自动分配存储映射关系。
bin/kafka-topics.sh --create --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName --partitions 3 --replication-factor 2
1
 表示:创建一个 名为 topicName 的 Topic。其中指定分区个数为3,副本个数为2。
注意:Topic 名称中一定不要同时出现下划线 (’_’) 和小数点 (’.’)。
WARNING: Due to limitations in metric names, topics with a period (’.’) or underscore(’_’) could collide. To avoid issues ot os best to use either, but not both.

 创建 Topic 时也可指定参数:

bin/kafka-topics.sh --create --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName --partitions 3 --replication-factor 2 --config cleanup.policy=compact --config retention.ms=500
1
 创建topic过程的问题,replication-factor个数不能超过 broker 的个数,否则有如下错误信息:

ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.
1
二、查看 Topic
查看 Topic 列表

bin/kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092
1
 查询出来的结果仅有 Topic 的名称信息。

查看指定 Topic 明细

bin/kafka-topics.sh --describe --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName
1
 查询的结果结构如下:

  Topic:topicName PartitionCount:3 ReplicationFactor:2 Configs:
      Topic: topicName Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
      Topic: topicName Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
      Topic: topicName Partition: 2 Leader: 2 Replicas: 2,0 Isr: 2,0
1
2
3
4
 PartitionCount:partition 个数。
 ReplicationFactor:副本个数。
 Partition:partition 编号,从 0 开始递增。
 Leader:当前 partition 起作用的 breaker.id。
 Replicas: 当前副本数据所在的 breaker.id,是一个列表,排在最前面的其作用。
 Isr:当前 kakfa 集群中可用的 breaker.id 列表。

三 、删除 Topic
bin/kafka-topics.sh --delete --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName
1
若 delete.topic.enable=true
 直接彻底删除该 Topic。
若 delete.topic.enable=false
 如果当前 Topic 没有使用过即没有传输过信息:可以彻底删除。
 如果当前 Topic 有使用过即有过传输过信息:并没有真正删除 Topic 只是把这个 Topic 标记为删除(marked for deletion),重启 Kafka Server 后删除。
 注:delete.topic.enable=true 配置信息位于配置文件 config/server.properties 中(较新的版本中无显式配置,默认为 true)。

四、修改 Topic
增加分区数

bin/kafka-topics.sh --alter --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName --partitions 3
1
 修改分区数时,仅能增加分区个数。若是用其减少 partition 个数,则会报如下错误信息:

  org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic hadoop currently has 3 partitions, 2 would not be an increase.
1
 不能用来修改副本个数。(请使用 kafka-reassign-partitions.sh 脚本增加副本数)

增加配置

bin/kafka-topics.sh --alter --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName --config flush.messages=1
1
删除配置

bin/kafka-topics.sh --alter --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName --delete-config flush.messages
1
五、Topic 级别配置属性
 当如下所示的属性配置到 Topic 上时,将会覆盖 server.properties 上对应的属性。

属性名值类型默认值有效值服务器默认属性描述
cleanup.policylistdeletedelete

compact
log.cleanup.policy过期或达到上限日志的清理策略。
delete:删除
compact:压缩
compression.typestringproduceruncompressed
snappy
lz4
gzip
producer
compression.type指定给该topic最终的压缩类型
delete.retention.mslong86400000[0,…]log.cleaner.delete.retention.ms压缩的日志保留的最长时间,也是客户端消费消息的最长时间。
与 log.retention.minutes 的区别在于:一个控制未压缩的数据,一个控制压缩后的数据。
file.delete.delay.mslong60000[0,…]log.segment.delete.delay.ms从文件系统中删除前所等待的时间
flush.messageslong9223372036854775807[0,…]log.flush.interval.messages在消息刷到磁盘之前,日志分区收集的消息数
flush.mslong9223372036854775807[0,…]log.flush.interval.ms消息在刷到磁盘之前,保存在内存中的最长时间,单位是ms
index.interval.bytesint4096[0,…]log.index.interval.bytes执行 fetch 操作后,扫描最近的 offset 运行空间的大小。
设置越大,代表扫描速度越快,但是也更耗内存。
(一般情况下不需要设置此参数)
message.max.bytesint1000012[0,…]message.max.byteslog中能够容纳消息的最大字节数
min.cleanable.dirty.ratiodouble0.5[0,…,1]log.cleaner.min.cleanable.ratio日志清理的频率控制,占该log的百分比。
越大意味着更高效的清理,同时会存在空间浪费问题
retention.byteslong-1log.retention.bytestopic每个分区的最大文件大小。
一个 topic 的大小限制 = 分区数 * log.retention.bytes。
-1 表示没有大小限制。
retention.msint604800000[-1,…]log.retention.minutes日志文件保留的分钟数。
数据存储的最大时间超过这个时间会根据 log.cleanup.policy 设置的策略处理数据
segment.bytesint1073741824[14,…]log.segment.bytes每个 segment 的大小 (默认为1G)
segment.index.bytesint10485760[0,…]log.index.size.max.bytes对于segment日志的索引文件大小限制(默认为10M)

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐