结论:在配置文件中加上spring.kafka.listener.type=batch 。

完整报错信息如下:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.kafka.gateway.consumer.heartBeatConsumer.listensGroupPro(org.apache.kafka.clients.consumer.ConsumerRecords<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)]
Bean [com.kafka.gateway.consumer.heartBeatConsumer@3cceaa40]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.apache.kafka.clients.consumer.ConsumerRecords] for GenericMessage [payload={"index":"1000","ip":"A6-B1-C1-D6-FE-34","timestamp":1638263084,"type":"6","value":[]}, headers={kafka_offset=71, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@585c68b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=key, kafka_receivedTopic=heartbeat4, kafka_receivedTimestamp=1638495075056, kafka_acknowledgment=Acknowledgment for heartbeat4-0@71, kafka_groupId=gateway2}], failedMessage=GenericMessage [payload={"index":"1000","ip":"A6-B1-C1-D6-FE-34","timestamp":1638263084,"type":"6","value":[]}, headers={kafka_offset=71, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@585c68b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=key, kafka_receivedTopic=heartbeat4, kafka_receivedTimestamp=1638495075056, kafka_acknowledgment=Acknowledgment for heartbeat4-0@71, kafka_groupId=gateway2}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.apache.kafka.clients.consumer.ConsumerRecords] for GenericMessage [payload={"index":"1000","ip":"A6-B1-C1-D6-FE-34","timestamp":1638263084,"type":"6","value":[]}, headers={kafka_offset=71, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@585c68b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=key, kafka_receivedTopic=heartbeat4, kafka_receivedTimestamp=1638495075056, kafka_acknowledgment=Acknowledgment for heartbeat4-0@71, kafka_groupId=gateway2}], failedMessage=GenericMessage [payload={"index":"1000","ip":"A6-B1-C1-D6-FE-34","timestamp":1638263084,"type":"6","value":[]}, headers={kafka_offset=71, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@585c68b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=key, kafka_receivedTopic=heartbeat4, kafka_receivedTimestamp=1638495075056, kafka_acknowledgment=Acknowledgment for heartbeat4-0@71, kafka_groupId=gateway2}]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2636) [spring-kafka-2.8.0.jar:2.8.0]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2606) [spring-kafka-2.8.0.jar:2.8.0]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2567) [spring-kafka-2.8.0.jar:2.8.0]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2481) [spring-kafka-2.8.0.jar:2.8.0]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2403) [spring-kafka-2.8.0.jar:2.8.0]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2282) [spring-kafka-2.8.0.jar:2.8.0]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1956) [spring-kafka-2.8.0.jar:2.8.0]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1351) [spring-kafka-2.8.0.jar:2.8.0]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1342) [spring-kafka-2.8.0.jar:2.8.0]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1234) [spring-kafka-2.8.0.jar:2.8.0]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_211]
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) ~[na:1.8.0_211]
	at java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:1.8.0_211]
	at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_211]
	Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
		at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.checkAckArg(MessagingMessageListenerAdapter.java:374) ~[spring-kafka-2.8.0.jar:2.8.0]
		at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:352) ~[spring-kafka-2.8.0.jar:2.8.0]
		at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.0.jar:2.8.0]
		at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.0.jar:2.8.0]
		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2586) [spring-kafka-2.8.0.jar:2.8.0]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.apache.kafka.clients.consumer.ConsumerRecords] for GenericMessage [payload={"index":"1000","ip":"A6-B1-C1-D6-FE-34","timestamp":1638263084,"type":"6","value":[]}, headers={kafka_offset=71, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@585c68b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=key, kafka_receivedTopic=heartbeat4, kafka_receivedTimestamp=1638495075056, kafka_acknowledgment=Acknowledgment for heartbeat4-0@71, kafka_groupId=gateway2}], failedMessage=GenericMessage [payload={"index":"1000","ip":"A6-B1-C1-D6-FE-34","timestamp":1638263084,"type":"6","value":[]}, headers={kafka_offset=71, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@585c68b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=key, kafka_receivedTopic=heartbeat4, kafka_receivedTimestamp=1638495075056, kafka_acknowledgment=Acknowledgment for heartbeat4-0@71, kafka_groupId=gateway2}]
	at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:352) ~[spring-kafka-2.8.0.jar:2.8.0]
	at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.0.jar:2.8.0]
	at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.0.jar:2.8.0]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2586) [spring-kafka-2.8.0.jar:2.8.0]
	... 12 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.apache.kafka.clients.consumer.ConsumerRecords] for GenericMessage [payload={"index":"1000","ip":"A6-B1-C1-D6-FE-34","timestamp":1638263084,"type":"6","value":[]}, headers={kafka_offset=71, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@585c68b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=key, kafka_receivedTopic=heartbeat4, kafka_receivedTimestamp=1638495075056, kafka_acknowledgment=Acknowledgment for heartbeat4-0@71, kafka_groupId=gateway2}]
	at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145) ~[spring-messaging-5.3.13.jar:5.3.13]
	at org.springframework.kafka.annotation.KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaNullAwarePayloadArgumentResolver.java:46) ~[spring-kafka-2.8.0.jar:2.8.0]
	at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.3.13.jar:5.3.13]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-5.3.13.jar:5.3.13]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) ~[spring-messaging-5.3.13.jar:5.3.13]
	at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.0.jar:2.8.0]
	at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347) ~[spring-kafka-2.8.0.jar:2.8.0]
	... 15 common frames omitted

我的消费端写法如下:

@KafkaListener(topics = "heartbeat4", groupId = "gateway2")
    public void listensGroupPro(ConsumerRecords<String, String> records, Acknowledgment ack) {

当然我最一开始肯定不是这么简单的写法,只不过后面这个bug,我逐步去掉了一些配置。

我看报错是类型转换异常,我就又换成了

@KafkaListener(topics = "heartbeat4", groupId = "gateway2")
    public void listensGroupPro(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
        System.out.println(records);
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.value());
        }
        // 手动提交offset,里面的逻辑是采用的同步提交,尝试3次
        ack.acknowledge();
    }

可是还是报错,是在for循环里面报的错,并且离谱的是sout可以正常打印,但是打断点发现我的字符串竟然被拆分了!

 

 这显然不对啊,我后面又尝试把String换成?,还是不行,debug里面的源码也是找的迷路,看了很多博客,都是正常使用,其实我最一开始也是这样没问题的,不过昨天一重启批量消费就不行了!后来看很多博客配置,他们都是在yml里面正常配置,然后去自己手写一些工厂类读取配置然后注入容器。

factory.setBatchListener(batchListener);

我觉得这样的话,springkafka出的就没有意义了。后来果然让我找到了这个配置

spring.kafka.listener.type=batch 

而默认是single的

我也不知道为什么要这样默认,不过改掉之后就可以批量消费了。最不能理解的是为什么一开始我没有加却可以批量消费正常使用,然后突然拉闸。

生产端代码:

@RestController
@RequestMapping("/mmm")
public class TestProducer {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @GetMapping(value = "/aaa")
    public  void aaa(){
        for (int i = 0; i < 10 ; i++) {
            kafkaTemplate.send("heartbeat4", "key", "{\"index\":\"1000\",\"ip\":\"666666666666666\",\"timestamp\":1638263084,\"type\":\"6\",\"value\":[]}");
            System.out.println("发送成功");
        }
    }
}

完整配置:

spring:
  kafka:
    bootstrap-servers: 你的server
    producer:
      retries: 3
      batch-size: 16384
      buffer-memory: 33554432
      acks: 1
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: default-group
      enable-auto-commit: false
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 500

    listener:
      # 当每⼀条记录被消费者监听器(ListenerConsumer)处理之后提交
      # RECORD
      # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
      # BATCH
      # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上
      # 次提交时间⼤于TIME时提交
      # TIME
      # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理
      # record数量⼤于等于COUNT时提交
      # COUNT
      # TIME | COUNT 有⼀个条件满⾜时提交
      # COUNT_TIME
      # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后, ⼿动调
      # ⽤Acknowledgment.acknowledge()后提交
      # MANUAL
      # ⼿动调⽤Acknowledgment.acknowledge()后⽴即提交,⼀般使⽤这种
      # MANUAL_IMMEDIATE
      ack-mode: MANUAL_IMMEDIATE
      type: batch

如有疑问可以评论区提问。

如需转载,请务必声明原处。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐