又到了显摆分享技术的时候了

这篇文章基于前两篇文章之上,使用物联网行业开源的MQTT服务器接收数据,使Web行业热门的Springboot框架项目订阅与发布与数据入库与展示

如果对数据上传不是很了解的可以看我之前的文章

Stm32f103c8t6+ESP8266-01s+DHT11 实现向服务器上传温湿度数据

Springboot+STM32+ESP8266 使用HTTP的GET与POST发送请求向Springboot项目上传数据并展示

这篇文章的流程为:

1.搭建开源的mqtt服务器

2.创建Springboot项目

3.Springboot中整合MQTT

4.使用Springboot订阅与发布数据

5.Springboot将订阅的数据入库

6.开发实时订阅/发布展示页面

我会分为两篇文章分享,当然如果两篇放不下就再来一篇

第二篇也写好了可以实现整个搭建过程

[Java]SpringBoot2整合mqtt服务器EMQ实现消息订阅发布入库(二)

目录

一.开源物联网服务器EMQ

为什么用MQTT

搭建mqtt服务

EMQ管理页面

小结

二.搭建Springboot2+mqtt服务工程

搭建mqtt环境

实现mqtt功能代码

小结

三.总结


一.开源物联网服务器EMQ

为什么用MQTT

提到物联网能想到的就是万物互联上云,而所谓的上云就是数据上传到网络服务器,上传多使用的是MQTT协议

刚刚接触ESP8266模块的时候首先知道的数据发送协议为TCP UDP,而这两个协议都是基于HTTP协议基础上使用的

而MQTT也是基于HTTP协议的TCP之上诞生的

那为什么物联网行业都推荐使用Mqtt来发送数据呢

我的理解两个方面一个是省流量省带宽,一个是心跳包

省流量省带宽:因为数据报文头字节数小,协议的字节小,由于一条数据的字节数非常少,发送所需的流量少,有些移动设备只需要2G网络就可以上传数据

心跳包: 物联网设备需要时刻知道设备是否在线,如果设备离线没有及时反馈会影响小则影响数据上传,大则影响设备生产,通过发送小巧的心跳包让服务器知道设备是否在线

其实上面都是都是废话用来占文章长度的,真正的干货在下面

搭建mqtt服务

直接使用开源的Mqtt搭建方案 官网地址: EMQ 开源mqtt服务器搭建方案官网

这页面,这样式,这感觉,我就做不出来

直接点击免费试用 会看到有三个版本,作为勤俭节约的好青年,我选择了 EMQ X Broker 

有财力的或者为解决企业级问题的可以选择试用EMQ X Enterprise 并且直接关掉这篇文章了,我就是为了用免费也能将数据入库才写的文章

我这里对各版本的差别描述的比价笼统具体的差异可以看官方文档 : 具体不同版本差异可见文档

没问题了就下一步

我们选择第一个 EMQ X Broker 

 下面就会让你选择要下载的版本,使用的什么平台以及平台版本什么的,下载也是直接在服务器上直接执行写好的指令,等待下载完成,按照安装运行的步骤走完,mqtt就搭建好并启动了

EMQ管理页面

启动后在浏览器输入你的IP:18083

如172.0.0.1:18038,就会跳转到mqtt的管理页面,初始账号admin 密码public

登录成功后会看到

非常酷炫是吧,老酷炫了,我都看着累眼睛,而且都是英文这是啥啊(地铁老头手机),不是我英语不好我是为了大众考虑,我们需要修改一下

小结

Mqtt已经搭建起来了,如有物联网设备都可用通过ip:1883来访问平台

如172.0.0.1:1883向平台发送指定主题的数据,也可以通过平台订阅指定的主题

具体EMQ平台上左侧菜单都有什么功能可以自行百度,如我有时间(除非我真闲得慌)也会整理一下

因为这个版本功能很少,多数功能都是通过Springboot来实现

二.搭建Springboot2+mqtt服务工程

由于考虑有的开发人员是用eclipse有的使用idea所以这里

是不是以为我要把两种编译器创建Springboot的方法写出来,是不是让我猜中了,恭喜你这段我不写

搭建mqtt环境

在pom.xml里引入mqtt包

        <!--MQTT使用包-->
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
        </dependency>

在application.yml里添加mqtt配置参数

spring:
  mqtt:
    broker: tcp://{这里写EMQ部署的地址}:1883
    clientId: 123456 #客户端的id
    username: 登录账号
    password: 登录密码
    timeout: 2000
    KeepAlive: 20
    topics: ServerId_02 #主题
    qos: 1 #心跳包级别

实现mqtt功能代码

创建一个类读取application.yml中的配置

这里说一下我这里用了@Getter与@Setter注解会自动创建getset方法,用不了就用idea自带的方法创建getset方法

@Getter
@Setter
@Component
@Configuration
public class MqttConfiguration {

    @Value("${spring.mqtt.broker}")
    private String host;

    @Value("${spring.mqtt.clientId}")
    private String clientId;

    @Value("${spring.mqtt.username}")
    private String username;

    @Value("${spring.mqtt.password}")
    private String password;

    @Value("${spring.mqtt.timeout}")
    private int timeout;

    @Value("${spring.mqtt.KeepAlive}")
    private int KeepAlive;

    @Value("${spring.mqtt.topics}")
    private String topics;

    @Value("${spring.mqtt.qos}")
    private int qos;
    
}

在启动类Application添加mqtt初始化方法

@SpringBootApplication
public class Application implements ApplicationRunner{

    //读取mqtt配置
    @Autowired
    private MqttConfiguration mqttConfiguration;


    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
}

    /**
     * mqtt 初始化
     * @param args
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        if(true){
            if (log.isInfoEnabled()){
                log.info("===============>>>Mqtt is run starting:<<==================");
            }
            MqttPushClient mqttPushClient = new MqttPushClient();
            mqttPushClient.connect(mqttConfiguration);
        }
    }


}

Mqttbeanbak类

@Component
public class Mqttbeanbak {
    @Autowired
    private MqttConfiguration mqttConfiguration;
    @Bean("mqttPushClient")
    public MqttPushClient getMqttPushClient() {
        MqttPushClient mqttPushClient = new MqttPushClient();
        return mqttPushClient;
    }
}

 MqttPushClient类

@Slf4j
public class MqttPushClient  {
    private static MqttClient client;

    public static MqttClient getClient() {
        return client;
    }

    public static void setClient(MqttClient client) {
        MqttPushClient.client = client;
    }




    /**
     * 编辑连接信息
     * @param userName
     * @param password
     * @param outTime
     * @param KeepAlive
     * @return
     */
    private MqttConnectOptions getOption(String userName, String password, int outTime, int KeepAlive) {
        //MQTT连接设置
        MqttConnectOptions option = new MqttConnectOptions();
        //设置是否清空session,false表示服务器会保留客户端的连接记录,true表示每次连接到服务器都以新的身份连接
        option.setCleanSession(false);
        //设置连接的用户名
        option.setUserName(userName);
        //设置连接的密码
        option.setPassword(password.toCharArray());
        //设置超时时间 单位为秒
        option.setConnectionTimeout(outTime);
        //设置会话心跳时间 单位为秒 服务器会每隔(1.5*keepTime)秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
        option.setKeepAliveInterval(KeepAlive);
        //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
        //option.setWill(topic, "close".getBytes(StandardCharsets.UTF_8), 2, true);
        option.setMaxInflight(1000);
        return option;
    }

    /**
     * 发起连接
     */
    public void connect(MqttConfiguration mqttConfiguration) {
        MqttClient client;
        try {
            client = new MqttClient(mqttConfiguration.getHost(), mqttConfiguration.getClientId(), new MemoryPersistence());
            MqttConnectOptions options = getOption(mqttConfiguration.getUsername(), mqttConfiguration.getPassword(),
                    mqttConfiguration.getTimeout(), mqttConfiguration.getKeepAlive());
            MqttPushClient.setClient(client);
            try {
                client.setCallback(new PushCallback(this, mqttConfiguration));
                if (!client.isConnected()) {
                    client.connect(options);
                    log.info("================>>>MQTT连接成功<<======================");

                } else {//这里的逻辑是如果连接不成功就重新连接
                    client.disconnect();
                    client.connect(options);
                    log.info("===================>>>MQTT断连成功<<<======================");
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * 断线重连
     *
     * @throws Exception
     */
    public Boolean reConnect() throws Exception {
        Boolean isConnected = false;
        if (null != client) {
            client.connect();
            if (client.isConnected()) {
                isConnected = true;
            }
        }
        return isConnected;
    }
    /**
     * 发布,默认qos为0,非持久化
     *
     * @param topic
     * @param pushMessage
     */
    public void publish(String topic, String pushMessage) {

        publish(0, false, topic, pushMessage);

    }
    /**
     * 发布
     *
     * @param qos
     * @param retained
     * @param topic
     * @param pushMessage
     */
    public void publish(int qos, boolean retained, String topic, String pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        try {
            message.setPayload(pushMessage.getBytes("UTF-8"));
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
        if (null == mTopic) {
            log.error("===============>>>MQTT topic 不存在<<=======================");
        }
        MqttDeliveryToken token;
        try {
            token = mTopic.publish(message);
            token.waitForCompletion();
        } catch (MqttPersistenceException e) {
            e.printStackTrace();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    /**
     * 发布消息的服务质量(推荐为:2-确保消息到达一次。0-至多一次到达;1-至少一次到达,可能重复),
     * retained 默认:false-非持久化(是指一条消息消费完,就会被删除;持久化,消费完,还会保存在服务器中,当新的订阅者出现,继续给新订阅者消费)
     *
     * @param topic
     * @param pushMessage
     */
    public void publish(int qos, String topic, String pushMessage) {

        publish(qos, false, topic, pushMessage);

    }

    /**
     *
     * 订阅多个主题
     *
     * @param topic
     * @param qos
     */
    public void subscribe(String[] topic, int[] qos) {
        try {
            MqttPushClient.getClient().unsubscribe(topic);
            MqttPushClient.getClient().subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 清空主题
     * @param topic
     */
    public void cleanTopic(String topic) {
        try {
            MqttPushClient.getClient().unsubscribe(topic);
        } catch (MqttException e) {
            log.error(e.getMessage());
            e.printStackTrace();
        }
    }




}

MqttSender类

@Component(value = "mqttSender")
@Slf4j
public class MqttSender
{

    @Async
    public void send(String queueName, String msg) {
        log.info("=====================>>>>发送主题"+queueName);
        publish(2,queueName, msg);
    }


    /**
     * 发布,默认qos为0,非持久化
     * @param topic
     * @param pushMessage
     */
    public void publish(String topic,String pushMessage){

        publish(1, false, topic, pushMessage);

    }

    /**
     * 发布
     * @param qos
     * @param retained
     * @param topic
     * @param pushMessage
     */
    public void publish(int qos,boolean retained,String topic,String pushMessage){
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        try {
            message.setPayload(pushMessage.getBytes("UTF-8"));
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
        if(null == mTopic){
            log.error("===================>>>MQTT topic 不存在<<=================");
        }
        MqttDeliveryToken token;
        try {
            token = mTopic.publish(message);
            token.waitForCompletion();
        } catch (MqttPersistenceException e) {
            log.error("============>>>publish fail",e);
            e.printStackTrace();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 发布消息的服务质量(推荐为:2-确保消息到达一次。0-至多一次到达;1-至少一次到达,可能重复),
     * retained 默认:false-非持久化(是指一条消息消费完,就会被删除;持久化,消费完,还会保存在服务器中,当新的订阅者出现,继续给新订阅者消费)
     * @param topic
     * @param pushMessage
     */
    public void publish(int qos, String topic, String pushMessage){
        publish(qos, false, topic, pushMessage);
    }
}

PushCallback类

这个类要多留意,因为订阅的数据会进入messageArrived方法,如果想将数据处理并保存需要在这个类里操作

由于该类不能使用@Autowired调用数据处理数据入库方法,所以需要特殊处理

下一篇文章会特别详细的讲解

@Slf4j
@Component
public class PushCallback implements MqttCallback {


    private MqttPushClient client;
    private MqttConfiguration mqttConfiguration;


    public PushCallback(MqttPushClient client ,MqttConfiguration mqttConfiguration) {
        this.client = client;
        this.mqttConfiguration = mqttConfiguration;
    }

    @Override
    public void connectionLost(Throwable cause) {
        /** 连接丢失后,一般在这里面进行重连 **/
        if(client != null) {
            while (true) {
                try {
                    log.info("==============》》》[MQTT] 连接断开,5S之后尝试重连...");
                    Thread.sleep(5000);
                    MqttPushClient mqttPushClient = new MqttPushClient();
                    mqttPushClient.connect(mqttConfiguration);
                    if(MqttPushClient.getClient().isConnected()){
                        log.info("=============>>重连成功");
                    }
                    break;
                } catch (Exception e) {
                    log.error("=============>>>[MQTT] 连接断开,重连失败!<<=============");
                    continue;
                }
            }
        }
        log.info(cause.getMessage());
    }
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        //publish后会执行到这里
        log.info("publish后会执行到这里");
        log.info("pushComplete==============>>>" + token.isComplete());
    }


    /**
     * 监听对应的主题消息
     * @param topic
     * @param message
     * @throws Exception
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe后得到的消息会执行到这里面
        String Payload = new String(message.getPayload());

        log.info("============》》接收消息主题 : " + topic);
        log.info("============》》接收消息Qos : " + message.getQos());
        log.info("============》》接收消息内容 : " + Payload);
        log.info("============》》接收ID : " + message.getId());
        log.info("接收数据结束 下面可以执行数据处理操作");

    }
}

MqttController类

@Controller
@Slf4j
@ResponseBody
@RequestMapping("/mqtt")
public class MqttController {

    //发送逻辑
    @Autowired
    private MqttSender mqttSender;
    
    //订阅逻辑
    @Autowired
    private MqttPushClient mqttPushClient;

    @RequestMapping("/sendmqtt")
    public String sendmqtt(){
        String TOPIC1="testtest1";
        String JSON = "{'gender':'man','hobby':'girl'}";
        log.info("    本机主题:"+TOPIC1+" 发送数据为:"+JSONObject.toJSONString(JSON));
        mqttSender.send(TOPIC1, JSON);
        log.info("     发送结束");
        return "发送结束";
    }

    @RequestMapping("/subscriptionmqtt")
    public String subscriptionmqtt(){
        int Qos=1;
        String TOPIC1="testtest1";
        String[] topics={TOPIC1};
        int[] qos={Qos};
        mqttPushClient.subscribe(topics,qos);
        return "订阅主题";
    }
}

整合的mqtt的代码就是这些,下面是测试是否可以与mqtt服务器通信

在没有订阅时直接发送数据

发送数据访问  http://127.0.0.1:8080/mqtt/sendmqtt 

现在订阅该主题并在次访问 http://127.0.0.1:8085/mqtt/subscriptionmqtt 

再次发送数据就会看到由于订阅了该主题,我们向该主题发送的数据就会被订阅,就会接收到发送的 

小结

可以看到订阅方法已经可以读到发送的数据了,

有人可能想到在接收方法中使用@Autowired注解注入数据入库方法,这样数据就可以存入数据库了,

那么恭喜你你会看到你的mqtt连接会不断的短线重连并且报空指针

你会发现使用@Autowired注入的方法是空的,这就是下一篇文章要讲的内容了

三.总结

到目前为止简单的EMQ服务部署以及Springboot的mqtt环境已经搭建好了,可以实现简单的数据发送主题订阅

也算是回归我的本行进行开发,从硬件单片机到软件程序开发,有趣非常有趣

我也希望能与志同道合的人相互交流分享技术,最好能交个女朋友,本人已经有女朋友了(2021年11月)

哎,可能写博客是我为数不多的可以展示自我的方式了

想要交流技术的可以直接私信我,我脾气很好的(点头)

下一篇 解决注入方法报空指针并将订阅接到的数据入库 , 再开发一个实时的可发送/订阅的工具页面

第二篇也写好了可以实现整个搭建过程

[Java]SpringBoot2整合mqtt服务器EMQ实现消息订阅发布入库(二)

先把页面发出来显摆展示一下

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐