kafka no record information is available

springcloud-config-bus报错

[KafkaConsumerDestination{consumerDestinationName='springCloudBus', partitions=1, dlqName='null'}.container-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
	at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:151)
	at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1401)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1108)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	at java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1116)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:983)
	at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1510)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:205)
	at com.sun.proxy.$Proxy257.commitSync(Unknown Source)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2366)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2361)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2347)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2161)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1133)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1059)
	... 4 common frames omitted

在这种情况下,异常是在记录处理之外(处理完成后)引起的。由于重新平衡导致提交失败,而且因为记录不再可用,所以无论如何也无法重新查找。它只是重新调用异常,并将无限期地重试。

解决

kafka加入max.poll.interval.ms配置,配置如下:

spring:
  kafka:
  	bootstrap-servers: 192.168.10.1:9092,192.168.10.2:9092,192.168.10.3:9092
  	consumer:
      #用于标识此使用者所属的使用者组的唯一字符串。
      group-id: xxx
      #ID在发出请求时传递给服务器: 用于服务器端日志记录。
      client-id: xxx
      #当Kafka中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,默认值为latest,表示自动将偏移重置为最新的偏移量
      #可选的值为latest, earliest, none
      auto-offset-reset: earliest
      #如果'enable.auto.commit'true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
      auto-commit-interval: 5000ms
      #如果为true,则消费者的偏移量将在后台定期提交,默认值为true
      enable-auto-commit: true
      #如果没有足够的数据立即满足“fetch.min.bytes”给出的要求,服务器在回答获取请求之前将阻塞的最长时间(以毫秒为单位)
      #默认值为500
      fetch-max-wait: 500ms
      #服务器应以字节为单位返回获取请求的最小数据量,默认值为1,对应的kafka的参数为fetch.min.bytes。
      fetch-min-size: 1
      #心跳与消费者协调员之间的预期时间(以毫秒为单位),默认值为3000
      heartbeat-interval: 3000ms
      #密钥的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializer
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #值的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #一次调用poll()操作时返回的最大记录数,默认值为500
      max-poll-records: 500
    properties:
      max:
        poll:
          interval:
            ms: 600000
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐