Python 是一種跨平臺的計算機(jī)程序設(shè)計語言,是ABC 語言的替代品,屬于面向?qū)ο蟮膭討B(tài)類型語言。它最初被設(shè)計用于編寫自動化腳本,隨著版本的不斷更新和語言新功能的添加,越來越多被用于獨(dú)立的、大型項目的開發(fā)。
MQTT 是一個物聯(lián)網(wǎng)傳輸協(xié)議,用于輕量級的發(fā)布/訂閱式消息傳輸,旨在為低帶寬和不穩(wěn)定的網(wǎng)絡(luò)環(huán)境中的物聯(lián)網(wǎng)設(shè)備提供可靠的網(wǎng)絡(luò)服務(wù)。其輕量、簡單、開放和易于實現(xiàn)等特點(diǎn),使得它適用范圍更加廣泛。
本文主要介紹如何在 Python 項目中使用paho-mqtt客戶端庫 ,實現(xiàn)客戶端與MQTT服務(wù)器的連接、訂閱、取消訂閱、收發(fā)消息等功能。
一、項目準(zhǔn)備
本項目使用 Python 3.10進(jìn)行開發(fā)測試。
用戶可用以下命令來確認(rèn) Python的版本:
python3 --version
Python 3.10.9
測試設(shè)備:
瑞科慧聯(lián)(RAK)網(wǎng)關(guān)RAK7268 V2、帶溫濕度傳感器的數(shù)據(jù)采集器Sensor Hub
二、選擇 MQTT 客戶端庫
paho-mqtt是目前 Python 中使用較多的 MQTT 客戶端庫。它為 Python 2.7 或 3.x 版本以上的客戶端類提供了對 MQTT v3.1 和 v3.1.1 的支持,還提供了一些幫助程序功能。這使得消息發(fā)布到 MQTT 服務(wù)器變得更簡單。
三、Pip 安裝 Paho MQTT 客戶端
Pip 是 Python 包管理工具。該工具提供了對 Python 包的查找、下載、安裝、卸載的功能。
pip3install paho.mqtt
四、Python MQTT 使用
1、連接 MQTT 服務(wù)器
本文將使用瑞科慧聯(lián)LoRaWAN?網(wǎng)關(guān)提供的內(nèi)置 MQTT服務(wù),該服務(wù)基于 Mosquitto的開源消息代理。服務(wù)器接入信息如下:
- Broker:192.168.230.1
- TCP Port:1883
2、導(dǎo)入 Paho MQTT客戶端
from paho.mqtt import client as mqtt
3、設(shè)置 MQTT Broker 連接參數(shù)
設(shè)置 MQTT Broker 連接地址,端口以及 topic,同時調(diào)用 Pythonrandom.randint函數(shù)隨機(jī)生成 MQTT 客戶端 id。
MQTT_SERVER_IP ="192.168.230.1"
MQTT_PORT =1883
4、編寫 MQTT 連接函數(shù)
編寫連接回調(diào)函數(shù) on_connect,該函數(shù)將在客戶端連接后會被調(diào)用。在該函數(shù)中可以依據(jù)rc來判斷客戶端是否連接成功。同時可創(chuàng)建一個 MQTT 客戶端連接到broker.emqx.io。
defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):
"""連接MQTT服務(wù)器"""
client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))
mqttClient=mqtt.Client(client_id)
mqttClient.on_connect=on_connect # 返回連接狀態(tài)的回調(diào)函數(shù)
mqttClient.on_message=on_message # 返回訂閱消息回調(diào)函數(shù)
MQTT_HOST=MQTT_SERVER_IP # MQTT服務(wù)器地址
# MQTT_PORT = MQTT_PORT # MQTT端口
mqttClient.username_pw_set("username","password") # mqtt服務(wù)器賬號密碼
mqttClient.connect(MQTT_HOST,MQTT_PORT,60)
mqttClient.loop_start() # 啟用線程連接
returnmqttClient
5、發(fā)布消息
定義一個 while 循環(huán)語句,在循環(huán)中設(shè)置每秒調(diào)用 MQTT 客戶端publish函數(shù)向/python/mqtt主題發(fā)送消息。
ddefon_publish():
# 發(fā)布消息
msg_count=0
whileTrue:
time.sleep(1)
mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)
topic='application/1/device/0000000000000444/tx'# 發(fā)布的主題,訂閱時需要使用這個主題才能訂閱此消息
msg='{"confirmed": true,"data": "SGVsbG8=","fPort": 10}'
result=mqttClient.publish(topic,msg)
status=result[0]
ifstatus==0:
print('第{}條消息發(fā)送成功'.format(msg_count))
else:
print('第{}條消息發(fā)送失敗'.format(msg_count))
msg_count+=1
6、訂閱消息
編寫消息回調(diào)函數(shù) on_message,函數(shù)將在客戶端從 MQTT Broker 收到消息后被調(diào)用,并打印出訂閱的 topic 名稱以及接收到的消息內(nèi)容。
defon_subscribe():
"""訂閱主題:mqtt/demo"""
mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)
whileTrue:
mqttClient.subscribe("application/#",2)
time.sleep(1)
7、完整代碼
消息訂閱代碼
#!/usr/bin/python
frompaho.mqttimportclientasmqtt
importtime
importjson
# from settings import *
importbase64
"""
網(wǎng)關(guān)通過mqtt發(fā)出數(shù)據(jù)
json - ok
probuf - no
"""
MQTT_SERVER_IP="192.168.230.1"
MQTT_PORT=1883
defon_connect(client,userdata,flags,rc):
"""一旦連接成功, 回調(diào)此方法"""
rc_status= ["連接成功","協(xié)議版本錯誤","無效的客戶端標(biāo)識","服務(wù)器無法使用","用戶名或密碼錯誤","無授權(quán)"]
print("connect:",rc_status[rc])
defon_message(client,userdata,msg):
"""一旦訂閱到消息, 回調(diào)此方法"""
print("主題"+msg.topic +" 消息"+str(msg.payload.decode('gbk')))
print("主題"+msg.topic +" 消息"+str(msg.payload.decode()))
try:
temp=json.loads(msg.payload.decode())
# client.disconnect()
deveui=temp['devEUI']
print("devEUI: ",deveui)
data=temp['data']
print("解碼前的data為: ",data)
data_decode=base64.b64decode(data).hex()
print("解碼后的data為: ",data_decode)
str1=data_decode[4:]
ifstr1[0:4]=="0167":
a=int(str1[4:8],16)*0.1
print("溫度:",a,"℃")
ifstr1[8:12]=="0268":
b=int(str1[12:16],16)
print("濕度:",b,"%RH")
elifstr1[0:4]=="0268":
c=int(str1[4:8],16)
print("濕度:",c,"%RH")
exceptExceptionase:
print(e)
defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):
"""連接MQTT服務(wù)器"""
client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))
mqttClient=mqtt.Client(client_id)
mqttClient.on_connect=on_connect # 返回連接狀態(tài)的回調(diào)函數(shù)
mqttClient.on_message=on_message # 返回訂閱消息回調(diào)函數(shù)
MQTT_HOST=MQTT_SERVER_IP # MQTT服務(wù)器地址
# MQTT_PORT = MQTT_PORT # MQTT端口
mqttClient.username_pw_set("username","password") # mqtt服務(wù)器賬號密碼
mqttClient.connect(MQTT_HOST,MQTT_PORT,60)
mqttClient.loop_start() # 啟用線程連接
returnmqttClient
defon_subscribe():
"""訂閱主題:mqtt/demo"""
mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)
whileTrue:
mqttClient.subscribe("application/#",2)
# allure.attach("gateway/" + GATEWAY_EUI + "/event/up", name="topic")
# mqttClient.subscribe("gateway/ac1f09fffe08f099/event/up", 2)
time.sleep(1)
if__name__=='__main__':
on_subscribe()
消息發(fā)布代碼
#!/usr/bin/python
frompaho.mqttimportclientasmqtt
importtime
importjson
# from settings import *
importbase64
"""
網(wǎng)關(guān)通過mqtt發(fā)出數(shù)據(jù)
json - ok
probuf - no
"""
MQTT_SERVER_IP="192.168.230.1"
MQTT_PORT=1883
defon_connect(client,userdata,flags,rc):
"""一旦連接成功, 回調(diào)此方法"""
rc_status= ["連接成功","協(xié)議版本錯誤","無效的客戶端標(biāo)識","服務(wù)器無法使用","用戶名或密碼錯誤","無授權(quán)"]
print("connect:",rc_status[rc])
defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):
"""連接MQTT服務(wù)器"""
client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))
mqttClient=mqtt.Client(client_id)
mqttClient.on_connect=on_connect # 返回連接狀態(tài)的回調(diào)函數(shù)
MQTT_HOST=MQTT_SERVER_IP # MQTT服務(wù)器地址
# MQTT_PORT = MQTT_PORT # MQTT端口
mqttClient.username_pw_set("username","password") # mqtt服務(wù)器賬號密碼
mqttClient.connect(MQTT_HOST,MQTT_PORT,60)
mqttClient.loop_start() # 啟用線程連接
returnmqttClient
defon_publish():
# 發(fā)布消息
msg_count=0
whileTrue:
time.sleep(1)
mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)
topic='application/x/device/x/tx'# 發(fā)布的主題,訂閱時需要使用這個主題才能訂閱此消息
msg='{"confirmed": true,"data": "SGVsbG8=","fPort": 10}'#需要發(fā)布的消息內(nèi)容
result=mqttClient.publish(topic,msg)
status=result[0]
ifstatus==0:
print('第{}條消息發(fā)送成功'.format(msg_count))
else:
print('第{}條消息發(fā)送失敗'.format(msg_count))
msg_count+=1
if__name__=='__main__':
on_publish()
測試
消息發(fā)布
運(yùn)行 MQTT消息發(fā)布代碼,將看到客戶端連接成功,并且成功將消息發(fā)布。
消息訂閱
通過瑞科慧聯(lián)帶溫濕度傳感器的 Sensor hub進(jìn)行數(shù)據(jù)傳輸,訂閱并解析數(shù)據(jù)結(jié)果如下:
五、總結(jié)
至此,我們完成了使用paho-mqtt客戶端連接到LoRaWAN?網(wǎng)關(guān)內(nèi)置 MQTT服務(wù)器,并實現(xiàn)了測試客戶端與 MQTT 服務(wù)器的連接、消息發(fā)布和訂閱并解析。
與 C ++ 或 Java 之類的高級語言不同,Python 比較適合設(shè)備側(cè)的業(yè)務(wù)邏輯實現(xiàn)。使用 Python 可以減少代碼上的邏輯復(fù)雜度,降低與設(shè)備的交互成本。未來,我們相信在物聯(lián)網(wǎng)領(lǐng)域 Python 將會有更廣泛的應(yīng)用!
-
物聯(lián)網(wǎng)
+關(guān)注
關(guān)注
2909文章
44557瀏覽量
372754 -
python
+關(guān)注
關(guān)注
56文章
4792瀏覽量
84627 -
MQTT
+關(guān)注
關(guān)注
5文章
650瀏覽量
22487
發(fā)布評論請先 登錄
相關(guān)推薦
評論