结论:如果消费者开启了批量消费的配置,那么必须同时配置一个当批量消费出现异常的处理器。否则仅配置启动消费者批量消费是会出现问题的。

@Bean("batchConsumerFactory")
    public KafkaListenerContainerFactory<?> consumerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig()));
        // 设置为批量消费,每个批次数量在Kafka配置参数中设置
        factory.setBatchListener(true);
        // 批量消费失败时的处理函数
        factory.setBatchErrorHandler((e, consumerRecords) -> {
            for (ConsumerRecord<?, ?> consumerRecord : consumerRecords) {
                log.error("批量消费异常:{}, 请求参数:{}", e.getMessage(), consumerRecord);
            }
        });
        // 设置消费者poll方法长轮询时间
        factory.getContainerProperties().setPollTimeout(500);
        return factory;
    }
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐