吐血整理的kafka配置文件信息,请君来看

 Kafka官方配置

        如果你的生产环境连接的是阿里云的Kafka,需要提供SSL凭证,额外的SSL相关参数请参考阿里云。

spring:
  #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
  kafka:
    bootstrap-servers: 10.200.8.29:9092
    #https://kafka.apache.org/documentation/#producerconfigs
    producer:
      bootstrap-servers: 10.200.8.29:9092
      # 可重试错误的重试次数,例如“连接错误”、“无主且未选举出新Leader”
      retries: 1 #生产者发送消息失败重试次数
      # 多条消息放同一批次,达到多达就让Sender线程发送
      batch-size: 16384 # 同一批次内存大小(默认16K)
      # 发送消息的速度超过发送到服务器的速度,会导致空间不足。send方法要么被阻塞,要么抛异常
      # 取决于如何设置max.block.ms,表示抛出异常前可以阻塞一段时间
      buffer-memory: 314572800 #生产者内存缓存区大小(300M = 300*1024*1024)
      #acks=0:无论成功还是失败,只发送一次。无需确认
      #acks=1:即只需要确认leader收到消息
      #acks=all或-1:ISR + Leader都确定收到
      acks: 1
      key-serializer: org.apache.kafka.common.serialization.StringSerializer #key的编解码方法
      value-serializer: org.apache.kafka.common.serialization.StringSerializer #value的编解码方法
      #开启事务,但是要求ack为all,否则无法保证幂等性
      #transaction-id-prefix: "COLA_TX"
      #额外的,没有直接有properties对应的参数,将存放到下面这个Map对象中,一并初始化
      properties:
        #自定义拦截器,注意,这里结尾时classes(先于分区器,快递先贴了标签再指定地址)
        interceptor.classes: cn.com.controller.TimeInterceptor
        #自定义分区器
        #partitioner.class: com.alibaba.cola.kafka.test.customer.inteceptor.MyPartitioner
        #即使达不到batch-size设定的大小,只要超过这个毫秒的时间,一样会发送消息出去
        linger.ms: 1000
        #最大请求大小,200M = 200*1024*1024,与服务器broker的message.max.bytes最好匹配一致
        max.request.size: 209715200
        #Producer.send()方法的最大阻塞时间(115秒)
        # 发送消息的速度超过发送到服务器的速度,会导致空间不足。send方法要么被阻塞,要么抛异常
        # 取决于如何设置max.block.ms,表示抛出异常前可以阻塞一段时间
        max.block.ms: 115000
        #该配置控制客户端等待服务器的响应的最长时间。
        #如果超时之前仍未收到响应,则客户端将在必要时重新发送请求,如果重试次数(retries)已用尽,则会使请求失败。 
        #此值应大于replica.lag.time.max.ms(broker配置),以减少由于不必要的生产者重试而导致消息重复的可能性。
        request.timeout.ms: 115000
        #等待send回调的最大时间。常用语重试,如果一定要发送,retries则配Integer.MAX
        #如果超过该时间:TimeoutException: Expiring 1 record(s) .. has passed since batch creation
        delivery.timeout.ms: 120000
        # 生产者在服务器响应之前能发多少个消息,若对消息顺序有严格限制,需要配置为1
        # max.in.flight.requests.per.connection: 1
         
 
    #https://kafka.apache.org/documentation/#consumerconfigs
    consumer:
      bootstrap-servers: 10.200.8.29:9092
      group-id: auto-dev #消费者组
      #消费方式: 在有提交记录的时候,earliest与latest是一样的,从提交记录的下一条开始消费
      # earliest:无提交记录,从头开始消费
      #latest:无提交记录,从最新的消息的下一条开始消费
      auto-offset-reset: earliest 
      enable-auto-commit: false #是否自动提交偏移量offset
      auto-commit-interval: 1S #前提是 enable-auto-commit=true。自动提交的频率
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 2
      properties:
        #如果在这个时间内没有收到心跳,该消费者会被踢出组并触发{组再平衡 rebalance}
        session.timeout.ms: 120000
        #最大消费时间。此决定了获取消息后提交偏移量的最大时间,超过设定的时间(默认5分钟),服务端也会认为该消费者失效。踢出并再平衡
        max.poll.interval.ms: 300000
        #配置控制客户端等待请求响应的最长时间。 
        #如果在超时之前没有收到响应,客户端将在必要时重新发送请求,
        #或者如果重试次数用尽,则请求失败。
        request.timeout.ms: 60000
        #订阅或分配主题时,允许自动创建主题。0.11之前,必须设置false
        allow.auto.create.topics: true
        #poll方法向协调器发送心跳的频率,为session.timeout.ms的三分之一
        heartbeat.interval.ms: 40000 
        #每个分区里返回的记录最多不超max.partitions.fetch.bytes 指定的字节
        #0.10.1版本后 如果 fetch 的第一个非空分区中的第一条消息大于这个限制
        #仍然会返回该消息,以确保消费者可以进行
        #max.partition.fetch.bytes=1048576  #1M

    listener:
      #当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效
      #manual_immediate:需要手动调用Acknowledgment.acknowledge()后立即提交
      ack-mode: manual_immediate
      missing-topics-fatal: true #如果至少有一个topic不存在,true启动失败。false忽略
      #type: single #单条消费?批量消费? #批量消费需要配合 consumer.max-poll-records
      type: batch
      concurrency: 2 #配置多少,就为为每个消费者实例创建多少个线程。多出分区的线程空闲
 
    template:
      default-topic: "COLA"

你看我都这么努力的分享知识给你了,鼓励一下又何妨O(∩_∩)O

你的打赏是对我最好的支持!

                  

 

 

生产环境遇到的问题

问题1:nested exception is org.apache.kafka.common.errors.RecordTooLargeException: 

nested exception is org.apache.kafka.common.errors.RecordTooLargeException: 

很明显,消息体目前过大。需要提高Kafka服务器能够接收的最大数据大小。

解决方法:提高Kafka服务器单条消息最大大小。配置参数 message.max.bytes

我的方法:生产环境大概每份文件不超过5M,调整服务器broker参数message.max.bytes=5*1024*1024

官方文档

##brokerConfiguration

message.max.bytes
The largest record batch size allowed by Kafka 
(after compression if compression is enabled). 
If this is increased and there are consumers 
older than 0.10.2, the consumers' fetch size 
must also be increased so that they can fetch
 record batches this large. In the latest message
 format version, records are always grouped into 
batches for efficiency. In previous message format
 versions, uncompressed records are not grouped
 into batches and this limit only applies to a
 single record in that case.
This can be set per topic with the topic
 level max.message.bytes config.

Type:	int
Default:	1048588 (1M)
Valid Values:	[0,...]
Importance:	high
Update Mode:	cluster-wide

问题2:Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 101: org.apache.kafka.common.errors.DisconnectException.

异常类:org.apache.kafka.common.errors.DisconnectException

异常类描述:Server disconnected before a request could be completed.

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.a.k.clients.FetchSessionHandler[442] - [Consumer clientId=consumer-1, groupId=auto-dev] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 101: org.apache.kafka.common.errors.DisconnectException.

问题定位:由于消息读写频率较高,因此kafka服务器的负载达到上限。所以有些消费者实例一直就poll不到数据。达到配置的时间【session.timeout.ms】、【request.timeout.ms】,消费者会被踢出Kafka。因此日志不断打印该错误。(其实另一方面也提示消费时间过长)

解决方法:提高consumer参数【session.timeout.ms】、【request.timeout.ms】的值。

我的方法:在application.yaml中为【session.timeout.ms】、【request.timeout.ms】这两个参数均调整到4分钟,即240000ms。

官方文档

##ConsumerConfiguration

request.timeout.ms
The configuration controls the maximum amount 
of time the client will wait for the response of a request. 
If the response is not received before the timeout elapses 
the client will resend the request if necessary or 
fail the request if retries are exhausted.

Type:	int
Default:	30000 (30 seconds)
Valid Values:	[0,...]
Importance:	medium
##ConsumerConfiguration

session.timeout.ms
The timeout used to detect client failures 
when using Kafka's group management facility. 
The client sends periodic heartbeats to indicate 
its liveness to the broker. If no heartbeats are 
received by the broker before the expiration of 
this session timeout, then the broker will 
remove this client from the group and initiate 
a rebalance. Note that the value must be 
in the allowable range as configured in the 
broker configuration by 
group.min.session.timeout.ms and group.max.session.timeout.ms.

Type:	int
Default:	45000 (45 seconds)
Valid Values:	
Importance:	high

问题3:TimeoutException: Expiring 1 record(s) .. has passed since batch creation

问题定位:消息已经投递到kafka,但是kafka来不及处理。提示kafka写性能达到瓶颈。

我的方法:可以从两方面修改。

(1)控制代码发送频率,即控制调用kafkaTemplate.send方法的频率。作为有经验的开发人员,不要问我怎么控制发送频率。代码里加入Thread.sleep就可以不是么

(2)在生产者参数中提高【delivery.timeout.ms】的时间。默认2分钟kafka没有响应则报异常。我们可以增加该时间,单位毫秒。但是有上限,不能超过【request.timeout.ms】与【linger.ms】之和。请阅读下面我贴过来的官方文档。

官方文档

##ProduerConfiguration

delivery.timeout.ms

An upper bound on the time to report success or 
failure after a call to send() returns. 
This limits the total time that a record will 
be delayed prior to sending, the time to await 
acknowledgement from the broker (if expected), 
and the time allowed for retriable send failures.
 The producer may report failure to send a record 
earlier than this config if either an unrecoverable
 error is encountered, the retries have been exhausted,
 or the record is added to a batch which reached 
an earlier delivery expiration deadline. 
The value of this config should be greater than 
or equal to the sum of request.timeout.ms and linger.ms.

Type:	int
Default:	120000 (2 minutes)
Valid Values:	[0,...]
Importance:	medium

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐