SpringBoot集成MQTT (超稳定)
SpringBoot配置MQTT,断开重连....效果稳定
·
引入依赖包
<!--mqtt包-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
application.properties(连接信息配置)
例:主题自定义
# mqtt
mqtt.host=tcp://127.0.0.1:1883
mqtt.TOPIC1=RcsGrroup81 (订阅的主题)
mqtt.TOPIC2=RcsGrroup82
mqtt.TOPIC3=RcsGrroup83
mqtt.TOPICAll=RcsGrroup81,RcsGrroup82,RcsGrroup83
mqtt.qos=1 (设置消息发布质量)
mqtt.clientid=subClient_xc (用户id)
MqttConsumer (消费端配置)
@Slf4j
@Component
public class MqttConsumer implements ApplicationRunner {
@Value("${mqtt.host}")
private String HOST;
@Value("${mqtt.TOPICAll}")
public String TOPICAll;
@Value("${mqtt.qos}")
private Integer qos;
@Value("${mqtt.clientid}")
private String clientid;
@Value("${mqtt.userName}")
private String userName;
@Value("${mqtt.passWord}")
private String passWord;
private static MqttClient client;
@Override
public void run(ApplicationArguments args) throws Exception {
log.info("初始化并启动mqtt......"+clientid);
this.connect();
}
/**
* 连接mqtt服务器
*/
private void connect() {
try {
// 1 创建客户端
getClient();
// 2 设置配置
MqttConnectOptions options = getOptions();
String[] topic = TOPICAll.split(",");
// 3 消息发布质量
int[] qos = getQos(topic.length);
// 4 最后设置
create(options, topic, qos);
} catch (Exception e) {
log.error("mqtt连接异常:" + e);
}
}
/**
* 创建客户端 --- 1 ---
*/
public void getClient() {
try {
if (null == client) {
client = new MqttClient(HOST, clientid, new MemoryPersistence());
}
log.info("--创建mqtt客户端");
} catch (Exception e) {
log.error("创建mqtt客户端异常:" + e);
}
}
/**
* 生成配置对象,用户名,密码等 --- 2 ---
*/
public MqttConnectOptions getOptions() {
MqttConnectOptions options = new MqttConnectOptions();
//options.setUserName(PropertiesUtil.MQTT_USER_NAME);
//options.setPassword(PropertiesUtil.MQTT_PASSWORD.toCharArray());
// 设置超时时间
options.setConnectionTimeout(1000);
// 设置会话心跳时间
options.setKeepAliveInterval(20);
// 是否清除session
options.setCleanSession(true);
//是否自动重连
options.setAutomaticReconnect(true);
log.info("--生成mqtt配置对象");
return options;
}
/**
* qos --- 3 ---
*/
public int[] getQos(int length) {
int[] qos = new int[length];
for (int i = 0; i < length; i++) {
/**
* MQTT协议中有三种消息发布服务质量:
*
* QOS0: “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
* QOS1: “至少一次”,确保消息到达,但消息重复可能会发生。
* QOS2: “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果,资源开销大
*/
qos[i] = 1;
}
log.info("--设置消息发布质量");
return qos;
}
/**
* 装在各种实例和订阅主题 --- 4 ---
*/
public void create(MqttConnectOptions options, String[] topic, int[] qos) {
try {
client.setCallback(new MqttConsumerCallback(client, options, topic, qos));
log.info("--添加回调处理类");
client.connect(options);
} catch (Exception e) {
log.info("装载实例或订阅主题异常:" + e);
}
}
/**
* 订阅某个主题
*
* @param topic
* @param qos
*/
public void subscribe(String topic, int qos) {
try {
log.info("topic:" + topic);
client.subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
MqttConsumerCallback (mqtt回调处理类)
/**
* mqtt回调处理类
*/
@Slf4j
public class MqttConsumerCallback implements MqttCallbackExtended {
private MqttClient client;
private MqttConnectOptions options;
private String[] topic;
private int[] qos;
public MqttConsumerCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) {
this.client = client;
this.options = options;
this.topic = topic;
this.qos = qos;
}
/**
* 断开重连
*/
@Override
public void connectionLost(Throwable cause) {
log.info("MQTT连接断开,发起重连......");
try {
if (null != client && !client.isConnected()) {
//清除信息
StaticData.data.clear();
client.reconnect();
log.error("尝试重新连接");
} else {
//清除信息
StaticData.data.clear();
client.connect(options);
log.error("尝试建立新连接");
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 接收到消息调用令牌中调用
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
//log.info("deliveryComplete---------" + Arrays.toString(topic));
}
/**
* 消息处理
*/
@Override
public void messageArrived(String topic, MqttMessage message) {
try {
String msg = new String(message.getPayload());
log.info("收到topic:" + topic + " 消息:" + msg);
//TODO 自定义-业务处理
//JSONObject jsonObject = JSONObject.parseObject(msg);
//StaticData.data.put(topic,jsonObject.toJSONString());
} catch (Exception e) {
log.info("处理mqtt消息异常:" + e);
}
}
/**
* mqtt连接后订阅主题
*/
@Override
public void connectComplete(boolean b, String s) {
try {
if (null != topic && null != qos) {
if (client.isConnected()) {
client.subscribe(topic, qos);
log.info("mqtt连接成功,客户端ID:"+client.getClientId() );
log.info("--订阅主题::" + Arrays.toString(topic));
} else {
log.info("mqtt连接失败,客户端ID:"+client.getClientId() );
}
}
} catch (Exception e) {
log.info("mqtt订阅主题异常:" + e);
}
}
}
更多推荐
已为社区贡献2条内容
所有评论(0)