最近使用了python的MQTT进行小量数据的传输,非常好用,所以记录一下。

1、MQTT

MQTT实际上是一种基于发布/订阅模式的 轻量级物联网消息传输协议 ,可以用极少的代码和带宽为联网设备提供实时可靠的消息服务,它广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等行业。

设想在物联网设备中,很多事微型终端,不会拥有太多的计算资源和带宽,所以MQTT这种轻量级的传输协议非常适合来进行小微数据的传输,

MQTT中的设备有三种角色:Broke、Publish和Subscribe。Broke即代理,其作用是接受任意终端要发送的信息,并转发到所有订阅了该消息的设备;

Publish即发送消息的设备,Subscribe即订阅消息的设备,实际上同一个终端既可以是Publish也可以是Subscribe,因为它既可以发送消息,也可以接受消息。
在这里插入图片描述
另一个重要的概念是订阅,实际上一个MQTT的Broker中可能连接了很多设备,我想在两台设备之间传递,而其他设备不接受该消息怎么办?实际上我可以在终端向Broker发送消息时,定义消息的Topic(主题)比如“test topic”,然后附带本次消息的内容。Broker就会接收到本次消息及其消息的Topic,然后Broker就会看哪些设备订阅了该Topic(设备在连接到Broker时就要指明想要接受的Topic),然后将信息转发给订阅了该Topic的设备。

所以想要在同一Broker下的任意几个设备互相通信,进入要定义一个Topic,大家发送的消息都带上该Topic即可。

2、python中安装MQTT

直接使用pip安装:
pip3 install -i https://pypi.doubanio.com/simple paho-mqtt
关于使用的一个参考链接:mqtt协议------paho-mqtt协议

3、连接到Broker

现在假设我们有两个终端需要信息传输,我们需要将两个终端都连接到同一个Broker中

# python 3.6
import random
import time

from paho.mqtt import client as mqtt_client

# 免费的Broker
broker = 'broker.emqx.io'
port = 1883

client_id = f'python-mqtt-{random.randint(0, 1000)}'	# 设定唯一设备号,不设则mqtt随机生成

# 连接函数
def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
    # flags是一个包含代理回复的标志的字典;
	# rc的值决定了连接成功或者不成功(0为成功)
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)	# 实例化对象
    client.on_connect = on_connect	# 设定回调函数,当Broker响应连接时,就会执行给定的函数
    client.connect(broker, port)	# 连接
    return client

关于loop_start(),根据文档大概意思是接受或发送都会造成网络阻塞,使用loop_start()会创建新线程自动调用loop(),而loop()则是调用在select()中等待发送或接受
在这里插入图片描述
总之在持续发送或者持续接受时,使用loop_start()即可

3、发送函数

设定循环向Topic发送信息,间隔为1s


# 定义发送信息的函数
topic = "/python/mqtt"	# 自定义一个Topic
def publish(client):
    msg_count = 0
    while True:
        time.sleep(1)
        msg = f"messages: {msg_count}"
        result = client.publish(topic, msg)	# 指定信息的tpoic和信息内容,并发送
        # result: [0, 1]
        status = result[0]	# 解析响应内容
        if status == 0:	# 发送成功
            print(f"Send `{msg}` to topic `{topic}`")
        else:	# 发送失败
            print(f"Failed to send message to topic {topic}")
        msg_count += 1

4、发送方(Publish)完整代码

# python 3.6
import random
import time

from paho.mqtt import client as mqtt_client

# 免费的Broker
broker = 'broker.emqx.io'
port = 1883

client_id = f'python-mqtt-{random.randint(0, 1000)}'	# 设定唯一设备号,不设则mqtt随机生成

# 连接函数
def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
    # flags是一个包含代理回复的标志的字典;
	# rc的值决定了连接成功或者不成功(0为成功)
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)	# 实例化对象
    client.on_connect = on_connect	# 设定回调函数,当Broker响应连接时,就会执行给定的函数
    client.connect(broker, port)	# 连接
    return client

# 定义发送信息的函数
topic = "/python/mqtt"	# 自定义一个Topic
def publish(client):
    msg_count = 0
    while True:
        time.sleep(1)
        msg = f"messages: {msg_count}"
        result = client.publish(topic, msg)	# 指定信息的tpoic和信息内容,并发送
        # result: [0, 1]
        status = result[0]	# 解析响应内容
        if status == 0:	# 发送成功
            print(f"Send `{msg}` to topic `{topic}`")
        else:	# 发送失败
            print(f"Failed to send message to topic {topic}")
        msg_count += 1


if __name__ == '__main__':
    client = connect_mqtt()		# 连接
    client.loop_start()	# 新线程loop
    publish(client)	# 发送

5、接受方(Subscribe)完整代码

Subscribe实际上和Publish差不多,只不过把pulish()函数变为subscribe(),并多一个订阅收到消息响应的回调函数,用于处理接收到的消息

# python3.6

import random
from paho.mqtt import client as mqtt_client


broker = 'broker.emqx.io'	# 与Publish一样
port = 1883	# 与Publish一样
topic = "/python/mqtt"  # 准备订阅该Topic
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 100)}'	# 生成一个设备号


# 与小节3一直,都是连接到Borker
def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)
    client = mqtt_client.Client(client_id)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client


# 订阅函数,设定要订阅的Topic,以及设定接受信息后的回调函数
def subscribe(client):
    def on_message(client, userdata, msg):
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")

    client.subscribe(topic)
    client.on_message = on_message

if __name__ == '__main__':
	client = connect_mqtt()	# 连接
    subscribe(client)	# 订阅设置
    # 网络阻塞,不断接受并调用回调函数处理结果,意味着代码会一直卡在这里接受,所以可以使用多线程来使用
    client.loop_forever()	

比如你可以这样:

import threading

# 这个函数负责连接mqtt,并使用loop_forever()卡住持续接受消息
def mqtt_receive():
    client = connect_mqtt()  # 连接
    subscribe(client)	# 订阅设置
    client.loop_forever()


th=threading.Thread(target=mqtt_receive)
th.start()	# 创新新线程

如此你的程序就不会卡住了

6、总结

MQTT使用简单,对于微小数据的传输很方便,但是需要连接到Internet才行,因为需要连接到一个公共Borker,当然你也可以自己创建一个Broker。

对于Publish和Subscribe谁先运行其实无所谓,因为Publish是发送,而Subscribe是在等待谁往订阅的Topic发送信息。

如何在 Python 中使用 MQTT
mqtt协议------paho-mqtt协议 文档

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐