Springboot 结合 mqttredis对接硬件以及做消息分发,最佳实践

一,认识

需要了解EMQX 基本知识原理,不了解的可以查看我之间的博客,以及网上的资料,这里不在过多撰述。

二,开发思路

这里以对接雷达水位计为例:

说一下思路, 这里场景各种设备连接 EMQX ,然后通过 EMQX 上报数据,和接收服务器下发的指令。

我们需要部署一个 EMQX 服务器, 设备配置我们的服务器ip和端口连接到 EMQX 。

那么我们开发EMQX 的思路应该是什么样子的。

  1. mqtt 客户端订阅相关主题;

  2. 数据库保存数据设备产品项目定义主题,存到redis;

  3. 通过主题做出相关数据分析;

三,准备工作

3.1 引入Springboot-mqtt依赖

Springboot 依赖, MQTT依赖

	<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
    </parent>

	<dependency>
        <!-- mqtt -->
        <dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-integration</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-stream</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-mqtt</artifactId>
		</dependency>
    </dependency>

其他相关依赖 不在撰写, 数据库依赖以及 工具类依赖 ,自己按需引用

四,编写代码

4.1 编写MQTT配置类

不在过多解释代码,每行都有注释

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;

/**
 * 功能描述: 配置类
 *
 * @Author keLe
 * @Date 2022/10/31
 */
@Data
@Component
@Configuration
@PropertySource("classpath:application.yml")
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {

    /**服务器地址url*/
    private String host;

    /**客户端唯一ID*/
    private String clientId;

    /**用户名*/
    private String userName;

    /**密码*/
    private String passWord;

    /**超时时间*/
    private Integer timeOut;

    /**保活时间*/
    private Integer keepaLive;

    /**是否清除会话*/
    private boolean clearSession;
}

appliction.yml

mqtt:
    host: tcp://xx.xx.xx.xx:1883 #MQTT-服务器连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:61613,tcp://192.168.2.133:61613
    clientId: ${random.int}  #MQTT-连接服务器默认客户端ID
    userName: admin   #MQTT-用户名
    passWord: admin #MQTT-密码
    default-topic: test #MQTT-默认的消息推送主题,实际可在调用接口时指定
    timeOut: 1000 #连接超时
    keepaLive: 30   #设置会话心跳时间
    clearSession: true  #清除会话(设置为false,断开连接,重连后使用原来的会话 保留订阅的主题,能接收离线期间的消息)

4.2 编写MQTT客户端,处理创建,连接,订阅,发布等功能

import com.joygis.mqtt.MqttProperties;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Arrays;

/**
 * 功能描述: mqtt客户端
 *
 * @Author keLe
 * @Date 2022/10/31
 */
@Slf4j
@Component
public class MqttCustomerClient {

    @Resource
    private MqttCallback mqttCallback;

    @Resource
    private MqttProperties mqttProperties;

    /**
     * 连接配置
     */
    private MqttConnectOptions options;

    /**
     * MQTT异步客户端
     */
    public static MqttAsyncClient client;

    /**
     * 功能描述: 客户端连接
     *
     * @Author keLe
     * @Date 2022/10/31
     */
    public void connect() {
        if (mqttProperties == null) {
            log.error("【mqtt异常】:连接失败,配置文件缺失。");
            return;
        }
        //设置配置
        if (options == null) {
            setOptions();
        }
        //创建客户端
        if (client == null) {
            createClient();
        }
        while (!client.isConnected()) {
            try {
                IMqttToken token = client.connect(options);
                token.waitForCompletion();
            } catch (Exception e) {
                log.error("【mqtt异常】:mqtt连接失败,message={}", e.getMessage());
            }
        }
    }

    /**
     * 功能描述: 创建客户端
     *
     * @Author keLe
     * @Date 2022/10/31
     */
    private void createClient() {
        if (client == null) {
            try {
              /*host为主机名,clientId是连接MQTT的客户端ID,MemoryPersistence设置clientId的保存方式
                默认是以内存方式保存*/
                client = new MqttAsyncClient(mqttProperties.getHost(), mqttProperties.getClientId(), new MemoryPersistence());
                //设置回调函数
                client.setCallback(mqttCallback);
                log.info("【mqtt】:mqtt客户端启动成功");
            } catch (MqttException e) {
                log.error("【mqtt异常】:mqtt客户端连接失败,error={}", e.getMessage());
                e.printStackTrace();
            }
        }
    }

    /**
     * 功能描述: 设置连接属性
     *
     * @Author keLe
     * @Date 2022/10/31
     */
    private void setOptions() {
        if (options != null) {
            options = null;
        }
        if (mqttProperties == null) {
            log.error("【mqtt异常】连接失败,失败原因:配置文件缺失。");
            return;
        }
        options = new MqttConnectOptions();
        options.setCleanSession(true);
        options.setUserName(mqttProperties.getUserName());
        options.setPassword(mqttProperties.getPassWord().toCharArray());
        options.setConnectionTimeout(mqttProperties.getTimeOut());
        options.setKeepAliveInterval(mqttProperties.getKeepaLive());
        //设置自动重新连接
        options.setAutomaticReconnect(true);
        options.setCleanSession(mqttProperties.isClearSession());
    }

    /**
     * 功能描述: 断开与mqtt的连接
     *
     * @Author keLe
     * @Date 2022/10/31
     */
    public synchronized void disconnect() {
        //判断客户端是否null 是否连接
        if (client != null && client.isConnected()) {
            try {
                IMqttToken token = client.disconnect();
                token.waitForCompletion();
            } catch (MqttException e) {
                log.error("【mqtt异常】: 断开mqtt连接发生错误,message={}", e.getMessage());
            }
        }
        client = null;
    }

    /**
     * 功能描述: 重新连接MQTT
     *
     * @Author keLe
     * @Date 2022/10/31
     */
    public synchronized void refresh() {
        disconnect();
        setOptions();
        createClient();
        connect();
    }

    /**
     * 功能描述: 发布
     * @param qos         连接方式
     * @param retained    是否保留
     * @param topic       主题
     * @param pushMessage 消息体
     * @Author keLe
     * @Date 2022/10/31
     */
    public void publish(int qos, boolean retained, String topic, String pushMessage) {
        log.info("【mqtt】:发布主题" + topic);
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());

        try {
            IMqttDeliveryToken token = client.publish(topic,message);
            token.waitForCompletion();
        } catch (MqttPersistenceException e) {
            e.printStackTrace();
        } catch (MqttException e) {
            log.error("【mqtt异常】: 发布主题时发生错误 topic={},message={}",topic,e.getMessage());
        }
    }

    /**
     * 功能描述: 订阅某个主题
     * @param topic 主题
     * @param qos   消息质量
     *              Qos1:消息发送一次,不确保
     *              Qos2:至少分发一次,服务器确保接收消息进行确认
     *              Qos3:只分发一次,确保消息送达和只传递一次
     * @Author keLe
     * @Date 2022/10/31
     */
    public void subscribe(String topic, int qos){
        log.info("【mqtt】:订阅了主题 topic={}",topic);
        try {
            IMqttToken token = client.subscribe(topic, qos);
            token.waitForCompletion();
        }catch (MqttException e){
            log.error("【mqtt异常】:订阅主题 topic={} 失败 message={}",topic,e.getMessage());
        }
    }

    /**
     * 功能描述: 订阅某些主题
     * @param topic 主题
     * @param qos   消息质量
     *              Qos1:消息发送一次,不确保
     *              Qos2:至少分发一次,服务器确保接收消息进行确认
     *              Qos3:只分发一次,确保消息送达和只传递一次
     * @Author keLe
     * @Date 2022/10/31
     */
    public void subscribe(String[] topic,int[] qos){
        log.info("【mqtt】:订阅了主题 topic={}", Arrays.toString(topic));
        try {
            IMqttToken token = client.subscribe(topic,qos);
            token.waitForCompletion();
        }catch (MqttException e){
            log.error("【mqtt异常】:订阅主题 topic={} 失败 message={}",topic,e.getMessage());
        }
    }

    /**是否处于连接状态*/
    public boolean isConnected(){
        return client != null && client.isConnected();
    }
}

4.3 编写MQTT 回调监听器

/**
 * 功能描述: 消费监听
 *
 * @Author keLe
 * @Date 2022/10/31
 */
@Slf4j
@Component
public class MqttCallback implements MqttCallbackExtended {

    @Resource
    private MqttService mqttService;

    @Override
    public void connectionLost(Throwable throwable) {
        log.error("【mqtt异常】:断开连接....");
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        mqttService.subscribeCallback(topic,message);
    }

    /**
     * 功能描述: 发布消息后,到达MQTT服务器,服务器回调消息接收
     * @param token  Mqtt传递令牌
     * @Author keLe
     * @Date 2022/10/31
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        log.info("【mqtt】交付完成:{}",token.isComplete());
    }

    /**
     * 功能描述: 监听mqtt连接消息
     * @param reconnect 是否重连
     * @param serverUrl 服务地址
     * @Author keLe
     * @Date 2022/10/31
     */
    @Override
    public void connectComplete(boolean reconnect, String serverUrl) {
        log.info("mqtt已经连接!!");
        //连接后,可以在此做初始化事件,或订阅
        try {
            mqttService.subscribe(MqttCustomerClient.client);
        } catch (MqttException e) {
            log.error("======>>>>>订阅主题失败 error={}",e.getMessage());
        }
    }
}

4.4 编写MQTT 业务接口,处理订阅发布

import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSON;
import com.joygis.common.constant.Constants;
import com.joygis.common.core.redis.RedisCache;
import com.joygis.mqtt.client.MqttCustomerClient;
import com.joygis.mqtt.domian.SubscribeConfig;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.List;

@Slf4j
@Service
public class MqttService {

    @Resource
    private MqttCustomerClient mqttCustomerClient;

    @Autowired
    private RedisCache redisCache;

    /**
     * 功能描述: 订阅主题
     * @param client MQTT异步客户端
     * @Author keLe
     * @Date 2022/10/31
     */
    public void subscribe(MqttAsyncClient client) throws MqttException {
        //获取主题
        List<String> cacheList = redisCache.getCacheList(Constants.SUB_CONFIG_KEY + "topic");
        if(CollectionUtil.isEmpty(cacheList)){
            log.error("【mqtt异常】:redis缓存中,无法获取主题相关信息!");
            return;
        }
        String[] topicFilters = cacheList.toArray(new String[cacheList.size()]);
        int[] qos = new int[cacheList.size()];
        for(int i = 0 ; i<cacheList.size() ; i++){
            qos[i] = 1;
        }
        // 订阅
        client.subscribe(topicFilters, qos);
        log.info("mqtt订阅了设备信息和物模型主题");
    }

    /**
     * 功能描述: 消息回调方法
     * @param topic  主题
     * @param mqttMessage 消息体
     * @Author keLe
     * @Date 2022/10/31
     */
    @Async
    public void subscribeCallback(String topic, MqttMessage mqttMessage) throws InterruptedException {
        /**测试线程池使用*/
        log.info("====>>>>线程名--{}",Thread.currentThread().getName());
        /**模拟耗时操作*/
        // Thread.sleep(1000);
        // subscribe后得到的消息会执行到这里面
        String message = new String(mqttMessage.getPayload());
        log.info("接收消息主题 : " + topic);
        log.info("接收消息Qos : " + mqttMessage.getQos());
        log.info("接收消息内容 : " + message);
        String key = Constants.SUB_CONFIG_KEY+topic;
        SubscribeConfig subscribeConfig = redisCache.getCacheObject(key);
        //TODO 这里使用通过数据取到相应的bean 动态去调用接口解析数据
    }

    /**
     * 功能描述: 发布设备状态
     * @Author keLe
     * @Date 2022/10/31
     * @param  productId 产品id
     * @param  deviceNum 设备编号
     * @param  deviceStatus 设备状态
     * @param  isShadow 影子模式
     * @param  rssi 编号
     */
    public void publishStatus(Long productId, String deviceNum, int deviceStatus, int isShadow,int rssi) {
        String message = "{\"status\":" + deviceStatus + ",\"isShadow\":" + isShadow + ",\"rssi\":" + rssi + "}";
        mqttCustomerClient.publish(1, false, "/" + productId + "/" + deviceNum + "", message);
    }

    /**
     * 功能描述: 发布设备状态
     * @Author keLe
     * @Date 2022/10/31
     * @param  productId 产品id
     * @param  deviceNum 设备编号
     */
    public void publishInfo(Long productId, String deviceNum) {
        mqttCustomerClient.publish(1, false, "/" + productId + "/" + deviceNum + "", "");
    }

    /**
     * 功能描述: 发布设备状态
     * @Author keLe
     * @Date 2022/10/31
     * @param  productId 产品id
     * @param  deviceNum 设备编号
     */
    public void publishFunction(Long productId, String deviceNum, List<String> thingsList) {
        if (thingsList == null) {
            mqttCustomerClient.publish(1, true, "/" + productId + "/" + deviceNum + "", "");
        } else {
            mqttCustomerClient.publish(1, true, "/" + productId + "/" + deviceNum + "", JSON.toJSONString(thingsList));
        }

    }
}

4.5 Redis初始化,加载数据库 topic

@Service
public class SubscribeConfigServiceImpl implements ISubscribeConfigService {
    @Autowired
    private RedisCache redisCache;

    @Resource
    private SubscribeConfigMapper subscribeConfigMapper;

    @PostConstruct
    public void init() {
        loadingConfigCache();
    }
    
    @Override
    public void loadingConfigCache() {
        List<SubscribeConfig> configsList = subscribeConfigMapper.selectSubscribeConfigList(new SubscribeConfig());
        if(CollectionUtil.isNotEmpty(configsList)){
            for (SubscribeConfig config : configsList) {
                redisCache.setCacheObject(getCacheKey(config.getTopic()), config);
            }
            List<String> topicList = configsList.stream().map(SubscribeConfig::getTopic).collect(Collectors.toList());
            redisCache.deleteObject(getCacheKey("topic"));
            redisCache.setCacheList(getCacheKey("topic"), topicList);
        }
    }

    /**
     * 设置cache key
     *
     * @param configKey 参数键
     * @return 缓存键key
     */
    private String getCacheKey(String configKey) {
        return Constants.SUB_CONFIG_KEY + configKey;
    }
    
}

4.6 Springboot 启动,MQTT也自启动,任务定时器池也启动

package com.joygis.iot.config;
import com.joygis.mqtt.client.MqttCustomerClient;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * 功能描述: spring 容器创建完成之后,开始创建mqtt客户端
 *
 * @Author keLe
 * @Date 2022/10/31
 */
@Order(value = 1 )
@Component
public class MqttStart implements ApplicationRunner {

    @Resource
    private MqttCustomerClient mqttCustomerClient;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        mqttCustomerClient.connect();
    }
}

五 ,测试

启动服务

在这里插入图片描述

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐