SpringBoot-消息队列
本篇基于黑马程序员SpringBoot2全套视频教程,springboot零基础到项目实战(SpringBoot2完整版),是自己学习中整理的笔记
消息 :
- 消息的发送方
- 生产者
- 消息接收方
- 消费者
同步消息 : 收到响应消息之后才能继续发送
异步消息 : 不需要响应消息就可以继续发送
企业级应用中广泛使用的三种异步消息传递技术
- JMS: 一个规范 , 消息开发的API
- AMQP: 一个协议 , 规范了数据的传输格式
- MQTT:
SpringBoot整合各种消息技术 :
ActiveMQ:
下载安装:
下载地址 : https://activemq.apache.org/components/classic/download/
- 第一个是启动服务
- 第二个是安装服务
- 第三个是卸载服务
直接双击第一个 , 启动之后 , 访问 http://127.0.0.1:8161/ ,会显示一个管理后台页面 , 默认用户名密码是admin
使用:
-
添加依赖:
-
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
-
配置 yml 文件
-
spring: activemq: #指定本机的端口号 broker-url: tcp://localhost:61616 #指定消息的存储空间 jms: template: default-destination: sichen
-
编写实现类 :
-
@Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Override public void sendMessage(String id) { System.out.println("待发送短信的订单已经纳入处理队列 , id:" + id); //将id存入消息队列里边 , 先转换在发送 jmsMessagingTemplate.convertAndSend("order.queue.id",id); //指定消息队列 , 相当于key值 } @Override public String doMessage() { //从消息队列里边取出id , 取出并转换 , 指定为String类型的 String id = jmsMessagingTemplate.receiveAndConvert("order.queue.id",String.class); System.out.println("以完成短信发送业务 id :" + id); return id; }
-
编写监听器 , 设置只要队列中有消息就消费
-
@Component //需要被Spring管控 public class MessageListener { @JmsListener(destination = "order.queue.id") //指定监听器操作的位置 , 只要消息队列中有消息 , 就会一直运行 @SendTo("下一个消息队列") //将处理后的返回值 , 发送到下一个消息队列里边 public void receive(String id){ //设置消息的处理业务 System.out.println("以完成短信发送业务 id :" + id); } }
设置发布订阅模型 ,
默认的处理模型是点对点的模型 , 一个消息只能有一个消费者消费 , 或超时
切换模型 . 要在配置文件中设置 , 即可
pub-sub-domain: true ;
RabbitMQ:
下载安装:
RabbitMQ是基于Erlang语言编写的 , 需要安装Erlang
下载地址: https://www.erlang.org/downloads
安装一键式安装 , 安装完毕需要重启 , 需要依赖windows组件
环境变量 :
- ERLANG_HOME = 安装路径
- PATH = %ERLANG_HOME%\bin
安装RabbitMQ
https://rabbitmq.com/install-windows.html
启动:
在安装目录下 , 有一个sbin目录 , 这个目录下就是RabbitMQ的可执行文件
启动命令 :
注意需要要使用管理员权限启动
rabbitmq-service.bat start
关闭使用 :
rabbitmq-service.bat stop
开启管理后台页面 :
在控制台 输入 :可以查看携带的所有插件的状态
rabbitmq-plugins.bat list
使用的是这个插件 :
启动这个插件 :
rabbitmq-plugins.bat enable rabbitmq_management
启动之后 , 前边的括号中会有一个E , 如果是这个插件携带的插件 , 前边的括号中是一个 e
RabbitMQ的后台管理端口
- 服务端口号为 5672 ,
- 访问地址是http://localhost:15672
用户名和密码是guest
使用direct(直连交换机模式) :
-
导入坐标 :
-
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <!--注意:这个依赖比较特殊 , 使用的是协议的名称--> </dependency>
-
编写配置 :
-
rabbitmq: host: localhost port: 5672
-
需要在编写一个配置类
-
@Configuration public class RabbitConfigDriect { //定义消息队列 @Bean public Queue driectQueue(){ return new Queue("driect queue"); } //定义交换机 @Bean public DirectExchange directExchange(){ return new DirectExchange("directExchange"); } //将消息队列与交换机进行绑定 , 可以同时绑定多个队列 @Bean public Binding binding(){ return BindingBuilder.bind(directExchange()).to(directExchange()).with("driect"); } }
-
编写实现类 :
-
import com.sichen.service.MessageService; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class MessageServiceRabbitMQDirectImpl implements MessageService { @Autowired private AmqpTemplate amqpTemplate; @Override public void sendMessage(String id) { //第一个参数: 指定交换机 , 第二个参数:指定交换机和队列绑定键的名称 , 第三个值是你的消息值 amqpTemplate.convertAndSend("directExchange","driect",id); } @Override public String doMessage() { return null; } }
-
编写监听器类 : 实现自动消费队列中的信息 , 使用多个监听器对消息队列进行监听 , 可以实现消息轮循处理(两个监听器交替处理消息)
-
@Component public class MessageListener { @RabbitListener(queues = "driect queue")//需要指定消息队列的名称 public void receive(){ } }
使用 Topic (主题交换机模式) :
- 配置类 :
定义消息队列 : 可以指定绑定键的名称 , 在使用这个队列的时候进行匹配 , 也就是一个消息可以指定多个队列进行消费 ,
-
实现类 : 可以指定对应的消息队列的表达式 , 使消息到不同的消息队列中
-
监听器类 : 实现指定的消息队列的自动消费
RocketMQ:
下载安装 :
下载地址 : https://rocketmq.apache.org/
安装 :解压缩即可
- 默认端口号是 : 9876
环境变量 :
- ROCKETMQ_HOME:
- PATH
- NAMESRV_ADDR : 127.0.0.1:9876 (建议配置)
启动命名服务器 :
双击bin目录下的mqnamesrv.cmd文件 , 启动命名服务器
启动broker服务器
双击bin目录下的mqbroker.cmd , 启动broker服务器
-
不配置这个命名服务器
- 在每次进行连接的时候 , 生产者和消费者每次都要连接所有的业务服务器 , 这就造成了程序很繁琐 ,
-
配置这个命名服务器之后
- 生产者和消费者只用去访问这个命名服务器即可 , 由命名服务器去进行业务服务器的管理
-
导入依赖 :
-
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.1</version> <!--这个坐标SpringBoot并没有收录 , 所以要指定版本--> </dependency>
-
-
配置yml文件 :
-
rocket: name-server: localhost:9876 producer: group: group_rocketmq #定义消息消费者的组的信息
-
-
编写实现类 :
-
@Service public class MessageServiceRocketmqImpl implements MessageService { @Autowired private RocketMQTemplate rocketMQTemplate; @Override //同步的消息处理机制 , 不推荐使用 public void sendMessage(String id) { System.out.println("待发送短信的订单已经纳入处理队列 , id:"+id); rocketMQTemplate.convertAndSend("rocket_id",id); } @Override //我们以后常用的是异步的消息处理 public void sendMessage(String id) { System.out.println("待发送短信的订单已经纳入处理队列 , id:"+id); //rocketMQTemplate.convertAndSend("rocket_id",id); SendCallback callback = new SendCallback() { @Override //成功执行这个方法 public void onSuccess(SendResult sendResult) { System.out.println("消息发送成功"); } @Override //失败执行这个方法 public void onException(Throwable throwable) { System.out.println("消息发送失败"); } }; rocketMQTemplate.asyncSend("rocket_id",id,callback); } }
-
-
编写监听类 :
-
@Configuration @RocketMQMessageListener(consumerGroup = "group_rocketmq" , topic = "rocket_id") //只需要加一个注解即可 : //有两个必填项 : //consumerGroup : yml配置文件中定义的组的名称 //topic : 定义的队列的命名空间 public class MessageListener implements RocketMQListener<String> { @Override //继承的方法 public void onMessage(String s) { } }
-
更多推荐
所有评论(0)