前两天有个朋友咨询我一个关于 kafka 消息消费慢的问题。最后帮忙解决了,记录下过程。

他的项目上 kafka 使用的是云服务,版本号是 2.12,现有一个生产者和一个消费者,生产十分钟的消息,需要二十分钟才能消费完。

消费者每次拉取1000条消息进行处理,使用的是 newCachedThreadPool 线程池。

1.kafka 相关命令

1)创建topic

–zookeeper: 指定了kafka所连接的zookeeper服务地址
–topic: 指定了所要创建的主题的名称
–partitions: 指定了分区个数
–replication-factor: 指定了副本因子
–create: 创建主题的动作指令

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic test

./kafka-topics.sh 在 kafka 的 bin 文件夹下,下面的命令一样。

2)查看topic列表

./kafka-topics.sh --zookeeper localhost:2181 --list

3)查看某个topic详情

./kafka-topics.sh --zookeeper localhost:2181 --describe --topic test

4)查看消费组

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

5)查看消费者组的信息

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group 消费组名

2.kafka 相关参数

max.poll.records:一次 poll 返回的最大记录数默认是500条。
max.poll.interval.ms:两次 poll 方法最大时间间隔这个参数,默认是300s。

这两个值设置要合理,如果一次拉取的消息处理完的时间,超过了 poll 方法最大时间间隔,会导致偏移量报错,导致重复消费。

这两个值在 springboot 上的配置信息为

spring:
  kafka:
    consumer:
      max-poll-records: 500
  	properties:
      max:
        poll:
          interval:
            ms: 300000

300000 单位是毫秒。

这两个值设置合理的话,是能解决消费慢的问题。

3.优化

除了设置上面的两个值,还建议他增加消费者,在同一个消费组中,每个消费者的 ID 设置不同即可,相关参数为

spring:
  kafka:
    consumer:
      client-id: client01
      group-id: groupName

然后朋友优化了他的代码,取消 newCachedThreadPool 线程池,改为了缓存模式,每次缓存1000条消息一起处理。

4.感慨

听朋友说他已经花了一个月时间了还没有解决,我怀疑他在摸鱼。

我这边就花了两个半上午,干活的空档,查了一些资料。

看来我还是可以的。

不过想起来他那将近两倍于我的工资,嗯嗯嗯嗯嗯,

笑中带泪

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐