SpringBoot + Mqtt协议,实现多个主题订阅及消息推送功能
MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议。mqtt 与 MQ 的区别:mqtt:一种通信协议,类似人类交谈中的汉语、英语、俄语中的一种语言规范MQ:一种通信通道,也叫消息队列,类似人类交谈中的用电话、email、微信的
MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议。
mqtt 与 MQ 的区别:
mqtt:一种通信协议,类似人类交谈中的汉语、英语、俄语中的一种语言规范
MQ:一种通信通道,也叫消息队列,类似人类交谈中的用电话、email、微信的一种通信方式
详细区别:
有三个基本概念:消息、消息协议、消息队列。
消息:信息的载体
消息协议:为了让消息发送者和消息接收者都能够明白消息所承载的信息(消息发送者需要知道如何构造消息;消息接收者需要知道如何解析消息),它们就需要按照一种统一的格式描述消息,这种统一的格式称之为消息协议。所以,有效的消息一定具有某一种格式;而没有格式的消息是没有意义的。
消息队列:消息从发送者到接收者的方式也有两种。一种为即时消息通讯,也就是说消息从一端发出后(消息发送者)立即就可以达到另一端(消息接收者),这种方式的具体实现就是我们已经介绍过的RPC(当然单纯的http通讯也满足这个定义);另一种为延迟消息通讯,即消息从某一端发出后,首先进入一个容器进行临时存储,当达到某种条件后,再由这个容器发送给另一端。 这个容器的一种具体实现就是消息队列,如RabbitMQ。
相关mqtt的理论东西,有兴趣的可以详情了解,下面我大概说下如何搭建服务。
1)添加pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
2)application.yml文件中添加配置信息 ,订阅主题有多重形式,大致分为这几类
2.1)具体某个主题 ;test1
2.2)具体多个主题,可以用逗号分隔;test1,test2
2.3)主题层级分隔符: / ,单层通配符: + ,单层通配符 “+” 只匹配主题的一层;
test/+ ,可以匹配到test/开头的,且没有第三级主题下的所有主题
2.4)主题层级分隔符: / ,多层通配符: # ,多层通配符"#"是一个匹配主题中任意层次数的通配符;
test/# ,可以匹配到test/开头的所有主题
spring:
mqtt:
subscribe:
url: tcp://IP:PORT
username: emqx
password: public
client:
inid: mqttClientInputId
outid: mqttClientOutputId
default:
topic: mqtt/+/test1,mqtt/+/test2
completionTimeout: 3000
- mqtt订阅消息配置文件 MqttSubscribeConfig.java,如下
package com.mqtt.test.config;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttSubscribeConfig {
@Value("${spring.mqtt.subscribe.username}")
private String username;
@Value("${spring.mqtt.subscribe.password}")
private String password;
@Value("${spring.mqtt.subscribe.url}")
private String hostUrl;
// 订阅主题的客户端ID
@Value("${spring.mqtt.subscribe.client.inid}")
private String clientId;
@Value("${spring.mqtt.subscribe.default.topic}")
private String defaultTopic;
@Value("${spring.mqtt.subscribe.completionTimeout}")
private int completionTimeout ; //连接超时
@Bean
public MqttConnectOptions getReceiverMqttConnectOptionsForSub(){
MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
List<String> hostList = Arrays.asList(hostUrl.trim().split(","));
String[] serverURIs = new String[hostList.size()];
hostList.toArray(serverURIs);
mqttConnectOptions.setServerURIs(serverURIs);
mqttConnectOptions.setKeepAliveInterval(2);
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory receiverMqttClientFactoryForSub() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getReceiverMqttConnectOptionsForSub());
return factory;
}
//接收通道
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
//配置client,监听的topic
@Bean
public MessageProducer inbound() {
List<String> topicList = Arrays.asList(defaultTopic.trim().split(","));
String[] topics = new String[topicList.size()];
topicList.toArray(topics);
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(clientId,receiverMqttClientFactoryForSub(),topics);
adapter.setCompletionTimeout(completionTimeout);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
//通过通道获取数据
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String msg = message.getPayload().toString();
Map<String,Object> msgMap = JSONObject.parseObject(msg);
// 这里可以处理接收的数据
log.info("\n----------------------------START---------------------------\n" +
"接收到订阅消息:\ntopic:" + topic + "\nmessage:" + msg +
"\n-----------------------------END----------------------------");
};
}
}
4)mqtt配置推送消息的服务 ,MqttSendConfig.java
package com.mqtt.test.config;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import java.util.Arrays;
import java.util.List;
@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttSendConfig {
@Value("${spring.mqtt.subscribe.username}")
private String username;
@Value("${spring.mqtt.subscribe.password}")
private String password;
@Value("${spring.mqtt.subscribe.url}")
private String hostUrl;
// 推送消息的客户端ID
@Value("${spring.mqtt.subscribe.client.outid}")
private String clientId;
@Value("${spring.mqtt.subscribe.default.topic}")
private String defaultTopic;
@Value("${spring.mqtt.subscribe.completionTimeout}")
private int completionTimeout ; //连接超时
@Bean
public MqttConnectOptions getReceiverMqttConnectOptionsForSend(){
MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
List<String> hostList = Arrays.asList(hostUrl.trim().split(","));
String[] serverURIs = new String[hostList.size()];
hostList.toArray(serverURIs);
mqttConnectOptions.setServerURIs(serverURIs);
mqttConnectOptions.setKeepAliveInterval(2);
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory receiverMqttClientFactoryForSend() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getReceiverMqttConnectOptionsForSend());
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, receiverMqttClientFactoryForSend());
messageHandler.setAsync(false);
messageHandler.setDefaultTopic(defaultTopic);
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}
5)mqtt推送服务的接口类,可以通过注入该接口调用消息推送功能,推送消息至指定主题
package com.mqtt.test.mqtt;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
/**
* 发送信息到MQTT服务器
*
* @param topic 主题
* @param message 消息主体
*/
void publishMqttMessageWithTopic(String message, @Header(MqttHeaders.TOPIC) String topic);
/**
* 发送信息到MQTT服务器
*
* @param topic 主题
* @param qos 对消息处理的几种机制。<br> 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。<br>
* 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。<br>
* 2 多了一次去重的动作,确保订阅者收到的消息有一次。
* @param message 消息主体
*/
void publishMqttMessageWithTopic(String message, @Header(MqttHeaders.TOPIC) String topic,@Header(MqttHeaders.QOS) int qos);
}
注意事项;
mqtt主题订阅和mqtt消息推送的配置中客户端ID是不能一样的,如果一样的话推送消息会报错,无法成功推送消息
更多推荐
所有评论(0)