0%

【计算机网络】mqtt 通信协议入门(五)连接服务器实战-协议层实现

这篇文章介绍了”连接服务器实战“中协议层的实现,该层实现 mqtt 协议规定报文的收发,主要调用了我们前面已经实现的系统层和序列化反序列化层的接口。

接口

首先还是回顾一下我们需要实现的接口:

1
2
3
4
int mqtt_init(struct mqtt_session *session, struct session_init_params *init_params);
int mqtt_connect(struct mqtt_session *session);
int mqtt_disconnect(struct mqtt_session *session);
int mqtt_pingreq(struct mqtt_session *session);
  • mqtt_init :初始化会话对象 session
  • mqtt_connect :连接到服务器
  • mqtt_disconnect :从服务器断开
  • mqtt_pingreq :保持连接

初始化会话对象

函数功能比较易懂,只是参数过多导致比较复杂,但基本任务就是根据 init_params 初始化 session 对象,必要时并为其成员分配内存空间:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
int mqtt_init(struct mqtt_session *session, struct session_init_params *init_params)
{
int retval = 0;
unsigned char will_flag, username_flag, password_flag;
struct connect_options *conn_opts;
struct network_t *net;

if (!session || !init_params)
return -1;
memset(session, 0, sizeof(struct mqtt_session));

/* 为指针成员分配内存 */
conn_opts = (struct connect_options*)memory_alloc(sizeof(struct onnect_options));
if (!conn_opts) {
return -1;
}
memset(conn_opts, 0, sizeof(struct connect_options));

net = (struct network_t*)memory_alloc(sizeof(struct network_t));
if (!net) {
retval = -1;
goto free_conn_opts;
}
memset(net, 0, sizeof(struct network_t));

/* 根据 init_params 初始各个字段 */
conn_opts->proto_version = init_params->conn_params.proto_version;
conn_opts->clean_session = init_params->conn_params.clean_session;
will_flag = init_params->conn_params.will_flag;
conn_opts->will_flag = will_flag;
conn_opts->will_qos = init_params->conn_params.will_qos;
conn_opts->will_retain = init_params->conn_params.will_retain;
username_flag = conn_opts->password_flag = init_params->conn_params.password_flag;
password_flag = conn_opts->username_flag = init_params->conn_params.username_flag;
conn_opts->keep_alive_interval = init_params->conn_params.keep_alive_interval;

conn_opts->client_id.cstring = init_params->conn_params.client_id;
if (will_flag ) {
conn_opts->will_topic.cstring = init_params->conn_params.will_topic;
conn_opts->will_message.cstring = init_params->conn_params.will_message;
}
if (username_flag) {
conn_opts->username.cstring = init_params->conn_params.username;
}
if (password_flag) {
conn_opts->password.cstring = init_params->conn_params.password;
}

session->conn_opts = conn_opts;

network_init(net, init_params->network_params.addr, init_params->network_params.port);
session->network = net;

session->net_timeout = init_params->net_timeout;
session->read_buflen = init_params->read_buflen;
session->write_buflen = init_params->write_buflen;

/* 分配读写缓冲区 */
session->read_buf = (unsigned char*)memory_alloc(session->read_buflen);
if (!session->read_buflen) {
retval = -1;
goto free_net;
}
session->write_buf = (unsigned char*)memory_alloc(session->write_buflen);
if (!session->write_buf) {
retval = -1;
goto free_readbuf;
}

/* 设置成初始状态 */
session->state = SESSION_INITIALIZE;

/* 初始化定时器 */
if (!timer_init(&session->last_send_timer)) {
retval = -1;
goto free_writebuf;
}

if (!timer_init(&session->last_recv_timer)) {
retval = -1;
goto free_last_sendt;
}

return retval;

free_last_sendt:
timer_destory(&session->last_send_timer);
free_writebuf:
memory_free(session->write_buf);
free_readbuf:
memory_free(session->read_buf);
free_net:
memory_free(session->network);
free_conn_opts:
memory_free(conn_opts);
return retval;
}

建立与服务器的连接

为了能够与服务器建立连接,我们除了需要发送序列化的 CONNECT 报文外,还要确保收到了服务端发送的 CONNACK 报文。 上述过程涉及到数据包的发送和介接收,我们可以优先实现这两个函数:

发送数据包

该函数会在规定时间 net_timeout 内,尽力发送长为 length 的数据包,数据被保存在 session 的写缓冲区内:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static int mqtt_send_packet(struct mqtt_session *session, int length, struct timer_t* timer)
{
int len = 0;
int sent = 0;
const unsigned char *wbuf = session->write_buf;

timer_init(timer);
timer_cutoff(timer, session->net_timeout);

/* 在规定时间内循环发送数据,直到时间截止或发送完毕 length 字节 */
while ((sent < length) && (!timer_is_expired(timer))) {
len = network_write(session->network, wbuf + sent, length, timer_remain(timer));
if (len <= 0)
break;
sent += len;
}

/* 更新计时器,用来确定保活报文的发送时机 */
if (sent == length) {
timer_cutoff(&session->last_send_timer, session->conn_opts->keep_alive_interval * 1000);
return 0;
}

return -1;
}

值得注意的是,该函数负责所有类型数据包的发送,所以,在这里可以统计 last_send_timer 计时器的时间。

接收数据包

与发送数据包相比,接收一个来自网络的数据包需要考虑的事情更多,比如,需要判断数据包类型、 session 对象的读缓冲区大小是否足够,当缓冲区不足时如何处理等。下面是实现,细节都在注释中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
static int mqtt_recv_packet(struct mqtt_session *session, enum type *t, struct timer_t *timer)
{
int retval = 0;
unsigned char *rbuf;
unsigned int recv, optslen;
const unsigned int fix_headerlen = 1;
header_t header = {0};

if (!session || !session->read_buf)
return -1;

timer_init(timer);
timer_cutoff(timer, session->net_timeout);

rbuf = session->read_buf;

/* 读取固定头部,只读一个字节 */
recv = network_read(session->network, rbuf, fix_headerlen, timer_remain(timer));
if (recv != fix_headerlen)
return recv;
rbuf += recv;

/* 计算剩余长度 */
retval = mqtt_read_optslen(session, &optslen, timer);
if (retval < 0)
return retval;

/* 缓冲区大小和网络数据包大小 */
if (session->read_buflen < fix_headerlen + retval + optslen) {
/* 缓冲区太小,读出脏数据 */
mqtt_read_corrupt(session, timer, optslen);
return -1;
}

/* 正常读入数据包到缓冲区 */
rbuf += write_optslen(rbuf, optslen);
retval = network_read(session->network, rbuf, optslen, timer_remain(timer));
if (retval != optslen)
return -1;

/* 设置数据包类型 */
header.byte = session->read_buf[0];
*t = header.bits.type;

/* 类似的,设置计时器 */
timer_cutoff(&session->last_recv_timer, session->conn_opts->keep_alive_interval * 1000);

return 0;
}

首先,和发送数据类似,在这里可以统计 last_send_timer 计时器的时间。此外还有两个需要注意的点:

  • 计算剩余长度:由于我们不知道 session 的写缓冲区大小是否合适,所以需要先计算报文的总大小,这在之前序列化反序列化时遇到过,这里的实现是类似的,只是数据是从网络中读到的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static int mqtt_read_optslen(struct mqtt_session *session, unsigned int *len, struct timer_t *timer)
{
int retval, idx = 0;
int muiltiplier = 1;
unsigned char remain, val, ch;

*len = 0;
do {
if (idx >= 4)
return -1;

retval = network_read(session->network, &ch, 1, timer_remain(timer));
if (retval < 1)
return -1;

remain = ch & 0x80;
val = ch & 0x7f;
*len += val * muiltiplier;
muiltiplier <<= 7;
} while (remain);

return idx;
}
  • 读出损坏数据:当缓冲区无法容纳数据包时,我们将这个数据视为无效,但需要将其从网络缓冲区中读出并丢弃,以免影响后续报文传输:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static void mqtt_read_corrupt(struct mqtt_session *session, struct timer_t *timer, unsigned int remain)
{
unsigned int len, rbuflen, safelen;

rbuflen = session->read_buflen;
safelen = remain < rbuflen ? remain : rbuflen;

do {
len = network_read(session->network, session->read_buf, safelen, timer_remain(timer));
if (len == 0)
break;
remain -= len;
safelen = remain < rbuflen ? remain : rbuflen;
} while (!timer_is_expired(timer) && remain > 0);
}

建立连接主函数

三大部分:调用网络层接口完成底层网络连接;序列化并发送 CONNECT 报文;等待并读取 CONNACK 报文。最后如果 CONNACK 报文的返回正确,那我们连接就建立成功了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
int mqtt_connect(struct mqtt_session *session)
{
int retval = 0;
unsigned char *wbuf;
unsigned int buflen;
struct timer_t conn_timer = {0};
struct connack_options ack_opts = {0};

if (!session)
return -1;

if (!timer_init(&conn_timer))
return -1;
timer_cutoff(&conn_timer, session->net_timeout);

wbuf = session->write_buf;
buflen = session->write_buflen;

/* 建立底层网络连接 */
retval = network_connect(session->network);
if (retval < 0)
goto destory_timer;

/* CONNECT 的序列化和发送 */
buflen = serialize_connect(wbuf, buflen, session->conn_opts);
retval = mqtt_send_packet(session, buflen, &conn_timer);
if (retval < 0)
goto destory_timer;

/* CONNACK 的接收和反序列化 */
retval = wait_connack(session, &conn_timer);
if (retval < 0)
goto destory_timer;

retval = deserialize_connack(&ack_opts, session->read_buf, session->read_buflen);
if (!retval) {
retval = -1;
goto destory_timer;
}

if (ack_opts.return_code != ACK_RC_SUCCESS) {
printf("error:\tCONNACK return_code is:\t%d\n", ack_opts.return_code);
retval = -1;
goto destory_timer;
}

/* 连接成功 设置会话状态 */
session->state = SESSION_CONNECTED;
printf("connect to server successful!\n");

destory_timer:
timer_destory(&conn_timer);
return retval;
}

其中用到了我们之前没提到的新函数 wait_connack :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static int wait_connack(struct mqtt_session *session,  struct timer_t *timer)
{
int retval = 0;
enum type type;

if (!session || !session->read_buf)
return -1;
do {
if (timer_is_expired(timer))
return -1;
retval = mqtt_recv_packet(session, &type, timer);
} while (type != CONNACK || retval != 0);

return retval;
}

函数也比较简单,就是在 conn_tiemr 超时之前,一直等待 CONNACK 报文。

值得一提的是,发送完 CONNECT 报文后,我们是一直在以循环等待,或者称为轮询,的方式读取 CONNACK 报文的,这种方式被称为 同步 IO ,与之对应的是 异步 IO。这里我们稍微解释一下,我们简单的使用三个过程描述我们的连接:

  1. 发送 CONNECT;2.等待 CONNACK;3. 做其他事。

对于同步 IO,这三个过程一定是按照 1 -> 2 -> 3 的顺序发生的,在没有等到 CONNACK 之前是不能去左其他事情的。异步 IO 则不然,假设做完 1 之后,我们可以使用一些手段,无需等待 CONNACK 返回,就去做其他事,这些手段包括 回调函数通知 等机制。这里只是先将这个概念提出,后续我们会看到是如何实现异步处理的。

但是,重要的一定在于,连接的建立无需使用异步机制,这是因为在等到 CONNACK 返回前,我们往往没有其他事情可做,换句话说,等到 CONNACK 是我们做其他事情的前提,我们最好循环等待。

保持活跃

有了前面的基础,发送 PINGREQ 报文就再简单不过了,直接看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
int mqtt_pingreq(struct mqtt_session *session)
{
int retval = 0;
unsigned int len;
struct timer_t ping_timer = {0};

if (!session || !session->write_buf)
return -1;

if (!timer_init(&ping_timer))
return -1;

retval = serialize_pingreq(session->write_buf, session->write_buflen);
if (retval < 0) {
goto destory_timer;
} else {
len = retval;
retval = 0;
}

retval = mqtt_send_packet(session, len, &ping_timer);
if (retval < 0)
goto destory_timer;

printf("sent pingreq to server successful!\n");

destory_timer:
timer_destory(&ping_timer);
return retval;
}

这里发送完 PINGREQ 报文直接就表示 ping 成功是比较武断的,至少要再次收到 PINGRESP 报文,才能表示服务端真正接收到了我们的心跳包。然而,我们可以思考一下,是否还应该像等待 CONNACK 报文那样,等待 PINGRESP 报文呢?应该使用异步还是同步方式呢?同步方式容易实现,仿造 wait_connack 写一个类似函数就行了,如果是异步如何实现呢?这里我们就先不实现了,后续再补充。

断开连接

一样很简单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
int mqtt_disconnect(struct mqtt_session *session)
{
int retval = 0;
unsigned int len;
struct timer_t disconn_timer = {0};

if (!session || !session->write_buf)
return -1;

if (!timer_init(&disconn_timer))
return -1;

retval = serialize_disconnect(session->write_buf, session->write_buflen);
if (retval < 0) {
goto destory_timer;
} else {
len = retval;
retval = 0;
}

retval = mqtt_send_packet(session, len, &disconn_timer);
if (retval < 0)
goto destory_timer;

printf("disconnect from server successful!\n");
session->state = SESSION_INVALID;

destory_timer:
timer_destory(&disconn_timer);
return retval;
}

总结

到这里,整个 ”连接到服务器“ 小任务的主要代码就算完成了,下一篇文章计划写一下代码的编译、调试和测试运行,下下一篇可能会讨论一下如何实现 PINGRESP 的接收,以上。