0%

【计算机网络】mqtt 通信协议入门(四)连接服务器实战-底层实现

上一篇 mqtt 协议的文章介绍了项目的整体架构,这篇文章计划将所有的接口都实现,并且能够成功连接服务器端。我会仍然按照自底向上的顺序介绍,下面先回顾一下我们的整体架构:

image-20240716175511507

Linux 下系统层实现

memory 内存管理模块

在 Linux 下,我们可以直接使用 C 库函数,非常简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
#include "memory.h"

#include <stdlib.h>

void *memory_alloc(int size)
{
return malloc(size);
}

void memory_free(void *ptr)
{
free(ptr);
}

timer 定时器

在 Linux下,我们使用 struct timeval 结构来表示和计算时间,先来看一下这个结构,由秒和微妙表示:

1
2
3
4
5
struct timeval
{
__time_t tv_sec; /* Seconds. */
__suseconds_t tv_usec; /* Microseconds. */
};

使用 timeval 初始化我们的定时器:

1
2
3
4
5
6
7
8
void timer_init(struct timer_t *t)
{
struct timeval *timeval = (struct timeval*)t->timer;
if (!timeval)
timeval = memory_alloc(sizeof(struct timeval));
memset(timeval, 0, sizeof(struct timeval));
t->timer = (void*)timeval;
}

设置我们的定时器的截止时间,这里我们的超时时间由毫秒表示,需要进行简单的时间转换,然后在系统时间的基础上计算出来我们的截止时间,并保存到定时器中:

1
2
3
4
5
6
7
8
9
10
11
void timer_cutoff(struct timer_t *t, unsigned int timeout)
{
struct timeval *base = (struct timeval*)t->timer;
struct timeval interval = {
.tv_sec = timeout / 1000,
.tv_usec = (timeout % 1000) * 1000,
};

gettimeofday(base, NULL);
timeradd(base, &interval, base);
}

在判断是否超时时,也是直接和系统时间进行比较,系统时间一定是单调线性增长的。这里我们比较系统时间和之前设定的截止时间比较,如果系统时间超过了定时器,代表超时,返回 1,否则返回 0,表示没超时。

1
2
3
4
5
6
7
8
9
10
11
int timer_is_expired(struct timer_t *t)
{
struct timeval *base = (struct timeval*)t->timer;
struct timeval now, minus;

gettimeofday(&now, NULL);
timersub(base, &now, &minus);
if (minus.tv_sec > 0 || (minus.tv_sec == 0 && minus.tv_usec > 0))
return 0;
return 1;
}

network 底层网络连接

基于 Linux 下的 socket api,实现底层网络连接的建立和数据读写,首先看看简单的初始化网络抽象:

1
2
3
4
5
6
void network_init(struct network_t *network, char *addr, char *port)
{
network->sockfd = -1;
network->addr = addr;
network->port = port;
}

然后是比较重要的连接建立,这里会根据我们初始化的地址和端口号来创建一个 socket 然后建立连接,细节可以看代码中的注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
int network_connect(struct network_t *network) 
{
int retval = 0;
int sockfd;
char *addr, *port;
struct addrinfo hint, *result, *cur;

addr = network->addr;
port = network->port;

/* 下面可以简单理解为 DNS 协议解析域名过程,最后会根据 hint
的限制返回所有满足要求的 addrinfo */
memset(&hint, 0, sizeof(struct addrinfo));
hint.ai_family = AF_UNSPEC;
hint.ai_socktype = SOCK_STREAM;
hint.ai_protocol = IPPROTO_TCP;
retval = getaddrinfo(addr, port, &hint, &result); /* DNS */
if (retval != 0)
return retval;

/* 根据 addrinfo 列表创建 socket 并尝试建立连接 */
cur = result;
while (cur) {
/* 根据 addrinfo 创建 socket */
sockfd = socket(cur->ai_family, cur->ai_socktype, cur->ai_protocol);
if (sockfd < 0) {
retval = sockfd;
continue;
}

retval = connect(sockfd, cur->ai_addr, cur->ai_addrlen);
/* 连接成功 break */
if (retval == 0) {
network->sockfd = sockfd;
break;
}
/* 连接失败 尝试下一个 */
close(sockfd);
cur = cur->ai_next;
}

/* 释放 addrinfo 列表*/
freeaddrinfo(result);
return retval;
}

网络数据的读写,也比较简单,这里设置了一下网络超时时间:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int network_read(struct network_t *network, char *buf, int count, int timeout)
{
int sockfd = network->sockfd;
struct timeval tv = {
.tv_sec = timeout / 1000,
.tv_usec = (timeout % 1000) * 1000,
};

setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(struct timeval));
return read(sockfd, buf, count);
}

int network_write(struct network_t *network, const char *buf, int count, int timeout)
{
int sockfd = network->sockfd;
struct timeval tv = {
.tv_sec = timeout / 1000,
.tv_usec = (timeout % 1000) * 1000,
};

setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(struct timeval));
return write(sockfd, buf, count);
}

最后,释放网络连接:

1
2
3
4
5
6
7
8
int network_release(struct network_t *network) {
int retval = 0;

if (network->sockfd)
retval = close(network->sockfd);
memset(network, 0, sizeof(struct network_t));
return retval;
}

序列化和反序列化层

基本接口

为了实现连接报文、心跳报文的序列化和反序列化,我们需要先提供一些基本元素的序列化/反序列化方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/* return serialize/deserialize byte count or -1 on error */
int serialize_fix_header(unsigned char *buf, int count, enum type t);

/* return length(>0) */
unsigned int packet_len(unsigned int vlen);
unsigned int mqtt_string_len(struct mqtt_string mstr);

void writec(unsigned char **ptr, unsigned char ch);
void writei(unsigned char **ptr, unsigned int i);
void write_cstring(unsigned char **ptr, const char *str);
void write_lenstring(unsigned char **ptr, struct len_string len_str);
void write_mqttstring(unsigned char **ptr, struct mqtt_string mqtt_str);

void readc(const unsigned char **ptr, unsigned char *ch);
void readi(const unsigned char **ptr, unsigned int *i);
void read_mqttstring(unsigned char **ptr, const struct mqtt_string *mqtt_str);

/* return read/write byte count or -1 on error */
int write_optslen(unsigned char *buf, unsigned int len);
int read_optslen(const unsigned char *buf, unsigned int *len);

其中大部分函数都比较简单且容易理解,这里简要介绍下 write_optslenread_optslen 两个函数,它们负责计算 可变报头+有效负载 长度并进行序列化与反序列化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
int write_optslen(unsigned char *buf, unsigned int len)
{
int idx = 0;

do
{
char ch = len & (128 - 1);
len >>= 7;
/* if there are more digits to encode, set the top bit of this digit */
if (len > 0)
ch |= 0x80;
buf[idx++] = ch;
} while (len > 0);

return idx;
}

int read_optslen(const unsigned char *buf, unsigned int *len)
{
int idx = 0;
int muiltiplier = 1;
unsigned char remain, val;

*len = 0;
do {
if (idx >= 4)
return -1;

char ch = buf[idx++];
remain = ch & 0x80;
val = ch & 0x7f;

*len += val * muiltiplier;
muiltiplier <<= 7;
} while (remain);

return idx;
}

上面的函数严格遵守 mqtt 报文规定的长度编码方式,即

对小于 128 的值它使用单字节编码。更大的值按下面的方式处理。 低 7 位有效位用于编码数据,最高有效位用于指示是否有更多的字节。因此每个字节可以编码 128 个数值 和一个延续位(continuation bit)。剩余长度字段最大 4 个字节。

对于只有固定头部的报文,这里提供了通用的序列化方式,根据报文类型序列化固定头部:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int serialize_fix_header(unsigned char *buf, int count, enum type t)
{
unsigned char *ptr = buf;
header_t header = {0};

if (count < 2)
return -1;

header.bits.type = t;
writec(&ptr, header.byte);
ptr += write_optslen(ptr, 0);

return ptr - buf;
}

连接报文

严格按照 mqtt 协议规定的字段,我们可以实现 CONNECT 报文的序列化操作,细节可以看代码注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
int serialize_connect(unsigned char *buf, int count, struct connect_options *conn_opts)
{
unsigned int len;
unsigned char *ptr = buf;
header_t header = {0};
connect_flags_t flags = {0};

/* 计算 可变报头 + 有效载荷长度 */
len = conn_opts_size(conn_opts);
if (packet_len(len) > count)
return -1;
/* 序列化 固定头部 */
header.bits.type = CONNECT;
writec(&ptr, header.byte);
/* 序列化 可变长度 */
ptr += write_optslen(ptr, len);

/* 可变报头部分的序列化 */
if (conn_opts->proto_version == 3) {
write_cstring(&ptr, "MQIsdp");
writec(&ptr, 3);
} else if (conn_opts->proto_version == 4) {
write_cstring(&ptr, "MQTT");
writec(&ptr, 4);
}

flags.bits.clean_session = conn_opts->clean_session;
flags.bits.will_flag = conn_opts->will_flag ? 1 : 0;
if (conn_opts->will_flag) {
flags.bits.will_qos = conn_opts->will_qos;
flags.bits.will_retain = conn_opts->will_retain;
}
flags.bits.password_flag = conn_opts->password_flag;
flags.bits.username_flag = conn_opts->username_flag;

writec(&ptr, flags.byte);
writei(&ptr, conn_opts->keep_alive_interval);

/* 有效负载部分的序列化,根据 flags 标识确定 */
write_mqttstring(&ptr, conn_opts->client_id);
if (conn_opts->will_flag) {
write_mqttstring(&ptr, conn_opts->will_topic);
write_mqttstring(&ptr, conn_opts->will_message);
}
if (conn_opts->username_flag)
write_mqttstring(&ptr, conn_opts->username);
if (conn_opts->password_flag)
write_mqttstring(&ptr, conn_opts->password);
/* 返回序列化后的长度 */
return ptr - buf;
}

连接响应报文的反序列化比较简单,这里一并讲,细节可以看代码中的注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
int deserialize_connack(struct connack_options *connack_opts, const unsigned char *buf, int count)
{
unsigned int len;
const unsigned char* ptr = buf;
header_t header = {0};
connack_flags_t flags = {0};
/* 反序列化固定头部 */
readc(&ptr, &header.byte);
if (header.bits.type != CONNACK)
return 0;
/* 反序列化 可变长度 */
ptr += read_optslen(ptr, &len);
if (len < 2)
return 0;
/* 反序列化 可变报头 包括 sp 和 ack */
readc(&ptr, &flags.byte);
connack_opts->session_present = flags.bits.session_present;
readc(&ptr, &connack_opts->return_code);

return 1;
}

断开连接报文只有固定头部部分,它的序列化直接调用上面的通用接口:

1
2
3
4
int serialize_disconnect(unsigned char *buf, int count)
{
return serialize_fix_header(buf, count, DISCONNECT);
}

PING 报文

PINGREQ 和 PINGRESP 报文均只有固定头部部分,实现比较简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int serialize_pingreq(unsigned char *buf, unsigned int count)
{
return serialize_fix_header(buf, count, PINGREQ);
}

int deserialize_pingresp(const unsigned char *buf, unsigned int count)
{
const unsigned char *ptr = buf;
header_t header = {0};
/* 反序列化头部 */
readc(&ptr, &header.byte);
if (header.bits.type != PINGRESP)
return -1;
return 2;
}

MQTT 协议层

mqtt 协议层代码量较哒,出于篇幅考虑,新开一篇文章介绍。