0%

【计算机网络】mqtt 通信协议入门(三)连接服务器实战-设计

在接触一些新概念、新技术时,不仅仅要学习其理论,也要结合实践来加深印象。之前的文章详细介绍了连接的建立与保持相关报文,这篇文章就在上一篇文章的基础上,实现一个能够进行连接建立和保持的简易 mqtt 客户端,随着学习的深入,我们也会一点一点完善它。

初步设计

良好的软件架构设计能够让软件有更好的拓展性和可维护性,使软件更有生命力。虽然我们本篇文章的目的很明确,功能也很简单,但是为了日后能更好的拓展它,本文还是进行了简单的分层设计,如下图:

image-20240716175526244

系统层

系统层位于最底层,目的是使我们的软件与具体的操作系统解耦,能有更好的跨平台性。根据我们的功能:连接建立和保持,我这里将其用的的功能分为三个模块:

  • network:该模块为我们提供基础的网络服务(TCP/IP),包括建立关闭连接,读写网络数据报等功能,值得一提的是,这里我们是基于 Socket API 实现的。

  • timer:该模块为我们提供及时服务,用来判断当前是否需要发送保活报文

  • memory :该模块为我们提供堆内存的管理功能

序列化/反序列化层

序列化和反序列化层,该层数据包转换为适合传输的字节流格式。然后,该字节流可以通过网络发送给 MQTT 代理。

MQTT 协议接口层

mqtt 协议接口层为应用程序提供协议规定的接口,这篇文章我们仅实现 connect 连接建立、disconnect 连接关闭和 pingreq 保活(keep alive)三个接口。

客户端

最上层是使用 mqtt 的客户端,专注于实现业务逻辑,无需关系底层的具体实现,只是使用标准 mqtt API 进行通信。使用这种方式实现的应用程序拥有良好的可移植性。

各层的抽象和接口

系统层提供的接口

network

首先是数据结构的定义:

1
2
3
4
5
struct network_t {
char *addr;
char *port;
int sockfd;
};

使用 ip 地址 addr 和端口号 port 来唯一确定我们的服务器,使用 sockfd 记录我们打开的 socket。接下来是进行网络通信的 api 接口,涉及了 network_t 的初始化,以及建立、断开连接、读写 socket 等功能,如下:

1
2
3
4
5
void network_init(struct network_t *network, char *addr, char *port);
int network_connect(struct network_t *network);
int network_read(struct network_t *network, char *buf, int count, int timeout);
int network_write(struct network_t *network, const char *buf, int count, int timeout);
int network_release(struct network_t *network);

memory

堆内存的申请与释放

1
2
void *memory_alloc(int size);
void memory_free(void *ptr);

timer

定时器数据结构,使用一个 void * 指针,保存各个平台下的定时器结构。

1
2
3
struct timer_t {
void *timer;
};

接口包括定时器的创建,设置超时截止时间,判断是否超时三个功能:

1
2
3
void timer_init(struct timer_t *t); 
void timer_cutoff(struct timer_t *t, unsigned int timeout);
int timer_is_expired(struct timer_t *t);

序列化/反序列化层

序列化和反序列化层需要为不同类型的控制报文实现必要的序列化和反序列化功能,本篇文章目前只涉及 CONNECT 的序列化、CONNACK 的反序列化、PINGREQ 的序列化以及 PINGRESP 的反序列化。

为了实现上述功能,我们必须对控制协议的各个段进行抽象,具体的报文结构我们在之前的文章已经详细介绍过了,具体可以查看 mqtt 报文结构mqtt 连接控制报文详解 这两篇文章。

报文固定头部抽象

抽象出报文的公共头部,内容如下:

1
2
3
4
5
6
7
8
9
typedef union {
unsigned char byte;
struct bits {
unsigned char retain : 1;
unsigned char qos : 2;
unsigned char dup : 1;
unsigned char type : 4;
};
} header_t;

对于 type 字段,可以为每个报文定义 enum 变量:

1
2
3
4
5
6
enum type
{
CONNECT = 1, CONNACK, PUBLISH, PUBACK, PUBREC,
PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE,
UNSUBACK, PINGREQ, PINGRESP, DISCONNECT
};

连接报文抽象

连接报文的特有部分可以抽象为下面的结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
struct connect_options {
/* 可变报头部分 */
unsigned char proto_version;
unsigned char clean_session;
unsigned char will_flag;
unsigned char will_qos;
unsigned char will_retain;
unsigned char password_flag;
unsigned char username_flag;
unsigned short keep_alive_interval;
/* 有效负载部分 */
struct mqtt_string client_id;
struct mqtt_string will_topic;
struct mqtt_string will_message;
struct mqtt_string username;
struct mqtt_string password;
};

其中,对于字符串类型的成员,在编码时,有些是直接使用 char* 编码,有些使用长度和字符串两部分确定,这里我使用了 struct mqtt_string 将二者抽象出来,如下:

1
2
3
4
5
6
7
8
struct len_string {
int len;
char *string;
};
struct mqtt_string {
char *cstring;
struct len_string len_str;
};

连接确认报文抽象

连接确认报文的特有部分可以抽象为下面的结构体:

1
2
3
4
struct connack_options {
unsigned char session_present;
unsigned char return_code;
};

其余三种的抽象

断开连接报文、心跳请求报文、心跳响应报文,这些报文只有固定头部,无需抽象特有部分。

一些细节

需要注意的一点在于,虽然我们对报文进行了抽象,但是其中的字段仅仅代表着其在报文中的取值,而不能代表其在报文中的大小和位置——这是为了方便在代码中处理这些结构——这就意味着,我们需要定义一些其他的结构来帮助我们进行序列化和反序列化。

举个例子,在 CONNECT 实际报文中,报文标志位相关的字段每个仅占用 1-2 位,所以,在序列化时需要将 unsigned char 类型的数据转换为字节中的某一位,比如下面的结构可以帮助我们实现类似功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
/* 直接序列化到报文中的报文标志位 */
typedef union {
unsigned char byte;
struct bits {
unsigned char reserved : 1;
unsigned char clean_session : 1;
unsigned char will_flag : 1;
unsigned char will_qos : 2;
unsigned char will_retain : 1;
unsigned char password_flag : 1;
unsigned char username_flag : 1;
};
} connect_flags;

序列化和反序列化接口

对于任意一种报文,本文涉及的序列化和反序列化接口如下:

1
2
3
4
5
int serialize_connect(unsigned char *buf, int count, struct connect_options *conn_opts);
int deserialize_connack(struct connack_options *connack_opts, const char *buf, int count);
int serialize_disconnect(unsigned char *buf, int count);
int serialize_pingreq(unsigned char *buf, int count);
int deserialize_pingresp(unsigned char *buf, int count);

这些接口的参数列表都比较容易理解,返回值需要提一嘴的是,对于序列化接口,返回值为成功序列化到 buf 中的字节数,而对于反序列化接口来说,返回值用来判断是否反序列化成功,其中 1 (true)代表成功,0 (false)代表失败。

MQTT 协议接口层

在任何网络通信中,客户端和服务器端都需要对连接(称为会话)进行维护,这是很合理而且必要的。下面我们来思考一下连接状态需要如何抽象,需要使用哪些字段来维护。

CONNECT 报文参数

如上文所示,CONNECT 中携带了一些影响整个通信过程的标志,例如,用户名,密码,遗嘱设置等。所以,需要在会话中保存这些参数:

1
2
3
4
struct mqtt_session {
struct connect_options *conn_params;
...
};

会话状态

对于一个会话可能有多种状态,比如连接未建立前,连接建立后,连接断开后等,我们简单的设置如下状态:

1
2
3
4
5
enum session_state {
SESSION_INVALID = -1, /* 已断开的无效会话 */
SESSION_INITIALIZE = 0, /* 已初始化未连接的会话 */
SESSION_CONNECTED = 1, /* 成功连接的会话 */
}

同时在会话中维护自身状态:

1
2
3
4
5
struct mqtt_session {
...
enum session_state state;
...
};

使用底层网络连接

对于一个会话而言,其想要进行网络连接,则需要系统层提供的网络支持,这个在上文已经提到,我们使用了 network_t 抽象出了底层的网络连接:

1
2
3
4
5
struct mqtt_session {
...
struct network_t *network;
...
};

底层网络连接超时时间

为了避免网络报文读写长时间阻塞,上文我们提供的网络接口需要设置超时时间,这与需要我们在会话中维护:

1
2
3
4
5
struct mqtt_session {
...
unsigned int net_timeout;
...
};

定时器和最大发布间隔

mqtt 协议规定了两个发布报文之间的最大时间间隔,如果超过时间间隔而且没有要发布的报文的话,需要发送 PINGREQ 报文。这也对会话需要维护的内容提出了要求:一方面,我们需要保存此处会话要求的超时时间,幸运的是,这一部分已经在 CONNECT 报文中被保存了;另一方面,我们还需要记录上一次发送和接收报文的时间,使用它们来判断当前是否已经超过最大发布间隔。

1
2
3
4
5
6
struct mqtt_session {
...
struct timer_t last_send_timer;
struct timer_t last_recv_timer;
...
};

读写缓冲区

序列化之后和反序列化之前的报文保存在哪里呢?很明显,我还需要两个缓冲区,其中读缓冲区用来保存接收到的后的报文,写缓冲区用来保存待发送的报文:

1
2
3
4
5
6
7
8
struct mqtt_session {
...
unsigned char *read_buf;
unsigned char *write_buf;
unsigned int read_buflen;
unsigned int write_buflen;
...
};

初版 session

经过上面的分析,最基础的会话结构如下,后续我们还会对其进行扩充:

1
2
3
4
5
6
7
8
9
10
11
12
struct mqtt_session {
struct connect_options *conn_params;
enum session_state state;
struct network_t *network;
unsigned int net_timeout;
struct timer_t last_send_timer;
struct timer_t last_recv_timer;
unsigned char *read_buf;
unsigned char *write_buf;
unsigned int read_buflen;
unsigned int write_buflen;
};

mqtt 协议接口

  • 会话初始化。使用初始化参数创建一个会话,接口如下:
1
int mqtt_init(struct mqtt_session *session, struct session_init_params *init_params);

其中 session_init_params 结构体是暴露给用户的可选参数,和我们之前提到了 session 结构体并没有本质的区别,这里就不展示了。

  • 剩下的是建立连接、断开连接已经发送心跳请求的接口:
1
2
3
int mqtt_connect(struct mqtt_session *session);
int mqtt_disconnect(struct mqtt_session *session);
int mqtt_pingreq(struct mqtt_session *session);

错误处理

为了专注于核心逻辑,这里我们暂时简化错误处理,我们简单的使用 -1 代表出现错误,并没有对错误进行分类,使用 0 表示成功,如下:

1
2
3
4
typedef enum {
MQTT_ERROR = -1,
MQTT_SUCCESS = 0,
} mqtt_error_t;

啰里啰唆半天终于把整体结构讲完了,当然还有很多不完善的地方,就留着后面边实现边扩充吧。