这篇文章介绍了”连接服务器实战“中协议层的实现,该层实现 mqtt 协议规定报文的收发,主要调用了我们前面已经实现的系统层和序列化反序列化层的接口。
接口 首先还是回顾一下我们需要实现的接口:
1 2 3 4 int mqtt_init (struct mqtt_session *session, struct session_init_params *init_params) ;int mqtt_connect (struct mqtt_session *session) ;int mqtt_disconnect (struct mqtt_session *session) ;int mqtt_pingreq (struct mqtt_session *session) ;
mqtt_init
:初始化会话对象 session
mqtt_connect
:连接到服务器
mqtt_disconnect
:从服务器断开
mqtt_pingreq
:保持连接
初始化会话对象 函数功能比较易懂,只是参数过多导致比较复杂,但基本任务就是根据 init_params
初始化 session
对象,必要时并为其成员分配内存空间:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 int mqtt_init (struct mqtt_session *session, struct session_init_params *init_params) { int retval = 0 ; unsigned char will_flag, username_flag, password_flag; struct connect_options *conn_opts ; struct network_t *net ; if (!session || !init_params) return -1 ; memset (session, 0 , sizeof (struct mqtt_session)); conn_opts = (struct connect_options*)memory_alloc(sizeof (struct onnect_options)); if (!conn_opts) { return -1 ; } memset (conn_opts, 0 , sizeof (struct connect_options)); net = (struct network_t *)memory_alloc(sizeof (struct network_t )); if (!net) { retval = -1 ; goto free_conn_opts; } memset (net, 0 , sizeof (struct network_t )); conn_opts->proto_version = init_params->conn_params.proto_version; conn_opts->clean_session = init_params->conn_params.clean_session; will_flag = init_params->conn_params.will_flag; conn_opts->will_flag = will_flag; conn_opts->will_qos = init_params->conn_params.will_qos; conn_opts->will_retain = init_params->conn_params.will_retain; username_flag = conn_opts->password_flag = init_params->conn_params.password_flag; password_flag = conn_opts->username_flag = init_params->conn_params.username_flag; conn_opts->keep_alive_interval = init_params->conn_params.keep_alive_interval; conn_opts->client_id.cstring = init_params->conn_params.client_id; if (will_flag ) { conn_opts->will_topic.cstring = init_params->conn_params.will_topic; conn_opts->will_message.cstring = init_params->conn_params.will_message; } if (username_flag) { conn_opts->username.cstring = init_params->conn_params.username; } if (password_flag) { conn_opts->password.cstring = init_params->conn_params.password; } session->conn_opts = conn_opts; network_init(net, init_params->network_params.addr, init_params->network_params.port); session->network = net; session->net_timeout = init_params->net_timeout; session->read_buflen = init_params->read_buflen; session->write_buflen = init_params->write_buflen; session->read_buf = (unsigned char *)memory_alloc(session->read_buflen); if (!session->read_buflen) { retval = -1 ; goto free_net; } session->write_buf = (unsigned char *)memory_alloc(session->write_buflen); if (!session->write_buf) { retval = -1 ; goto free_readbuf; } session->state = SESSION_INITIALIZE; if (!timer_init(&session->last_send_timer)) { retval = -1 ; goto free_writebuf; } if (!timer_init(&session->last_recv_timer)) { retval = -1 ; goto free_last_sendt; } return retval; free_last_sendt: timer_destory(&session->last_send_timer); free_writebuf: memory_free(session->write_buf); free_readbuf: memory_free(session->read_buf); free_net: memory_free(session->network); free_conn_opts: memory_free(conn_opts); return retval; }
建立与服务器的连接 为了能够与服务器建立连接,我们除了需要发送序列化的 CONNECT 报文外,还要确保收到了服务端发送的 CONNACK 报文。 上述过程涉及到数据包的发送和介接收,我们可以优先实现这两个函数:
发送数据包 该函数会在规定时间 net_timeout
内,尽力发送长为 length
的数据包,数据被保存在 session 的写缓冲区内:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 static int mqtt_send_packet (struct mqtt_session *session, int length, struct timer_t * timer) { int len = 0 ; int sent = 0 ; const unsigned char *wbuf = session->write_buf; timer_init(timer); timer_cutoff(timer, session->net_timeout); while ((sent < length) && (!timer_is_expired(timer))) { len = network_write(session->network, wbuf + sent, length, timer_remain(timer)); if (len <= 0 ) break ; sent += len; } if (sent == length) { timer_cutoff(&session->last_send_timer, session->conn_opts->keep_alive_interval * 1000 ); return 0 ; } return -1 ; }
值得注意的是,该函数负责所有类型数据包的发送,所以,在这里可以统计 last_send_timer
计时器的时间。
接收数据包 与发送数据包相比,接收一个来自网络的数据包需要考虑的事情更多,比如,需要判断数据包类型、 session 对象的读缓冲区大小是否足够,当缓冲区不足时如何处理等。下面是实现,细节都在注释中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 static int mqtt_recv_packet (struct mqtt_session *session, enum type *t, struct timer_t *timer) { int retval = 0 ; unsigned char *rbuf; unsigned int recv, optslen; const unsigned int fix_headerlen = 1 ; header_t header = {0 }; if (!session || !session->read_buf) return -1 ; timer_init(timer); timer_cutoff(timer, session->net_timeout); rbuf = session->read_buf; recv = network_read(session->network, rbuf, fix_headerlen, timer_remain(timer)); if (recv != fix_headerlen) return recv; rbuf += recv; retval = mqtt_read_optslen(session, &optslen, timer); if (retval < 0 ) return retval; if (session->read_buflen < fix_headerlen + retval + optslen) { mqtt_read_corrupt(session, timer, optslen); return -1 ; } rbuf += write_optslen(rbuf, optslen); retval = network_read(session->network, rbuf, optslen, timer_remain(timer)); if (retval != optslen) return -1 ; header.byte = session->read_buf[0 ]; *t = header.bits.type; timer_cutoff(&session->last_recv_timer, session->conn_opts->keep_alive_interval * 1000 ); return 0 ; }
首先,和发送数据类似,在这里可以统计 last_send_timer
计时器的时间。此外还有两个需要注意的点:
计算剩余长度:由于我们不知道 session 的写缓冲区大小是否合适,所以需要先计算报文的总大小,这在之前序列化反序列化时遇到过,这里的实现是类似的,只是数据是从网络中读到的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 static int mqtt_read_optslen (struct mqtt_session *session, unsigned int *len, struct timer_t *timer) { int retval, idx = 0 ; int muiltiplier = 1 ; unsigned char remain, val, ch; *len = 0 ; do { if (idx >= 4 ) return -1 ; retval = network_read(session->network, &ch, 1 , timer_remain(timer)); if (retval < 1 ) return -1 ; remain = ch & 0x80 ; val = ch & 0x7f ; *len += val * muiltiplier; muiltiplier <<= 7 ; } while (remain); return idx; }
读出损坏数据:当缓冲区无法容纳数据包时,我们将这个数据视为无效,但需要将其从网络缓冲区中读出并丢弃,以免影响后续报文传输:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 static void mqtt_read_corrupt (struct mqtt_session *session, struct timer_t *timer, unsigned int remain) { unsigned int len, rbuflen, safelen; rbuflen = session->read_buflen; safelen = remain < rbuflen ? remain : rbuflen; do { len = network_read(session->network, session->read_buf, safelen, timer_remain(timer)); if (len == 0 ) break ; remain -= len; safelen = remain < rbuflen ? remain : rbuflen; } while (!timer_is_expired(timer) && remain > 0 ); }
建立连接主函数 三大部分:调用网络层接口完成底层网络连接;序列化并发送 CONNECT 报文;等待并读取 CONNACK 报文。最后如果 CONNACK 报文的返回正确,那我们连接就建立成功了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 int mqtt_connect (struct mqtt_session *session) { int retval = 0 ; unsigned char *wbuf; unsigned int buflen; struct timer_t conn_timer = {0 }; struct connack_options ack_opts = {0 }; if (!session) return -1 ; if (!timer_init(&conn_timer)) return -1 ; timer_cutoff(&conn_timer, session->net_timeout); wbuf = session->write_buf; buflen = session->write_buflen; retval = network_connect(session->network); if (retval < 0 ) goto destory_timer; buflen = serialize_connect(wbuf, buflen, session->conn_opts); retval = mqtt_send_packet(session, buflen, &conn_timer); if (retval < 0 ) goto destory_timer; retval = wait_connack(session, &conn_timer); if (retval < 0 ) goto destory_timer; retval = deserialize_connack(&ack_opts, session->read_buf, session->read_buflen); if (!retval) { retval = -1 ; goto destory_timer; } if (ack_opts.return_code != ACK_RC_SUCCESS) { printf ("error:\tCONNACK return_code is:\t%d\n" , ack_opts.return_code); retval = -1 ; goto destory_timer; } session->state = SESSION_CONNECTED; printf ("connect to server successful!\n" ); destory_timer: timer_destory(&conn_timer); return retval; }
其中用到了我们之前没提到的新函数 wait_connack
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 static int wait_connack (struct mqtt_session *session, struct timer_t *timer) { int retval = 0 ; enum type type ; if (!session || !session->read_buf) return -1 ; do { if (timer_is_expired(timer)) return -1 ; retval = mqtt_recv_packet(session, &type, timer); } while (type != CONNACK || retval != 0 ); return retval; }
函数也比较简单,就是在 conn_tiemr
超时之前,一直等待 CONNACK 报文。
值得一提的是,发送完 CONNECT 报文后,我们是一直在以循环等待,或者称为轮询,的方式读取 CONNACK 报文的,这种方式被称为 同步 IO
,与之对应的是 异步 IO
。这里我们稍微解释一下,我们简单的使用三个过程描述我们的连接:
发送 CONNECT;2.等待 CONNACK;3. 做其他事。
对于同步 IO
,这三个过程一定是按照 1 -> 2 -> 3
的顺序发生的,在没有等到 CONNACK 之前是不能去左其他事情的。异步 IO
则不然,假设做完 1 之后,我们可以使用一些手段,无需等待 CONNACK 返回,就去做其他事,这些手段包括 回调函数 ,通知 等机制。这里只是先将这个概念提出,后续我们会看到是如何实现异步处理的。
但是,重要的一定在于,连接的建立无需使用异步机制 ,这是因为在等到 CONNACK 返回前,我们往往没有其他事情可做,换句话说,等到 CONNACK 是我们做其他事情的前提,我们最好循环等待。
保持活跃 有了前面的基础,发送 PINGREQ 报文就再简单不过了,直接看代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 int mqtt_pingreq (struct mqtt_session *session) { int retval = 0 ; unsigned int len; struct timer_t ping_timer = {0 }; if (!session || !session->write_buf) return -1 ; if (!timer_init(&ping_timer)) return -1 ; retval = serialize_pingreq(session->write_buf, session->write_buflen); if (retval < 0 ) { goto destory_timer; } else { len = retval; retval = 0 ; } retval = mqtt_send_packet(session, len, &ping_timer); if (retval < 0 ) goto destory_timer; printf ("sent pingreq to server successful!\n" ); destory_timer: timer_destory(&ping_timer); return retval; }
这里发送完 PINGREQ 报文直接就表示 ping 成功是比较武断的,至少要再次收到 PINGRESP 报文,才能表示服务端真正接收到了我们的心跳包。然而,我们可以思考一下,是否还应该像等待 CONNACK 报文那样,等待 PINGRESP 报文呢?应该使用异步还是同步方式呢?同步方式容易实现,仿造 wait_connack
写一个类似函数就行了,如果是异步如何实现呢?这里我们就先不实现了,后续再补充。
断开连接 一样很简单
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 int mqtt_disconnect (struct mqtt_session *session) { int retval = 0 ; unsigned int len; struct timer_t disconn_timer = {0 }; if (!session || !session->write_buf) return -1 ; if (!timer_init(&disconn_timer)) return -1 ; retval = serialize_disconnect(session->write_buf, session->write_buflen); if (retval < 0 ) { goto destory_timer; } else { len = retval; retval = 0 ; } retval = mqtt_send_packet(session, len, &disconn_timer); if (retval < 0 ) goto destory_timer; printf ("disconnect from server successful!\n" ); session->state = SESSION_INVALID; destory_timer: timer_destory(&disconn_timer); return retval; }
总结 到这里,整个 ”连接到服务器“ 小任务的主要代码就算完成了,下一篇文章计划写一下代码的编译、调试和测试运行,下下一篇可能会讨论一下如何实现 PINGRESP 的接收,以上。