spring boot整合Kafka批量消费、并发消费
写在前面:各位看到此博客的小伙伴,如有不对的地方请及时通过私信我或者评论此博客的方式指出,以免误人子弟。多谢!由于Kafka的写性能非常高,因此肯定会碰到Kafka消息队列拥堵的情况。遇到这种情况,我们可以通过并发消费、批量消费的方法进行解决。之前配置生产者我没有在yml文件中直接配置,而是新建了一个配置类,在配置类中配置生产者,这样确实有点麻烦了,所以消费者就直接在yml进行配置了。1、先看下批
写在前面:各位看到此博客的小伙伴,如有不对的地方请及时通过私信我或者评论此博客的方式指出,以免误人子弟。多谢!
由于Kafka的写性能非常高,因此肯定会碰到Kafka消息队列拥堵的情况。遇到这种情况,我们可以通过并发消费、批量消费的方法进行解决。
之前配置生产者我没有在yml文件中直接配置,而是新建了一个配置类,在配置类中配置生产者,这样确实有点麻烦了,所以消费者就直接在yml进行配置了。
1、先看下批量消费,想要批量消费,首先要开启批量消费,通过listener.type属性设置为batch即可开启,看下代码吧:
spring:
kafka:
consumer:
group-id: test-consumer-group
bootstrap-servers: 118.190.152.59:9092
max-poll-records: 5 # 一次 poll 最多返回的记录数
listener:
type: batch # 开启批量消费
如上:如上设置了启用批量消费和批量消费每次最多消费记录数。这里设置 max-poll-records是5,并不是说如果没有达到5条消息,我们就一直等待。而是说一次poll最多返回的记录数为5。
配置好后,接着对消费者监听这边代码稍作修改,改成使用 List 来接收:
@KafkaListener(topics = {"mytopic"})
public void consumer(List<String> message){
System.out.println("接收到的消息:" + message);
}
如果使用ConsumerRecord类接收,也是一样的使用 List 来接收,就不贴代码了。
新增测试方法测试下效果:
@Transactional
@GetMapping("/send13")
public void test13() {
for (int i = 0; i < 23; i++) {
kafkaTemplate.send(topic, "msg-" + i);
}
}
启动项目,访问
http://localhost:8080/send13结果如下:
2、再来看下并发消费,为了加快消费,我们可以提高并发数,比如下面配置我们将并发设置为 3。注意:并发量根据实际分区数决定,必须小于等于分区数,否则会有线程一直处于空闲状态。
spring:
kafka:
consumer:
group-id: test-consumer-group
bootstrap-servers: 118.190.152.59:9092
max-poll-records: 5 # 一次 poll 最多返回的记录数
listener:
type: batch # 开启批量监听
concurrency: 3 # 设置并发数
我们设置concurrency为3,也就是将会启动3条线程进行监听,而要监听的topic有5个partition,意味着将有2条线程都是分配到2个partition,
还有1条线程分配到1个partition。
看下项目启动日志:
如上:有一个线程监听3,2分区,一个线程监听4,另一个线程监听1,0分区。感兴趣的小伙伴可以测试下消费信息的效率。
补充一下:
像之前配置生产者一样,通过自定义配置类的方式也是可以的,但是相对yml配置来说还是有点麻烦的,直接贴下代码记录一下:
/**
* 消费者配置
*/
@Configuration
public class KafkaConsumerConfig {
/**
* 消费者配置
* @return
*/
public Map<String,Object> consumerConfigs(){
Map<String,Object> props = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your ip:9092");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> batchFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
//并发数量
factory.setConcurrency(3);
//开启批量监听
factory.setBatchListener(true);
return factory;
}
}
同时监听器通过@KafkaListener注解的containerFactory 配置指定批量消费的工厂即可,如下:
@KafkaListener(topics = {"mytopic"},containerFactory = "batchFactory")
public void consumer(List<String> message){
System.out.println("接收到的消息:" + message);
}
更多推荐
所有评论(0)