比較厲害的網(wǎng)站制作公司重慶seo整站優(yōu)化外包服務(wù)
前言
上一章我們用開發(fā)板通過SNTP協(xié)議獲取網(wǎng)絡(luò)協(xié)議,本章我們介紹一下開發(fā)板通過配置MQTT連接到服務(wù)器上,并且訂閱和發(fā)布消息。
什么是MQTT?
MQTT是一種輕量級的消息傳輸協(xié)議,旨在物聯(lián)網(wǎng)(IoT)應(yīng)用中實現(xiàn)設(shè)備間的可靠通信。它使用發(fā)布-訂閱模式,其中包括一個MQTT服務(wù)端(代理或服務(wù)器)和多個MQTT客戶端之間的通信。MQTT協(xié)議具有以下特點:
- 輕量級:MQTT協(xié)議設(shè)計簡單,協(xié)議頭部開銷小,適用于資源受限的設(shè)備和網(wǎng)絡(luò)。
- 低帶寬消耗:MQTT采用二進制編碼,有效地利用網(wǎng)絡(luò)帶寬。
- 異步通信:客戶端可以隨時發(fā)布和訂閱消息,無需等待對方的響應(yīng)。
- 發(fā)布-訂閱模式:消息發(fā)布者將消息發(fā)布到特定的主題,而訂閱者則訂閱感興趣的主題。這種模式支持松耦合的通信和靈活的消息傳遞。
報文介紹
報文格式
MQTT控制報文由三部分組成,分別是固定報頭,可變報頭,有效載荷。

固定報頭

固定報頭最少由兩個字節(jié)組成,第一個字節(jié)的7-4位為協(xié)議類型,3-0位為標(biāo)志位,從第二個字節(jié)開始為剩余長度(包括可變報頭和有效載荷的長度)
協(xié)議類型具體定義可參考下表:


標(biāo)志位可以參考下表:

其中:
DUP1 = 控制報文的重復(fù)分發(fā)標(biāo)志
QoS2 = PUBLISH 報文的服務(wù)質(zhì)量等級
RETAIN3 = PUBLISH 報文的保留標(biāo)志
協(xié)議類型示例如下表:

剩余長度字段最多四個字節(jié),最少一個字節(jié),具體長度如下表所示:

其中,每個字節(jié)的6-0位用于編碼數(shù)據(jù),第7位是標(biāo)志位,為1則表示下一個字節(jié)也是剩余長度字段。
可變報頭
某些控制報文包含可變報頭,它在固定報頭(Fixed header)和有效載荷(Payload)之間。每個協(xié)議的可變報頭都不一樣。
其中大多數(shù)協(xié)議都會有的字段是報文標(biāo)識符。
可變報頭在各個控制報文的詳細內(nèi)容中再展開講解。
有效載荷
有效載荷是除控制報文格式以外的有效信息,CONNECT、PUBLISH、SUBSCRIBE等需要傳遞有效信息的協(xié)議幀都需要。
實例講解
MQTT報文的具體格式可以參考文檔:MQTT Version 3.1.1 (oasis-open.org)

連接MQTT服務(wù)器(客戶端->服務(wù)器)
(以下皆為HEX格式)
//固定報頭
10 21(剩余33個字節(jié))
//可變報頭
00 04 4D 51 54 54 04 C2 00 3C
//clientid,長度8字節(jié),文本內(nèi)容為clientid
00 08 63 6C 69 65 6E 74 69 64
//用戶名,長度4字節(jié),文本內(nèi)容為MQTT
00 04 4D 51 54 54
//密碼,長度5字節(jié),文本內(nèi)容為w5500
00 05 77 35 35 30 30
確認(rèn)連接(服務(wù)器->客戶端)
//連接成功,會話為新會話
20 02 00 00
訂閱主題(客戶端->服務(wù)器)
//固定報頭,剩余長度10字節(jié)
82 0A
//可變報頭
00 01
//有效載荷(長度5字節(jié),內(nèi)容為topic,qos為0)
00 05 74 6F 70 39 63 00
確認(rèn)訂閱(服務(wù)器->客戶端)
//固定報頭,剩余長度3字節(jié)
90 03
//可變報頭
00 01
//有效載荷,回復(fù)訂閱qos為0
00
發(fā)布消息(qos0)
//固定報頭,qos0消息,非重傳,非保留,剩余長度10字節(jié)
30 14
//可變報頭,5個字節(jié)的主題“topic”
00 05 74 6F 70 69 63
//有效載荷“message”
6D 65 73 73 61 67 65
連接方式
開發(fā)板和主機都接在路由器LAN口
連接MQTTX服務(wù)器測試
相關(guān)代碼
我們打開例程中的mqtt_client.c文件,首先可以看到,我們定義了MQTT協(xié)議的收發(fā)報文緩存和MQTT所使用的socket號
#define MQTT_SEND_BUFF_SIZE 2048 // MQTT協(xié)議發(fā)送報文緩存大小
#define MQTT_RECV_BUFF_SIZE 2048 // MQTT協(xié)議接收報文緩存大小
#define MQTT_SOCKET 1 // MQTT使用的SOCKET號uint8_t mqtt_send_buff[MQTT_SEND_BUFF_SIZE] = {0}; // MQTT協(xié)議發(fā)送報文緩存
uint8_t mqtt_recv_buff[MQTT_RECV_BUFF_SIZE] = {0}; // MQTT協(xié)議接收報文緩存
然后定義一個結(jié)構(gòu)體來存放連接參數(shù)和訂閱發(fā)布主題參數(shù)
// MQTT連接和訂閱參數(shù)結(jié)構(gòu)體
typedef struct MQTTCONNECTION
{uint8_t server_ip[4];int port;char clientid[1024];char username[1024];char passwd[1024];char pubtopic[255];char subtopic[255];int QOS;
} mqttconn;// MQTT連接和訂閱參數(shù)
mqttconn mqtt_params = {.server_ip = {54, 244, 173, 190},.port = 1883,.clientid = "9a1d7719a8ac40d29311f26c5c5469dc",.username = "mqtt_username",.passwd = "123456",.pubtopic = "W5500",.subtopic = "W5500",.QOS = 0,
};
網(wǎng)絡(luò)地址參數(shù)如下
static wiz_NetInfo net_info = {.mac = {0x00, 0x08, 0xdc, 0x16, 0xed, 0x2e},.ip = {192, 168, 1, 20},.sn = {255, 255, 255, 0},.gw = {192, 168, 1, 1},.dns = {8, 8, 8, 8},.dhcp = NETINFO_STATIC};
并定義了三個全局變量用來存放連接MQTT的信息
MQTTClient c = {0}; // MQTT客戶端連接信息結(jié)構(gòu)體
Network n = {0}; // 網(wǎng)絡(luò)信息結(jié)構(gòu)體
int connOK; //連接狀態(tài)
此外,還需定義四個函數(shù)
首先是一個1ms的循環(huán)定時器回調(diào)函數(shù),在這個回調(diào)函數(shù)中,我們必須把mqtt_interface.c庫文件中的MilliTimer_Handler()函數(shù)加入到我們的1ms定時器回調(diào)函數(shù)中。
bool repeating_timer_callback(struct repeating_timer *t)
{MilliTimer_Handler();return true;
}
其次是mqtt初始化函數(shù),在這個函數(shù)中,我們連接并且訂閱主題,最后發(fā)布一條消息上去。
void mqtt_init(void)
{int ret;MQTTPacket_connectData data = MQTTPacket_connectData_initializer;NewNetwork(&n, MQTT_SOCKET);ConnectNetwork(&n, mqtt_params.server_ip, 1883);MQTTClientInit(&c, &n, 1000, mqtt_send_buff, MQTT_SEND_BUFF_SIZE, mqtt_recv_buff, MQTT_RECV_BUFF_SIZE);data.willFlag = 0;data.MQTTVersion = 3;data.clientID.cstring = mqtt_params.clientid;data.username.cstring = mqtt_params.username;data.password.cstring = mqtt_params.passwd;data.keepAliveInterval = 30;data.cleansession = 1;// 連接mqtt服務(wù)器,如果連接失敗則繼續(xù)重連connOK = MQTTConnect(&c, &data);printf("Connected:%s\r\n", connOK == 0 ? "success" : "failed");while (connOK){sleep_ms(50);connOK = MQTTConnect(&c, &data);printf("Connected:%s\r\n", connOK == 0 ? "success" : "failed");}// 訂閱主題,如果訂閱失敗則繼續(xù)訂閱ret = MQTTSubscribe(&c, mqtt_params.subtopic, mqtt_params.QOS, messageArrived);printf("Subscribing to %s\r\n", mqtt_params.subtopic);printf("Subscribed:%s\r\n", ret == 0 ? "success" : "failed");while (ret){sleep_ms(50);ret = MQTTSubscribe(&c, mqtt_params.subtopic, mqtt_params.QOS, messageArrived);printf("Subscribing to %s\r\n", mqtt_params.subtopic);printf("Subscribed:%s\r\n", ret == 0 ? "success" : "failed");}sleep_ms(50);// 發(fā)布消息MQTTMessage pubmessage = {.qos = QOS0,.retained = 0,.dup = 0,.id = 0,};pubmessage.payload = "hello mqtt!";pubmessage.payloadlen = strlen(pubmessage.payload);MQTTPublish(&c, mqtt_params.pubtopic, &pubmessage);printf("TX:%s\r\n", pubmessage.payload);
}
然后就是消息回調(diào)函數(shù),服務(wù)器下發(fā)的消息都會進入該函數(shù)中進行處理。
void messageArrived(MessageData *md)
{unsigned char messagebuffer[512];MQTTMessage *message = md->message;if (0)//展示qos等級{memcpy(messagebuffer, (char *)message->payload, (int)message->payloadlen);*(messagebuffer + (int)message->payloadlen + 1) = '\n';printf("%s\r\n", messagebuffer);}if (0)//展示qos等級printf("%.*s", (int)message->payloadlen, (char *)message->payload);elseprintf("%s%.*s%s%s", "Rx:", (int)message->payloadlen, (char *)message->payload, mqtt_params.QOS, "\r\n");
}
最后就是mqtt?;詈瘮?shù),該函數(shù)需要放在主函數(shù)的主循環(huán)中,否則可能導(dǎo)致保活失敗
void keep_mqtt(void)
{if (MQTTYield(&c, 30)){mqtt_init();}
}
在主函數(shù)中,我們只需要初始化網(wǎng)絡(luò)信息和接口,然后開啟1ms循環(huán)定時器,最后初始化mqtt,然后把mqtt?;詈瘮?shù)放入主循環(huán)中即可。
int main()
{struct repeating_timer timer;stdio_init_all();sleep_ms(3000);printf("W5500 mqtt example.\r\n");wizchip_initialize(); // SPI初始化以及鏈路狀態(tài)檢測wizchip_setnetinfo(&net_info); // 設(shè)置網(wǎng)絡(luò)地址信息print_network_information(net_info); // 打印網(wǎng)絡(luò)地址信息add_repeating_timer_ms(1, repeating_timer_callback, NULL, &timer); // 開啟1ms循環(huán)定時器mqtt_init(); // mqtt連接函數(shù)while (true){keep_mqtt(); // mqtt保活}
}
測試效果
將程序編譯燒錄后,打開串行監(jiān)視器,可以看到,成功連接并且訂閱上主題,還發(fā)布了一條信息。

在MQTTX上我們也能收到開發(fā)板發(fā)布的消息,我們在MQTTX發(fā)布一條消息出去。開發(fā)板也同樣能收到。

相關(guān)連接
本章例程鏈接:mqtt_client example