MQTT简要使用教程(python)
MQTT使用简答,对于微小数据的传输很方便,但是需要连接到Internet才行,因为需要连接到一个公共Borker,当然你也可以自己创建一个Broker。对于Publish和Subscribe谁先运行其实无所谓,因为Publish是发送,而Subscribe是在等待谁往订阅的Topic发送信息。如何在Python中使用MQTTmqtt协议------paho-mqtt协议文档httpshttps。
最近使用了python的MQTT进行小量数据的传输,非常好用,所以记录一下。
1、MQTT
MQTT实际上是一种基于发布/订阅模式的 轻量级物联网消息传输协议 ,可以用极少的代码和带宽为联网设备提供实时可靠的消息服务,它广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等行业。
设想在物联网设备中,很多事微型终端,不会拥有太多的计算资源和带宽,所以MQTT这种轻量级的传输协议非常适合来进行小微数据的传输,
MQTT中的设备有三种角色:Broke、Publish和Subscribe。Broke即代理,其作用是接受任意终端要发送的信息,并转发到所有订阅了该消息的设备;
Publish即发送消息的设备,Subscribe即订阅消息的设备,实际上同一个终端既可以是Publish也可以是Subscribe,因为它既可以发送消息,也可以接受消息。
另一个重要的概念是订阅,实际上一个MQTT的Broker中可能连接了很多设备,我想在两台设备之间传递,而其他设备不接受该消息怎么办?实际上我可以在终端向Broker发送消息时,定义消息的Topic(主题)比如“test topic”,然后附带本次消息的内容。Broker就会接收到本次消息及其消息的Topic,然后Broker就会看哪些设备订阅了该Topic(设备在连接到Broker时就要指明想要接受的Topic),然后将信息转发给订阅了该Topic的设备。
所以想要在同一Broker下的任意几个设备互相通信,进入要定义一个Topic,大家发送的消息都带上该Topic即可。
2、python中安装MQTT
直接使用pip安装:
pip3 install -i https://pypi.doubanio.com/simple paho-mqtt
关于使用的一个参考链接:mqtt协议------paho-mqtt协议
3、连接到Broker
现在假设我们有两个终端需要信息传输,我们需要将两个终端都连接到同一个Broker中
# python 3.6
import random
import time
from paho.mqtt import client as mqtt_client
# 免费的Broker
broker = 'broker.emqx.io'
port = 1883
client_id = f'python-mqtt-{random.randint(0, 1000)}' # 设定唯一设备号,不设则mqtt随机生成
# 连接函数
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
# flags是一个包含代理回复的标志的字典;
# rc的值决定了连接成功或者不成功(0为成功)
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id) # 实例化对象
client.on_connect = on_connect # 设定回调函数,当Broker响应连接时,就会执行给定的函数
client.connect(broker, port) # 连接
return client
关于loop_start()
,根据文档大概意思是接受或发送都会造成网络阻塞,使用loop_start()会创建新线程自动调用loop(),而loop()则是调用在select()中等待发送或接受
总之在持续发送或者持续接受时,使用loop_start()
即可
3、发送函数
设定循环向Topic发送信息,间隔为1s
# 定义发送信息的函数
topic = "/python/mqtt" # 自定义一个Topic
def publish(client):
msg_count = 0
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg) # 指定信息的tpoic和信息内容,并发送
# result: [0, 1]
status = result[0] # 解析响应内容
if status == 0: # 发送成功
print(f"Send `{msg}` to topic `{topic}`")
else: # 发送失败
print(f"Failed to send message to topic {topic}")
msg_count += 1
4、发送方(Publish)完整代码
# python 3.6
import random
import time
from paho.mqtt import client as mqtt_client
# 免费的Broker
broker = 'broker.emqx.io'
port = 1883
client_id = f'python-mqtt-{random.randint(0, 1000)}' # 设定唯一设备号,不设则mqtt随机生成
# 连接函数
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
# flags是一个包含代理回复的标志的字典;
# rc的值决定了连接成功或者不成功(0为成功)
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id) # 实例化对象
client.on_connect = on_connect # 设定回调函数,当Broker响应连接时,就会执行给定的函数
client.connect(broker, port) # 连接
return client
# 定义发送信息的函数
topic = "/python/mqtt" # 自定义一个Topic
def publish(client):
msg_count = 0
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg) # 指定信息的tpoic和信息内容,并发送
# result: [0, 1]
status = result[0] # 解析响应内容
if status == 0: # 发送成功
print(f"Send `{msg}` to topic `{topic}`")
else: # 发送失败
print(f"Failed to send message to topic {topic}")
msg_count += 1
if __name__ == '__main__':
client = connect_mqtt() # 连接
client.loop_start() # 新线程loop
publish(client) # 发送
5、接受方(Subscribe)完整代码
Subscribe实际上和Publish差不多,只不过把pulish()函数变为subscribe(),并多一个订阅收到消息响应的回调函数,用于处理接收到的消息
# python3.6
import random
from paho.mqtt import client as mqtt_client
broker = 'broker.emqx.io' # 与Publish一样
port = 1883 # 与Publish一样
topic = "/python/mqtt" # 准备订阅该Topic
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 100)}' # 生成一个设备号
# 与小节3一直,都是连接到Borker
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
# 订阅函数,设定要订阅的Topic,以及设定接受信息后的回调函数
def subscribe(client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.subscribe(topic)
client.on_message = on_message
if __name__ == '__main__':
client = connect_mqtt() # 连接
subscribe(client) # 订阅设置
# 网络阻塞,不断接受并调用回调函数处理结果,意味着代码会一直卡在这里接受,所以可以使用多线程来使用
client.loop_forever()
比如你可以这样:
import threading
# 这个函数负责连接mqtt,并使用loop_forever()卡住持续接受消息
def mqtt_receive():
client = connect_mqtt() # 连接
subscribe(client) # 订阅设置
client.loop_forever()
th=threading.Thread(target=mqtt_receive)
th.start() # 创新新线程
如此你的程序就不会卡住了
6、总结
MQTT使用简单,对于微小数据的传输很方便,但是需要连接到Internet才行,因为需要连接到一个公共Borker,当然你也可以自己创建一个Broker。
对于Publish和Subscribe谁先运行其实无所谓,因为Publish是发送,而Subscribe是在等待谁往订阅的Topic发送信息。
更多推荐
所有评论(0)