概述:

spring-rabbit2.2版本后添加了批量消费, 网上没找到相关的用法,特此记录一下

环境:

jdk1.8 RabbitMQ3.8.3 新建springboot项目

配置:

引入amqp,如下

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.6.0</version>
</dependency>

修改application.yml配置如下

spring:
  rabbitmq:
    host: mymq.net
    virtual-host: /
    username: guest
    password: guest
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 250
        consumer-batch-enabled: true    #开启批量消费
        batch-size: 16                  #每次批量消费大小

指定批处理MessageListenerConverter(这里必须设置)

@Slf4j
@Component
public class BatchConfig {

    @Bean
    public String cusSimpleRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactory containerFactory) {
        log.info("设置批量-----");
        containerFactory.setBatchListener(true);
        return "cusSimpleRabbitListenerContainerFactory";
    }
}

配置消费者

@Slf4j
@Component
public class CallCompleteConsumer {

    @RabbitHandler
    @RabbitListener(queues = "my-consume-queue")
    public void onMessage(List<Message> messages, Channel channel) {

        final List<String> messageList = messages.stream()
                .map(each -> new String(each.getBody()))
                .collect(Collectors.toList());
        log.info("[onMessage] {}: {}", messages.size(), messageList);
        try {
            TimeUnit.MILLISECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            for (Message message : messages) {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

运行:

先创建好对列, 修改参数,启动即可

总结:

    消息批处理比较适合像处理结果批量入库批量操作效率高于单独操作的时候, 如处理结果批量更新到Mysql, 这里说一下ack与Kafka批处理的区别, Kafka批处理ack的时候提交的是offset, 而这里的批处理只针对单个消息的tag, 必须每个都要ack.

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐