RabbitMq(2)之入门
1. 相关概念通常谈到队列服务,会有三个概念:发消息者、队列、收消息者,RabbitMq在这个基础概念之上,多做了一层抽象,在发送消息和接收消息之间加入了交换器(Exchange)。发送消息者跟收消息者没有直接联系,发消息者把消息给交换器,交换器根据调度策略把消息再给队列虚拟主机:一个虚拟机持有一组交换机、队列和绑定。再rabbitmq中,只能以虚拟主机的粒度进行控制权限,如果想要禁止A组访问B
1. 相关概念
通常谈到队列服务,会有三个概念:发消息者、队列、收消息者,RabbitMq在这个基础概念之上,多做了一层抽象,在发送消息和接收消息之间加入了交换器(Exchange)。发送消息者跟收消息者没有直接联系,发消息者把消息给交换器,交换器根据调度策略把消息再给队列
- 虚拟主机:一个虚拟机持有一组交换机、队列和绑定。再rabbitmq中,只能以虚拟主机的粒度进行控制权限,如果想要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建虚拟主机
- 交换机:Exchange用于转发消息,它不会做存储,如果没有Queue bind绑定到Exchange的话,会直接丢弃Producer发送过来的消息。消息到交换机需要通过 路由键 进行转发到具体的队列中
- 绑定:交换机需要和队列相绑定,多对多关系
2. 交换机
交换机不存储消息,只做消息的转换,在启用ack模式后,交换机找不到队列会返回错误
- Direct:直接交换机,”先匹配,再投送“,即在绑定时设置一个routing_key,消息的routing_key匹配时才会被交换机进行投送
- Topic:按规则转发消息
- Headers:设置 header attribute参数类型的交换机
- Fanout:转发消息到所有绑定队列
2.1 Direct直接交换机
rabbitmq默认交换机,采用全文匹配的方式,例如:T - D1 绑定ey为 QD1,如果发送消息中的路由键匹配到QD1,就会进行转发
2.2 Topic交换机
Topic Exchange转发消息主要是根据通配符。这种交换机会和队列绑定一种格式,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息
- 路由键必须是字符串,用句号(.) 隔开,例如:agreements.us
- 路由匹配模式中的(*),主要用于匹配路由键指定位置的单词。例如:agreements.*.b,就会去匹配第一个arrements,以及第三个 b
- 路由匹配模式中的(#), 表示多个单词 。例如: agreements.b.# 只要路由键 agreements.b 开头都会匹配
2.3 Headers交换机
相比较direct和topic固定的使用routing_key,headers则是一个自定义匹配规则类型,在队列与交换器绑定时会定义一组规则,只要消息中的键值对有一对匹配或者全部匹配时,就会被投送到对应队列
2.4 Fanout扇形交换机
Fanout Exchange消息广播的模式,不管路由键或者是路由模式,会把消息发送给绑定给它的全部队列,如果配置了routing_key 会被忽略
3. Spring Boot集成Rabbitmq
3.1 引入依赖包
使用spring封装的amqp进行操作
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.2 组件对象
- Queue:抽象队列对象,
- durable:是否持久化,如果为false,rabbitmq启动时就会自动删除队列
- exclusive:声明了exclusive属性的队列只对首次声明它的连接可见,并且在连接断开时自动删除
- autoDelete:所有客户端都断开连接时自动删除(用于临时使用)
- Message
- Exchange
- Binding
3.3 消息队列
@Configuration
public class QueueConfiguration {
@Bean
public Queue queue() {
return new Queue(RabbitMqConst.HELLO_QUEUE_NAME);
}
}
3.4 生产者
@Component
public class SimpleSender {
@Resource
private AmqpTemplate amqpTemplate;
public void send() {
String context = "helloMQ:" + new Date().getTime();
amqpTemplate.convertAndSend(RabbitMqConst.HELLO_QUEUE_NAME, context);
}
}
3.5 消费者
@Component
@RabbitListener(queues = RabbitMqConst.HELLO_QUEUE_NAME)
public class HelloReceiver {
/**
* 处理方法
* @param hello
*/
@RabbitHandler
public void process(String hello) {
System.out.println("接收到数据:" + hello);
}
}
3.6 测试方法
@SpringBootTest(classes = SpringRabbitmqDemoApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)
public class SpringRabbitmqDemoApplicationTests {
@Resource
private SimpleSender simpleSender;
@Test
public void hello() {
simpleSender.send();
}
}
4. 交换机测试
注意:默认所有创建的队列以及交换机都会持久化,测试代码时可以设置为false,autoDelete可以设置为true
4.1 Direct交换机
@Configuration
public class QueueConfiguration {
/**
* 创建队列:指定队列名称
* @return
*/
@Bean
public Queue queue() {
return new Queue(RabbitMqConst.HELLO_QUEUE_NAME);
}
/**
* 创建一个直接交换机,指定交换机的名称
*
* @return
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange(RabbitMqConst.DIRECT_EXCHANGE_NAME);
}
/**
* 创建一个绑定对象,主要的目的是将,交换机和队列绑定起来,hello 是routingkey
* 在发送数据时,只有routingkey对应上了之后才会发送到对应的队列里面
*
* @return
*/
@Bean
public Binding bindingToDirect() {
return BindingBuilder.bind(queue()).to(directExchange()).with("hello");
}
}
@Component
public class SimpleSender {
@Resource
private AmqpTemplate amqpTemplate;
public void sendToDirect() {
String context = "sendDirect:" + new Date().getTime();
//注意第二个参数需要跟Binding中指定的routingkey对应上
amqpTemplate.convertAndSend(RabbitMqConst.DIRECT_EXCHANGE_NAME, "hello", context);
}
}
4.2 Topic交换机
@Configuration
public class QueueConfiguration {
/**
* 创建队列:指定队列名称
* @return
*/
@Bean
public Queue queue() {
// 构造方法接收三个参数:durable(是否持久化), exclusive(是否是特殊化,最对第一个连接可见,断开后就删除), autoDelete(是否自动删除)
return new Queue(RabbitMqConst.HELLO_QUEUE_NAME, false, false, false);
}
/**
* 创建一个topic交换机
* @return
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(RabbitMqConst.TOPIC_EXCHANGE_NAME);
}
/**
* 自定routingkey以 * 或 # 进行匹配
* @return
*/
@Bean
public Binding bindingToTopic() {
return BindingBuilder.bind(queue()).to(topicExchange()).with("hello.*(#).world");
}
}
4.3 Fanout交换机
@Configurationpublic class QueueConfiguration { /** * 创建队列:指定队列名称 * @return */ @Bean public Queue queue() { // 构造方法接收三个参数:durable(是否持久化), exclusive(是否是特殊化,最对第一个连接可见,断开后就删除), autoDelete(是否自动删除) return new Queue(RabbitMqConst.HELLO_QUEUE_NAME, false, false, false); } @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(RabbitMqConst.FANOUT_EXCHANGE_NAME); } /** * 扇形交换机不需要指定routingkey,会转发给绑定的所有队列 * @return */ @Bean public Binding bindingToFanout() { return BindingBuilder.bind(queue()).to(fanoutExchange()); }} public void sendToFanout() { String context = "sendFanout:" + new Date().getTime(); //发送时,如果指定了routingkey也会失效 amqpTemplate.convertAndSend(RabbitMqConst.FANOUT_EXCHANGE_NAME, "hello.1.world", context); }
4.4 Headers交换机
@Configurationpublic class QueueConfiguration { /** * 创建队列:指定队列名称 * @return */ @Bean public Queue queue() { // 构造方法接收三个参数:durable(是否持久化), exclusive(是否是特殊化,最对第一个连接可见,断开后就删除), autoDelete(是否自动删除) return new Queue(RabbitMqConst.HELLO_QUEUE_NAME, false, false, false); } @Bean public HeadersExchange headersExchange() { return new HeadersExchange(RabbitMqConst.HEADERS_EXCHANGE_NAME); } /** * 设置头中数据是否匹配,只有匹配时才进行转发 * @return */ @Bean public Binding bindingToHeaders() { //匹配单个KV值 Binding single = BindingBuilder.bind(queue()).to(headersExchange()).where("msg").matches("hello"); //多个KV值进行匹配 Map<String, Object> map = new HashMap<>(); map.put("msg1", "hello1"); map.put("msg2", "hello2"); Binding many = BindingBuilder.bind(queue()).to(headersExchange()).whereAny(map).match(); return many; }}public void sendToHeaders() { String context = "sendHeaders:" + new Date().getTime(); MessageProperties properties = MessagePropertiesBuilder .newInstance() .setHeader("msg2", "hello2") .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN) .build(); Message message = MessageBuilder .withBody(context.getBytes()) .andProperties(properties) .build(); amqpTemplate.send(RabbitMqConst.HEADERS_EXCHANGE_NAME, "", message); }
- whereAny:匹配多个KV中的任意一个
- whereAll:所有的KV都必须匹配到
5. 消息确认机制
spring: rabbitmq: host: 192.168.92.190 port: 5672 username: root password: root publisher-confirms: true # 消息发送到交换机是否成功的回调函数 publisher-returns: true # 消息发送到对应队列失败调用的回调 virtual-host: / listener: simple: acknowledge-mode: manual # 采用手动答应 concurrency: 1 # 指定最小的消费者数量 max-concurrency: 1 # 指定最大的消费者数量 retry: enabled: true #开启消息重试
5.1 生产者
@Componentpublic class SimpleSender { @Resource private AmqpTemplate amqpTemplate; public void send() { RabbitTemplate rabbitTemplate = (RabbitTemplate) this.amqpTemplate; //交换机投递到对应队列失败时,根据当前Mandatory 判断是否需要调用 ReturnCallback,配置文件中需要配置publisher-returns:true rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { System.out.println("消息成功发送到exchange...."); } else { System.out.println("消息发送exchange失败,原因:"+ cause); } }); //交换机转发消息到对应队列失败时调用 rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { String correlationId = message.getMessageProperties().getCorrelationId(); System.out.printf("消息:%s,发送失败,应答码:%s,原因:%s,交换机:%s,路由键:%s%n", correlationId, replyCode, replyText, exchange, routingKey); }); String context = "helloMQ:" + new Date().getTime(); this.amqpTemplate.convertAndSend(RabbitMqConst.DIRECT_EXCHANGE_NAME, "hello", context); }}
- ConfirmCallback:消息确认函数,当前函数主要的目的是,消息是否发送到交换机时回调的函数
- ReturnCallback:消息返回回调函数,当前消息成功投递到交换机,但是没有投递到对应的队列时调用
5.2 消费者
/** * 处理方法 * @param */ @RabbitHandler @RabbitListener(queues = RabbitMqConst.HELLO_QUEUE_NAME) public void process(Message message, Channel channel) throws IOException { System.out.println("接收到数据:" + new String(message.getBody())); //消费者手动确认消息,如果没有确认,broker会将数据存储后继续发送 channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); }
6. 死信队列
DLX 全称(Dead-Letter-Exchange),称之为死信交换器,当消息变成一个死信之后,如果这个消息所在的队列存在 x-dead-letter-exchange 参数,那么它会被发送到 x-dead-letter-exchange 对应值的交换器上,这个交换器就称之为死信交换器,与这个死信交换器绑定的队列就是死信队列
- 消息被拒绝了 Basic.Reject或Basic.Nack)并且设置 requeue 参数的值为 false
- 消息过期了
- 队列达到最大长度
6.1 队列配置
@Configurationpublic class DeadQueueConfiguration { /** * 创建一个正常的队列,设置参数死信队列交换机的名称 * @return */ @Bean public Queue queue() { return QueueBuilder .nonDurable(RabbitMqConst.TEST_DLX_QUEUE_NAME) .withArgument("x-dead-letter-exchange", RabbitMqConst.DLX_EXCHANGE) .build(); } /** * 创建一个直接交换机 * * @return */ @Bean public DirectExchange directExchange() { return new DirectExchange(RabbitMqConst.TEST_DLX_EXCHANGE); } @Bean public Binding bindingToDirect() { return BindingBuilder.bind(queue()).to(directExchange()).with("hello"); } /** * ------------------------------------------------------------------------------- 创建死信队列 * @return */ @Bean public Queue deadQueue() { return QueueBuilder .nonDurable(RabbitMqConst.DLX_QUEUE_NAME) .build(); } @Bean public Exchange deadExchange() { return ExchangeBuilder .topicExchange(RabbitMqConst.DLX_EXCHANGE) .build(); } /** * 将死信队列绑定到对应的交换机上 routingKey 以 #号进行订阅,任意 routingKey都会被转发到队列上 */ @Bean public Binding bindingToDead() { return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("#").noargs(); }}
@Componentpublic class DeadSender { @Resource private AmqpTemplate amqpTemplate; public void send() { String context = "dead msg:" + new Date().getTime(); MessageProperties properties = MessagePropertiesBuilder .newInstance() //设置过期时间为10秒 .setExpiration("10000") .build(); Message message = MessageBuilder .withBody(context.getBytes()) .andProperties(properties) .build(); //发送到直接交换机上 amqpTemplate.convertAndSend(RabbitMqConst.TEST_DLX_EXCHANGE, "hello", message); }}
test_dlx_queue:为正常队列,发送消息时,指定10秒钟过期,因为没有开消费者,所以就自动转发到了dlx_queue上面
7. 延时队列
7.1 下载插件
wget https://objects.githubusercontent.com/github-production-release-asset-2e65be/924551/bdcc937b-781e-4952-b2e2-a66c8aa94336?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20220215%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20220215T015044Z&X-Amz-Expires=300&X-Amz-Signature=5f16f24403f3cba33a2e4a99f4eeef41bf3cb5ca5c05bd59468e4f5fd9e8b8f3&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=924551&response-content-disposition=attachment%3B%20filename%3Drabbitmq-server-3.9.8-1.el7.src.rpm&response-content-type=application%2Foctet-stream
找到服务安装路径下面的plugins路径,将下载的 rabbitmq_delayed_message_exchange 插件拷贝进去
mv rabbitmq_delayed_message_exchange-3.8.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins/
启动:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
7.2 延迟队列配置
@Configurationpublic class DelayConfiguration { @Bean public Exchange delayExchange() { Map<String, Object> map = new HashMap<>(); //指定路由类型为直接交换机 map.put("x-delayed-type", "direct"); // 第二个交换机的类型自定义设置为 x-delayed-message 延迟队列指定 return new CustomExchange(RabbitMqConst.DELAY_EXCHANGE, "x-delayed-message", true, false, map); } @Bean public Queue delayQueue() { return QueueBuilder .nonDurable(RabbitMqConst.DELAY_QUEUE) //设置队列最大长度为1 .withArgument("x-max-length", 1) .build(); } @Bean public Binding bindingToDelay() { return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("delay.test").noargs(); }}
7.3 生产者
@Componentpublic class DelaySender { @Resource private AmqpTemplate amqpTemplate; public void send(Integer delay) { String context = "delay msg:" + new Date().getTime(); MessageProperties properties = MessagePropertiesBuilder .newInstance() .build(); Message message = MessageBuilder .withBody(context.getBytes()) .andProperties(properties) .build(); //发送到直接交换机上 amqpTemplate.convertAndSend(RabbitMqConst.DELAY_EXCHANGE, "delay.test", message, p -> { //进行消息拦截处理,给消息设置延时时间 p.getMessageProperties().setDelay(delay); return p; }); }}
7.3 消费者
@Componentpublic class DelayReceiver { /** * 处理方法 * @param */ @RabbitHandler @RabbitListener(queues = RabbitMqConst.DELAY_QUEUE) public void process(Message message, Channel channel) throws IOException { System.out.println("监听到延时数据,进行处理:" + new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); }}
更多推荐
所有评论(0)