如何使用 MQTT 报文实现发布订阅功能

摘要:
MQTT协议通过交换预定义的MQTT控制消息进行通信。以MQTTX为例,演示如何通过MQTT消息实现发布和订阅功能。客户端和服务器都必须使用ClientId来标识它们之间的MQTT会话相关状态。没有有效负载的订阅消息违反了协议。使用MQTTX连接到代理。emqx Io:1883代理,并创建主题为testtopic/#且Qos等于2的订阅。订阅消息包含原因代码列表,用于指定授予的最大Qos级别或订阅消息请求的每个订阅的错误。每个原因代码对应于Subscribe消息中的主题过滤器。

MQTT 协议通过交换预定义的MQTT控制报文来通信。下面以 MQTTX 为例,展示如何通过 MQTT 报文实现发布订阅功能。

Connect 连接

MQTT 协议基于 TCP/IP 协议,MQTT Broker 和 Client 都有需要有 TCP/IP 地址。

Broker

WX201911281137582x.png

如果你暂时没有一个可用的 MQTT Broker,EMQ X 提供了一个公共 Broker 地址用于测试:broker.emqx.io:1883

Client

WX201911281139012x.png

MQTTX 工具中 Client 的配置其实是 MQTT 协议中 Connect 报文的配置,下面解释一下相关配置项:

Client ID

服务端使用 ClientId 识别客户端。连接服务端的每个客户端都有唯一的 ClientId 。客户端和服务端都必须使用 ClientId 识别两者之间的 MQTT 会话相关的状态。

ClientId 必须存在,但是服务端可以允许客户端提供一个零字节的 ClientId,如果这样做了,服务端必须将这看作特殊情况并分配唯一的 ClientId 给那个客户端。然后正常处理这个 CONNECT 报文。

Username/Password

MQTT 可以通过发送用户名和密码来进行相关的认证和授权,但是,如果此信息未加密,则用户名和密码是以明文的方式发送的。EMQ X 不仅支持SSL/TLS 加密,还提供了 emqx-auth-username 插件对密码进行加密。

Keep Alive

保持连接(Keep Alive)是一个以秒为单位的时间间隔,它是指在客户端传输完成一个控制报文的时刻到发送下一个报文的时刻,两者之间允许空闲的最大时间间隔。客户端负责保证控制报文发送的时间间隔不超过保持连接的值。如果没有任何其它的控制报文可以发送,客户端必须发送一个PINGREQ报文。

如果 Keep Alive 的值非零,并且服务端在一点五倍的 Keep Alive 时间内没有收到客户端的控制报文,它必须断开客户端的网络连接,认为网络连接已断开。

Clean Session

客户端和服务端可以保存会话状态,以支持跨网络连接的可靠消息传输,这个标志告诉服务器这次连接是不是一个全新的连接。

客户端的会话状态包括:

  • 已经发送给服务端,但是还没有完成确认的 QoS 1 和 QoS 2 级别的消息
  • 已从服务端接收,但是还没有完成确认的 QoS 2 级别的消息。

服务端的会话状态包括:

  • 会话是否存在,即使会话状态的其它部分都是空。
  • 客户端的订阅信息。
  • 已经发送给客户端,但是还没有完成确认的 QoS 1 和 QoS 2 级别的消息。
  • 即将传输给客户端的 QoS 1和 QoS 2 级别的消息。
  • 已从客户端接收,但是还没有完成确认的 QoS 2 级别的消息。
  • 可选,准备发送给客户端的 QoS 0 级别的消息。

如果 CleanSession 标志被设置为 1,客户端和服务端必须丢弃之前的任何会话并开始一个新的会话。会话仅持续和网络连接同样长的时间。

如果 CleanSession 标志被设置为 0,服务端必须基于当前会话(使用 ClientId 识别)的状态恢复与客户端的通信。如果没有与这个客户端标识符关联的会话,服务端必须创建一个新的会话。当连接断开后,客户端和服务端必须保存会话信息。

Connack 确认连接请求

客户端发送 Connect 报文请求对服务器的连接,服务器必须发送 Connack 报文作为对 来自客户端的 Connect 报文的回应。如果客户端在合理的时间内没有收到服务端的CONNACK报文,客户端应该关闭网络连接。合理的时间取决于应用的类型和通信基础设施。在 MQTTX 中,可以通过 Connection Timeout 来设置合理的超时时间。

Connect.png

Connack 报文包含 Session Present 和 Connect Return code 两个重要的标志。

Session Present

Session Present 标志表示当前会话是否是一个新的会话,如果服务端收到 CleanSession 标志为1的连接,Connack报文中的 SessionPresent 标志为 0 。如果服务端收到一个 CleanSession 为0的连接,SessionPresent 标志的值取决于服务端是否已经保存了 ClientId 对应客户端的会话状态。如果服务端已经保存了会话状态,Connack 报文中的 SessionPresent 标志为 1,如果服务端没有已保存的会话状态,Connack 报文中的 SessionPresent 标志为 0.

Connect Return code

Connect Return code 表示服务器对此次 Connect 的回应,0 表示连接已被服务器接受。如果服务端收到一个合法的 CONNECT 报文,但出于某些原因无法处理它,服务端应该尝试发送一个包含非零返回码(表格中的某一个)的 CONNACK 报文。如果服务端发送了一个包含非零返回码的CONNACK 报文,那么它必须关闭网络连接。

返回码响应描述
00x00连接已接受连接已被服务端接受
10x01连接已拒绝,不支持的协议版本服务端不支持客户端请求的MQTT协议级别
20x02连接已拒绝,不合格的客户端标识符客户端标识符是正确的UTF-8编码,但服务端不允许使用
30x03连接已拒绝,服务端不可用网络连接已建立,但MQTT服务不可用
40x04连接已拒绝,无效的用户名或密码用户名或密码的数据格式无效
50x05连接已拒绝,未授权客户端未被授权连接到此服务器
6-255保留

如果认为上表中的所有连接返回码都不太合适,那么服务端必须关闭网络连接,不需要发送CONNACK 报文。

Subscribe 订阅主题

客户端向服务端发送 Subscribe 报文用于创建一个或多个订阅。每个订阅注册客户端关心的一个或多个主题。为了将应用消息转发给与那些订阅匹配的主题,服务端发送 Publish 报文给客户端。Subscribe 报文为每个订阅指定了最大的 QoS 等级,服务端根据这个发送应用消息给客户端。

WX201911281425432x.png

Subscribe 报文的有效载荷必须包含至少一对主题过滤器 和 QoS 等级字段组合。没有有效载荷的 Subscribe 报文是违反协议的。

使用 MQTTX 连接 broker.emqx.io:1883 的 Broker 并创建主题为testtopic/# ,Qos 等于 2 的订阅。

WX201911281439252x.png

Suback 订阅确认

服务端发送 Suback 报文给客户端,用于确认它已收到并且正在处理 Subscribe 报文。

Subscribe.png

Suback 报文包含一个原因码列表,用于指定授予的最大QoS等级或 Subscribe 报文所请求的每个订阅发生的错误,每个原因码对应 Subscribe 报文中的一个主题过滤器。Suback 报文中的原因码顺序必须与 Subscribe 报文中的主题过滤器顺序相匹配

允许的返回码值:

  • 0x00 - 最大QoS 0
  • 0x01 - 成功 – 最大QoS 1
  • 0x02 - 成功 – 最大 QoS 2
  • 0x80 - Failure 失败

Publish 发布消息

Publish 报文是指从客户端向服务端或者服务端向客户端传输一个应用消息,服务器收到 Publish 报文后根据主题过滤器将消息转发给其他客户端。

Publish.png

尝试使用 MQTTX 发布一条主题为 testtopic/mytopic ,内容为 {"msg": "hello world"} 的消息,由于之前已经订阅了 testtopic/# 这一主题,所以立即接收到了 Broker 转发回来的这条消息

WX201911281441422x.png

Topic

主题名(Topic Name)用于识别消息应该被发布到哪一个会话,服务端发送给订阅客户端的 Publish 报文的主题名必须匹配该订阅的主题过滤器。

QoS

QoS 表示应用消息分发的服务质量等级保证

QoS值Bit 2Bit 1描述
000最多分发一次
101至少分发一次
210只分发一次
-11保留位

Publish 报文不能将 QoS 所有的位设置为 1。如果服务端或客户端收到 QoS 所有位都为 1 的 Publish 报文,它必须关闭网络连接。

关于不同等级的QoS的工作原理,请查阅MQTT 5.0 协议介绍 - QoS 服务质量

Retain

如果客户端发给服务端的 Publish 报文的保留(RETAIN)标志被设置为 1,服务端必须存储这个应用消息和它的服务质量等级(QoS),以便它可以被分发给未来的主题名匹配的订阅者 。一个新的订阅建立时,对每个匹配的主题名,如果存在最近保留的消息,它必须被发送给这个订阅者。如果服务端收到一条保留(RETAIN)标志为1的Q消息,它必须丢弃之前为那个主题保留的任何消息,并将这个新的消息当作那个主题的新保留消息。

保留标志为 1 且有效载荷为零字节的 Publish 报文会被服务端当作正常消息处理,它会被发送给订阅主题匹配的客户端。此外,同一个主题下任何现存的保留消息必须被移除,因此这个主题之后的任何订阅者都不会收到一个保留消息

服务端发送 Publish 报文给客户端时,如果消息是作为客户端一个新订阅的结果发送,它必须将报文的保留标志设为 1。当一个 Publish 报文发送给客户端是因为匹配一个已建立的订阅时,服务端必须将保留标志设为 0,不管它收到的这个消息中保留标志的值是多少。

如果客户端发给服务端的 Publish 报文的保留标志位 0,服务端不能存储这个消息也不能移除或替换任何现存的保留消息。

Payload

有效载荷包含将被发布的应用消息。数据的内容和格式是应用特定的,可以发送图像,任何编码的文本,加密的数据以及几乎所有二进制数据。

免责声明:文章转载自《如何使用 MQTT 报文实现发布订阅功能》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇CAD数据导入Arcgis10.1的依赖关系foxmail密码获取下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

软件版本号规范

原文:https://blog.csdn.net/master_yao/article/details/51274163 1.软件版本阶段说明 oBase版:此版本表示该软件仅仅是一个假页面链接,通常包括所有的功能和页面布局,但是页面中的功能都没有做完整的实现,只是做为整体网站的一个基础架构。 oAlpha版:此版本表示该软件在此阶段主要是以实现软件功能为...

APP专项测试

一、功能测试 功能测试主要根据软件需求说明书或用户需求等资料,编写测试用例,以验证各个功能点的实现情况,具体实施过程参考如下: 根据被测试模块的功能点,设计相应的测试用例进行覆盖,例如涉及到用户输入的地方要考虑到边界,用户使用场景类要考虑到正常和异常的场景,业务相关联的模块需要联合测试等; 随时关注跟踪需求的变化和理解需求,对需求理解有误时及时更改相关测...

Linux OpenSSL安装和使用示例

由于使用的是Linux系统,在使用沙箱时需要生成公钥和私钥,更具官方说明,Linux只能使用OpenSSL。 一、下载&安装 到 OpenSSL 官网下载源码进行编译,然后安装。 下载源码 openssl-3.0.0-alpha9.tar.gz wget https://www.openssl.org/source/openssl-3.0.0-...

uniapp app头部渐变

在pages.json中配置:{ "pages": [ //pages数组中第一项表示应用启动页, { "path": "pages/login/login", "style": { "navigationBarTitleText": "登录",...

vue3 + ant-design-vue2 + vuex+mitt快速配置指南

1. 目前能和vue3配合的UI只有ant-design-vue2 2.vue3移除了event bus,使用mitt来替代 3.有了Composition API基本上不需要使用vuex了,但是某种情况 下vuex还是比较好用的。vue3的provide和inject数据回溯不容易去debug. 使用vue-cli4.5以下,生成一个javascript...

分享几种Linux软件的安装方法

Linux软件安装由于不同的Linux分支,安装方法也互不相同,介绍几种常见的安装方法。 http://wenku.baidu.com/link?url=hrOBvu_P-joieXLZfbUjkyRXMHC_CgeAZWjTTtiKKZZhcmNBTILoH2he0TJ9GuhCr75ud4IDuZohhHjzK3B_YPhCkWJ30umXLzdJZG...