🚔MQTT 协议入门

  • MQTT协议

    特点 角色 MQTT协议中的订阅、主题、会话简介 MQTT主题过滤规则 MQTT报文类型 MQTT协议连接过程 消息的存储与丢弃

  • 客户端

    fosesource mqtt-client 下载与使用 客户端消息发布方式 API及参数说明

  • 服务端

    服务端的描述 消息发布过程 服务端安装与配置 配置说明

MQTT协议

文中资料引述: 1. 菜鸟学院/MQTT入门介绍 2. cnblogs/MQTT介绍与使用 3. MQTT协议中文版

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上;

特点 MQTT协议工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:

  • 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。

  • 对负载内容屏蔽的消息传输。

  • 使用TCP/IP提供网络连接。

  • 有三种消息发布服务质量等级 (QoS)

    角色

  • 实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

  • MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:  

    Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload); payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

MQTT协议中的订阅、主题、会话

  • 会话(Session)每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。

  • 主题名(Topic Name)连接到一个应用程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端。

  • 订阅(Subscription)订阅包含主题筛选器(Topic Filter)和最大服务质量(QoS)。订阅会与一个会话(Session)关联。一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器。

  • 主题筛选器(Topic Filter)一个对主题名通配符筛选器,在订阅表达式中使用,表示订阅所匹配到的多个主题。

  • 负载(Payload)消息订阅者所具体接收的内容。 MQTT主题过滤规则

  • 主题层级分隔符 — "/" 主题层级分隔符使得主题名结构化。如果存在分隔符,它将主题名分割为多个主题层级。斜杠("/" U+002F)用于分割主题的每个层级,为主题名提供一个分层结构。当客户端订阅指定的主题过滤器包含两种通配符时,主题层级分隔符就很有用了。主题层级分隔符可以出现在主题过滤器或主题名字的任何位置。相邻的主题层次分隔符表示一个零长度的主题层级。

  • 多层通配符 — "#" “#”是用于匹配主题中任意层级的通配符。多层通配符表示它的父级和任意数量的子层级。多层通配符必须位于它自己的层级或者跟在主题层级分隔符后面。不管哪种情况,它都必须是主题过滤器的最后一个字符 . 例如,如果客户端订阅主题 “ china/beijing/# ”,它会收到使用下列主题名发布的消息: china/beijing china/beijing/chaoyang china/beijing/chaoyang/chinabank

    主题过滤规则

    描述

    school/#

    也匹配单独的 “school” ,因为 # 包括它的父级。

    #

    是有效的,会收到所有的应用消息。

    school/teacher/#

    有效的。

    school/teacher#

    无效的。

    school/teacher/#/lever

    无效的,必须是主题过滤器的最后一个字符。

  • 单层通配符 — "+" 加号是只能用于单个主题层级匹配的通配符。在主题过滤器的任意层级都可以使用单层通配符,包括第一个和最后一个层级。然而它必须占据过滤器的整个层级 。可以在主题过滤器中的多个层级中使用它,也可以和多层通配符一起使用。

    主题过滤规则

    匹配的topic

    china/+

    只能匹配 china/guangzhou

    china/+/+/chinabank

    china/beijing/chaoyang/chinabank

  • 通配符 — "$" 表示匹配一个字符。 注: 单层通配符和多层通配符只能用于订阅(subscribe)消息而不能用于发布(publish)消息,层级分隔符两种情况下均可使用。

MQTT报文类型

名称

报文流动方向

描述

RESERVED

0

x

保留

CONNECT

1

客户端到服务端

客户端请求连接服务端

CONNACK

2

服务端到客户端

连接报文确认

PUBLISH

3

双向

发布消息

PUBACK

4

双向

QoS 1消息发布收到确认

PUBREC

5

双向

发布收到(保证交付第一步)

PUBREL

6

双向

发布释放(保证交付第二步)

PUBCOMP

7

双向

QoS 2消息发布完成(保证交互第三步)

SUBSCRIBE

8

客户端到服务端

客户端订阅请求

SUBACK

9

服务端到客户端

订阅请求报文确认

UNSUBSCRIBE

10

客户端到服务端

客户端取消订阅请求

UNSUBACK

11

服务端到客户端

取消订阅报文确认

PINGREQ

12

客户端到服务端

心跳请求

PINGRESP

13

服务端到客户端

心跳响应

DISCONNECT

14

客户端到服务端

客户端断开连接

RESERVED

15

x

保留

MQTT服务质量等级QoS详解:

"至多一次",消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP的推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,再次联网也就收不到了。QoS Level 0 wireshark数据包:

"至少一次",确保消息到达,但消息重复可能会发生。QoS Level 1 wireshark数据包:

"只有一次", 确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。在计费系统中,消息重复或丢失会导致不正确的结果。这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用户收到且只会收到一次。 QoS Level 2 wireshark数据包:

采用订阅者和发布者的方式,分别测试三种消息级别,在订阅者掉线的情况(发布者先启动,推送消息后关闭,再启动订阅者)

  • 订阅者QoS2/发布者QoS2 数据包示例

  • 订阅者QoS1/发布者QoS1 数据包示例

  • 订阅者QoS2/发布者QoS0 数据包示例(消息等级与推送端有关)

数据传递时,消息的存储与丢弃图解:

客户端

fosesource mqtt-client 简介 fosesource 官方 github

下载与使用

  • 可以直接使用maven 方式导入到项目中

    ```xml

    org.fusesource.mqtt-clientmqtt-client1.14

  • 通过下载github源码,导入到项目中

在moquette-mqtt 中三种方式实现发布消息的方式:

  • 采用阻塞式的连接的(BlockingConnection)

  • 采用回调式的连接 (CallbackConnection)

  • 采用Future样式的连接(FutureConnection)

    代码示例以及API

服务端

一个程序或设备,作为发送消息的客户端和请求订阅的客户端之间的中介。

接受来自客户端的网络连接。 接受客户端发布的应用消息。 处理客户端的订阅和取消订阅请求。 转发应用消息给符合条件的已订阅客户端。

支持MQTT协议的服务器软件:active/moquette Apache ActiveMQ

来自apache基金会,支持 STOMP,AMQP,MQTT,Openwire,SSL和WebSockets 多种协议,后期拓展性会比较好,当前只做候选不进行细节研究。 Moquette Moquette采用Java编写的MQTT标准服务端,支持QoS0/QoS1/QoS2服务质量等级,底层采用Netty进行协议编码与解码。Moquette的集群配置使用的是hazelcast。Hazelcast是基于java编写的数据同步工具。在moquette中,用于不同节点消息的同步。

Moquette 的安装与配置 1. 从github下载源码; 2. 运行对应环境的编译脚本;(windows 使用 bat 脚本运行 ,unix 采用 shell 脚本运行) 3. 编译完成后,distribution/build 得到编译后的压缩包。 默认目录结构:

bin 目录是Moquette服务对应windows与unix 环境的启动脚本 lib 运行所需的jar文件 config 配置文件:

moquette.conf 服务核心配置文件 moquette-log.properties 服务器日志打印配置 password_file.conf 用户名与密码配置 acl.conf 用户权限控制,此文件在解压时不会存在,需要自己手工创建

**moquette.conf 配置:**
属性名称 | 默认值 | 属性描述
--- | --- | ---
port | 1883 | 服务端口号
host |  0.0.0.0 | IP绑定 
acl_file | config/acl.conf |用户权限控制 
password_file |config/password_file.conf | 鉴权文件 
netty.epoll | true | epoll的启用
allow_anonymous | false | 是否需要鉴权

**password_file.conf 配置:**
采用 username:sha256(password) 的格式存放,一行一个用户

**acl.conf 用户权限配置:**
```shell
    user root
    topic write user/log
    pattern writeread user/log/+
    user client
    topic read user/log
    pattern read user/log/+  
```  
 基于文件的权限配置说明:
> [user root] 指示一个用户root。其后的条目,代表该用户的相关topic的读写权限,一直到另一个user结束。
> [topic write user/log] 代表对 user/log主题具有write权限,topic命令指定特定的主题名称,不能带有通配符。
> [pattern writeread user/log/+] 使用通配符的方式对 user/log/* 条目具有读写权限。  
> []() 权限分类: write read writeread
  1. 采用Docker 进行部署运行

    • 在解压根目录下新建Dockfile文件

         FROM openjdk:latest
         COPY . /usr/local/mqtt
         WORKDIR /usr/local/mqtt/bin
         CMD ["./moquette.sh"]
    • 打包docker 镜像/运行镜像

         docker run -dit --name moquette \
         -p 1883:1883 -p 8080:8080 \
         -v $PWD/config:/usr/local/mqtt/config \
         -v $PWD/logs:/usr/local/mqtt/logs \
         moquette

最后更新于