RabbitMQ学习笔记:死信队列、延迟队列和优先级队列
环境window10虚拟机、centos7工具:SecureCRT 8.0、Intellij IDEA死信队列以下三种情况会消息会变成死信:① 消息过期时间到了 – 消息过期了② 队列满了③ 消息被拒绝– 即消费者调用Basic.Reject/Basic.Nack死信队列和普通队列基本是没有什么区别的。正常情况下,消息都是先到正常的队列里面,然后在满足上面三个条件的任意一个的情况后,就会进入到死信
环境
window10
虚拟机、centos7
工具:SecureCRT 8.0、Intellij IDEA
死信队列
以下三种情况会消息会变成死信:
① 消息过期时间到了 – 消息过期了
② 队列满了
③ 消息被拒绝 – 即消费者调用Basic.Reject/Basic.Nack
死信队列和普通队列基本是没有什么区别的。正常情况下,消息都是先到正常的队列里面,然后在满足上面三个条件的任意一个的情况后,就会进入到死信队列里面去。
当消息在一个队列中变成死信后,它能被重新发送到另一个交换器中,这个交换器就是DLX
,绑定DLX
的队列就称之为死信队列。
创建死信队列
// 先创建一个死信交换器,其实和普通交换器一模一样
channel.exchangeDeclare("dlx_exchange", "direct");
// 这里是关键
Map<String, Object> args = new HashMap<>();
// 通过x-dead-letter-exchange来指定死信交换器
args.put("x-dead-letter-exchange", "dlx_exchange");
// 也可以指定路由键,一旦指定,那么死信队列和死信交换器绑定时,路由键得一样
args.put("x-dead-letter-routing-key", "yutao");
// 声明正常队列,通过args参数来指定死信交换器
channel.queueDeclare("my_queue", false, false, false, args);
需要注意的地方:
① args.put("x-dead-letter-exchange", "dlx_exchange");
通过参数来设置死信交换器的。
② 参数里可以设置路由键args.put("x-dead-letter-routing-key", "yutao");
,但是设置后,就需要和channel.queueBind("queue.dlx", "exchange.dlx", "yutao");
中的路由键保持一致。如果没有设置,那么queueBind()
中的路由键就得和channel.basicPublish("exchange.normal", "yutao", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
中的路由键保持一致。
用代码来理解:
// 先创建一个死信交换器,其实和普通交换器一模一样
channel.exchangeDeclare("dlx_exchange", "direct");
// 通过参数设置死信队列路由键的情况
Map<String, Object> propArgs = new HashMap<>();
propArgs.put("x-dead-letter-exchange", "exchange.dlx");
propArgs.put("x-dead-letter-routing-key", "routing_key");
channel.queueBind("queue.dlx", "exchange.dlx", "routing_key");
// 没有通过参数设置死信路由键的情况
channel.queueBind("queue.dlx", "exchange.dlx", "routing_key");
channel.basicPublish("exchange.normal", "routing_key", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
使用死信队列
思路:比如正常队列设置10秒,消息过期,之后rabbitmq会把消息发送到绑定的死信交换器中,接着死信交换器会把消息发送到绑定的死信队列中。
// 省略连接connection和channel
// 创建一个type="direct",持久化的,非自动删除的交换器
channel.exchangeDeclare("exchange.dlx", "direct", true);
// 声明一个正常交换器
channel.exchangeDeclare("exchange.normal", "fanout", true);
// 设置死信队列的参数
Map<String, Object> propArgs = new HashMap<>();
propArgs.put("x-message-ttl", 10000);
propArgs.put("x-dead-letter-exchange", "exchange.dlx");
propArgs.put("x-dead-letter-routing-key", "yutao");
// 声明一个正常队列
channel.queueDeclare("queue.normal", true, false, false, propArgs);
// 将正常交换器和正常队列进行绑定
channel.queueBind("queue.normal", "exchange.normal", "normal");
// 声明死信队列
channel.queueDeclare("queue.dlx", true, false, false, null);
// 将死信交换器和死信队列进行绑定
channel.queueBind("queue.dlx", "exchange.dlx", "yutao");
// 发送一条持久化的消息
String message = "测试死信队列 Hello world";
channel.basicPublish("exchange.normal", "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
消费端就是去把消费队列改为死信队列,即可:
channel.basicConsume("queue.dlx", consumer);
总结
最后用一张图来描述下,使用死信队列的流程:
延迟队列
基于上面的死信队列,其实还可以利用它的特性去实现一个延迟队列。
① 创建正常队列,队列设置过期时间分别为:5秒、10秒、30秒、1分钟四个队列。
② 为这四个正常队列绑定四个死信队列。
③ 消费端监听死信队列,就得到了相应的延迟消息。
比如 我希望这条消息延迟5秒发送,那么生产者就把消息发送到5秒的正常队列,时间到了后,rabbitmq会将消息重新转发到死信队列,这样消费端就监听到了。
优先级队列
可以通过参数x-max-priority
对队列设置优先级:
Map<String, Object> map = new HashMap<>();
map.put("x-max-priority", 10);
channel.queueDeclare("queue.priority", true, false, false, map);
接着需要在发送消息中设置消息的当前优先级。
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.priority(5);
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("exchange_priority", "rk_priority", properties, ("message").getBytes());
优先级默认为0,上面的消息设置的优先级为5,最高为队列设置的最大优先级。
如果消息很少,消息并没有发生堆积,那么对消息设置优先级是没有意义的,因为其立马被消费掉了
参考地址:《RabbitMQ实战指南》
更多推荐
所有评论(0)