写在前面:各位看到此博客的小伙伴,如有不对的地方请及时通过私信我或者评论此博客的方式指出,以免误人子弟。多谢!

由于Kafka的写性能非常高,因此肯定会碰到Kafka消息队列拥堵的情况。遇到这种情况,我们可以通过并发消费、批量消费的方法进行解决。

之前配置生产者我没有在yml文件中直接配置,而是新建了一个配置类,在配置类中配置生产者,这样确实有点麻烦了,所以消费者就直接在yml进行配置了。

1、先看下批量消费,想要批量消费,首先要开启批量消费,通过listener.type属性设置为batch即可开启,看下代码吧:

spring:
  kafka:
    consumer:
      group-id: test-consumer-group
      bootstrap-servers: 118.190.152.59:9092
      max-poll-records: 5 # 一次 poll 最多返回的记录数
    listener:
      type: batch # 开启批量消费

如上:如上设置了启用批量消费和批量消费每次最多消费记录数。这里设置 max-poll-records是5,并不是说如果没有达到5条消息,我们就一直等待。而是说一次poll最多返回的记录数为5。

配置好后,接着对消费者监听这边代码稍作修改,改成使用 List 来接收:

@KafkaListener(topics = {"mytopic"})
public void consumer(List<String> message){
	System.out.println("接收到的消息:" + message);
}

如果使用ConsumerRecord类接收,也是一样的使用 List 来接收,就不贴代码了。

新增测试方法测试下效果:

@Transactional
@GetMapping("/send13")
public void test13() {
	for (int i = 0; i < 23; i++) {
		kafkaTemplate.send(topic, "msg-" + i);
	}
}

启动项目,访问
http://localhost:8080/send13结果如下:

2、再来看下并发消费,为了加快消费,我们可以提高并发数,比如下面配置我们将并发设置为 3。注意:并发量根据实际分区数决定,必须小于等于分区数,否则会有线程一直处于空闲状态

spring:
  kafka:
    consumer:
      group-id: test-consumer-group
      bootstrap-servers: 118.190.152.59:9092
      max-poll-records: 5 # 一次 poll 最多返回的记录数
    listener:
      type: batch # 开启批量监听
      concurrency: 3 # 设置并发数

我们设置concurrency为3,也就是将会启动3条线程进行监听,而要监听的topic有5个partition,意味着将有2条线程都是分配到2个partition,
还有1条线程分配到1个partition。

看下项目启动日志:

如上:有一个线程监听3,2分区,一个线程监听4,另一个线程监听1,0分区。感兴趣的小伙伴可以测试下消费信息的效率。

补充一下:

像之前配置生产者一样,通过自定义配置类的方式也是可以的,但是相对yml配置来说还是有点麻烦的,直接贴下代码记录一下:

/**
 * 消费者配置
 */
@Configuration
public class KafkaConsumerConfig {
 
    /**
     * 消费者配置
     * @return
     */
    public Map<String,Object> consumerConfigs(){
        Map<String,Object> props = new HashMap<>();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your ip:9092");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
 
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        //并发数量
        factory.setConcurrency(3);
        //开启批量监听
        factory.setBatchListener(true);
        return factory;
    }
}

 同时监听器通过@KafkaListener注解的containerFactory 配置指定批量消费的工厂即可,如下:

@KafkaListener(topics = {"mytopic"},containerFactory = "batchFactory")
public void consumer(List<String> message){
	System.out.println("接收到的消息:" + message);
}

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐