1、概述

SpringBoot项目中RabbitMq的使用可参考这篇博客:
https://blog.csdn.net/qq_35387940/article/details/100514134
写的非常的细致。

本文着重讲解:

  • 同一个消费者类可同时监听消费多个队列

  • 同一个消费者类中可使用方法重载来实现消费队列中不同类型的数据

2、代码示例

第一步:

创建springboot项目,引入依赖,添加rabbitmq相关配置,具体步骤参考文首的博客链接。

第二步:

创建rabbitmq配置类:RabbitConfig.java(名字随便起,记得类名上面添加@Configuration注解)

配置类中创建:
两个队列:队列ATestQueueA,队列BTestQueueB
两个交换机:一个直连交换机TestDirectExchange、一个扇形交换机TestFanoutExchange
绑定规则:直连交换机绑定队列A,扇形交换机绑定队列B
如图:
在这里插入图片描述

package com.tzq.testRabbitMq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author : Tangzhiqian
 * @CreateTime : 2021/4/26
 *
 **/

@Configuration
public class DirectRabbitConfig {

    //队列A
    @Bean
    public Queue testQueueA() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //   return new Queue("TestDirectQueue",true,true,false);

        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("TestQueueA",true);
    }
     //队列B
    @Bean
    public Queue testQueueB() {
        return new Queue("TestQueueB",true);
    }


    //Direct交换机 起名:TestDirectExchange
    @Bean
    DirectExchange testDirectExchange() {
        return new DirectExchange("TestDirectExchange",true,false);
    }
    //扇形交换机 起名:testFanoutExchange
    @Bean
    FanoutExchange testFanoutExchange() {
        return new FanoutExchange("TestFanoutExchange",true,true);
    }


    //绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(testQueueA()).to(testDirectExchange()).with("Test");
    }
    @Bean
    Binding bindingTopic() {
        return BindingBuilder.bind(testQueueB()).to(testFanoutExchange());
    }


}

第三步:

创建定时任务类:向交换机定时发送数据;
TestDirectExchange发送String格式数据
TestFanoutExchange发送Map格式数据

package com.tzq.testRabbitMq.schedule;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;

@Component
public class ScheduleTestRabbitMqSender {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Scheduled(cron = "0/6 * * * * ?")
    public void SendRabbitmqMessage() {

        Map<String, Object> mapA = new HashMap<>();
        mapA.put("message", "mapA的数据,string格式");

        Map<String, Object> mapB = new HashMap<>();
        mapB.put("message", "mapB的数据,map格式");
		
		//发送string类型数据
        rabbitTemplate.convertAndSend("TestDirectExchange","Test", mapA.toString());
        //发送map类型数据
        rabbitTemplate.convertAndSend("TestFanoutExchange",null, mapB);
        System.out.println("数据发送成功!!!");
    }
}

定时任务:及记得在启动类上加@EnableScheduling
注解

第四步:

创建队列消费的类。

  • 监听多个队列,在类上添加多个@RabbitListener(queues = ${QueueName})
  • 监听多种类型数据,重载被@RabbitHandler注解的方法,参数不同即可。
package com.tzq.testRabbitMq.schedule;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;

/**
 * @Author tangzhiqian
 * @CreateTime 2021/4/26 9:53
 */
@Component
@RabbitListener(queues = "TestQueueA")//监听的队列名称 TestQueueA
@RabbitListener(queues = "TestQueueB")//监听的队列名称 TestQueueB
public class DirectReceiver {

    @RabbitHandler
    public void process(String message) {
        System.out.println("消费者收到String类型消息  : " + message);
    }

    @RabbitHandler
    public void process(Map message) {
        System.out.println("消费者收到map类型消息  : " + message);
    }
}
第五步:

运行程序:
在这里插入图片描述
消费者同时消费到了两个队列中的数据,不同的数据类型,也可以用不同的方法形参来接收。

源码:
https://download.csdn.net/download/TAaron666/18141264

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐