结论:如果消费者开启了批量消费的配置,那么必须同时配置一个当批量消费出现异常的处理器。否则仅配置启动消费者批量消费是会出现问题的。

@Bean("batchConsumerFactory")
    public KafkaListenerContainerFactory<?> consumerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig()));
        // 设置为批量消费,每个批次数量在Kafka配置参数中设置
        factory.setBatchListener(true);
        // 批量消费失败时的处理函数
        factory.setBatchErrorHandler((e, consumerRecords) -> {
            for (ConsumerRecord<?, ?> consumerRecord : consumerRecords) {
                log.error("批量消费异常:{}, 请求参数:{}", e.getMessage(), consumerRecord);
            }
        });
        // 设置消费者poll方法长轮询时间
        factory.getContainerProperties().setPollTimeout(500);
        return factory;
    }
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐