全部例程

MQTT&OneNET

W55MH32 其他标签

2025/02/12 更新

本篇文章,我们将详细介绍如何在W55MH32芯片上面实现MQTT协议。并通过实战例程,为大家讲解如何使用W55MH32的MQTT协议连接OneNET平台,实现与OneNET物模型的数据交互。

该例程用到的其他网络协议,例如DHCPDNS,请参考相关章节。有关W55MH32的初始化过程,请参考Network Install章节,这里将不再赘述。

MQTT简介

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)是一种轻量级的、基于发布/订阅模式的消息传输协议, 广泛应用于物联网(IoT)领域,尤其是在网络带宽受限、设备资源有限的环境中。它由IBM在1999年提出,主要用于远程监控和控制系统中的设备通信。MQTT协议具有低带宽、低功耗、低延迟等优点,特别适用于嵌入式系统和物联网设备的通信。

MQTT特点

  • 轻量级:MQTT协议采用极简的协议头,减少了消息头的大小,适合带宽有限和计算资源有限的设备。
  • 可靠性:MQTT支持三种服务质量(QoS)级别,能够确保数据可靠送达。
  • 实时性:MQTT适合低延迟应用场景,消息会尽可能地实时推送到订阅者。
  • 保留消息:MQTT支持“保留消息”功能,代理会保存最后一条发布的消息,当有新设备订阅某个主题时,代理会立即发送保留的消息。
  • 持久会话:MQTT允许客户端在断开后恢复会话状态,断开期间的消息可以在客户端重新连接后继续接收。

MQTT应用场景

  • 工业自动化:在工业环境中,生产设备、传感器、控制器等都需要进行数据交换, MQTT协议适用于在复杂的工业自动化系统中提供实时通信。
  • 智能电网:智能电网应用中,电力设备(如变电站、智能电表、开关设备等) 通过以太网与控制中心进行实时数据交换,使用MQTT协议进行远程监控和控制。
  • 远程设备监控与管理:适用于需要远程监控和管理的设备,如远程气象站、环境监测设备等。通过MQTT协议实时获取设备的传感器数据。

MQTT 发布/订阅模式

发布订阅模式(Publish-Subscribe Pattern)是一种消息传递模式, 它将发送消息的客户端(发布者)与接收消息的客户端(订阅者)解耦,使得两者不需要建立直接的联系也不需要知道对方的存在。

MQTT 发布/订阅模式的精髓在于由一个被称为代理(Broker)的中间角色负责所有消息的路由和分发工作, 发布者将带有主题的消息发送给代理,订阅者则向代理订阅主题来接收感兴趣的消息。

在 MQTT 中,主题和订阅无法被提前注册或创建,所以代理也无法预知某一个主题之后是否会有订阅者,以及会有多少订阅者, 所以只能将消息转发给当前的订阅者, 如果当前不存在任何订阅,那么消息将被直接丢弃。

MQTT 发布/订阅模式有 4 个主要组成部分:发布者、订阅者、代理和主题。

  • 发布者(Publisher):负责将消息发布到主题上,发布者一次只能向一个主题发送数据,发布者发布消息时也无需关心订阅者是否在线。
  • 订阅者(Subscriber):订阅者通过订阅主题接收消息,且可一次订阅多个主题。
  • 代理(Broker):负责接收发布者的消息,并将消息转发至符合条件的订阅者。另外, 代理也需要负责处理客户端发起的连接、断开连接、订阅、取消订阅等请求。
  • 主题(Topic):主题是 MQTT 进行消息路由的基础,它类似 URL 路径,使用斜杠 / 进行分层,比如 sensor/1/temperature。 一个主题可以有多个订阅者,代理会将该主题下的消息转发给所有订阅者;一个主题也可以有多个发布者,代理将按照消息到达的顺序转发。

MQTT协议的通信流程如下图:

MQTT QoS详解

QoS 0:消息最多传递一次,可能会丢失,且不做重试。这是最基本的QoS级别,不保证消息的传送或顺序。 使用场景:实时传输的更新数据,如温度、湿度等,消息丢失对应用的影响较小。

QoS 1:消息至少传送一次。为了确保消息到达,发送方会重复发送消息,直到收到接收方的确认应答(PUBACK)。可能会导致消息重复接收。

QoS 2:确保消息只能被传递一次,且只传递一次。该级别保证了消息的可靠性、唯一性和顺序性。通过四次握手过程确保消息不丢失、不会重复。

适用场景:高安全要求的应用,如支付系统、重要的设备控制等,不能容忍消息重复或丢失。

MQTT消息

在 MQTT 中,客户端可以在连接时在服务端中注册一个遗嘱消息,与普通消息类似,我们可以设置遗嘱消息的主题、有效载荷等等。当该客户端意外断开连接,服务端就会向其他订阅了相应主题的客户端发送此遗嘱消息。这些接收者也因此可以及时地采取行动。 这一特性通常用于检测和响应客户端故障或掉线事件,特别适合在需要高可靠性的物联网系统和实时监控场景中使用。

有关MQTT协议的报文,可以参考:Introduction · MQTT协议中文版,本文不再赘述。

OneNET物联网平台简介

OneNET是由中国移动打造的物联网开放平台,在物联网应用和真实设备之间搭建高效、稳定安全的应用平台。该平台支持适配各种网络环境和协议类型, 可实现各种传感器和智能硬件的快速接提供丰富的 API和应用模板以支撑各类行业应用和智能硬件的开发。

OneNET物模型介绍

物模型是物联网平台中用于描述和管理设备能力的抽象模型,它定义了设备的属性、行为和事件,使设备与平台、应用之间的交互更加标准化和统一。 在OneNET平台中,物模型是设备管理与数据交互的核心概念,通过物模型,开发者可以方便地对设备进行建模、管理和使用。

MQTT连接OneNET收发数据流程

1.准备阶段

注册与实名认证:用户需要在OneNET平台注册账号,并完成实名认证。

创建产品和添加物模型:登录阿里云物联网平台,创建产品并在产品下添加以下物模型功能。

创建设备:在刚刚创建的产品下创建一个设备。

2.记录参数

连接参数:可在OneNET文档中心中找到MQTT协议所需的连接参数,从中可得: 服务器地址及端口和安全认证三要素:

clientld为我们创建的设备名,usemame为我们创建产品后平台分配的产品ID,password则需使用token生成工具生成,工具说明及下载链接:OneNET Token生成工具

发布主题:$sys/{pid}/{device-name}/thing/property/post(属性上报消息主题)

订阅主题:$sys/{pid}/{device-name}/thing/property/post/reply(属性上报响应主题)

订阅主题:$sys/{pid}/{device-name}/thing/property/set(设置主题)

请注意:

上面两个主题中的{pid}需要替换成产品ID,{device-name}替换成设备名。

3.连接、订阅和发布消息

接着我们可以使用上面记录的连接参数进行连接,当连接成功后,订阅上面的订阅主题。并通过发布主题上报物模型数据。

在OneNET平台,如果产品创建阶段选择的数据格式为OneJson格式时,接收和发送数据格式都会遵守下面这个格式:

{
  "id": "123",
  "version": "1.0",
  "params": {
    "Power": {
      "value": "on",
      "time": 1524448722123
    },
    "WF": {
      "value": 23.6,
      "time": 1524448722123
    }
  },
  "method":"thing.{功能类型}.{方法}"
}

字段id是请求的唯一标识符,值为"123",用于跟踪请求的响应;version表示OneJSON数据格式的版本号,这里为"1.0";params字段包含设备的属性数据, 包含了属性标识符、对应的值和时间戳;method字段指定设备执行的操作类型,这里是thing.{功能类型}.{方法},用于指明设备进行的操作。

4.接收消息处理

接收消息:当接收到消息时,我们只需要按照上面的json格式进行解析,然后进行相应的处理即可。

实现过程

接下来,我们看看在W55MH32上如何实现MQTT连接OneNET,并进行订阅、发布消息以及接收消息处理。

步骤1:注册MQTT定时中断函数MilliTimer_Handler()到1ms定时器中断中


void TIM3_IRQHandler(void)
{
    static uint32_t tim3_1ms_count = 0;
    if (TIM_GetITStatus(TIM3, TIM_IT_Update) != RESET)
    {
        tim3_1ms_count++;
        MilliTimer_Handler();
        if (tim3_1ms_count >= 1000)
        {
            DHCP_time_handler();
            DNS_time_handler();
            tim3_1ms_count = 0;
        }
        TIM_ClearITPendingBit(TIM3, TIM_IT_Update);
    }
}

步骤2:mqtt初始化


mqttconn mqtt_params = {
  .mqttHostUrl = "mqtts.heclouds.com",
  .server_ip = {
      0,
  },                                                                                                                                                     /*Define the Connection Server IP*/
  .port = 1883,                                                                                                                                          /*Define the connection service port number*/
  .clientid = "W5100S_W5500",                                                                                                                            /*Define the client ID*/
  .username = "70TwP2gxl5",                                                                                                                              /*Define the user name*/
  .passwd = "version=2018-10-31&res=products%2F70TwP2gxl5%2Fdevices%2FW5100S_W5500&et=1791400694&method=sha1&sign=0SchVg6Y2MRYn%2B9zItNZwt%2F%2FN4Y%3D", /*Define user passwords*/
  .pubtopic = "$sys/70TwP2gxl5/W5100S_W5500/thing/property/post",                                                                                        /*Define the publication message*/
  .pubtopic_reply = "$sys/70TwP2gxl5/W5100S_W5500/thing/property/post/reply",
  .subtopic = "$sys/70TwP2gxl5/W5100S_W5500/thing/property/set", /*Define subscription messages*/
  .subtopic_reply = "$sys/70TwP2gxl5/W5100S_W5500/thing/property/set_reply",
  .pubQoS = QOS0,                /*Defines the class of service for publishing messages*/
  .willtopic = "/wizchip/will",  /*Define the topic of the will*/
  .willQoS = QOS0,               /*Defines the class of service for Will messages*/
  .willmsg = "wizchip offline!", /*Define a will message*/
  .subQoS = QOS0,                /*Defines the class of service for subscription messages*/
};

/**
 * @brief Initializing the MQTT client side
 *
 * Initialize the MQTT client side with the given parameters, including network configuration and MQTT connection parameters.
 *
 * @param sn socket number
 * @param send_buf send buffer pointer
 * @param recv_buf recv buffer pointer
 */
void mqtt_init(uint8_t sn, uint8_t *send_buf, uint8_t *recv_buf)
{
    wiz_NetInfo get_info = {0};
    wizchip_getnetinfo(&get_info);
    /* DNS parsing */
    if (do_dns(send_buf, (uint8_t *)mqtt_params.mqttHostUrl, mqtt_params.server_ip))
    {
        while (1)
        {
        }
    }
    NewNetwork(&n, sn);                                          /*Obtain network configuration information*/
    ConnectNetwork(&n, mqtt_params.server_ip, mqtt_params.port); /*Connect to the MQTT server*/
    MQTTClientInit(&c, &n, 1000, send_buf, MQTT_ETHERNET_MAX_SIZE, recv_buf, MQTT_ETHERNET_MAX_SIZE);
    data.willFlag = 0;                                                            /* will flag: If the will annotation bit is 0, the following will-related settings are invalid*/
    willdata.qos = mqtt_params.willQoS;                                           /* will QoS */
    willdata.topicName.lenstring.data = mqtt_params.willtopic;                    /* will topic */
    willdata.topicName.lenstring.len = strlen(willdata.topicName.lenstring.data); /* will topic len */
    willdata.message.lenstring.data = mqtt_params.willmsg;                        /* will message */
    willdata.message.lenstring.len = strlen(willdata.message.lenstring.data);     /* will message len */
    willdata.retained = 0;
    willdata.struct_version = 3;
    data.will = willdata;
    data.MQTTVersion = 4;
    data.clientID.cstring = mqtt_params.clientid;
    data.username.cstring = mqtt_params.username;
    data.password.cstring = mqtt_params.passwd;
    data.keepAliveInterval = 30;
    data.cleansession = 1;
}

在这个函数中,需要传入使用的socket号以及收发缓存数组。进入初始化函数后,首先会使用DNS解析MQTT服务器域名,然后进行MQTT参数初始化, 将连接的参数填充到data结构体中。

请注意:

当data.willFlag=0时,则是关闭遗嘱主题功能,反之则为开启。关闭遗嘱主题后,下面的参数失效。

data和willdata结构体如下所示:

//data结构体
typedef struct
{
  /** The eyecatcher for this structure.  must be MQTC. */
  char struct_id[4];
  /** The version number of this structure.  Must be 0 */
  int struct_version;
  /** Version of MQTT to be used.  3 = 3.1 4 = 3.1.1    */
  unsigned char MQTTVersion;
  MQTTString clientID;
  unsigned short keepAliveInterval;
  unsigned char cleansession;
  unsigned char willFlag;
  MQTTPacket_willOptions will;
  MQTTString username;
  MQTTString password;
} MQTTPacket_connectData;

//willdata结构体
/**
 * Defines the MQTT "Last Will and Testament" (LWT) settings for
 * the connect packet.
 */
typedef struct
{
	/** The eyecatcher for this structure.  must be MQTW. */
	char struct_id[4];
	/** The version number of this structure.  Must be 0 */
	int struct_version;
	/** The LWT topic to which the LWT message will be published. */
	MQTTString topicName;
	/** The LWT payload. */
	MQTTString message;
	/**
      * The retained flag for the LWT message (see MQTTAsync_message.retained).
      */
	unsigned char retained;
	/**
      * The quality of service setting for the LWT message (see
      * MQTTAsync_message.qos and @ref qos).
      */
	char qos;
} MQTTPacket_willOptions;

步骤3:进入do_mqtt()函数


void do_mqtt(void)
{
  uint8_t ret;
  switch (run_status)
  {
  case CONN: {
      ret = MQTTConnect(&c, &data); /* Connect to the MQTT server */
      printf("Connect to the MQTT server: %d.%d.%d.%d:%d\r\n", mqtt_params.server_ip[0], mqtt_params.server_ip[1], mqtt_params.server_ip[2], mqtt_params.server_ip[3], mqtt_params.port);
      printf("Connected:%s\r\n\r\n", ret == SUCCESSS ? "success" : "failed");
      if (ret != SUCCESSS)
      {
          run_status = ERR;
      }
      else
      {
          run_status = SUB;
      }
      break;
  }
  case SUB: {
      ret = MQTTSubscribe(&c, mqtt_params.subtopic, mqtt_params.subQoS, messageArrived); /* Subscribe to Topics */
      printf("Subscribing to %s\r\n", mqtt_params.subtopic);
      printf("Subscribed:%s\r\n\r\n", ret == SUCCESSS ? "success" : "failed");
      if (ret != SUCCESSS)
      {
          run_status = ERR;
      }
      else
      {
          run_status = PUB_MESSAGE;
      }
      ret = MQTTSubscribe(&c, mqtt_params.pubtopic_reply, mqtt_params.subQoS, messageArrived); /* Subscribe to Topics */
      printf("Subscribing to %s\r\n", mqtt_params.pubtopic_reply);
      printf("Subscribed:%s\r\n\r\n", ret == SUCCESSS ? "success" : "failed");
      if (ret != SUCCESSS)
      {
          run_status = ERR;
      }
      else
      {
          run_status = PUB_MESSAGE;
      }
      break;
  }
  case PUB_MESSAGE: {
      pubmessage.qos        = QOS0;
      pubmessage.payload    = "{\"id\":\"123\",\"version\":\"1.0\",\"params\":{\"CurrentTemperature\":{\"value\":26.6}}}";
      pubmessage.payloadlen = strlen(pubmessage.payload);
      ret                   = MQTTPublish(&c, (char *)&(mqtt_params.pubtopic), &pubmessage); /* Publish message */
      if (ret != SUCCESSS)
      {
          run_status = ERR;
      }
      else
      {
          printf("publish:%s,%s\r\n\r\n", mqtt_params.pubtopic, (char *)pubmessage.payload);
          run_status = KEEPALIVE;
      }
      break;
  }
  case KEEPALIVE: {
      if (MQTTYield(&c, 30) != SUCCESSS) /* keepalive MQTT */
      {
          run_status = ERR;
      }
      delay_ms(100);
  }
  case RECV: {
      if (mqtt_recv_flag)
      {
          mqtt_recv_flag = 0;
          json_decode(mqtt_recv_msg);
      }
      delay_ms(100);
      break;
  }
  case ERR: /* Running error */
      printf("system ERROR!");
      delay_ms(1000);
      break;

  default:
      break;
  }
}

进入该函数后,程序首先尝试连接到指定的MQTT服务器。如果连接成功,则进入订阅状态;如果连接失败,则直接进入错误状态并打印错误信息。

在订阅状态中,订阅两个主题:一个是用于接收消息的订阅主题,另一个是用于发布回复的发布主题。订阅主题需要传入一个回调函数messageArrived(),当收到订阅主题的消息时会进入回调函数处理。messageArrived()函数如下:


/**
* @brief mqtt Receive message callback function
*
* This function is called when a message is received. The function will parse the message content and process it accordingly.
*
* @param md :message data pointer
*/
void messageArrived(MessageData *md)
{
    char topicname[64] = {0};
    char msg[512] = {0};
    sprintf(topicname, "%.*s", (int)md->topicName->lenstring.len, md->topicName->lenstring.data);
    sprintf(msg, "%.*s", (int)md->message->payloadlen, (char *)md->message->payload);
    printf("recv:%s,%s\r\n\r\n", topicname, msg);
    if (strcmp(topicname, mqtt_params.subtopic) == 0)
    {
        mqtt_recv_flag = 1;
        memset(mqtt_recv_msg, 0, sizeof(mqtt_recv_msg));
        memcpy(mqtt_recv_msg, msg, strlen(msg));
    }
}

如果这两个主题都成功订阅,则进入发布消息状态;如果订阅过程中出现失败,则进入错误状态并打印错误信息。 进入发布消息状态后,构建一条消息并尝试发布。如果发布成功,则进入保持连接状态;如果发布失败,则进入错误状态并打印错误信息。

在保持连接状态中,持续接收消息并检查是否有需要处理的消息。如果有接收到需要处理的消息,则进入json_decode()函数处理。


/**
* @brief Cloud JSON message parsing
*
* Parses the given JSON message and executes the corresponding operation based on the parsed result.
*
* @param msg :JSON message pointer
*/
void json_decode(char *msg)
{
    int ret;
    char replymsg[128] = {0};
    cJSON *id = NULL;
    cJSON *jsondata = NULL;
    cJSON *params = NULL;
    cJSON *LED = NULL;
    jsondata = cJSON_Parse(msg);
    if (jsondata == NULL)
    {
        printf("json parse fail.\r\n");
        return;
    }
    id = cJSON_GetObjectItem(jsondata, "id");
    params = cJSON_GetObjectItem(jsondata, "params");
    LED = cJSON_GetObjectItem(params, "LEDSwitch");
    if (LED->valueint)
    {
        printf("LED ON\r\n");
    }
    else
    {
        printf("LED OFF\r\n");
    }
    pubmessage.qos = QOS0;
    sprintf(replymsg, "{\"id\":\"%s\",\"code\":200,\"msg\":\"success\"}", id->valuestring);
    printf("reply:%s\r\n", replymsg);
    pubmessage.payload = replymsg;
    pubmessage.payloadlen = strlen(replymsg);
    ret = MQTTPublish(&c, mqtt_params.subtopic_reply, &pubmessage); /* Publish message */
    if (ret != SUCCESSS)
    {
        run_status = ERR;
    }
    else
    {
        printf("publish:%s,%s\r\n\r\n", mqtt_params.subtopic_reply, (char *)pubmessage.payload);
    }
    cJSON_Delete(jsondata);
}

在任何步骤中,如果发生错误,都会立即进入错误状态,并打印出详细的错误信息。

运行结果

请注意:

因为本示例需要访问互联网,请确保W55MH32的网络环境及配置能够正常访问互联网。

烧录例程运行后,首先进行了PHY链路检测,接着是通过DHCP获取网络地址信息和DNS解析服务器域名,然后连接服务器和订阅主题,最后发送物模型数据, 随后就接收到OneNET平台响应的上报成功的信息,此时查看平台物模型数据已经得到更新显示。如下图所示:

然后我们使用ONENET平台的在线设备调试功能,调试LWDSwish状态,下发数据,此时W55MH32就接收到下发的消息并进行响应操作了,如下执行LED OFF:

总结

本文讲解了如何在 W55MH32 芯片上实现 MQTT 协议并连接 OneNET 平台,通过实战例程展示了从准备工作、连接配置到消息订阅、发布及接收处理的完整过程。文章详细介绍了 MQTT 协议的概念、特点、应用场景、发布 / 订阅模式、QoS 级别,以及 OneNET 物联网平台和物模型相关知识, 帮助读者理解其在物联网设备与云端数据交互中的实际应用价值。

下一篇文章将讲解如何在 W55MH32 上实现多路 Socket设置为 TCP 客户端模式,并进行连接同一个服务器测试, 解析多路 Socket 连接的核心原理及应用,同时讲解在 W55MH32 上实现该功能的具体步骤与要点,敬请期待!

下载本章例程

我们提供完整的工程文件以及配套开发板,方便你随时测试,快速完成产品开发:

开发环境: Keil MDK5 配套开发板