在接触一些新概念、新技术时,不仅仅要学习其理论,也要结合实践来加深印象。之前的文章详细介绍了连接的建立与保持相关报文,这篇文章就在上一篇文章的基础上,实现一个能够进行连接建立和保持的简易 mqtt 客户端,随着学习的深入,我们也会一点一点完善它。
初步设计
良好的软件架构设计能够让软件有更好的拓展性和可维护性,使软件更有生命力。虽然我们本篇文章的目的很明确,功能也很简单,但是为了日后能更好的拓展它,本文还是进行了简单的分层设计,如下图:
系统层
系统层位于最底层,目的是使我们的软件与具体的操作系统解耦,能有更好的跨平台性。根据我们的功能:连接建立和保持,我这里将其用的的功能分为三个模块:
network:该模块为我们提供基础的网络服务(TCP/IP),包括建立关闭连接,读写网络数据报等功能,值得一提的是,这里我们是基于 Socket API 实现的。
timer:该模块为我们提供及时服务,用来判断当前是否需要发送保活报文
memory :该模块为我们提供堆内存的管理功能
序列化/反序列化层
序列化和反序列化层,该层数据包转换为适合传输的字节流格式。然后,该字节流可以通过网络发送给 MQTT 代理。
MQTT 协议接口层
mqtt 协议接口层为应用程序提供协议规定的接口,这篇文章我们仅实现 connect 连接建立、disconnect 连接关闭和 pingreq 保活(keep alive)三个接口。
客户端
最上层是使用 mqtt 的客户端,专注于实现业务逻辑,无需关系底层的具体实现,只是使用标准 mqtt API 进行通信。使用这种方式实现的应用程序拥有良好的可移植性。
各层的抽象和接口
系统层提供的接口
network
首先是数据结构的定义:
1 | struct network_t { |
使用 ip 地址 addr
和端口号 port
来唯一确定我们的服务器,使用 sockfd
记录我们打开的 socket。接下来是进行网络通信的 api 接口,涉及了 network_t
的初始化,以及建立、断开连接、读写 socket 等功能,如下:
1 | void network_init(struct network_t *network, char *addr, char *port); |
memory
堆内存的申请与释放
1 | void *memory_alloc(int size); |
timer
定时器数据结构,使用一个 void *
指针,保存各个平台下的定时器结构。
1 | struct timer_t { |
接口包括定时器的创建,设置超时截止时间,判断是否超时三个功能:
1 | void timer_init(struct timer_t *t); |
序列化/反序列化层
序列化和反序列化层需要为不同类型的控制报文实现必要的序列化和反序列化功能,本篇文章目前只涉及 CONNECT 的序列化、CONNACK 的反序列化、PINGREQ 的序列化以及 PINGRESP 的反序列化。
为了实现上述功能,我们必须对控制协议的各个段进行抽象,具体的报文结构我们在之前的文章已经详细介绍过了,具体可以查看 mqtt 报文结构 和 mqtt 连接控制报文详解 这两篇文章。
报文固定头部抽象
抽象出报文的公共头部,内容如下:
1 | typedef union { |
对于 type
字段,可以为每个报文定义 enum
变量:
1 | enum type |
连接报文抽象
连接报文的特有部分可以抽象为下面的结构体:
1 | struct connect_options { |
其中,对于字符串类型的成员,在编码时,有些是直接使用 char*
编码,有些使用长度和字符串两部分确定,这里我使用了 struct mqtt_string
将二者抽象出来,如下:
1 | struct len_string { |
连接确认报文抽象
连接确认报文的特有部分可以抽象为下面的结构体:
1 | struct connack_options { |
其余三种的抽象
断开连接报文、心跳请求报文、心跳响应报文,这些报文只有固定头部,无需抽象特有部分。
一些细节
需要注意的一点在于,虽然我们对报文进行了抽象,但是其中的字段仅仅代表着其在报文中的取值,而不能代表其在报文中的大小和位置——这是为了方便在代码中处理这些结构——这就意味着,我们需要定义一些其他的结构来帮助我们进行序列化和反序列化。
举个例子,在 CONNECT 实际报文中,报文标志位相关的字段每个仅占用 1-2 位,所以,在序列化时需要将 unsigned char
类型的数据转换为字节中的某一位,比如下面的结构可以帮助我们实现类似功能:
1 | /* 直接序列化到报文中的报文标志位 */ |
序列化和反序列化接口
对于任意一种报文,本文涉及的序列化和反序列化接口如下:
1 | int serialize_connect(unsigned char *buf, int count, struct connect_options *conn_opts); |
这些接口的参数列表都比较容易理解,返回值需要提一嘴的是,对于序列化接口,返回值为成功序列化到 buf
中的字节数,而对于反序列化接口来说,返回值用来判断是否反序列化成功,其中 1
(true)代表成功,0
(false)代表失败。
MQTT 协议接口层
在任何网络通信中,客户端和服务器端都需要对连接(称为会话)进行维护,这是很合理而且必要的。下面我们来思考一下连接状态需要如何抽象,需要使用哪些字段来维护。
CONNECT 报文参数
如上文所示,CONNECT 中携带了一些影响整个通信过程的标志,例如,用户名,密码,遗嘱设置等。所以,需要在会话中保存这些参数:
1 | struct mqtt_session { |
会话状态
对于一个会话可能有多种状态,比如连接未建立前,连接建立后,连接断开后等,我们简单的设置如下状态:
1 | enum session_state { |
同时在会话中维护自身状态:
1 | struct mqtt_session { |
使用底层网络连接
对于一个会话而言,其想要进行网络连接,则需要系统层提供的网络支持,这个在上文已经提到,我们使用了 network_t
抽象出了底层的网络连接:
1 | struct mqtt_session { |
底层网络连接超时时间
为了避免网络报文读写长时间阻塞,上文我们提供的网络接口需要设置超时时间,这与需要我们在会话中维护:
1 | struct mqtt_session { |
定时器和最大发布间隔
mqtt 协议规定了两个发布报文之间的最大时间间隔,如果超过时间间隔而且没有要发布的报文的话,需要发送 PINGREQ 报文。这也对会话需要维护的内容提出了要求:一方面,我们需要保存此处会话要求的超时时间,幸运的是,这一部分已经在 CONNECT 报文中被保存了;另一方面,我们还需要记录上一次发送和接收报文的时间,使用它们来判断当前是否已经超过最大发布间隔。
1 | struct mqtt_session { |
读写缓冲区
序列化之后和反序列化之前的报文保存在哪里呢?很明显,我还需要两个缓冲区,其中读缓冲区用来保存接收到的后的报文,写缓冲区用来保存待发送的报文:
1 | struct mqtt_session { |
初版 session
经过上面的分析,最基础的会话结构如下,后续我们还会对其进行扩充:
1 | struct mqtt_session { |
mqtt 协议接口
- 会话初始化。使用初始化参数创建一个会话,接口如下:
1 | int mqtt_init(struct mqtt_session *session, struct session_init_params *init_params); |
其中 session_init_params
结构体是暴露给用户的可选参数,和我们之前提到了 session
结构体并没有本质的区别,这里就不展示了。
- 剩下的是建立连接、断开连接已经发送心跳请求的接口:
1 | int mqtt_connect(struct mqtt_session *session); |
错误处理
为了专注于核心逻辑,这里我们暂时简化错误处理,我们简单的使用 -1
代表出现错误,并没有对错误进行分类,使用 0
表示成功,如下:
1 | typedef enum { |
啰里啰唆半天终于把整体结构讲完了,当然还有很多不完善的地方,就留着后面边实现边扩充吧。