MQTT&Aliyun
本篇文章,我们将详细介绍如何在W5500芯片上面实现MQTT协议。并通过实战例程,为大家讲解如何使用W5500的MQTT协议连接阿里云平台,实现与阿里云物模型的数据交互。
该例程用到的其他网络协议,例如DHCP,DNS,请参考相关章节。有关初始化过程,请参考Network Install章节,这里将不再赘述。
MQTT简介
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)是一种轻量级的、基于发布/订阅模式的消息传输协议, 广泛应用于物联网(IoT)领域,尤其是在网络带宽受限、设备资源有限的环境中。它由IBM在1999年提出,主要用于远程监控和控制系统中的设备通信。MQTT协议具有低带宽、低功耗、低延迟等优点,特别适用于嵌入式系统和物联网设备的通信。
MQTT特点
- 轻量级:MQTT协议采用极简的协议头,减少了消息头的大小,适合带宽有限和计算资源有限的设备。
- 可靠性:MQTT支持三种服务质量(QoS)级别,能够确保数据可靠送达。
- 实时性:MQTT适合低延迟应用场景,消息会尽可能地实时推送到订阅者。
- 保留消息:MQTT支持“保留消息”功能,代理会保存最后一条发布的消息,当有新设备订阅某个主题时,代理会立即发送保留的消息。
- 持久会话:MQTT允许客户端在断开后恢复会话状态,断开期间的消息可以在客户端重新连接后继续接收。
MQTT应用场景
- 工业自动化:在工业环境中,生产设备、传感器、控制器等都需要进行数据交换, MQTT协议适用于在复杂的工业自动化系统中提供实时通信。
- 智能电网:智能电网应用中,电力设备(如变电站、智能电表、开关设备等) 通过以太网与控制中心进行实时数据交换,使用MQTT协议进行远程监控和控制。
- 远程设备监控与管理:适用于需要远程监控和管理的设备,如远程气象站、环境监测设备等。通过MQTT协议实时获取设备的传感器数据。
MQTT 发布/订阅模式
发布订阅模式(Publish-Subscribe Pattern)是一种消息传递模式, 它将发送消息的客户端(发布者)与接收消息的客户端(订阅者)解耦,使得两者不需要建立直接的联系也不需要知道对方的存在。
MQTT 发布/订阅模式的精髓在于由一个被称为代理(Broker)的中间角色负责所有消息的路由和分发工作, 发布者将带有主题的消息发送给代理,订阅者则向代理订阅主题来接收感兴趣的消息。
在 MQTT 中,主题和订阅无法被提前注册或创建,所以代理也无法预知某一个主题之后是否会有订阅者,以及会有多少订阅者, 所以只能将消息转发给当前的订阅者, 如果当前不存在任何订阅,那么消息将被直接丢弃。
MQTT 发布/订阅模式有 4 个主要组成部分:发布者、订阅者、代理和主题。
- 发布者(Publisher):负责将消息发布到主题上,发布者一次只能向一个主题发送数据,发布者发布消息时也无需关心订阅者是否在线。
- 订阅者(Subscriber):订阅者通过订阅主题接收消息,且可一次订阅多个主题。
- 代理(Broker):负责接收发布者的消息,并将消息转发至符合条件的订阅者。另外, 代理也需要负责处理客户端发起的连接、断开连接、订阅、取消订阅等请求。
- 主题(Topic):主题是 MQTT 进行消息路由的基础,它类似 URL 路径,使用斜杠 / 进行分层,比如 sensor/1/temperature。 一个主题可以有多个订阅者,代理会将该主题下的消息转发给所有订阅者;一个主题也可以有多个发布者,代理将按照消息到达的顺序转发。
MQTT协议的通信流程如下图:

MQTT QoS详解
QoS 0:消息最多传递一次,可能会丢失,且不做重试。这是最基本的QoS级别,不保证消息的传送或顺序。 使用场景:实时传输的更新数据,如温度、湿度等,消息丢失对应用的影响较小。
QoS 1:消息至少传送一次。为了确保消息到达,发送方会重复发送消息,直到收到接收方的确认应答(PUBACK)。可能会导致消息重复接收。
QoS 2:确保消息只能被传递一次,且只传递一次。该级别保证了消息的可靠性、唯一性和顺序性。通过四次握手过程确保消息不丢失、不会重复。
适用场景:高安全要求的应用,如支付系统、重要的设备控制等,不能容忍消息重复或丢失。
MQTT消息
在 MQTT 中,客户端可以在连接时在服务端中注册一个遗嘱消息,与普通消息类似,我们可以设置遗嘱消息的主题、有效载荷等等。当该客户端意外断开连接,服务端就会向其他订阅了相应主题的客户端发送此遗嘱消息。这些接收者也因此可以及时地采取行动。 这一特性通常用于检测和响应客户端故障或掉线事件,特别适合在需要高可靠性的物联网系统和实时监控场景中使用。
有关MQTT协议的报文,可以参考:Introduction · MQTT协议中文版,本文不再赘述。
阿里云物联网平台简介
阿里云物联网平台(Alibaba Cloud IoT Platform)是阿里云推出的一款面向物联网(IoT)行业的综合性服务平台, 旨在帮助企业实现物联网设备的连接、管理、监控和数据分析等多种功能。通过该平台,企业可以将各种智能设备接入云端,并进行统一的管理和数据处理。
在设备接入上,支持 MQTT、CoAP 等多种协议,具备直连、网关代理等接入方式,能实现全球毫秒级就近接入。设备管理方面,覆盖设备全生命周期,借助设备影子保障弱网下状态一致,设备可灵活转移归属。数据服务中,实现冷热存储分离, 支持可视化数据解析与 EB 级离线分析。监控运维能力强,提供近百项监控指标,能一键跟踪消息流转,保障 OTA 升级成功率。
阿里云物模型介绍
阿里云物模型是阿里云物联网平台(IoT Platform)提供的一种用于描述和管理设备功能的抽象模型。物模型通过定义设备的属性、服务和事件,帮助开发者统一描述设备能力,简化设备管理和数据交互流程。 阿里云物模型是设备数字化管理的核心概念之一,广泛应用于智慧城市、工业物联网、智能家居等领域。
MQTT连接阿里云收发数据流程
1.准备阶段
注册与实名认证:用户需要在阿里云平台注册账号,并完成实名认证。
创建产品和添加物模型:登录阿里云物联网平台,创建产品并在产品下添加以下物模型功能。

创建设备:在刚刚创建的产品下创建一个设备。
2.记录参数
连接参数:在刚刚创建的设备详情页中找到MQTT连接参数:clientId,username,passwd,mqttHostUrl,port。

订阅主题: /sys/ieojgBm5q2c/${deviceName}/thing/service/property/set(属性设置主题)
发布主题: /sys/ieojgBm5q2c/${deviceName}/thing/event/property/post(上报消息主题)
请注意:
上面两个主题中的${deviceName}需要替换成设备名。

3.连接、订阅和发布消息
接着我们可以使用上面记录的连接参数进行连接,当连接成功后,订阅上面的订阅主题。并通过发布主题上报物模型数据。
在阿里云平台,如果产品创建阶段选择的数据格式为Alink JSON格式时,接收和发送数据格式都会遵守下面这个格式:
{
"method": "thing.event.property.post",
"id": "2241348",
"params": {
"prop_float": 1.25,
"prop_int16": 4658,
"prop_bool": 1
},
"version": "1.0"
}
method表示该消息的操作类型是上报设备属性事件;id:值为"2241348",这是一个唯一的标识符; params是一个包含设备属性数据的对象用于上报物模型数据;version:值为 "1.0",表示该消息所遵循的协议版本。
4.接收消息处理
接收消息:当接收到消息时,我们只需要按照上面的json格式进行解析,然后进行相应的处理即可。
实现过程
接下来,我们看看在W5500上如何实现MQTT连接阿里云,并进行订阅、发布消息以及接收消息处理。
步骤1:注册MQTT定时中断函数MilliTimer_Handler()到1ms定时器中断中
void TIM3_IRQHandler(void)
{
static uint32_t tim3_1ms_count = 0;
if (TIM_GetITStatus(TIM3, TIM_IT_Update) != RESET)
{
tim3_1ms_count++;
MilliTimer_Handler();
if (tim3_1ms_count >= 1000)
{
DHCP_time_handler();
DNS_time_handler();
tim3_1ms_count = 0;
}
TIM_ClearITPendingBit(TIM3, TIM_IT_Update);
}
}
步骤2:mqtt初始化
mqttconn mqtt_params = {
.mqttHostUrl = "iot-06z00dbroeg8dx3.mqtt.iothub.aliyuncs.com",
.server_ip = {
0,
}, /*Define the Connection Server IP*/
.port = 1883, /*Define the connection service port number*/
.clientid = "ieojgBm5q2c.W5100S_W5500|securemode=2,signmethod=hmacsha256,timestamp=1726890300625|", /*Define the client ID*/
.username = "W5100S_W5500&ieojgBm5q2c", /*Define the user name*/
.passwd = "8cdae1ce2d9391cfc4cf25539ef32b6d0b33558243aee7bc5aff073928a32568", /*Define user passwords*/
.pubtopic = "/sys/ieojgBm5q2c/W5100S_W5500/thing/event/property/post", /*Define the publication message*/
.subtopic = "/sys/ieojgBm5q2c/W5100S_W5500/thing/service/property/set", /*Define subscription messages*/
.pubQoS = QOS0, /*Defines the class of service for publishing messages*/
};
/**
* @brief Initializing the MQTT client side
*
* Initialize the MQTT client side with the given parameters, including network configuration and MQTT connection parameters.
*
* @param sn socket number
* @param send_buf send buffer pointer
* @param recv_buf recv buffer pointer
*/
void mqtt_init(uint8_t sn, uint8_t *send_buf, uint8_t *recv_buf)
{
wiz_NetInfo get_info = {0};
wizchip_getnetinfo(&get_info);
/* DNS parsing */
if (do_dns(send_buf, (uint8_t *)mqtt_params.mqttHostUrl, mqtt_params.server_ip))
{
while (1)
{
}
}
NewNetwork(&n, sn); /*Obtain network configuration information*/
ConnectNetwork(&n, mqtt_params.server_ip, mqtt_params.port); /*Connect to the MQTT server*/
MQTTClientInit(&c, &n, 1000, send_buf, MQTT_ETHERNET_MAX_SIZE, recv_buf, MQTT_ETHERNET_MAX_SIZE);
data.will = willdata;
data.willFlag = 0; /* will flag: If the will annotation bit is 0, the following will-related settings are invalid*/
willdata.qos = mqtt_params.willQoS; /* will QoS */
willdata.topicName.lenstring.data = mqtt_params.willtopic; /* will topic */
willdata.topicName.lenstring.len = strlen(willdata.topicName.lenstring.data); /* will topic len */
willdata.message.lenstring.data = mqtt_params.willmsg; /* will message */
willdata.message.lenstring.len = strlen(willdata.message.lenstring.data); /* will message len */
willdata.retained = 0;
willdata.struct_version = 3;
data.MQTTVersion = 4;
data.clientID.cstring = mqtt_params.clientid;
data.username.cstring = mqtt_params.username;
data.password.cstring = mqtt_params.passwd;
data.keepAliveInterval = 30;
data.cleansession = 1;
}
在这个函数中,需要传入使用的socket号以及收发缓存数组。进入初始化函数后,首先会使用DNS解析MQTT服务器域名,然后进行MQTT参数初始化, 将连接的参数填充到data结构体中。
请注意:
当data.willFlag=0时,则是关闭遗嘱主题功能,反之则为开启。关闭遗嘱主题后,下面的参数失效。
data和willdata结构体如下所示:
//data结构体
typedef struct
{
/** The eyecatcher for this structure. must be MQTC. */
char struct_id[4];
/** The version number of this structure. Must be 0 */
int struct_version;
/** Version of MQTT to be used. 3 = 3.1 4 = 3.1.1 */
unsigned char MQTTVersion;
MQTTString clientID;
unsigned short keepAliveInterval;
unsigned char cleansession;
unsigned char willFlag;
MQTTPacket_willOptions will;
MQTTString username;
MQTTString password;
} MQTTPacket_connectData;
//willdata结构体
/**
* Defines the MQTT "Last Will and Testament" (LWT) settings for
* the connect packet.
*/
typedef struct
{
/** The eyecatcher for this structure. must be MQTW. */
char struct_id[4];
/** The version number of this structure. Must be 0 */
int struct_version;
/** The LWT topic to which the LWT message will be published. */
MQTTString topicName;
/** The LWT payload. */
MQTTString message;
/**
* The retained flag for the LWT message (see MQTTAsync_message.retained).
*/
unsigned char retained;
/**
* The quality of service setting for the LWT message (see
* MQTTAsync_message.qos and @ref qos).
*/
char qos;
} MQTTPacket_willOptions;
步骤3:在主循环中执行do_mqtt()函数
void do_mqtt(void)
{
uint8_t ret;
switch (run_status)
{
case CONN: {
ret = MQTTConnect(&c, &data); /* Connect to the MQTT server */
printf("Connect to the MQTT server: %d.%d.%d.%d:%d\r\n", mqtt_params.server_ip[0], mqtt_params.server_ip[1], mqtt_params.server_ip[2], mqtt_params.server_ip[3], mqtt_params.port);
printf("Connected:%s\r\n\r\n", ret == SUCCESSS ? "success" : "failed");
if (ret != SUCCESSS)
{
run_status = ERR;
}
else
{
run_status = SUB;
}
break;
}
case SUB: {
ret = MQTTSubscribe(&c, mqtt_params.subtopic, mqtt_params.subQoS, messageArrived); /* Subscribe to Topics */
printf("Subscribing to %s\r\n", mqtt_params.subtopic);
printf("Subscribed:%s\r\n\r\n", ret == SUCCESSS ? "success" : "failed");
if (ret != SUCCESSS)
{
run_status = ERR;
}
else
{
run_status = PUB_MESSAGE;
}
break;
}
case PUB_MESSAGE: {
pubmessage.qos = QOS0;
pubmessage.payload = "{\"id\":\"123\",\"version\":\"1.0\",\"params\":{\"CurrentTemperature\":26.6,},\"method\":\"thing.event.property.post\"}";
pubmessage.payloadlen = strlen(pubmessage.payload);
ret = MQTTPublish(&c, (char *)&(mqtt_params.pubtopic), &pubmessage); /* Publish message */
if (ret != SUCCESSS)
{
run_status = ERR;
}
else
{
printf("publish:%s,%s\r\n\r\n", mqtt_params.pubtopic, (char *)pubmessage.payload);
run_status = KEEPALIVE;
}
break;
}
case KEEPALIVE: {
if (MQTTYield(&c, 30) != SUCCESSS) /* keepalive MQTT */
{
run_status = ERR;
}
}
case RECV: {
if (mqtt_recv_flag)
{
mqtt_recv_flag = 0;
json_decode(mqtt_recv_msg);
}
break;
}
case ERR: /* Running error */
printf("system ERROR!");
delay_ms(1000);
break;
default:
break;
}
}
do_mqtt()函数会执行一个状态机,按照以下步骤进行工作:
1.CONN:执行连接操作,使用我们初始化好的参数使用MQTTConnect()函数去执行连接服务器操作,连接成功后进入订阅主题步骤。
2.SUB:在这一步,我们会使用MQTTSubscribe()函数订阅阿里云的物模型设置属性主题,在这个函数中, 需要传入客户端结构体,订阅的主题名称,订阅的QoS等级,以及订阅主题的回调函数messageArrived()。订阅成功后进入发布消息步骤。
在收到订阅主题的消息后,会执行回调函数messageArrived(), 在这里我们打印出接收到的消息主题以及消息内容,并将消息拷贝到mqtt_recv_msg数组中,具体函数内容如下:
void messageArrived(MessageData *md)
{
char topicname[64] = {0};
char msg[512] = {0};
sprintf(topicname, "%.*s", (int)md->topicName->lenstring.len, md->topicName->lenstring.data);
sprintf(msg, "%.*s", (int)md->message->payloadlen, (char *)md->message->payload);
printf("recv:%s,%s\r\n\r\n", topicname, msg);
mqtt_recv_flag = 1;
memset(mqtt_recv_msg, 0, sizeof(mqtt_recv_msg));
memcpy(mqtt_recv_msg, msg, strlen(msg));
}
3.PUB_MESSAGE:在这一步,我们会发布当前温度的数据到阿里云的物模型中。成功后进入保活和接收数据部分。
4.KEEPALIVE和RECV:在这一步,会不断执行保活和监听接收操作,当收到数据之后,会进入json_decode()函数进行处理。
请注意:
必须不断执行KEEPALIVE的步骤进行保活。
json_decode()函数主要是使用CJSON将接收到的数据进行解析,然后执行对应的操作,具体内容如下:
void json_decode(char *msg)
{
cJSON *jsondata = NULL;
cJSON *params = NULL;
cJSON *LED = NULL;
jsondata = cJSON_Parse(msg);
if (jsondata == NULL)
{
printf("json parse fail.\r\n");
return;
}
params = cJSON_GetObjectItem(jsondata, "params");
LED = cJSON_GetObjectItem(params, "LEDSwitch");
if (LED->valueint == 1)
{
printf("LED ON\r\n");
}
else
{
printf("LED OFF\r\n");
}
cJSON_Delete(jsondata);
}
运行结果
请注意:
因为本示例需要访问互联网,请确保W5500的网络环境及配置能够正常访问互联网。
烧录例程运行后,首先进行了PHY链路检测,然后通过DHCP打印设置网络信息,最后连接MQTT服务器进行回环测试,如下图所示:

总结
本文讲解了如何在 W5500 芯片上实现 MQTT 协议并连接阿里云平台,通过实战例程展示了从准备工作、连接配置到消息订阅、发布及接收处理的完整过程。文章详细介绍了 MQTT 协议的概念、特点、应用场景、发布 / 订阅模式、QoS 级别,以及阿里云物联网平台和物模型相关知识,帮助读者理解其在物联网设备与云端数据交互中的实际应用价值。 下一篇文章将讲解如何在W5500上实现MQTT协议并连接OneNET平台,并实现与OneNET物模型的数据交互,敬请期待!