kafka手动设置ack时报以下错误:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.roomdis.micros.kafka.consumer.MessageConsumer.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)]
Bean [com.roomdis.micros.kafka.consumer.MessageConsumer@27068a50]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload={"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:08 CST 2018","content":"a89aca04-f6d0-4f70-93e9-fd0471165497:Wed Aug 01 19:45:08 CST 2018"}, headers={kafka_offset=112, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=kefuLogger}], failedMessage=GenericMessage [payload={"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:08 CST 2018","content":"a89aca04-f6d0-4f70-93e9-fd0471165497:Wed Aug 01 19:45:08 CST 2018"}, headers={kafka_offset=112, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=kefuLogger}]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:178) ~[spring-kafka-1.1.6.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72) ~[spring-kafka-1.1.6.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47) ~[spring-kafka-1.1.6.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:794) [spring-kafka-1.1.6.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:738) [spring-kafka-1.1.6.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2200(KafkaMessageListenerContainer.java:245) [spring-kafka-1.1.6.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:1031) [spring-kafka-1.1.6.RELEASE.jar:na]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_77]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_77]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload={"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:08 CST 2018","content":"a89aca04-f6d0-4f70-93e9-fd0471165497:Wed Aug 01 19:45:08 CST 2018"}, headers={kafka_offset=112, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=kefuLogger}], failedMessage=GenericMessage [payload={"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:08 CST 2018","content":"a89aca04-f6d0-4f70-93e9-fd0471165497:Wed Aug 01 19:45:08 CST 2018"}, headers={kafka_offset=112, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=kefuLogger}]
    ... 10 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] t

 

解决办法

增加两行配置
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐