方法一:使用spring-integration-mqtt

与springboot集成度更高,灵活程度不如org.eclipse.paho.client.mqttv3

<!--mqtt 相关依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

 application.yml

spring:
  mqtt:
    enable: true
    url: tcp://127.0.0.1:1883
    username:
    password:
    #MQTT-连接服务器默认客户端ID
#生产者和消费者是单独连接服务器会使用到一个clientid(客户端id),如果是同一个clientid的话会出现Lost connection: 已断开连接; retrying...
    consumerclientid: clientid1234231212_c
    producerclientid: clientid1234231212_p
    #连接超时,单位ms
    timeout: 5000
    #1.5*Keep Alive 的时间间隔心跳,单位为秒
    keepalive: 2
    # deviceId
    #deviceId: your_deviceId
    # mqtt-topic
    producertopic: /master/info
    consumertopic: /+/sys/motor, /+/sys/media, /+/sys/general, /+/sys/rc, /+/sys/rtk, /+/mission, /+/power, /+/v3/sys/general

 MqttConfig.class

package org.young.common.mqtt;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;


@Configuration
@IntegrationComponentScan
@ConditionalOnProperty(value = "spring.mqtt.enable", havingValue = "true")
public class MqttConfig {

    private static final Logger logger = LoggerFactory.getLogger(MqttConfig.class);

    @Value("${spring.mqtt.username}")
    private String username;

    @Value("${spring.mqtt.password}")
    private String password;

    @Value("${spring.mqtt.url}")
    private String hostUrl;

    @Value("${spring.mqtt.producerclientid}")
    private String producerClientId;

    @Value("${spring.mqtt.producertopic}")
    private String producerTopic;

    //生产者和消费者是单独连接服务器会使用到一个clientid(客户端id),如果是同一个clientid的话会出现Lost connection: 已断开连接; retrying...
    @Value("${spring.mqtt.consumerclientid}")
    private String consumerClientId;

    @Value("${spring.mqtt.consumertopic}")
    private String[] consumerTopic;

    @Value("${spring.mqtt.timeout}")
    private int timeout;   //连接超时

    @Value("${spring.mqtt.keepalive}")
    private int keepalive;  //连接超时

    //入站通道名(消费者)订阅的bean名称
    public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
    //出站通道名(生产者)发布的bean名称
    public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";

    /**
     * MQTT连接器选项
     *
     * @return {@link MqttConnectOptions}
     */
    @Bean
    public MqttConnectOptions getMqttConnectOptions() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{hostUrl});
        mqttConnectOptions.setKeepAliveInterval(keepalive);
        return mqttConnectOptions;
    }

    /**
     * MQTT客户端
     *
     * @return {@link org.springframework.integration.mqtt.core.MqttPahoClientFactory}
     */
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }

    /*******************************生产者*******************************************/

    /**
     * MQTT信息通道(生产者)
     *
     * @return {@link MessageChannel}
     */
    @Bean(name = CHANNEL_NAME_OUT)
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    /**
     * MQTT消息处理器(生产者)
     *
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(producerClientId, mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(producerTopic);
        return messageHandler;
    }

    /*******************************消费者*******************************************/

    /**
     * MQTT信息通道(消费者)
     *
     * @return {@link MessageChannel}
     */
    @Bean(name = CHANNEL_NAME_IN)
    public MessageChannel mqttInboundChannel() {
        return new DirectChannel();
    }

    /**
     * MQTT消息订阅绑定(消费者)
     *
     * @return {@link org.springframework.integration.core.MessageProducer}
     */
    @Bean
    public MessageProducer inbound() {
        // 可以同时消费(订阅)多个Topic
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(consumerClientId, mqttClientFactory(), consumerTopic);
        adapter.setCompletionTimeout(timeout);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        // 设置订阅通道
        adapter.setOutputChannel(mqttInboundChannel());
        return adapter;
    }

    /**
     * MQTT消息处理器(消费者)
     *
     * @return {@link MessageHandler}
     */
    @Bean
    @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
    public MessageHandler handler() {
        //方法1
//        return new MessageHandler() {
//            @Override
//            public void handleMessage(Message<?> message) throws MessagingException {
//                String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
//                String msg = message.getPayload().toString();
//                logger.info("接收到订阅消息:\ntopic:" + topic + "\nmessage:" + msg);
//            }
//        };
        //方法2
        return new MqttConsumer();
    }

    //如果我要配置多个client,只要配置多个通道即可
    //通道2
//    @Bean
//    public MessageChannel mqttInputChannelTwo() {
//        return new DirectChannel();
//    }
//    //配置client2,监听的topic:hell2,hello3
//    @Bean
//    public MessageProducer inbound1() {
//        MqttPahoMessageDrivenChannelAdapter adapter =
//                new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory(),
//                        "hello2","hello3");
//        adapter.setCompletionTimeout(timeout);
//        adapter.setConverter(new DefaultPahoMessageConverter());
//        adapter.setQos(1);
//        adapter.setOutputChannel(mqttInputChannelTwo());
//        return adapter;
//    }
//
//    //通过通道2获取数据
//    @Bean
//    @ServiceActivator(inputChannel = "mqttInputChannelTwo")
//    public MessageHandler handlerTwo() {
//        return new MqttConsumer();
//    }
}

MqttConsumer.class
package org.young.common.mqtt;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;

@Component
@ConditionalOnProperty(value = "spring.mqtt.enable", havingValue = "true")
public class MqttConsumer implements MessageHandler {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
        String payload = String.valueOf(message.getPayload());
        logger.info("接收到 mqtt消息,主题:{} 消息:{}", topic, payload);
    }
}
MqttProducer.class
package org.young.common.mqtt;

import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
@Component
@ConditionalOnProperty(value = "spring.mqtt.enable", havingValue = "true")
public interface MqttProducer {

    /**
     * payload或者data是发送消息的内容
     * topic是消息发送的主题,这里可以自己灵活定义,也可以使用默认的主题,就是配置文件的主题,qos是mqtt 对消息处理的几种机制分为0,1,2 其中0表示的是订阅者没收到消息不会再次发送,消息会丢失,1表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息,2相比多了一次去重的动作,确保订阅者收到的消息有一次
     * 当然,这三种模式下的性能肯定也不一样,qos=0是最好的,2是最差的
     */

    void sendToMqtt(String data);

    void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);

    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

使用的Controller

@PostMapping("/send")
    public ResponseEntity send(String topic, String data) {
        this.logger.info("开始发送mqtt消息,主题:{},消息:{}", topic, data);
        if (StringUtils.isNotBlank(topic)) {
            mqttProducer.sendToMqtt(data, topic);
            this.logger.info("发送mqtt消息完成,主题:{},消息:{}", topic, data);
            return ResponseUtil.buildSuccess("execute successful");
        } else {
            return ResponseUtil.buildError("topic is blank");
        }
    }

方法二:使用org.eclipse.paho.client.mqttv3

<dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.0</version>
        </dependency>
#与spring框架内mqtt实现方式不同在于订阅可实时
mqtt:
  enable: false
  url: tcp://127.0.0.1:1883
  username:
  password:
  #MQTT-连接服务器默认客户端ID
  clientid: clientid123123
  #连接超时
  timeout: 5000
  #心跳时间
  keepalive: 2
  # mqtt-topic
  topic: your_tpoic
MqttClientConfig.class
package org.young.common.mqtt.pahomqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;

/**
 * 两种实现方式,eclipse.paho可实时订阅,integration-mqtt在初始化阶段即完成订阅
 * mqtt消息处理配置
 * 属性文件中设置了key的值value:
 * ConditionalOnProperty设置了havingValue:value=havingvalue则匹配,若不等则不匹配
 * ConditionalOnproperty没有设置havingValue:value不等于false则匹配,若为false,则不匹配
 *
 * 属性文件中没有设置key:
 * ConditionalOnProperty中matchingIfMissing:true匹配,false不匹配
 */
@Configuration
@ConditionalOnProperty(value = "mqtt.enable", havingValue = "true")
@ConfigurationProperties(prefix = "mqtt")
public class MqttClientConfig {
    private String url;
    private String clientid;
    private String username;
    private String password;
    private String topic;
    private int timeout;
    private int keepalive;

    private MqttClient client;

    public MqttClient getClient() {
        return this.client;
    }

    public void setClient(MqttClient client) {
        this.client = client;
    }

    public String getClientid() {
        return clientid;
    }

    public void setClientid(String clientid) {
        this.clientid = clientid;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public int getTimeout() {
        return timeout;
    }

    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }

    public int getKeepalive() {
        return keepalive;
    }

    public void setKeepalive(int keepalive) {
        this.keepalive = keepalive;
    }

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }
}
MqttClientHandler.class
package org.young.common.mqtt.pahomqtt;

import lombok.SneakyThrows;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class MqttClientHandler  implements MqttCallback {

    @SneakyThrows
    @Override
    public void connectionLost(Throwable cause)  {
        // 连接丢失后,一般在这里面进行重连
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) {
        // subscribe后得到的消息会执行到这里面
        System.out.println("接收消息主题 : " + topic);
        System.out.println("接收消息Qos : " + message.getQos());
        System.out.println("接收消息内容 : " + new String(message.getPayload()));
    }
}
MqttClientService.class
package org.young.common.mqtt.pahomqtt;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;

@Service
@ConditionalOnProperty(value = "mqtt.enable", havingValue = "true")
public class MqttClientService {

    private static final Logger logger = LoggerFactory.getLogger(MqttClientService.class);

    @Autowired
    MqttClientConfig clientConfig;


    @PostConstruct
    public void init(){
        this.connect(clientConfig.getUrl(), clientConfig.getClientid(), clientConfig.getUsername(), clientConfig.getPassword(), clientConfig.getTimeout(), clientConfig.getKeepalive());
    }


    public void connect(String url, String clientID, String username, String password, int timeout, int keepalive){
        MqttClient client;
        try {
            client = new MqttClient(url, clientID, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(false);
            if (!((username == null) | username.isEmpty())) {
                options.setUserName(username);
            }
            if (!((password == null) | password.isEmpty())) {
                options.setPassword(password.toCharArray());
            }
            options.setConnectionTimeout(timeout);
            options.setKeepAliveInterval(keepalive);
            clientConfig.setClient(client);
            try {
                clientConfig.getClient().setCallback(new MqttClientHandler());
                clientConfig.getClient().connect(options);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void replayConnect () throws InterruptedException {
        logger.warn("断开连接,建议重连");
        //断开连接,重连
        int tryTimes = 1;
        while (!clientConfig.getClient().isConnected()) {
            Thread.sleep(2000);
            logger.info("重试第{}次", tryTimes);
            //即使连接上也要先断开再重新连接
//            client.disconnect();  //不这样就重连会报错
//            这里不能断开连接啊,断了就有问题
//            log.info("重新连接");
            this.connect(clientConfig.getUrl(), clientConfig.getClientid(), clientConfig.getUsername(), clientConfig.getPassword(), clientConfig.getTimeout(), clientConfig.getKeepalive());
            logger.info("连接完成");
            tryTimes++;

        }
    }

    /**
     * 发布,默认qos为0,非持久化
     * @param topic
     * @param pushMessage
     */
    public void publish(String topic,String pushMessage){
        publish(0, false, topic, pushMessage);
    }

    /**
     * 发布
     * @param qos
     * @param retained
     * @param topic
     * @param pushMessage
     */
    public void publish(int qos,boolean retained,String topic,String pushMessage){
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttTopic mTopic = clientConfig.getClient().getTopic(topic);
        if(null == mTopic){
            logger.error("topic not exist");
        }
        MqttDeliveryToken token;
        try {
            token = mTopic.publish(message);
            token.waitForCompletion();
        } catch (MqttPersistenceException e) {
            e.printStackTrace();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 订阅某个主题,qos默认为0
     * @param topic
     */
    public void subscribe(String topic){
        subscribe(topic,0);
    }

    /**
     * 订阅某个主题
     * @param topic
     * @param qos
     */
    public void subscribe(String topic,int qos){
        try {
            clientConfig.getClient().subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

使用Controller

@PostMapping("/send")
    public ResponseEntity send(String topic, String data) {
        this.logger.info("开始发送mqtt消息,主题:{},消息:{}", topic, data);
        if (StringUtils.isNotBlank(topic)) {
            mqttClientService.publish(topic, data);
            mqttClientService.subscribe("wrwerwrewer");
            this.logger.info("发送mqtt消息完成,主题:{},消息:{}", topic, data);
            return ResponseUtil.buildSuccess("execute successful");
        } else {
            return ResponseUtil.buildError("topic is blank");
        }
    }

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐