参考:https://www.jianshu.com/p/5370fff55cff

在使用时Kafka时,经常遇到大批量消息在队列中,如果一个消息一个消息的消费的话效率太低下了,这时候要用到批量消费消息。

批量监听器

从版本1.1开始,@KafkaListener可以被配置为批量接收从Kafka话题队列中的Message。要配置监听器容器工厂以创建批处理侦听器,需要设置batchListener属性为true,代码如下:

@Bean
KafkaListenerContainerFactory<?> batchFactory() {
  ConcurrentKafkaListenerContainerFactory<String, String> factory = new 
        ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
  factory.setBatchListener(true); // 开启批量监听
  return factory;
}

@Bean
public Map<String, Object> consumerConfigs() {
  Map<String, Object> props = new HashMap<>();
  props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); //设置每次接收Message的数量
  props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
  props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
  props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  return props;
}

批量接收

在@KafkaListener注解中声明工厂为batchFactory().

@KafkaListener(topics = "teemo", id = "consumer", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<?, ?>> list) {
  List<String> messages = new ArrayList<>();
  for (ConsumerRecord<?, ?> record : list) {
    Optional<?> kafkaMessage = Optional.ofNullable(record.value());
    // 获取消息
    kafkaMessage.ifPresent(o -> messages.add(o.toString()));
  }
  if (messages.size() > 0) {
    // 更新索引
    updateES(messages);
  }
}

综合示例 

package org.fiend.kafka.config;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Optional;

/**
 * @author langpf 2020/2/25
 */
@Component
public class KafkaReceiver {
    private static Logger log = LoggerFactory.getLogger(KafkaReceiver.class);

    /**
     * 单个消息接收
     * @param record rd
     */
    // @KafkaListener(id = "hades", autoStartup = "${listener.auto.startup}", topics = "oop, pui, que", concurrency = "2" )
    // @KafkaListener(id = "hades", autoStartup = "false", topics = "oop, pui, que", concurrency = "2" )
    // @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "1" }) })
    @KafkaListener(topics = {Constants.TOPIC_NAME})
    public void listen(ConsumerRecord<?, ?> record) {
        String value = (String) record.value();
        String topic = record.topic();

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            log.info("----------------- record =" + record);
            log.info("------------------ message =" + message);
        }
    }

    @KafkaListener(topics = {Constants.TOPIC_NAME})
    public void batchListen(List<ConsumerRecord<?, ?>> records) {
        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            if (kafkaMessage.isPresent()) {
                Object message = kafkaMessage.get();
                log.info("----------------- record =" + record);
                log.info("------------------ message =" + message);
            }
        }
    }

    /**
     * 批量接收kafka消息, 接收partition为0的消息
     * @param records re
     */
    @KafkaListener(id = "id0", topicPartitions = {@TopicPartition(topic = Constants.TOPIC_NAME, partitions = {"0"})})
    public void listenPartition0(List<ConsumerRecord<?, ?>> records) {
        log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id0 records size " + records.size());

        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("p0 Received message={}", message);
            }
        }
    }

    /**
     * 批量接收kafka消息, 接收partition为1的消息
     * @param records re
     */
    @KafkaListener(id = "id1", topicPartitions = {@TopicPartition(topic = Constants.TOPIC_NAME, partitions = {"1"})})
    public void listenPartition1(List<ConsumerRecord<?, ?>> records) {
        log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id1 records size " + records.size());

        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("p1 Received message={}", message);
            }
        }
    }
}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐