Kafka消费者处理消息失败后,无限消费该失败消息的问题
结论:如果消费者开启了批量消费的配置,那么必须同时配置一个当批量消费出现异常的处理器。否则仅配置启动消费者批量消费是会出现问题的。@Bean("batchConsumerFactory")public KafkaListenerContainerFactory<?> consumerFactory() {ConcurrentKafkaListenerContainerFactory&l
·
结论:如果消费者开启了批量消费的配置,那么必须同时配置一个当批量消费出现异常的处理器。否则仅配置启动消费者批量消费是会出现问题的。
@Bean("batchConsumerFactory")
public KafkaListenerContainerFactory<?> consumerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig()));
// 设置为批量消费,每个批次数量在Kafka配置参数中设置
factory.setBatchListener(true);
// 批量消费失败时的处理函数
factory.setBatchErrorHandler((e, consumerRecords) -> {
for (ConsumerRecord<?, ?> consumerRecord : consumerRecords) {
log.error("批量消费异常:{}, 请求参数:{}", e.getMessage(), consumerRecord);
}
});
// 设置消费者poll方法长轮询时间
factory.getContainerProperties().setPollTimeout(500);
return factory;
}
更多推荐
所有评论(0)