配置参数说明

生产者配置参数

  • bootstrap.serversmetadata.broker.list:设置kafka broker ip端口,可设置一个或多个,格式为host1:port1,host2:port2

  • retries:生产者发送消息到broker失败后的重试次数,默认值为0

  • retry.backoff.ms:发送消息失败后到重新发送消息的时间间隔,默认值为100 ms

  • buffer.memory:设置RecordAccumulator缓存大小,默认值为32 MB

  • batch.size:数据累积到batch.size大小后才会发送数据,默认值为16 KB

  • linger.ms:数据没有达到batch.size大小但已超过linger.ms时,立即发送消息,默认值为0 ms;提高并发量时可适当增加,如10 ms

  • max.block.ms:最大阻塞时间,RecordAccumulator缓存不足时或者元数据不可用时,发送数据到broker会阻塞或返回异常,此参数的默认值为60000 ms

  • request.timeout.ms:等待请求响应的超时时间,默认值为30000 ms

  • request.required.acks:指定分区中成功写入消息的副本数量

    • 0:发送消息后立即返回成功,不等待broker端响应结果

    • 1:发送消息后broker端分区leader成功写入后就会得到响应

    • -1all:发送消息后不仅要等待broker端分区leader成功写入,还要等待该分区所有ISR(同步副本集)成功写入,才会得到响应

  • max.request.size:发送消息的最大大小,默认值为1 MB

  • compression.type:指定消息的压缩算法,默认值为none,可以配置为gzipsnappylz4zstd(kafka 2.1.0开始支持,需对应的库文件)。

发送消息流程:
在这里插入图片描述

消费者配置参数

  • bootstrap.serversmetadata.broker.list:设置kafka broker ip端口,可设置一个或多个,格式为host1:port1,host2:port2

  • group.id: 消费者组名称

  • enable.auto.commit:是否自动提交,默认值为true

  • auto.commit.interval.ms:自动提交的时间间隔,默认5000 ms

  • auto.offset.reset:找不到记录的偏移量或记录的偏移量发生越界时,设置此参数确定从何处开始消费

    • earliest:当各分区下存在已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,从头开始消费。
    • latest:当各分区下存在已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,消费该分区下新产生的数据。
    • none:topic 各分区都存在已提交的 offset 时,从 offset 后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。
  • fetch.min.bytes:一次拉取数据请求中数据的最小大小,默认1 B

  • fetch.max.bytes:一次拉取数据请求中数据的最大大小,默认52428800 B,即50 MB

  • fetch.max.wait.ms:一次拉取数据请求中最大等待时间,默认500 ms

  • max.partition.fetch.bytes:一次拉取数据请求中每个分区数据的最大大小,默认1048576 B, 即1 MB

  • max.poll.records:一次拉取数据请求中数据的最大数量,默认500条

  • request.timeout.ms:等待请求响应的超时时间,默认值为30000 ms

  • partition.assignment.strategy:配置分区分配策略,默认策略range;多个消费者订阅一个主题时,建议采用range;多个消费者订阅相同列表主题时,建议采用roundrobin;多个消费者订阅不同的主题列表时,建议采用cooperative-sticky
    tips:消费者的顺序可通过配置client.id参数,如设置"0"、“1”、“2”,消费者排序就是c0、c1、c2

    • range:针对每个主题,对消费者、分区进行排序,按分区数是否整除消费者数进行区分:

      • 当分区数除以消费者没有余数时,按顺序依次分配,如6个分区,3个消费者:c1消费0、1分区,c2消费2、3,c3消费4、5分区
        在这里插入图片描述

      • 当分区数除以消费者有余数时,那么排在前面的消费者会多分配一个分区,如8个分区,3个消费者:c1消费0、1、2分区,c2消费3、4、5,c3消费6、7分区
        在这里插入图片描述

    • roundrobin:将所有主题的分区进行排序,然后按照消费者的排序轮询分配,如果存在消费者未订阅对应主题,即消费者的topic列表不同,在轮询时直接跳过,

      • 消费者topic列表相同,如2个topic(t0、t1),每个topic 4个分区,三个消费者,所有主题的分区排序为t0-0、t0-1、t0-2、t0-3、t1-0、t1-1、t1-2、t1-3,消费者排序:c1、c2、c3:c1消费t0-0、t0-3、t1-2分区,c2消费t0-1、t1-0、t1-3,c3消费t0-2、t1-1分区
        在这里插入图片描述

      • 消费者topic列表不同,如2个topic(t0、t1),每个topic 4个分区,三个消费者,所有主题的分区排序为t0-0、t0-1、t0-2、t0-3、t1-0、t1-1、t1-2、t1-3,消费者排序为c1、c2、c3,c1订阅t0,c2订阅t1,c3订阅t0、t1,则分配情况为:c1消费t0-0、t0-2分区,c2消费t1-0、t1-2,c3消费t0-1、t0-3、t1-1、t1-3分区
        在这里插入图片描述

    • cooperative-sticky:执行新的分区分配之前,尽量在上一次分配上做最小改动;目标:

      • 分区的分配尽量的均衡

      • 每一次重分配的结果尽量与上一次分配结果保持一致

        注意:两个目标发生冲突时优先保证第一个目标

        • 消费者topic列表不同,如2个topic(t0、t1),每个topic 4个分区,三个消费者,所有主题的分区排序为t0-0、t0-1、t0-2、t0-3、t1-0、t1-1、t1-2、t1-3,消费者排序为c1、c2、c3,c1订阅t0,c2订阅t1,c3订阅t0、t1,则分配情况为:c1消费t0-0、t0-2分区,c2消费t1-0、t1-1、t1-2,c3消费t0-1、t0-3、t1-3分区
          在这里插入图片描述
          此时将消费者1关闭,重平衡结果为:c2消费t1-0、t1-1、t1-2、t1-3,c3消费t0-1、t0-2、t0-3、t0-3分区
          在这里插入图片描述

使用场景

生产者

  • 保证消息的高可靠性

    会影响高并发的性能

    配置项:

    request.required.acks:可设置成1,也可设置成 -1或all,保证broker端接收到了消息

    retries:重试次数

    retry.backoff.ms:重试间隔时间

  • 节约网络带宽,提高吞吐量

    会增加CPU的使用率,最好与broker端的压缩格式一致,不然会导致broker CPU负载增加(需转换压缩格式进行存储)

    配置项:

compression.type:压缩消息进行发送

  • 增加高并发的性能

    会滞后消息的发送

    配置项:

batch.size:可根据实际消息大小及并发量进行配置

linger.ms:如10ms,不建议设置太大,越大消息滞后时间越长

消费者

  • 增加高并发的性能

    可能会出现上一条的情况

    配置项:

    enable.auto.commit:设置为true,不用在处理完消息后手动提交

    auto.commit.interval.ms:减少自动提交的间隔,避免消息处理了但偏移量还没提交

  • 消费者加入新的消费者组时,可定义从何处开始消费

    配置项:

    auto.offset.reset:具体参考配置说明

  • 分布式服务含有多个消费者处理不同分区数据时,尽量保证负载均衡

    配置项:

    partition.assignment.strategy:具体参考配置说明

    auto.offset.reset:具体参考配置说明

  • 分布式服务含有多个消费者处理不同分区数据时,尽量保证负载均衡

    配置项:

    partition.assignment.strategy:具体参考配置说明

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐