SpringBoot 的支持

SpringBoot 已经提供了对 AMQP 协议完全支持的 spring-boot-starter-amqp 依赖,引入此依赖即可快速方便的在 SpringBoot 中使用 RabbitMQ。参考:Spring AMQP

特点

  • 用于异步处理消费消息的监听器容器。
  • 用于发送和接收消息的 RabbitTemplate。
  • RabbitAdmin 用于自动声明队列、交换和绑定。

RabbitAdmin

作用

  • declareExchange:创建交换机。
  • deleteExchange:删除交换机。
  • declareQueue:创建队列。
  • deleteQueue:删除队列。
  • purge:清空队列。
  • declareBinding:新建绑定关系。
  • removeBinding:删除绑定关系。
  • getQueueProperties:查询队列属性。

加粗的为常用。

创建方式

// 连接工厂
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");

// 使用连接工厂创建 RabbitAdmin
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);

RabbitAdmin 方法

编程式实现

@Configuration
public class RabbitConfig {

    public static final String EXCHANGE_NAME = "exchange.cat.dog";
    public static final String EXCHANGE_DLX  = "exchange.dlx";
    public static final String QUEUE_NAME    = "queue.cat";
    public static final String QUEUE_DLX     = "queue.dlx";
    public static final String KEY_NAME      = "key.yingduan";
    public static final String KEY_DLX       = "#";

    @PostConstruct
    void rabbitAdmin() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");
      	
      	// 创建 RabbitAdmin
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);

        // 声明交换机
        DirectExchange directExchange = new DirectExchange(EXCHANGE_NAME);
        rabbitAdmin.declareExchange(directExchange);

        // 声明队列
        Queue queue = new Queue(QUEUE_NAME);
        rabbitAdmin.declareQueue(queue);

        // 声明绑定关系
        // 目的地名称、目的地类型、绑定交换机、绑定 key、参数
        Binding binding = new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, KEY_NAME, null);
        rabbitAdmin.declareBinding(binding);
    }

}

声明式实现(推荐)

@Slf4j
@Configuration
public class RabbitConfig {

    public static final String EXCHANGE_NAME = "exchange.cat.dog";
    public static final String EXCHANGE_DLX  = "exchange.dlx";
    public static final String QUEUE_NAME    = "queue.cat";
    public static final String QUEUE_DLX     = "queue.dlx";
    public static final String KEY_NAME      = "key.yingduan";
    public static final String KEY_DLX       = "#";

    @Bean
    ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");
        return connectionFactory;
    }

    @Bean
    RabbitAdmin rabbitAdmin(@Autowired ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    Exchange exchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }

    @Bean
    Queue queue() {
        return new Queue(QUEUE_NAME);
    }

    @Bean
    Binding binding() {
        // 目的地名称、目的地类型、绑定交换机、绑定 key、参数
        return new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, KEY_NAME, null);
    }

}

注意,以上配置再启动 SpringBoot 并不会立马创建交换机、队列、绑定,SpringBoot AMQP 有懒加载,需要等到使用 connection 时才会创建。什么是使用 connection

  • 比如创建 connection
@Bean
ConnectionFactory connectionFactory() {
  CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  connectionFactory.setHost("127.0.0.1");
  connectionFactory.setUsername("admin");
  connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");
  connectionFactory.createConnection();
  return connectionFactory;
}
  • 再比如监听了队列
@RabbitListener(queues = {"test"})
void test() {
  log.info("【测试监听消息】");
}

死信队列机制

死信队列需要在创建 Queue 时指定对应属性:

@Bean
Queue queue() {
  // 配置声明队列时使用的参数
  Map<String, Object> args = new HashMap<>(1);
  // 设置死信队列指向的交换机
  args.put("x-dead-letter-exchange", EXCHANGE_DLX);

  return new Queue(QUEUE_NAME, true, false, false, args);
}

RabbitTemplate

RabbitTemplate 是 SpringBoot AMQP 提供的快速发 RabbitMQ 消息的模板类,与 RestTemplate 有类似之处,意指方便、简单、快速的发 RabbitMQ 消息。

创建

@Bean
RabbitTemplate rabbitTemplate(@Autowired ConnectionFactory connectionFactory) {
  return new RabbitTemplate(connectionFactory);
}

发送消息

// 通过 Spring 到处注入使用即可。
rabbitTemplate.send(EXCHANGE_NAME, KEY_NAME, new Message("HelloWorld 中国".getBytes(StandardCharsets.UTF_8)))
rabbitTemplate.convertAndSend(EXCHANGE_NAME, KEY_NAME, "HelloWorld 中国");
Message message = rabbitTemplate.sendAndReceive(RabbitConfig.EXCHANGE_NAME, RabbitConfig.KEY_NAME, new Message("HelloWorld 中国".getBytes(StandardCharsets.UTF_8)));
  • send(final String exchange, final String routingKey, final Message message)(常用)

普通的消息发送,Message 的带参构造中可以传递参数,比如消息过期时间。

  • convertAndSend(String exchange, String routingKey, final Object object)(常用)

可以转换 Java 对象成 AMQP 消息进行发送。

  • Message sendAndReceive(final String exchange, final String routingKey, final Message message)

阻塞等待 5 秒钟,返回的 Message 就是服务端返回的数据,阻塞时间可以使用 rabbitTemplate.setReplyTimeout(10000) 设置。

发送端确认机制 和 消息返回机制

之前的《RabbitMQ 消息百分百投递方案》中有详细的记录过非 SpringBoot 的发送端确认机制消息返回机制。那改成 SpringBoot AMQP 之后肯定也是支持的。之前推荐使用同步单条消息确认机制,可以准确知道是哪一条消息出现问题方便做处理。同步多条异步 都不好确定是具体哪一条出现问题。

SpringBoot AMQP 提供的需要先配置 connectionFactory

@Bean
ConnectionFactory connectionFactory() {
  CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  connectionFactory.setHost("127.0.0.1");
  connectionFactory.setUsername("admin");
  connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");
  // 发送端确认的类型
  connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.SIMPLE);
  // 开启消息返回机制
  connectionFactory.setPublisherReturns(true);
  return connectionFactory;
}

RibbitTemplate 中配置回调函数:

@Bean
RabbitTemplate rabbitTemplate(@Autowired ConnectionFactory connectionFactory) {
  RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

  // 开启 Mandatory
  rabbitTemplate.setMandatory(true);
  // 配置 发送端确认 回调函数
  rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    log.info("【发送端确认】- 入参; correlationData: ${}$, ack: ${}$, cause: ${}$", correlationData, ack, cause);
  });
  // 配置 消息返回 回调函数,只有在消息没有从 Exchange 正确路由到 Queue 时才有回调。
  rabbitTemplate.setReturnsCallback(returned -> {
    log.info("【消息返回】- 入参; returned: ${}$", returned);
  });

  return rabbitTemplate;
}

show:

2022-07-05 13:24:16.549  INFO 59768 --- [nectionFactory1] com.lynchj.rabbitmq.config.RabbitConfig  : 【消息返回】- 入参; returned: $ReturnedMessage [message=(Body:'[B@3589027c(byte[17])' MessageProperties [headers={}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=exchange.cat.dog, routingKey=key.yingduan]$
2022-07-05 13:24:16.550  INFO 59768 --- [nectionFactory2] com.lynchj.rabbitmq.config.RabbitConfig  : 【发送端确认】- 入参; correlationData: $null$, ack: $true$, cause: $null$

ConfirmType.CORRELATED

上面的配置,在发送端确认时是无法区分消息是哪一个的,观察日志也能看出来,就打印了一个 ack 的值。要想关联上对应的消息需要做如下配置:

// 发送端确认的类型从 SIMPLE 更改为 CORRELATED
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);

CORRELATED:指有关联性的。

在发送消息是修改如下:

// 发送消息时,增加 CorrelationData 字段,在发送端确认的回调函数中会回传过来。
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.send(
  RabbitConfig.EXCHANGE_NAME,
  RabbitConfig.KEY_NAME,
  new Message("HelloWorld 中国".getBytes(StandardCharsets.UTF_8)),
  correlationData
);

show:

2022-07-05 13:30:09.779  INFO 54416 --- [nectionFactory1] com.lynchj.rabbitmq.config.RabbitConfig  : 【发送端确认】- 入参; correlationData: $CorrelationData [id=976c94a6-2fa8-45dd-84e1-691c0db31460]$, ack: $true$, cause: $null$

SimpleMessageListenerContainer

SimpleMessageListenerContainer 可以帮助在开发中高效的监听消息,可以设置坚挺队列、设置消费者数量、重回队列、消息确认模式等等。主要功能如下:

  • 设置同时监听多个队列、自动启动、自动配置RabbitMQ。
  • 设置消费者数量(最大数量、最小数量、批量消费)。
  • 设置消息确认模式、是否重回队列、异常捕获。
  • 设置是否独占、其他消费者属性等。
  • 设置具体的监听器、消息转换器等。
  • 支持动态设置,运行中修改监听器配置。

代码实现

@Bean
SimpleMessageListenerContainer messageListenerContainer(@Autowired ConnectionFactory connectionFactory) {
  SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
  // 监听队列,可多个
  messageListenerContainer.setQueueNames(QUEUE_NAME);
  // 并发处理的线程最小数目,不能大于 maxConcurrentConsumers
  messageListenerContainer.setConcurrentConsumers(1);
  // 并发处理的线程最大数目,不能小于 concurrentConsumers
  messageListenerContainer.setMaxConcurrentConsumers(1);
  // Ack 的方式
  messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  // 消费端限流
  messageListenerContainer.setPrefetchCount(1);
  // 设置监听消息处理方法
  messageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
    log.info("【消费消息】- 入参;message: ${}$", message);
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  });

  return messageListenerContainer;
}

MessageListenerAdapter

上边实现的消费者监听是通过 messageListenerContainer.setMessageListener() 方法实现,业务代码写到了配置的代码中,耦合性比较强,更优雅一点的做法是使用 MessageListenerAdapter

@Bean
SimpleMessageListenerContainer messageListenerContainer(@Autowired ConnectionFactory connectionFactory) {
  SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
  // 监听队列,可多个
  messageListenerContainer.setQueueNames(QUEUE_NAME);
  // 并发处理的线程最小数目,不能大于 maxConcurrentConsumers
  messageListenerContainer.setConcurrentConsumers(1);
  // 并发处理的线程最大数目,不能小于 concurrentConsumers
  messageListenerContainer.setMaxConcurrentConsumers(1);
  // Ack 的方式
  messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  // 消费端限流
  messageListenerContainer.setPrefetchCount(1);
  // 设置监听消息处理方法
  /*messageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("【消费消息】- 入参;message: ${}$", message);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        });*/

  // 创建消息监听适配器
  MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
  messageListenerAdapter.setDelegate(handleMessage);
  
  // 设置处理消息的适配器
  messageListenerContainer.setMessageListener(messageListenerAdapter);

  return messageListenerContainer;
}

handleMessage 是注入的另一个类:

@Slf4j
@Component
public class HandleMessage {

    void handleMessage(byte[] message) throws IOException {
        log.info("【消费消息】- 入参;message: ${}$", new String(message, StandardCharsets.UTF_8));
    }

}

经过包装一层 MessageListenerAdapter 适配器,可以把真实的处理方法解耦出去,MessageListenerAdaptersetDelegate() 方法设置了任意一个 Object,等到有消息消费时,会调用到这个 Object 的 handleMessage 方法,这个方法名是 MessageListenerAdapter 内部的一个常量:

ORIGINAL_DEFAULT_LISTENER_METHOD

也可以通过调用 MessageListenerAdaptersetDefaultListenerMethod() 方法来更改默认调用方法名。

还可以配置监听多个队列,并给不同的队列设置不同的处理方法:

// 监听多个队列
messageListenerContainer.setQueueNames("cat", "dog", "queue.dog.cat");

// 创建消息监听适配器
MessageListenerAdapter adapter = new MessageListenerAdapter(handleMessage);
// 设置真实处理业务消息的默认方法名称,如果没有设置,那么默认的处理器中的默认方式是 handleMessage 方法
adapter.setDefaultListenerMethod("onMessage");

// 配置队列与真实处理业务消息的方法对应名称
Map<String, String> queueOrTagToMethodName = new HashMap<>(8);
queueOrTagToMethodName.put("cat", "onCat");
queueOrTagToMethodName.put("dog", "onDog");
queueOrTagToMethodName.put("queue.dog.cat", "onInfo");
adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);

// 设置处理消息的适配器
messageListenerContainer.setMessageListener(adapter);
@Slf4j
@Component
public class HandleMessage {

    void handleMessage(byte[] message) {
        log.info("【消费消息】- 入参;message: ${}$", new String(message, StandardCharsets.UTF_8));
    }

    void onCat(byte[] message) {
        log.info("【消费消息】- 入参;message: ${}$", new String(message, StandardCharsets.UTF_8));
    }

    void onDog(byte[] message) {
        log.info("【消费消息】- 入参;message: ${}$", new String(message, StandardCharsets.UTF_8));
    }

    void onInfo(byte[] message) {
        log.info("【消费消息】- 入参;message: ${}$", new String(message, StandardCharsets.UTF_8));
    }
    
}

注意:美中不足的是 MessageListenerAdapter 中适配的真实处理业务消息的方法入参只能是 byte[]

MessageConverter

先说说其作用,之前收发消息时,使用了 Byte[] 数组作为消息体,而在编写业务逻辑时,需要使用 Java 对象,这样就避免不了要来回从 Byte[] <> String <> Java 对象之间的相互转换。MessageConverter 就是用来在收发消息时自动转换 AMQP 内部消息和 Java 对象的

MessageConverter 本身是接口,无法直接使用,不过 AMQP 内已经提供了一个其实现 org.springframework.amqp.support.converter.Jackson2JsonMessageConverter 方便直接使用,一般其况下使用这个就足够了,因为项目中大部分应该都是 JSON 形式的数据。当然,如果出现一些比较少见的格式,也可以自定义,只需要重写 toMessagefromMessage 即可。

Jackson2JsonMessageConverter

Student Model:

@NoArgsConstructor
@AllArgsConstructor
@Data
public class Student {

    private String name;

    private Integer age;

}

发消息的方法:

void send() throws Exception {
  // 发送消息时,增加 CorrelationData 字段,在发送端确认的回调函数中会回传过来。
  CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

  Student student = new Student("大漠知秋", 18);
  ObjectMapper objectMapper = new ObjectMapper();
  String s = objectMapper.writeValueAsString(student);

  rabbitTemplate.send(
    RabbitConfig.EXCHANGE_NAME,
    RabbitConfig.KEY_NAME,
    new Message(s.getBytes(StandardCharsets.UTF_8)),
    correlationData
  );
}

消费者监听配置:

@Bean
SimpleMessageListenerContainer messageListenerContainer(@Autowired ConnectionFactory connectionFactory) {
  SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
  // 监听队列,可多个
  messageListenerContainer.setQueueNames(QUEUE_NAME);
  // 并发处理的线程最小数目,不能大于 maxConcurrentConsumers
  messageListenerContainer.setConcurrentConsumers(1);
  // 并发处理的线程最大数目,不能小于 concurrentConsumers
  messageListenerContainer.setMaxConcurrentConsumers(1);
  // Ack 的方式
  messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  // 消费端限流
  messageListenerContainer.setPrefetchCount(1);

  // 创建消息监听适配器
  MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
  messageListenerAdapter.setDelegate(handleMessage);

  // 配置 MessageConverter
  Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
  messageListenerAdapter.setMessageConverter(messageConverter);

  // 设置处理消息的适配器
  messageListenerContainer.setMessageListener(messageListenerAdapter);

  return messageListenerContainer;
}

HandleMessage

@Slf4j
@Component
public class HandleMessage {

    void handleMessage(Student student) {
        log.info("【消费消息】- 入参;message: ${}$", student);
    }

}

如果仅仅是这样配置上 MessageConverter 就启动的话会报如下错:

Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Failed to invoke target method 'handleMessage' with argument type = [class java.util.LinkedHashMap], value = [{{name=大漠知秋, age=18}}]

Jackson2JsonMessageConverter 默认转换的 Java 对象为 LinkedHashMap,而在 handleMessage 处理方法中的参数是 Student,所以就报错了。需要指定一下类型:

// 配置 MessageConverter
Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
messageConverter.setClassMapper(new ClassMapper() {
  @Override
  public void fromClass(Class<?> clazz, MessageProperties properties) {

  }

  @Override
  public Class<?> toClass(MessageProperties properties) {
    return Student.class;
  }
});
messageListenerAdapter.setMessageConverter(messageConverter);

show:

2022-07-05 17:19:22.724  INFO 34140 --- [enerContainer-1] c.lynchj.rabbitmq.handle.HandleMessage   : 【消费消息】- 入参;message: $Student(name=大漠知秋, age=18)$

@RabbitListener(终极监听方案)

使用此方案做监听消息功能,就可以把之前的 SimpleMessageListenerContainer 进行监听的方案舍弃掉了,就是这么的喜新厌旧,不过之前的 SimpleMessageListenerContainer 也不是一无是处,学过之后可以更好的理解内部的一些逻辑。

@RabbitListener 的特点:

  • RabbitListener 是 SpringBoot 架构中监听消息的终极方案
  • RabbitListener 使用注解声明,对业务代码无侵入。
  • RabbitListener 可以在 SpringBoot 配置文件中进行配置。

@RabbitListener 本身是 Java 中的注解,可以搭配其他注解一起使用:

  • @Exchange:自动声明 Exchange。
  • @Queue:自动声明队列。
  • @QueueBinding:自动声明绑定关系。

基本使用

首先在 RabbitConfig 中新增创建 RabbitListenerContainerFactory 的 Bean,看名字应该就知道是用来替换掉 SimpleMessageListenerContainer 的工厂。方便后边使用 @RabbitListener 时创建 ListenerContainer

@Slf4j
@Configuration
public class RabbitConfig {

    public static final String EXCHANGE_NAME = "exchange.cat.dog";
    public static final String EXCHANGE_DLX  = "exchange.dlx";
    public static final String QUEUE_NAME    = "queue.cat";
    public static final String QUEUE_DLX     = "queue.dlx";
    public static final String KEY_NAME      = "key.yingduan";
    public static final String KEY_DLX       = "#";

    public static final String RABBIT_ADMIN                      = "rabbitAdmin";
    public static final String RABBIT_LISTENER_CONTAINER_FACTORY = "rabbitListenerContainerFactory";

    @Bean
    ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");
        // 发送端确认的类型
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        // 开启消息返回机制
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    }

    @Bean(name = RABBIT_ADMIN)
    RabbitAdmin rabbitAdmin(@Autowired ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    Exchange exchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }

    @Bean
    Queue queue() {
        // 配置声明队列时使用的参数
        Map<String, Object> args = new HashMap<>(1);
        // 设置死信队列指向的交换机
        args.put("x-dead-letter-exchange", EXCHANGE_DLX);

        return new Queue(QUEUE_NAME, true, false, false, args);
    }

    @Bean
    Binding binding() {
        // 目的地名称、目的地类型、绑定交换机、绑定 key、参数
        return new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, KEY_NAME, null);
    }

    @Bean
    RabbitTemplate rabbitTemplate(@Autowired ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

        // 开启 Mandatory
        rabbitTemplate.setMandatory(true);
        // 配置 发送端确认 回调函数
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("【发送端确认】- 入参; correlationData: ${}$, ack: ${}$, cause: ${}$", correlationData, ack, cause);
        });
        // 配置 消息返回 回调函数,只有在消息没有从 Exchange 正确路由到 Queue 时才有回调。
        rabbitTemplate.setReturnsCallback(returned -> {
            log.info("【消息返回】- 入参; returned: ${}$", returned);
        });

        return rabbitTemplate;
    }

    @Bean(name = RABBIT_LISTENER_CONTAINER_FACTORY)
    RabbitListenerContainerFactory rabbitListenerContainerFactory(@Autowired ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory listenerContainerFactory = new SimpleRabbitListenerContainerFactory();
        listenerContainerFactory.setConnectionFactory(connectionFactory);
        return listenerContainerFactory;
    }

}

发送端:

@Slf4j
@Component
public class PublisherConfirm {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    void send() throws Exception {
        // 发送消息时,增加 CorrelationData 字段,在发送端确认的回调函数中会回传过来。
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        Student student = new Student("大漠知秋", 18);
        ObjectMapper objectMapper = new ObjectMapper();
        String s = objectMapper.writeValueAsString(student);

        rabbitTemplate.send(
                RabbitConfig.EXCHANGE_NAME,
                RabbitConfig.KEY_NAME,
                new Message(s.getBytes(StandardCharsets.UTF_8)),
                correlationData
        );
    }

}

消费者端:

@Slf4j
@Component
public class RabbitListenerTest {

    @RabbitListener(
            containerFactory = RabbitConfig.RABBIT_LISTENER_CONTAINER_FACTORY,
            queues = {RabbitConfig.QUEUE_NAME}
    )
    void listenCat(@Payload Message message) {
        log.info("【消费消息】- 入参;message: ${}$", message);
        log.info("【消费消息】- student: ${}$", new String(message.getBody(), StandardCharsets.UTF_8));
    }

}

show:

2022-07-05 18:03:03.101  INFO 60560 --- [ntContainer#0-1] c.l.rabbitmq.handle.RabbitListenerTest   : 【消费消息】- 入参;message: $(Body:'[B@251d4c4b(byte[32])' MessageProperties [headers={spring_listener_return_correlation=0e26e018-0e0a-43af-a197-67428c8fc800, spring_returned_message_correlation=e8937d7d-1257-46a2-93ec-e65bd2fac5ad}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=exchange.cat.dog, receivedRoutingKey=key.yingduan, deliveryTag=1, consumerTag=amq.ctag-A8eb9Qb1Uwyrdz6KcsvwBA, consumerQueue=queue.cat])$
2022-07-05 18:03:03.101  INFO 60560 --- [ntContainer#0-1] c.l.rabbitmq.handle.RabbitListenerTest   : 【消费消息】- student: ${"name":"大漠知秋","age":18}$

使用 bindings 创建 Exchange、Queue、Binding

简化后的 RabbitConfig

@Slf4j
@Configuration
public class RabbitConfig {

    public static final String EXCHANGE_NAME = "exchange.cat.dog";
    public static final String EXCHANGE_DLX  = "exchange.dlx";
    public static final String QUEUE_NAME    = "queue.cat";
    public static final String QUEUE_DLX     = "queue.dlx";
    public static final String KEY_NAME      = "key.yingduan";
    public static final String KEY_DLX       = "#";

    public static final String RABBIT_ADMIN                      = "rabbitAdmin";
    public static final String RABBIT_LISTENER_CONTAINER_FACTORY = "rabbitListenerContainerFactory";

    @Bean
    ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");
        // 发送端确认的类型
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        // 开启消息返回机制
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    }

    @Bean(name = RABBIT_ADMIN)
    RabbitAdmin rabbitAdmin(@Autowired ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    RabbitTemplate rabbitTemplate(@Autowired ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

        // 开启 Mandatory
        rabbitTemplate.setMandatory(true);
        // 配置 发送端确认 回调函数
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("【发送端确认】- 入参; correlationData: ${}$, ack: ${}$, cause: ${}$", correlationData, ack, cause);
        });
        // 配置 消息返回 回调函数,只有在消息没有从 Exchange 正确路由到 Queue 时才有回调。
        rabbitTemplate.setReturnsCallback(returned -> {
            log.info("【消息返回】- 入参; returned: ${}$", returned);
        });

        return rabbitTemplate;
    }

    @Bean(name = RABBIT_LISTENER_CONTAINER_FACTORY)
    RabbitListenerContainerFactory rabbitListenerContainerFactory(@Autowired ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory listenerContainerFactory = new SimpleRabbitListenerContainerFactory();
        listenerContainerFactory.setConnectionFactory(connectionFactory);
        return listenerContainerFactory;
    }

}

RabbitListenerTest

@Slf4j
@Component
public class RabbitListenerTest {

    @RabbitListener(
            containerFactory = RabbitConfig.RABBIT_LISTENER_CONTAINER_FACTORY,
            // 指定 RabbitAdmin,创建 Exchange、Queue、Binding 时使用
            admin = RabbitConfig.RABBIT_ADMIN,
            // 绑定关系,没有的 Exchange、Queue、Binding 没有的会自动创建。
            bindings = {
                    // 第一个绑定关系,可以多个
                    @QueueBinding(
                            // 队列
                            value = @Queue(
                                    // 队列名
                                    name = RabbitConfig.QUEUE_NAME,
                                    // 队列参数
                                    arguments = {
                                            // 队列中消息超时时间
                                            @Argument(
                                                    name = "x-message-ttl",
                                                    value = "1000",
                                                    type = "java.lang.Integer"
                                            ),
                                            // 死信队列配置信息
                                            @Argument(
                                                    name = "x-dead-letter-exchange",
                                                    value = RabbitConfig.EXCHANGE_DLX,
                                                    // 默认值就是 String, 也可以不写
                                                    type = "java.lang.String"
                                            )
                                    }
                            ),
                            // 交换机
                            exchange = @Exchange(
                                    // 交换机名
                                    name = RabbitConfig.EXCHANGE_NAME
                            ),
                            // 绑定 Key
                            key = {RabbitConfig.KEY_NAME}
                    )
            }
    )
    void listenCat(@Payload Message message) {
        log.info("【消费消息】- 入参;message: ${}$", message);
        log.info("【消费消息】- student: ${}$", new String(message.getBody(), StandardCharsets.UTF_8));
    }

}

这种方式注解写到崩溃,不建议使用。。。

SpringBoot 使用 RabbitMQ 终极方案

SpringBoot 的开发原则就是约定大于配置,上面的代码中,还存在着不少 @Bean 的配置代码,这显然很不 SpringBoot,应该把一些常规配置,配置到 .yml.properties 中,让项目可以从配置文件中自动加载好 Bean,对此,SpringBoot AMQP 包提供了对应的支持。

配置文件(.yml/.properties)

spring:
  rabbitmq:
    host: 'localhost'
    port: 5672
    username: 'admin'
    password: 'kzh_mxg4vfb2QRP*xkv'
    virtual-host: '/'
    # 发送端确认机制开启,并且使用关联性的类型
    publisher-confirm-type: correlated
    # 开启消息返回机制
    publisher-returns: true
    template:
      # 开启委托,配合 publisher-returns 使用
      mandatory: true
    listener:
      simple:
        # Ack 模式
        acknowledge-mode: manual
        # 消费者限流
        prefetch: 10
        # 并发处理的线程最小数目,不能大于 max-concurrency
        concurrency: 3
        # 并发处理的线程最大数目,不能小于 concurrency
        max-concurrency: 5

RabbitConfig

主要是配置发送端确认回调消息返回回调ExchangeQueueBinding 的创建。

@Slf4j
@Configuration
public class RabbitConfig implements InitializingBean {

    public static final String EXCHANGE_NAME = "exchange.cat.dog";
    public static final String EXCHANGE_DLX  = "exchange.dlx";
    public static final String QUEUE_NAME    = "queue.cat";
    public static final String QUEUE_DLX     = "queue.dlx";
    public static final String KEY_NAME      = "key.yingduan";
    public static final String KEY_DLX       = "#";

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Override
    public void afterPropertiesSet() throws Exception {
        // 发送端确认 回调配置
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("【发送端确认】- 入参; correlationData: ${}$, ack: ${}$, cause: ${}$", correlationData, ack, cause);
        });

        // 消息返回 回调配置,只有在 Exchange 无法路由到 Queue 时回调
        rabbitTemplate.setReturnsCallback(returned -> {
            log.error("【消息返回】- 入参; returned: ${}$", returned);
        });
    }

    @Bean
    Exchange exchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }

    @Bean
    Queue queue() {
        // 配置声明队列时使用的参数
        Map<String, Object> args = new HashMap<>(1);
        // 设置死信队列指向的交换机
        args.put("x-dead-letter-exchange", EXCHANGE_DLX);
        // 设置队列内消息过期时间,单位:毫秒
        args.put("x-message-ttl", 15000);

        return new Queue(QUEUE_NAME, true, false, false, args);
    }

    @Bean
    Binding binding() {
        // 目的地名称、目的地类型、绑定交换机、绑定 key、参数
        return new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, KEY_NAME, null);
    }

    @Bean
    TopicExchange dlxExchange() {
        return new TopicExchange(EXCHANGE_DLX);
    }

    @Bean
    Queue dlxQueue() {
        return new Queue(QUEUE_DLX);
    }

    @Bean
    Binding dlxBinding() {
        // 目的地名称、目的地类型、绑定交换机、绑定 key、参数
        return new Binding(QUEUE_DLX, Binding.DestinationType.QUEUE, EXCHANGE_DLX, KEY_DLX, null);
    }

}

发送消息

@Slf4j
@RestController
@RequestMapping("/send")
public class SendController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendOne/{name}")
    void sendOne(@PathVariable(name = "name") String name) throws JsonProcessingException {
        log.info("【sendOne】- 入参: ${}$", name);
        // 发送消息时,增加 CorrelationData 字段,在发送端确认的回调函数中会回传过来。
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        Student student = new Student(name, 18);
        ObjectMapper objectMapper = new ObjectMapper();
        String s = objectMapper.writeValueAsString(student);

        MessageProperties messageProperties = new MessageProperties();
        // 消息过期时间 10 秒
        messageProperties.setExpiration("10000");

        rabbitTemplate.send(
                RabbitConfig.EXCHANGE_NAME,
                RabbitConfig.KEY_NAME,
                new Message(s.getBytes(StandardCharsets.UTF_8), messageProperties),
                correlationData
        );
    }

}

监听消息

@Slf4j
@Component
public class RabbitListeners {

    @RabbitListener(queues = {RabbitConfig.QUEUE_NAME})
    void listenCat(String content, @Payload Message message, Channel channel) throws IOException, InterruptedException {
        log.info("【消费消息】- 入参;content: ${}$, message: ${}$, channel: ${}$", content, message, channel);
        log.info("【消费消息】- student: ${}$", new String(message.getBody(), StandardCharsets.UTF_8));
        TimeUnit.SECONDS.sleep(3);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}

Student

@NoArgsConstructor
@AllArgsConstructor
@Data
public class Student {

    private String name;

    private Integer age;

}

说明

这一套下来包括了发送端确认消息返回手动 Ack消费者限流消息过期死信队列。可以有效地保证消息的发送、路由、消费能够正常执行。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐