引入依赖包

<!--mqtt包-->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

application.properties(连接信息配置)
例:主题自定义

# mqtt
mqtt.host=tcp://127.0.0.1:1883
mqtt.TOPIC1=RcsGrroup81 (订阅的主题)
mqtt.TOPIC2=RcsGrroup82
mqtt.TOPIC3=RcsGrroup83
mqtt.TOPICAll=RcsGrroup81,RcsGrroup82,RcsGrroup83
mqtt.qos=1 (设置消息发布质量)
mqtt.clientid=subClient_xc (用户id)

MqttConsumer (消费端配置)

@Slf4j
@Component
public class MqttConsumer implements ApplicationRunner {

    @Value("${mqtt.host}")
    private String HOST;

    @Value("${mqtt.TOPICAll}")
    public String TOPICAll;

    @Value("${mqtt.qos}")
    private Integer qos;

    @Value("${mqtt.clientid}")
    private String clientid;

    @Value("${mqtt.userName}")
    private String userName;

    @Value("${mqtt.passWord}")
    private String passWord;

    private static MqttClient client;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("初始化并启动mqtt......"+clientid);
        this.connect();
    }

    /**
     * 连接mqtt服务器
     */
    private void connect() {
        try {
            // 1 创建客户端
            getClient();
            // 2 设置配置
            MqttConnectOptions options = getOptions();
            String[] topic = TOPICAll.split(",");
            // 3 消息发布质量
            int[] qos = getQos(topic.length);
            // 4 最后设置
            create(options, topic, qos);
        } catch (Exception e) {
            log.error("mqtt连接异常:" + e);
        }
    }

    /**
     *  创建客户端  --- 1 ---
     */
    public void getClient() {
        try {
            if (null == client) {
                client = new MqttClient(HOST, clientid, new MemoryPersistence());
            }
            log.info("--创建mqtt客户端");
        } catch (Exception e) {
            log.error("创建mqtt客户端异常:" + e);
        }
    }

    /**
     *  生成配置对象,用户名,密码等  --- 2 ---
     */
    public MqttConnectOptions getOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        //options.setUserName(PropertiesUtil.MQTT_USER_NAME);
        //options.setPassword(PropertiesUtil.MQTT_PASSWORD.toCharArray());
        // 设置超时时间
        options.setConnectionTimeout(1000);
        // 设置会话心跳时间
        options.setKeepAliveInterval(20);
        // 是否清除session
        options.setCleanSession(true);
        //是否自动重连
        options.setAutomaticReconnect(true);
        log.info("--生成mqtt配置对象");
        return options;
    }

    /**
     *  qos   --- 3 ---
     */
    public int[] getQos(int length) {

        int[] qos = new int[length];
        for (int i = 0; i < length; i++) {
            /**
             *  MQTT协议中有三种消息发布服务质量:
             *
             * QOS0: “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
             * QOS1: “至少一次”,确保消息到达,但消息重复可能会发生。
             * QOS2: “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果,资源开销大
             */
            qos[i] = 1;
        }
        log.info("--设置消息发布质量");
        return qos;
    }

    /**
     *  装在各种实例和订阅主题  --- 4 ---
     */
    public void create(MqttConnectOptions options, String[] topic, int[] qos) {
        try {
            client.setCallback(new MqttConsumerCallback(client, options, topic, qos));
            log.info("--添加回调处理类");
            client.connect(options);
        } catch (Exception e) {
            log.info("装载实例或订阅主题异常:" + e);
        }
    }
    /**
     * 订阅某个主题
     *
     * @param topic
     * @param qos
     */
    public void subscribe(String topic, int qos) {
        try {
            log.info("topic:" + topic);
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

}

MqttConsumerCallback (mqtt回调处理类)

/**
 * mqtt回调处理类
 */
@Slf4j
public class MqttConsumerCallback implements MqttCallbackExtended {


    private MqttClient client;
    private MqttConnectOptions options;
    private String[] topic;
    private int[] qos;

    public MqttConsumerCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) {
        this.client = client;
        this.options = options;
        this.topic = topic;
        this.qos = qos;
    }


    /**
     * 断开重连
     */
    @Override
    public void connectionLost(Throwable cause) {
        log.info("MQTT连接断开,发起重连......");
        try {
            if (null != client && !client.isConnected()) {
                //清除信息
                StaticData.data.clear();
                client.reconnect();
                log.error("尝试重新连接");
            } else {
                //清除信息
                StaticData.data.clear();
                client.connect(options);
                log.error("尝试建立新连接");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * 接收到消息调用令牌中调用
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {

        //log.info("deliveryComplete---------" + Arrays.toString(topic));
    }
    /**
     * 消息处理
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) {
        try {
            String msg = new String(message.getPayload());
            log.info("收到topic:" + topic + " 消息:" + msg);
            //TODO 自定义-业务处理
            //JSONObject jsonObject = JSONObject.parseObject(msg);
            //StaticData.data.put(topic,jsonObject.toJSONString());
        } catch (Exception e) {
            log.info("处理mqtt消息异常:" + e);
        }
    }

    /**
     * mqtt连接后订阅主题
     */
    @Override
    public void connectComplete(boolean b, String s) {
        try {
            if (null != topic && null != qos) {
                if (client.isConnected()) {
                    client.subscribe(topic, qos);
                    log.info("mqtt连接成功,客户端ID:"+client.getClientId() );
                    log.info("--订阅主题::" + Arrays.toString(topic));
                } else {
                    log.info("mqtt连接失败,客户端ID:"+client.getClientId() );
                }
            }
        } catch (Exception e) {
            log.info("mqtt订阅主题异常:" + e);
        }
    }
}
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐