kafka基本操作


添加和删除 topics
使用 topic 工具来添加和修改 topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y

replication-factor 控制有多少服务器将复制每个写入的消息。如果设置了3个复制因子,那么只能最多2个相关的服务器能出问题,否则将无法访问数据。我们建议使用2或3个复制因子,以便在不中断数据消费的情况下透明的调整集群.

删除一个 topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic my_topic_name

修改 topics

使用相同的 topic 工具,可以修改 topic 的配置或分区
要添加分区,你可以做如下操作
bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic my_topic_name --partitions 40

请注意,分区的一个用处是对数据进行语义分区,并且添加分区不会更改现有数据的分区,因此如果依赖该分区,则可能会影响消费者。也就是说,如果数据是通过 hash(key)%number_of_partitions 进行分区的,那么这个分区可能会通过添加分区进行混洗,但Kafka不会尝试以任何方式自动重新分配数据。

增加一个配置项:
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my_topic_name --alter --add-config x=y
删除一个配置项:
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my_topic_name --alter --delete-config x

配置管理:kafka-configs.sh专门用来对配置进行操作的。动态修改,使用entity-type参数来指定操作配置的类型,使用entity-name参数来指定操作配置的名称。

检查 consumer 位置

有时观察到消费者的位置是有用的。有一个工具,可以显示 consumer 群体中所有 consumer 的位置,以及他们所在日志的结尾。要在名为my-group的 consumer 组上运行此工具,消费一个名为my-topic的 topic 将如下所示:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

注意:这将仅显示使用Java consumer API(基于非ZooKeeper的 consumer)的 consumer 的信息。
 
topic名字                      分区id      当前已消费的条数    总条数     未消费的条数          消费id       								主机ip                       	客户端id
TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
my-topic                       0          2               4               2          consumer-1-029af89c-873c-4751-a720-cefd41a669d6   /127.0.0.1                     consumer-1
my-topic                       1          2               3               1          consumer-1-029af89c-873c-4751-a720-cefd41a669d6   /127.0.0.1                     consumer-1
my-topic                       2          2               3               1          consumer-2-42c1abd4-e3b2-425d-a8bb-e1ea49b29bb2   /127.0.0.1                     consumer-2

--describe用于查询显示详情

这个工具也适用于基于ZooKeeper的 consumer:
bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group my-group

注意:这只会显示关于使用ZooKeeper的 consumer 的信息(不是那些使用Java consumer API的消费者)。
 
TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                    HOST            CLIENT-ID
  topic3          0          241019          395308          154289          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
  topic2          1          520678          803288          282610          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
  topic3          1          241018          398817          157799          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
  topic1          0          854144          855809          1665            consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1
  topic2          0          460537          803290          342753          consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1
  topic3          2          243655          398812          155157          consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4

管理 Consumer 组

通过 ConsumerGroupCommand 工具,我们可以列出,描述或删除 consumer 组。请注意,删除仅在组元数据存储在ZooKeeper中时可用。当使用新的 consumer API ( broker 协调分区处理和重新平衡)时, 当该组的最后一个提交偏移量过期时,该组将被删除。 例如,要列出所有 topic 中的所有 consumer 组:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

test-consumer-group

如前所述,为了查看偏移量,我们这样“describe”consumer 组:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

      CONSUMER-ID                                    HOST            CLIENT-ID       #PARTITIONS
      consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1       2
      consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4       1
      consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2       3
      consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1      consumer3       0

有许多额外的“描述”选项可以用来提供关于消费者群体的更详细的信息:

  • --members:此选项提供使用者组中所有活动成员的列表
    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members
      CONSUMER-ID                                    HOST            CLIENT-ID      #PARTITIONS 
      consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1       2
      consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4       1
      consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2       3
      consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1      consumer3       0
  • --members --verbose:除了上面的“--members”选项报告的信息之外,该选项还提供分配给每个成员的分区
    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose
      CONSUMER-ID                                    HOST            CLIENT-ID       #PARTITIONS     ASSIGNMENT
      consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1       2               topic1(0), topic2(0)
      consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4       1               topic3(2)
      consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2       3               topic2(1), topic3(0,1)
      consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1      consumer3       0               -
  • --offsets:这是默认描述选项,并提供与“--describe”选项相同的输出。
  • --state:此选项提供有用的组级信息。
    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state
      COORDINATOR (ID)          ASSIGNMENT-STRATEGY       STATE                #MEMBERS
      localhost:9092 (0)        range                     Stable               4

若要手动删除一个或多个使用者组,可以使用“--delete”选项:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group

Deletion of requested consumer groups ('my-group', 'my-other-group') was successful.

若要重置使用者组的偏移量,可以使用“--reset-offsets”选项。此选项一次支持一个使用者组。它需要定义以下作用域: --all-topics--topic。必须选择一个范围,除非使用“--from-file”方案。另外,首先要确保使用者实例是非活动的.
它有3个执行选项:

  • (default) 显示要重置的偏移量
  • --execute执行--reset-offsets过程。
  • --export将结果导出为 CSV 格式

--reset-offsets还有以下场景可供选择(必须至少选择一个场景) :

  • --to-datetime <String: datetime> 将偏移量从日期时间重置为偏移量。格式: ‘YYYY-MM-DDTHH:mm:SS.sss’
  • --to-earliest将偏移量重置为最早的偏移量
  • --to-latest将偏移量重置为最新偏移量
  • --shift-by <Long: number-of-offsets>将偏移电流的偏移量重置为 n’,其中 n’可以是正的也可以是负的。
  • --from-file将偏移量重置为 CSV 文件中定义的值
  • --to-current将偏移量重置为当前偏移量。
  • --by-duration <String: duration> 将偏移量重置为按当前时间戳的持续时间进行偏移。格式:‘PnDTnHnMnS’
  • --to-offset将偏移量重置为特定的偏移量。

请注意,超出范围的偏移量将被调整到可用的偏移量末端。例如,如果偏移端为10,偏移移位请求为15,那么实际上将选择10的偏移量。

例如,将使用者组的偏移量重置为最新的偏移量:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest

  TOPIC                          PARTITION  NEW-OFFSET
  topic1                         0          0

如果正在使用老的consumer 并在ZooKeeper中存储组元数据(即 offsets.storage = zookeeper ),则传递 --zookeeper 而不是bootstrap-server:
bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list

修改broker配置

从 Kafka 版本1.1开始,一些代理配置可以在不重新启动代理的情况下进行更新。有关每个代理配置的更新模式,请参见Broker Configs中的 Dynamic Update Mode 列。

  • read-only:需要重新启动broker才能进行更新
  • per-broker:可以为每个broker动态更新
  • cluster-wide:可以作为集群范围的默认值动态更新。也可以作为测试的每个代理值更新。

要更改broker id 0的当前代理配置(例如,日志清理器线程的数量) :
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config log.cleaner.threads=2

为了描述broker id 0的当前动态broker 配置:
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config log.cleaner.threads=2

为了描述broker id 0的当前动态broker配置:
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --describe

有些配置可以配置为集群范围的默认配置,以在整个集群中维护一致的值。集群中的所有broker都将处理集群默认更新。例如,更新所有broker上的日志清理器线程:
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config log.cleaner.threads=2

为了描述当前配置的动态集群范围的默认配置:
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --describe

所有在集群级别可配置的配置也可以在每个代理级别配置(例如用于测试)。如果在不同级别定义了配置值,则使用以下优先顺序:

  • 动态的每个broker配置
  • 动态集群范围的默认配置
  • 来自 server.properties 的静态broker配置
  • kafka 默认值

kafka-server-start.sh

程序启动文件
> bin/kafka-server-start.sh [-daemon] server.properties [--override property=value]*
这个命令后面可以有多个参数,第一个是可选参数,该参数可以让当前命令以后台服务方式执行,第二个必须是 Kafka 的配置文件。后面还可以有多个–override开头的参数,其中的property可以是Broker Configs中提供的所有参数。这些额外的参数会覆盖配置文件中的设置。
例如下面使用同一个配置文件,通过参数覆盖启动多个Broker。

> bin/kafka-server-start.sh -daemon config/server.properties --override broker.id=0 --override log.dirs=/tmp/kafka-logs-1 --override listeners=PLAINTEXT://:9092 --override advertised.listeners=PLAINTEXT://localhost:9092
> bin/kafka-server-start.sh -daemon config/server.properties --override broker.id=1 --override log.dirs=/tmp/kafka-logs-2 --override listeners=PLAINTEXT://:9093 --override advertised.listeners=PLAINTEXT://localhost:9093

上面这种用法只是用于演示,真正要启动多个Broker 应该针对不同的 Broker 创建相应的 server.properties 配置。

kafka-console-consumer.sh

运行控制台consumer客户端来读取刚才创建的消息,该命令支持的参数如下。

选项描述
--blacklist排除在消费之外的topics黑名单
--bootstrap-server 必需 (除非使用旧consumer) : 要连接到的服务器
--consumer-property将表单 key = value 中的用户定义属性传递给consumer的机制
--consumer.config用户配置属性文件 ,注意,[ consumer-property ]优先于此配置
--csv-reporter-enabled如果设置了,将启用 CSV 度量报告器
--delete-consumer-offsets如果指定了,则在启动时删除 zookeep 中的consumer路径
--enable-systest-events除了记录消耗的消息之外,还要记录consumer的生命周期事件。(这是针对系统测试的)
--formatter用于格式化要显示的kafka消息的类的名称。(默认值: kafka.tools.DefaultMessageFormatter)
--from-beginning如果consumer还没有可以使用已建立的consumer offset,则从日志中出现的最早的消息开始,而不是从最新的消息开始
--key-deserializer键反序列化
--max-messages退出前要使用的最大消息数。如果没有设定,消费就是持续的
--metrics-dir如果设置了 --csv-reporter-enabled,并且设置了此参数,则将在此处输出 csv 指标
--new-consumer使用新的消费者实现,这是默认设置。
--offset要使用的offset id (非负数) ,或者“earliest”表示从开始使用,或者“latest”表示从结束使用(默认值: latest)
--partition要使用的partition
--property用于初始化消息格式化程序的属性
--skip-message-on-error如果在处理消息时出现错误,请跳过它而不是暂停。
--timeout-ms如果指定了,则在指定的时间间隔内没有消息可用时退出。
--topic要使用的topic id
--value-deserializer值反序列化
--whitelist消费topics的白名单
--zookeeper必需 (仅当使用旧的consumer时) :, host: port 中的 zookeep 连接的连接字符串。可以给出多个 URL 以允许故障转移。

--bootstrap-server 必须指定,通常--topic也要指定查看的topic。如果想要从头查看消息,还可以指定--from-beginning参数。一般使用的命令如下。
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
还可以通过下面的命令指定partition查看:
>> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --partition 0

kafka-console-producer.sh

producer把一些事件写进topic

选项描述
--batch-size如果未同步发送,则在单个批处理中发送的消息数。(默认值: 200)
--broker-list必需: 表单 HOST1: PORT1,HOST2: PORT2中的broker列表字符串。
--compression-codec压缩编解码器: “ none”、“ gzip”、“ snappy”或“ lz4”。如果没有指定值,则默认为‘ gzip
--key-serializer用于序列化键的消息编码器实现的类名。(默认值: kafka.serializer.DefaultEncoder)
--line-reader读取行中所使用的类的类名。默认情况下,每行都作为单独的消息读取。(默认值: kafka.tools.ConsoleProducer$LineMessageReader)
--max-block-msproducer在发送请求期间阻塞的最大时间(默认值: 60000)
--max-memory-bytesproducer用来缓冲等待发送到服务器的记录的总内存。(默认值: 33554432)
--max-partition-memory-bytespartition分配的缓冲区大小。当收到的记录小于这个规模的producer将试图乐观地组合在一起,直到达到这个规模。(默认值: 16384)
--message-send-max-retriesBrokers可能由于多种原因接收不到消息,暂时不可用只是其中之一。此属性指定数字在producer放弃并删除此消息之前收回。(默认值: 3)
--metadata-expiry-ms即使我们没有看到任何领导层的变化,我们也会强制刷新元数据的时间(以毫秒为单位)。(默认值: 300000)
--old-producer使用旧的producer实现。
--producer-property将表单 key = value 中的用户定义属性传递给producer的机制。
--producer.configProducer配置属性文件。请注意,[ producer-property ]优先于此配置。
--property将表单 key = value 中的用户定义属性传递给消息读取器的机制。这允许为用户定义的消息读取器进行自定义配置。
--queue-enqueuetimeout-ms事件队列超时(默认值:2147483647)
--queue-size如果设置并且producer异步模式下运行,这将给出等待足够批处理大小的消息队列的最大数量。(默认值: 10000)
--request-required-acksproducer请求的必要条件(default: 1)
--request-timeout-msproducer请求的返回超时。值必须是非负和非零(默认值: 1500)
--retry-backoff-ms在每次重试之前,producer刷新相关topics的元数据。由于leader需要一点时间,此属性指定producer在刷新元数据之前等待的时间量。(默认值: 100)
--socket-buffer-sizeTcp RECV 的大小。(默认值: 102400)
--sync如果向 brokers 发送的 set 消息请求是同步的,那么在它们到达时一次发送一个
--timeout如果设置并且producer在异步模式下运行,这将给出消息队列等待足够批处理大小的最长时间。该值以 ms 表示(默认值: 1000)
--topic必需: 要向produce生成消息的topic id
--value-serializer用于序列化值的消息编码器实现的类名。(默认值: kafka.serializer.DefaultEncoder)

其中 --broker-list--topic 是两个必须提供的参数
使用标准输入方式:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

从文件读取:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test < file-input.txt

kafka-topics.sh

Create添加, delete删除, describe描述, 和 修改 topic.

选项描述
--alter更改partitions数、复制分配,and/or
--config正在创建或更改topic的topic配置重写。以下是有效配置列表:cleanup.policy、compression.type、delete.retention.ms、file.delete.delay.ms、flush.messages、flush.ms、follower.replication.throttled.replicas 、index.interval.bytes、leader.replication.throttled.replicas、max.message.bytes 、message.format.version、message.timestamp.difference.max.ms、message.timestamp.type 、min.cleanable.dirty.ratio、min.compaction.lag.ms、min.insync.replicas、preallocate、retention.bytes、retention.ms、segment.bytes、segment.index.bytes 、 segment.jitter.ms、segment.ms 、unclean.leader.election.enable
--create 创建
--delete 删除
--delete-config要为现有主题删除的主题配置重写(请参见--config 选项下的配置列表)。
--describe列出给定topic的详细信息。
--disable-rack-aware禁用机架感知副本分配
--force取消控制台提示
--help打印使用信息
--if-exists如果在更改或删除topic时设置,则只有在topic存在时才执行操作
--if-not-exists如果在创建topic时设置,则仅当topic不存在时才执行操作
--list列出所有可用的topic
--partitions正在创建或更改主题的分区数(警告:如果为具有密钥的主题(分区)增加了分区消息的逻辑或排序将受到影响)
--replica-assignment正在创建或更改的topic的手动partition到broker分配的列表
--replication-factor正在创建的topic中每个partition的复制因子
--topic要创建、修改或描述的topic。除了--create 选项之外,还可以接受正则表达式
--topics-with-overrides 如果在描述topic时设置,则只显示覆盖了配置的topic
--unavailable-partitions如果在描述topics时设置,则只显示其leader不可用的partitions
--under-replicated-partitions如果在描述topics时设置,则只显示在复制partitions 下
--zookeeper必需 :, host: port 中的 zookeep 连接的连接字符串。可以给出多个 URL 以允许故障转移

下面是几种常用的 topic 命令。

描述主题的配置:
bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type topics --entity-name test_topic

设置保留时间:

# Deprecated way
> bin/kafka-topics.sh  --zookeeper localhost:2181 --alter --topic test_topic --config retention.ms=1000

# Modern way
> bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name test_topic --add-config retention.ms=1000

扩展群集

将服务器添加到Kafka集群非常简单,只需为其分配唯一的 broker ID并在新服务器上启动Kafka即可。但是,这些的服务器不会自动分配到任何数据分区,除非将分区移动到这些分区,否则直到创建新 topic 时才会提供服务。所以通常将机器添加到群集中时,希望将一些现有数据迁移到这些机器上。

迁移数据的过程是手动启动的,但是完全自动化。在迁移数据时,Kafka会将新服务器添加为正在迁移的分区的 follower,并允许它完全复制该分区中的现有数据。当新服务器完全复制了此分区的内容并加入了同步副本时,其中一个现有副本将删除其分区的数据。

分区重新分配工具可用于跨 broker 移动分区。理想的分区分布将确保所有 broker 的数据负载和分区大小比较均衡。分区重新分配工具不具备自动分析Kafka集群中的数据分布并移动分区以获得均匀负载的功能。因此,管理员必须找出哪些 topic 或分区应该移动。

分区重新分配工具可以以3种互斥方式运行:

  • --generate: 在此模式下,给定一个 topic 列表和一个 broker 列表,该工具会生成一个候选重新分配,以将指定的 topic 的所有分区移动到新的broker。此选项仅提供了一种便捷的方式,可以根据 tpoc 和目标 broker 列表生成分区重新分配计划。
  • --execute: 在此模式下,该工具基于用户提供的重新分配计划启动分区重新分配。(使用–reassignment-json-file选项)。这可以是由管理员制作的自定义重新分配计划,也可以是使用–generate选项提供的自定义重新分配计划。
  • --verify: 在此模式下,该工具将验证最近用 --execute 模式执行间的所有分区的重新分配状态。状态可以是成功完成,失败或正在进行

优雅的关机

Kafka集群将自动检测到任何 broker 关机或故障,并为该机器上的分区选择新的 leader。无论服务器出现故障还是因为维护或配置更改而故意停机,都会发生这种情况。 对于后一种情况,Kafka支持更优雅的停止服务器的机制,而不仅仅是杀死它。 当一个服务器正常停止时,它将采取两种优化措施:

  • 它将所有日志同步到磁盘,以避免在重新启动时需要进行任何日志恢复活动(即验证日志尾部的所有消息的校验和)。由于日志恢复需要时间,所以从侧面加速了重新启动操作。
  • 它将在关闭之前将以该服务器为 leader 的任何分区迁移到其他副本。这将使 leader 角色传递更快,并将每个分区不可用的时间缩短到几毫秒。

只要服务器的停止不是通过直接杀死,同步日志就会自动发生,但控制 leader 迁移需要使用特殊的设置:
controlled.shutdown.enable=true
请注意,只有当 broker 托管的分区具有副本(即,复制因子大于1 且至少其中一个副本处于活动状态)时,对关闭的控制才会成功。 这通常是你想要的,因为关闭最后一个副本会使 topic 分区不可用。

Balancing leadership

每当一个 borker 停止或崩溃时,该 borker 上的分区的leader 会转移到其他副本。这意味着,在 broker 重新启动时,默认情况下,它将只是所有分区的跟随者,这意味着它不会用于客户端的读取和写入。

为了避免这种不平衡,Kafka有一个首选副本的概念。如果分区的副本列表为1,5,9,则节点1首选为节点5或9的 leader ,因为它在副本列表中较早。可以通过运行以下命令让Kafka集群尝试恢复已恢复副本的领导地位:
bin/kafka-preferred-replica-election.sh --bootstrap-server localhost:9092
由于运行此命令可能很乏味,也可以通过以下配置来自动配置Kafka:
auto.leader.rebalance.enable=true

垮机架均衡副本

机架感知功能可以跨不同机架传播相同分区的副本。这扩展了 Kafka 为 broker 故障提供的容错担保,弥补了机架故障,如果机架上的所有 broker 都失败,则可以限制数据丢失的风险。该功能也可以应用于其他 broker 分组,例如EC2中的可用区域。
可以通过向 broker 配置添加属性来指定 broker 属于的特定机架:
broker.rack=my-rack-id
当 topic 创建,修改或副本重新分配时, 机架约束将得到保证,确保副本跨越尽可能多的机架(一个分区将跨越 min(#racks,replication-factor) 个不同的机架)。
用于向 broker 分配副本的算法可确保每个 broker 的 leader 数量将保持不变,而不管 broker 在机架之间如何分布。这确保了均衡的吞吐量

但是,如果 broker 在机架间分布不均 ,副本的分配将不均匀。具有较少 broker 的机架将获得更多复制副本,这意味着他们将使用更多存储并将更多资源投入复制。因此,每个机架配置相同数量的 broker 是明智的。

集群之间镜像数据

我们指的是通过“镜像”复制Kafka集群之间的数据的过程,以避免与在单个集群中的节点之间发生的复制混淆。Kafka附带了一个在Kafka集群之间镜像数据的工具 mirror-maker该工具从源集群中消费数据并产生数据到目标集群。 这种镜像的常见用例是在另一个数据中心提供副本.
可以运行许多这样的镜像进程来提高吞吐量和容错能力(如果一个进程死亡,其他进程将承担额外的负载)。

源群集中的 topic 中读取数据,并将其写入目标群集中具有相同名称的 topic。事实上,镜像只不过是把一个 Kafka 的 consumer 和 producer 联合在一起了。

源和目标集群是完全独立的实体:它们可以有不同数量的分区,偏移量也不会相同。由于这个原因,镜像集群并不是真正意义上的容错机制(因为 consumer 的偏移量将会不同)。为此,我们建议使用正常的群集内复制。然而,镜像制作进程将保留并使用消息 key 进行分区,所以在每个 key 的基础上保存顺序

以下示例显示如何从输入群集中镜像单个 topic(名为 my-topic ):
bin/kafka-mirror-maker.sh --consumer.config consumer.properties --producer.config producer.properties --whitelist my-topic

请注意,我们使用 –whitelist 选项指定 topic 列表。此选项允许使用任何 Java风格的正则表达式 因此,可以使用 –whitelist ‘A|B’ 来镜像名为A 和 B 的两个 topic 。 或者可以使用 –whitelist ‘*’ 来镜像全部 topic。确保引用的任何正则表达式不会被 shell 尝试将其展开为文件路径。为了方便起见,我们允许使用 ‘,‘而不是’|’ 指定 topic 列表

有时,说出你不想要的东西比较容易。与使用 –whitelist 来表示你想要的相反,通过镜像可以使用 –blacklist 来表示要排除的内容。 这也需要一个正则表达式的参数。但是,当启用 新的 consumer 时,不支持 --blacklist(即 bootstrap.servers )已在 consumer 配置中定义)。

将镜像与配置项 auto.create.topics.enable = true 结合使用,可以创建一个副本群集,即使添加了新的 topic,也可以自动创建和复制源群集中的所有数据。

自动将数据迁移到新机器

可以使用partition reassignment 分区重新分配工具将一些topics从当前的brokers集中移到新添加的brokers中。这在扩展现有集群时通常很有用,因为将整个topics移动到新的brokers比一次移动一个partition更容易。在使用这种方法时, 用户应该提供一个topics,这些topic应该移动到新的brokers和新的brokers目标列表。然后,该工具将给定topics的所有partitions均匀地分布到新的beokers中。在此移动过程中,topic的复制因子保持不变。topics输入列表的所有partitions的副本实际上都从旧的 brokers 移动到新添加的 brokers

例如,下面的示例将topics foo1、 foo2的所有partition移动到新的 brokers 5、6。在此操作结束时,topics foo1和 foo2的所有partitions将只存在于 brokers 5、6上。

由于该工具接受topics 的输入列表作为 json 文件,因此首先需要确定想要移动的topics ,并按如下方式创建 json 文件:

> cat topics-to-move.json
  {"topics": [{"topic": "foo1"},
              {"topic": "foo2"}],
  "version":1
  }

Json 文件准备好之后,使用partition reassignment 工具生成候选分配:
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
  Current partition replica assignment

  {"version":1,
  "partitions":[{"topic":"foo1","partition":0,"replicas":[2,1]},
                {"topic":"foo1","partition":1,"replicas":[1,3]},
                {"topic":"foo1","partition":2,"replicas":[3,4]},
                {"topic":"foo2","partition":0,"replicas":[4,2]},
                {"topic":"foo2","partition":1,"replicas":[2,1]},
                {"topic":"foo2","partition":2,"replicas":[1,3]}]
  }

  Proposed partition reassignment configuration

  {"version":1,
  "partitions":[{"topic":"foo1","partition":0,"replicas":[6,5]},
                {"topic":"foo1","partition":1,"replicas":[5,6]},
                {"topic":"foo1","partition":2,"replicas":[6,5]},
                {"topic":"foo2","partition":0,"replicas":[5,6]},
                {"topic":"foo2","partition":1,"replicas":[6,5]},
                {"topic":"foo2","partition":2,"replicas":[5,6]}]
  }

该工具生成一个候选分配,它将把所有partitionstopics foo1、 foo2移动到brokers 5、6。但是请注意,此时partition移动还没有开始,它只是告诉当前的赋值和建议的新赋值。如果想要rollback到当前的分配,应该保存它。新的分配应该保存在一个 json 文件中(例如,expand-cluster-reassignment.json)作为工具的输入,使用--execute 选项,如下所示:

bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --execute

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --execute
  Current partition replica assignment

  {"version":1,
  "partitions":[{"topic":"foo1","partition":0,"replicas":[2,1]},
                {"topic":"foo1","partition":1,"replicas":[1,3]},
                {"topic":"foo1","partition":2,"replicas":[3,4]},
                {"topic":"foo2","partition":0,"replicas":[4,2]},
                {"topic":"foo2","partition":1,"replicas":[2,1]},
                {"topic":"foo2","partition":2,"replicas":[1,3]}]
  }

  Save this to use as the --reassignment-json-file option during rollback
  Successfully started partition reassignments for foo1-0,foo1-1,foo1-2,foo2-0,foo2-1,foo2-2

最后,可以在工具中使用 --verify选项来检查分区重新分配的状态。请注意,相同的expand-cluster-reassignment.json (与--execute 选项一起使用)应该与--verify 选项一起使用:

bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --verify

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --verify
  Status of partition reassignment:
  Reassignment of partition [foo1,0] is completed
  Reassignment of partition [foo1,1] is still in progress
  Reassignment of partition [foo1,2] is still in progress
  Reassignment of partition [foo2,0] is completed
  Reassignment of partition [foo2,1] is completed
  Reassignment of partition [foo2,2] is completed

自定义分区分配和迁移

expand-cluster-reassignment.json还可以用于有选择地将partition副本移动到特定的brokers。以这种方式使用时,假定用户知道重新分配计划,并且不需要该工具来生成候选重新分配,从而有效地跳过--generate步骤,直接转到--execute 步骤.

例如,下面的示例将topic foo1partition 0 移动到broker 5、6 ,将topic foo2partition 1移动到代理2、3:

第一步是手工构建一个 json 文件中的自定义重新分配计划:

> cat custom-reassignment.json
  {"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}

然后,使用带有--execute 选项的 json 文件开始重新分配过程:
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file custom-reassignment.json --execute

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file custom-reassignment.json --execute
  Current partition replica assignment

  {"version":1,
  "partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
                {"topic":"foo2","partition":1,"replicas":[3,4]}]
  }

  Save this to use as the --reassignment-json-file option during rollback
  Successfully started partition reassignments for foo1-0,foo2-1

可以在工具中使用--verify 选项来检查partition重新分配的状态。请注意,相同的自定义重新分配,Json (与--execute 选项一起使用)应该与--verify 选项一起使用:
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file custom-reassignment.json --verify

 > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file custom-reassignment.json --verify
  Status of partition reassignment:
  Reassignment of partition [foo1,0] is completed
  Reassignment of partition [foo2,1] is completed

停用的brokers

partition reassignment tool还不能为停用brokers自动生成重新分配计划。因此,管理员必须提出一个重新分配计划,以便将broker上托管的所有partitions的副本转移到其他broker上。这可能是相对繁琐的,因为重新分配需要确保所有副本不会从停用的broker移动到另一个broker。为了使这个过程不费吹灰之力,我们计划在将来为停用的brokers添加工具支持。

增加复制因子

增加现有parition的复制因子很容易。只需在自定义重新分配 json 文件中指定额外的副本,并将其与 –execute 选项一起使用,以增加指定partitions的复制因子。

例如,下面的示例将topic foopartition 0的复制因子从1增加到3。在增加复制因子之前,broker 5上只存在partition的副本。作为增加复制因子的一部分,我们将在broker 6和7上添加更多的副本。

第一步是手动构建一个 json 文件中的自定义重新分配计划:

  > cat increase-replication-factor.json
  {"version":1,
  "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}

然后,使用带有 --execute 选项的 json 文件开始重新分配过程:
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --execute

  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --execute
  Current partition replica assignment

  {"version":1,
  "partitions":[{"topic":"foo","partition":0,"replicas":[5]}]}

  Save this to use as the --reassignment-json-file option during rollback
  Successfully started partition reassignment for foo-0

可以在工具中使用--verify选项来检查分区重新分配的状态。请注意,相同的increase-replication-factor.json (与--execute 选项一起使用)应该与--verify 选项一起使用:
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --verify

  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --verify
  Status of partition reassignment:
  Reassignment of partition [foo,0] is completed

还可以使用kafka-topics工具验证复制因子的增加:
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic foo --describe

  > bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic foo --describe
  
  Topic:foo	PartitionCount:1	ReplicationFactor:3	Configs:
    Topic: foo	Partition: 0	Leader: 5	Replicas: 5,6,7	Isr: 5,6,7

在数据迁移过程中限制带宽使用

Kafka 允许对复制流量应用一个节流阀,在用于将副本从一台机器移动到另一台机器的带宽上设置一个上限。这在重新平衡集群、引导新broker或添加或删除broker时非常有用,因为它限制了这些data-intensive 操作对用户的影响。

有两个接口,可用于启动节流阀。最简单和最安全的方法是在调用kafka-reassign-partitions.sh时应用一个节流阀。但kafka-configs.sh 还可以用来直接查看和修改节流阀值。

例如,如果使用下面的命令执行重新平衡操作,它将以不超过50MB/s 的速度移动partition
$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --execute --reassignment-json-file bigger-cluster.json --throttle 50000000

当你执行这个脚本,你会看到节流阀启动:

  The inter-broker throttle limit was set to 50000000 B/s
  Successfully started partition reassignment for foo1-0

如果在重新平衡期间改变节流阀,比如说增加吞吐量以便更快地完成任务,可以通过重新运行 execute 命令,使用传递相同的reassignment-json-file--additional选项来实现这一点:
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --additional --execute --reassignment-json-file bigger-cluster.json --throttle 700000000

$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092  --additional --execute --reassignment-json-file bigger-cluster.json --throttle 700000000
  The inter-broker throttle limit was set to 700000000 B/s

一旦重新平衡完成,管理员就可以使用 --verify 选项检查重新平衡的状态。如果重新平衡已经完成,节流阀将通过--verify命令移除。一旦重新平衡完成,管理员通过运行命令并使用--verify 选项及时删除节流阀,这一点很重要。如果不这样做,则可能会限制常规复制通信量。

如果执行了--verify 选项,并且重新分配已经完成,那么脚本将确认节流阀已经被移除:

bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --verify --reassignment-json-file bigger-cluster.json

  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092  --verify --reassignment-json-file bigger-cluster.json
  Status of partition reassignment:
  Reassignment of partition [my-topic,1] is completed
  Reassignment of partition [my-topic,0] is completed

  Clearing broker-level throttles on brokers 1,2,3
  Clearing topic-level throttles on topic my-topic

管理员还可以使用 kafka-configs.sh验证分配。有两对节流配置用于管理节流过程。第一对是指节流阀值本身。这是在代理级别使用动态属性配置的:

  • leader.replication.throttled.rate
  • follower.replication.throttled.rate

然后是枚举的节流副本集的配置对:

  • leader.replication.throttled.replicas
  • follower.replication.throttled.replicas
    topic配置

所有四个配置值都是通过 kafka-reassign-partitions.sh 自动分配的。

查看节流限制配置:
bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type brokers

  > bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type brokers
  Configs for brokers '2' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000
  Configs for brokers '1' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000

这显示了应用于复制协议的leaderfollower的节流。默认情况下,双方被分配相同的节流吞吐量值。

查看节流副本列表:
bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type topics

  > bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type topics
  Configs for topic 'my-topic' are leader.replication.throttled.replicas=1:102,0:101,
      follower.replication.throttled.replicas=1:101,0:102

在这里,我们看到leader节流应用于broker 102上的partition 1broker 101上的partition 0。同样,follower节流应用于broker 101 上的partition 1broker 102 上的partition 0
查看节流副本列表:
bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type topics

  > bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type topics
  Configs for topic 'my-topic' are leader.replication.throttled.replicas=1:102,0:101,
      follower.replication.throttled.replicas=1:101,0:102

在这里,我们看到leader节流应用于broker 102上的partition 1broker 101上的partition 0。同样,follower节流应用于broker 101上的partition 1broker 102上的partition 0

在默认情况下,kafka-reassign-partitions.sh 将对重新平衡之前存在的所有副本应用leader节流,其中任何一个副本都可能是leader节流。它将对所有移动目的地应用follower节流。因此,如果有一个partitionbroker 101,102 上有副本,被重新分配到102,103,对于这个分区,一个leader节流将应用于101,102,而一个follower节流将仅应用于103
如果需要,你也可以使用 kafka-configs.sh 上的alter 开关手动改变节流阀配置。

节流复制的安全使用

在使用节流复制时应该注意一些问题,特别是:
(1) 取消节流阀:
一旦重新分配完成,应及时取消节流阀(通过运行 kafka-reassign-partitions.sh --verify核实)。
(2)确保进度:
如果节流阀设置得太低,与传入的写入速率相比,复制可能无法取得进展。这种情况发生在:

max(BytesInPerSec) > throttle

其中 BytesInPerSec 是监视 producer到每个broker的写吞吐量的度量。

在重新平衡期间,管理员可以使用以下指标监视复制是否正在取得进展:

kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+)

在复制过程中,延迟应该不断减小。如果指标没有降低,管理员应该像上面描述的那样增加节流吞吐量。

设定配额

配额覆盖值默认值可以在(user=user1, client-id=clientA),用户或客户端级别配置,如此处所述。 默认情况下,客户端是无限制的配额。可以为每个(user, client-id),用户或客户端组设置自定义配额。

为(user = user1,client-id = clientA)配置自定义配额:
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA

  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
  Updated config for entity: user-principal 'user1', client-id 'clientA'.

为 user = user1配置自定义配额:
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1

  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1
  Updated config for entity: user-principal 'user1'.

为client-id=clientA配置自定义配额:
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-name clientA

  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-name clientA
  Updated config for entity: client-id 'clientA'.

可以通过指定 --entity-default 选项而不是--entity-name来为每个(user, client-id),用户或客户端ID组设置默认配额。

为user = userA配置默认客户端配额:
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-default

  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-default
  Updated config for entity: user-principal 'user1', default client-id.

为用户配置默认配额:
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-default

  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-default
  Updated config for entity: default user-principal.

配置client-id的默认配额:
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-default

  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-default
  Updated config for entity: default client-id.

以下是如何描述给定(user, client-id)的配额:
bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA

  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
  Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

描述给定用户的配额:
bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users --entity-name user1

  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users --entity-name user1
  Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

描述给定 client-id的配额:
bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type clients --entity-name clientA

  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type clients --entity-name clientA
  Configs for client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

如果未指定实体名称,则描述指定类型的所有实体。 例如,描述所有用户:
bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users

  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users
  Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
  Configs for default user-principal are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

对于(user, client)也是同样的:
bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users --entity-type clients

  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users --entity-type clients
  Configs for user-principal 'user1', default client-id are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
  Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

通过在 broker 上设置这些配置,可以适用于所有client-id的默认配额。只有在Zookeeper中未配置配额覆盖或默认配置时才应用这些属性。默认情况下,每个 client-id 都会收到一个无限制的配额。以下设置每个 producer 和 consumer 客户端的默认配额为10MB/sec。
quota.producer.default=10485760
quota.consumer.default=10485760
请注意,这些属性已被弃用,并可能在未来版本中删除。使用kafka-configs.sh配置的默认值优先于这些属性。

数据中心

有一些部署需要维护一个跨越多个数据中心的数据管道。对此,我们推荐的方法是在每个拥有众多应用实例的数据中心内部署一个本地Kafka集群,在每个数据中心内只与本地的kafka集群进行交互,然后各集群之间通过镜像进行同步,(请参阅镜像制作工具< / a>了解怎么做到这一点)。.
这种部署模式允许数据中心充当一个独立的实体,并允许我们能够集中的管理和调节数据中心之间的复制。在这种部署模式下,即使数据中心间的链路不可用,每个设施也可以独立运行:当发生这种情况时,镜像会落后,直到链路恢复正常并追上时为止。

如果应用程序需要所有数据的全局视图,你可以提供一个聚合数据的集群,使用镜像将所有数据中心的本地集群镜像聚合起来。聚合集群用于需要全部数据集的应用程序读取。

这并是不唯一的部署模式,可以通过广域网读取或者写入到远程的Kafka集群,但是这显然会增加获取集群的延时。

Kafka能在生产端和消费端很轻易的批处理数据,所以即使在高延时的连接中也可以实现高吞吐量。为此.虽然我们可能需要在生产端,消费端还有broker端增加TCP 套接字缓冲区大小,修改如下参数配置socket.send.buffer.bytessocket.receive.buffer.bytes

通常不建议在高延时链路的情况下部署一个跨越多个数据中心的Kafka集群。这将对Kafka写入和ZooKeeper写入产生非常高的复制延时,当各位置节点之间的网络不可用时,Kafka和ZooKeeper也将不保证可用

安全

安全概述

在0.9.0.0版中,Kafka社区添加了一些特性,通过单独使用或者一起使用这些特性,提高了Kafka集群的安全性。目前支持下列安全措施:

1、使用SSL或SASL验证来自客户端(producers和consumers)、其他brokers和工具的连接。Kafka支持以下SASL机制:

  • SASL/GSSAPI (Kerberos) - 从版本0.9.0.0开始
  • SASL/PLAIN - 从版本0.10.0.0开始
  • SASL/SCRAM-SHA-256 and SASL/SCRAM-SHA-512 -从版本0.10.2.0开始
  • SASL/OAUTHBEARER - 从版本2.0开始

2、验证从brokers 到 ZooKeeper的连接
3、对brokers与clients之间、brokers之间或brokers与工具之间使用SSL传输对数据加密(注意,启用SSL时性能会下降,其大小取决于CPU类型和JVM实现)。
4、授权客户端的读写操作
5、授权是可插拔的,并且支持与外部授权服务的集成

值得注意的是,安全是可选的 - 支持非安全集群,也支持需要认证,不需要认证,加密和未加密clients的混合集群。 以下指南介绍了如何在clients和brokers中配置和使用安全功能。

Listener配置

为了保护kafka集群,有必要保护用于与服务器通信的通道。每个服务器必须定义一组listeners,用于接收来自客户端和其他服务器的请求。可以将每个侦听器配置为使用各种机制对客户端进行身份验证,并确保服务器和客户端之间的通信被加密.

Kafka 服务器支持侦听多端口上的连接。这是通过服务器配置中的 listeners 属性配置的,该属性接受以逗号分隔的要启用的 listeners 列表。每个服务器上必须至少定义一个listener。在listeners中定义的每个listener的格式如下:

{LISTENER_NAME}://{hostname}:{port}

LISTENER _ NAME 通常是一个描述性名称,用于定义listener的用途。例如,许多配置使用单独的listener来处理客户端通信,因此它们可能在配置中将相应的侦听器称为 CLIENT

listeners=CLIENT://localhost:9092

每个listener安全协议在一个单独的配置中定义: listener.security.protocol.map。该值是映射到其安全协议的每个listener.security.protocol.map逗号分隔列表。例如,下面的值配置指定 CLIENT listener将使用 SSL,而 BROKER listener将使用plaintext

listener.security.protocol.map=CLIENT:SSL,BROKER:PLAINTEXT

安全协议的可选择如下:

  • PLAINTEXT
  • SSL
  • SASL_PLAINTEXT
  • SASL_SSL

PLAINTEXT协议不提供安全性,也不需要任何其他配置。

如果每个必需的listener使用单独的安全协议,那么也可以使用安全协议名作为listeners中的listener名称。使用上面的示例,我们可以使用以下定义跳过 CLIENTBROKER listener的定义:

listeners=SSL://localhost:9092,PLAINTEXT://localhost:9093

建议用户为 listener 提供显式的名称,因为这使得每个 listener 的预期用法更加清晰。

在这个列表中的listener中,可以通过将 inter.broker.listener.name 配置设置为listener的名称来声明listener用于inter-brokerbroker间通信inter-broker listener的主要用途是partition复制。如果没有定义,那么 inter-broker listenersecurity.inter.broker.protocol定义的安全协议决定,该协议默认为 PLAINTEXT

对于依赖于 Zookeep 来存储集群元数据的遗留集群,可以声明一个单独的listener,用于从运行中的控制器向brokers传播元数据。这是由 control.plane.listener.name 决定的。当需要将元数据更新推送到集群中的brokers时,运行中的控制器将使用此listener。使用控制平面listener的好处是它使用一个单独的处理线程,这使得应用程序流量不太可能阻碍元数据更改的及时传播(例如partition leader程序和 ISR 更新)。注意,默认值为 null,这意味着控制器将使用 ·inter.broker.listener· 定义的同一个listener

在 KRaft 集群中,代理是 process.roles中启用了broker角色(role)的任何服务器,控制器是启用了controller 控制器 role 的任何服务器。listener配置取决于roleinter.broker.listener.name 定义的listener专门用于brokers之间的请求。另一方面,控制器必须使用单独的listener,这是由 controller.listener.names 配置定义的。不能将此值设置为与inter-broker listener相同的值。

控制器接收来自其他控制器brokers的请求。因此,即使服务器没有启用controller role(即,它只是一个broker) ,它仍然必须定义controller listener以及配置它所需的任何安全属性。例如,我们可以在独立broker上使用以下配置:

process.roles=broker
listeners=BROKER://localhost:9092
inter.broker.listener.name=BROKER
controller.quorum.voters=0@localhost:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL

在这个示例中,controller listener仍然配置为使用 SASL _ SSL 安全协议,但是由于broker本身不公开controller listener,因此它不包含在listeners中。本例中将使用的端口来自 controller.quorum.voters 配置,该配置定义了控制器的完整列表。

对于启用了brokercontroller role的 KRaft 服务器,配置是相似的。唯一的区别是controller role必须包含在listeners中:

process.roles=broker,controller
listeners=BROKER://localhost:9092,CONTROLLER://localhost:9093
inter.broker.listener.name=BROKER
controller.quorum.voters=0@localhost:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL

controller.quorum.voters中定义的端口需要与公开的controller listeners之一完全匹配。例如,这里的 CONTROLLER listener绑定到端口9093controller.quorum.voters定义的连接字符串还必须使用端口9093

控制器将接受对 controller.listener.names 定义的所有listeners的请求。通常只有一个控制器listener,但也可能有更多。例如,这提供了一种方法,通过集群的一个滚动(一个滚动用于公开新的listeners,另一个滚动用于删除旧的listeners)将执行的listeners从一个端口或安全协议更改为另一个端口或安全协议。当定义多个controller listeners时,列表中的第一个listener将用于出站请求。

在 Kafka,为客户使用单独的listener是一种传统做法。这允许在inter-cluster listeners 网络级隔离群集间侦听器。在 KRaft 中的controller listener的情况下,listener应该是隔离的,因为客户机无论如何都不会使用它。客户端需要连接到broker上配置的任何其他listener。为控制器绑定的任何请求都将如下所述转发

使用 SSL 的加密和身份验证

ApacheKafka 允许客户端使用 SSL 进行通信加密和身份验证。默认情况下,SSL 是禁用的,但在需要时可以打开。下面的段落将详细解释如何设置自己的 PKI 基础设施,使用它来创建证书,并配置 Kafka 来使用这些证书。

为每个Kafka broker生成 SSL 密钥和证书

部署一个或多个支持 SSL 的brokers的第一步是为每个服务器生成一个公钥/私钥对。由于 Kafka 期望所有的密钥和证书都存储在密钥存储库中,因此我们将使用 Java 的 keytool 命令执行此任务。该工具支持两种不同的密钥存储格式,一种是 Java 特有的 jks 格式,这种格式现在已经被弃用,另一种是 PKCS12PKCS12是 Java 版本9的默认格式,以确保使用这种格式,而不管使用的是哪个 Java 版本,所有以下命令都明确指定 PKCS12格式。
> keytool -keystore {keystorefile} -alias localhost -validity {validity} -genkey -keyalg RSA -storetype pkcs12
需要在上面的命令中指定两个参数:

  • Keystorefile存储此broker的密钥(以及后面的证书)的 keystore 文件。Keystore 文件包含此broker的私钥和公钥,因此需要保证其安全。理想情况下,这个步骤是在 Kafka broker上运行的,这个密钥将被用在这个broker上,因为这个密钥永远不应该被传输/离开它预定的服务器。
  • validity:密钥的有效时间(以天为单位)。请注意,这与证书的有效期不同,有效期将在签署证书时确定。可以使用相同的密钥来请求多个证书: 如果密钥的有效期为10年,但 CA 将只签署有效期为1年的证书,那么可以随着时间的推移使用具有10个证书的相同密钥。

要获得可以与刚刚创建的私钥一起使用的证书,需要创建一个证书签名请求。当由受信任的 CA 签名时,签名请求会生成实际的证书,然后可以将该证书安装在密钥存储中并用于身份验证目的。
要生成证书签名请求,请对到目前为止创建的所有服务器密钥存储库运行以下命令。
> keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -destkeystoretype pkcs12 -ext SAN=DNS:{FQDN},IP:{IPADDRESS1}
这个命令假设想要向证书添加主机名信息,如果不是这样的话,您可以省略扩展参数-ext SAN=DNS:{FQDN},IP:{IPADDRESS1}。请参阅下面的更多信息。

主机名验证
启用主机名验证后,主机名验证是根据所连接的服务器的实际主机名或 ip 地址检查证书属性的过程,以确保您确实连接到了正确的服务器。
这种检查的主要原因是为了防止中间人攻击。对于kafka来说,这个检查在默认情况下被禁用了很长一段时间,但是在kafka2.0版本中,服务器的主机名验证在默认情况下对客户端连接和broker间连接启用。
服务器主机名验证可以通过将 ssl.endpoint.identification.algorithm设置为空字符串来禁用。
对于动态配置的broker listeners,可以使用 kafka-configs.sh 来禁用主机名验证:
> bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers --entity-name 0 --alter --add-config "listener.name.internal.ssl.endpoint.identification.algorithm="

通常没有很好的理由禁用主机名验证,除了最快的方法“只是让它工作”,然后承诺“修复它以后当有更多的时间”!在正确的时间进行主机名验证并不难,但是一旦集群启动并运行起来,就会变得更加困难

如果启用了主机名验证,客户端将根据以下两个字段之一验证服务器的完全限定域名 (FQDN)ip 地址

  • 通用名(CN)
  • 主题替代名称(SAN)

虽然 Kafka 检查这两个字段,但是自2000年以来一直不赞成使用通用名字字段来验证主机名,如果可能的话应该避免使用。此外,SAN 字段更加灵活,允许在证书中声明 多个DNS 和 IP 条目
另一个优点是,如果 SAN 字段用于主机名验证,则可以为授权目的将公共名称设置为更有意义的值。因为我们需要将 SAN 字段包含在签名的证书中,所以在生成签名请求时将指定它。也可以在生成密钥对时指定它,但这不会自动复制到签名请求中。
要添加 SAN 字段,请将以下参数-ext SAN=DNS:{FQDN},IP:{IPADDRESS}附加到 keytool 命令:
> keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -destkeystoretype pkcs12 -ext SAN=DNS:{FQDN},IP:{IPADDRESS1}

创建自己的 CA

在此步骤之后,集群中的每台机器都有一个公钥/私钥对,该对已经可以用于加密流量和证书签名请求,这是创建证书的基础。要添加身份验证功能,此签名请求需要由受信任的中心签名,该中心将在此步骤中创建。
证书颁发机构(CA)负责对证书进行签名。CA 的工作方式类似于发放护照的政府——政府在每本护照上盖章(签名) ,这样护照就很难伪造了。
其他国家的政府则核实这些印章,以确保护照的真实性。类似地,CA 对证书进行签名,加密保证签名的证书在计算上难以伪造。
因此,只要 CA 是一个真正的和可信的权威,客户机就可以有力地保证它们正在连接到真正的机器。

由于 OpenSSL 中的一个 bug,x509模块不会将请求的扩展字段从 CSR 复制到最终的证书中。因为我们希望在证书中使用 SAN 扩展来启用主机名验证,所以我们将改用 ca 模块。这需要在生成 CA 密钥对之前进行一些额外的配置。
将以下清单保存到名为 openssl-ca.cnf 的文件中,并根据需要调整有效性和公共属性的值。

HOME            = .
RANDFILE        = $ENV::HOME/.rnd

####################################################################
[ ca ]
default_ca    = CA_default      # The default ca section

[ CA_default ]

base_dir      = .
certificate   = $base_dir/cacert.pem   # The CA certifcate
private_key   = $base_dir/cakey.pem    # The CA private key
new_certs_dir = $base_dir              # Location for new certs after signing
database      = $base_dir/index.txt    # Database index file
serial        = $base_dir/serial.txt   # The current serial number

default_days     = 1000         # How long to certify for
default_crl_days = 30           # How long before next CRL
default_md       = sha256       # Use public key default MD
preserve         = no           # Keep passed DN ordering

x509_extensions = ca_extensions # The extensions to add to the cert

email_in_dn     = no            # Don't concat the email in the DN
copy_extensions = copy          # Required to copy SANs from CSR to cert

####################################################################
[ req ]
default_bits       = 4096
default_keyfile    = cakey.pem
distinguished_name = ca_distinguished_name
x509_extensions    = ca_extensions
string_mask        = utf8only

####################################################################
[ ca_distinguished_name ]
countryName         = Country Name (2 letter code)
countryName_default = DE

stateOrProvinceName         = State or Province Name (full name)
stateOrProvinceName_default = Test Province

localityName                = Locality Name (eg, city)
localityName_default        = Test Town

organizationName            = Organization Name (eg, company)
organizationName_default    = Test Company

organizationalUnitName         = Organizational Unit (eg, division)
organizationalUnitName_default = Test Unit

commonName         = Common Name (e.g. server FQDN or YOUR name)
commonName_default = Test Name

emailAddress         = Email Address
emailAddress_default = test@test.com

####################################################################
[ ca_extensions ]

subjectKeyIdentifier   = hash
authorityKeyIdentifier = keyid:always, issuer
basicConstraints       = critical, CA:true
keyUsage               = keyCertSign, cRLSign

####################################################################
[ signing_policy ]
countryName            = optional
stateOrProvinceName    = optional
localityName           = optional
organizationName       = optional
organizationalUnitName = optional
commonName             = supplied
emailAddress           = optional

####################################################################
[ signing_req ]
subjectKeyIdentifier   = hash
authorityKeyIdentifier = keyid,issuer
basicConstraints       = CA:FALSE
keyUsage               = digitalSignature, keyEncipherment

然后创建一个数据库和序列号文件,这些文件将用于跟踪哪些证书是用这个 CA 签署的。这两个文件都只是与 CA 密钥位于同一目录中的文本文件。

> echo 01 > serial.txt
> touch index.txt

完成这些步骤之后,现在就可以生成将用于稍后签署证书的 CA 了。
> openssl req -x509 -config openssl-ca.cnf -newkey rsa:4096 -sha256 -nodes -out cacert.pem -outform PEM
CA 仅仅是一个公共/私有密钥对和由其自身签名的证书,并且只用于签名其他证书。
这个密钥对应该保持非常安全,如果有人获得访问它的权限,他们可以创建和签署您的基础设施信任的证书,这意味着他们将能够在连接到任何信任这个 CA 的服务时模拟任何人。
下一步是将生成的 CA 添加到 客户端的信任库 ,以便客户端可以信任这个 CA:
> keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert

注意: 如果你在 Kafka 代理配置中将 ssl.client.auth 设置为“requested”或“required”,那么你必须为 Kafka brokers提供一个信任存储,它应该有客户密钥签名的所有 CA 证书。

> keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
与步骤1中存储每台计算机自身标识的密钥存储区不同,客户端的信任存储区存储客户端应该信任的所有证书。将证书导入到信任存储中还意味着信任由该证书签名的所有证书。如上所述,信任政府(CA)也意味着信任它颁发的所有护照(证书)。此属性称为信任链,在将 SSL 部署到大型Kafka 集群时尤其有用。您可以使用单个 CA 对集群中的所有证书进行签名,并让所有计算机共享信任 CA 的同一信任存储库。这样所有机器都可以认证其他机器。

签署证书

然后和 CA 签字:
> openssl ca -config openssl-ca.cnf -policy signing_policy -extensions signing_req -out {server certificate} -infiles {certificate signing request}
最后,您需要将 CA 的证书和已签名的证书导入密钥存储库:

> keytool -keystore {keystore} -alias CARoot -import -file {CA certificate}
> keytool -keystore {keystore} -alias localhost -import -file cert-signed

参数的定义如下:

  • Keystore: 密钥库的位置
  • CA 证书: CA 的证书
  • 证书签名请求: 用服务器密钥创建的 csr
  • 服务器证书: 将服务器的已签名证书写入的文件

这将给您留下一个名为 truststore.jks 的信任库——这对所有客户机和brokers都是一样的,并且不包含任何敏感信息,因此没有必要保护它。
此外,每个节点将有一个 server.keystore.jks 文件,其中包含节点密钥、证书和 CA 证书,有关如何使用这些文件的信息,请参考配置 Kafka Brokers 和配置 Kafka Client。
关于这个主题的一些工具帮助,请查看 easyRSA 项目,该项目有大量的脚本来帮助完成这些步骤。

PEM 格式的 SSL 密钥和证书
从2.7.0开始,可以直接以 PEM 格式为 Kafka brokers客户机配置 SSL 密钥和信任存储。这避免了在文件系统中存储单独文件的需要,并且利用了Kafka 配置的密码保护特性。除了 JKS 和 PKCS12之外,PEM 还可以用作基于文件的密钥和信任存储的存储类型。要在broker客户机配置中直接配置 PEM 密钥存储,应该在 ssl.keystore.key 中提供 PEM 格式的私钥,并且应该在 ssl.keystore.certificate.chain 中提供 PEM 格式的证书链。为了配置信任存储,应该在 ssl.truststore.certificates 中提供信任证书,例如 CA 的公共证书。由于 PEM 通常存储为多行 base-64字符串,因此配置值可以包含在 Kafka 配置中,作为多行字符串,其中行以反斜杠 (‘’) 结尾,用于连续行。

存储密码配置 ssl.keystore.passwordssl.truststore.password 不用于 PEM。如果使用密码对私钥进行加密,则必须在 ssl.key.password 中提供密钥密码。私钥可能以未加密的形式提供,而不需要密码。在production中,在这种情况下,应该使用 Kafka 的密码保护功能对配置进行加密或外部化。请注意,当使用 OpenSSL 之类的外部工具进行加密时,默认的 SSL 引擎工厂对于解密加密的私钥的能力是有限的。像 BouncyCastle 这样的第三方库可以集成到定制的 SslEngineFactory 中,以支持范围更广的加密私钥。

Production 中常见的陷阱
配置Kafka Brokers

如果没有为代理间通信启用 SSL (请参阅下文了解如何启用它) ,那么 PLAINTEXT 和 SSL 端口都是必需的。
listeners=PLAINTEXT://host.name:port,SSL://host.name:port
在broker端需要以下 SSL 配置:

ssl.keystore.location=/var/private/ssl/server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.truststore.location=/var/private/ssl/server.truststore.jks
ssl.truststore.password=test1234

注意: 从技术上讲,ssl.trust store.password 是可选的,但强烈推荐使用。如果没有设置密码,对信任库的访问仍然可用,但是禁用完整性检查。值得考虑的可选设置:

  • ssl.client.auth=none (“required” => 客户端身份验证是必需的, “requested” => 客户端身份验证是请求的,没有证书的客户端仍然可以连接。不鼓励使用“requested”,因为它提供了一种错误的安全感,配置错误的客户端仍然可以成功连接。)
  • ssl.cipher.suites (可选). 密码套件是一种命名的认证、加密、 MAC 和密钥交换算法的组合,用于协商使用 TLS 或 SSL 网络协议的网络连接的安全设置。(默认为空列表)
  • ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 (列出您将从客户端接受的 SSL 协议。请注意,为了支持 TLS,不推荐使用 SSL,不建议在生产环境中使用 SSL)
  • ssl.keystore.type=JKS
  • ssl.truststore.type=JKS
  • ssl.secure.random.implementation=SHA1PRNG

如果希望为broker间通信启用 SSL,请将以下内容添加到 server.properties 文件(它默认为 PLAINTEXT):
security.inter.broker.protocol=SSL

由于一些国家的导入规定,Oracle 的实现限制了默认情况下可用的加密算法的强度。如果需要更强的算法(例如,具有256位密钥的 AES) ,则必须获得JCE Unlimited Strength Jurisdiction Policy文件(JCE 无限强度管辖权策略)并将其安装在 JDK/JRE 中。有关更多信息,请参见 JCA 提供程序文档。

JRE/JDK 将有一个用于加密操作的默认伪随机数生成器(PRNG) ,因此不需要配置 ssl.secure.random.implementation所使用的实现。但是,一些实现存在性能问题(值得注意的是,Linux 系统上选择的缺省设置 NativePRNG 使用全局锁)。在 SSL 连接的性能成为问题的情况下,考虑显式地设置要使用的实现。SHA1PRNG 实现是非阻塞的,并且在重负载(每秒产生50MB 的消息,加上每个broker的复制流量)下显示了非常好的性能特征。

启动broker之后,您应该能够在 server.log 中看到:

with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL)

要快速检查服务器密钥存储库和信任存储库是否设置正确,可以运行以下命令:
> openssl s_client -debug -connect localhost:9093 -tls1
(注意: TLSv1应该列在 ssl.enabled.protocols 下)
在这个命令的输出中,您应该看到服务器的证书:

-----BEGIN CERTIFICATE-----
{variable sized random bytes}
-----END CERTIFICATE-----
subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani
issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.com

如果证书没有显示,或者有任何其他错误消息,那么密钥存储库没有正确设置。

配置 Kafka Client

SSL 只支持新的kafka Producer Consumer,不支持旧的 API。SSL 的配置对于Producer Consumer都是相同的。

如果broker中不需要客户端身份验证,那么下面是一个最小配置示例:

security.protocol=SSL
ssl.truststore.location=/var/private/ssl/client.truststore.jks
ssl.truststore.password=test1234

注意: 从技术上讲,ssl.truststore.password 是可选的,但强烈推荐使用。如果没有设置密码,对信任库的访问仍然可用,但是禁用完整性检查。如果需要客户端身份验证,则必须像步骤1那样创建密钥存储库,并且还必须配置以下内容:

ssl.keystore.location=/var/private/ssl/client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234

根据我们的需求和broker配置,还可能需要其他配置设置:

  • ssl.provider(可选)。用于 SSL 连接的安全提供程序的名称。默认值是 JVM 的默认安全提供程序。
  • ssl.cipher.suites(可选)。密码套件是一种命名的认证、加密、 MAC 和密钥交换算法的组合,用于协商使用 TLS 或 SSL 网络协议的网络连接的安全设置。
  • ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1。它应该列出在broker端配置的至少一个协议
  • ssl.truststore.type=JKS
  • ssl.keystore.type=JKS

使用控制台producer和控制台consumer的示例:

> kafka-console-producer.sh --bootstrap-server localhost:9093 --topic test --producer.config client-ssl.properties
> kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config client-ssl.properties
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐