上一篇 mqtt 协议的文章介绍了项目的整体架构,这篇文章计划将所有的接口都实现,并且能够成功连接服务器端。我会仍然按照自底向上的顺序介绍,下面先回顾一下我们的整体架构:
Linux 下系统层实现 memory 内存管理模块 在 Linux 下,我们可以直接使用 C 库函数,非常简单:
1 2 3 4 5 6 7 8 9 10 11 12 13 #include "memory.h" #include <stdlib.h> void *memory_alloc (int size) { return malloc (size); } void memory_free (void *ptr) { free (ptr); }
timer 定时器 在 Linux下,我们使用 struct timeval
结构来表示和计算时间,先来看一下这个结构,由秒和微妙表示:
1 2 3 4 5 struct timeval { __time_t tv_sec; __suseconds_t tv_usec; };
使用 timeval 初始化我们的定时器:
1 2 3 4 5 6 7 8 void timer_init (struct timer_t *t) { struct timeval *timeval = (struct timeval*)t->timer; if (!timeval) timeval = memory_alloc(sizeof (struct timeval)); memset (timeval, 0 , sizeof (struct timeval)); t->timer = (void *)timeval; }
设置我们的定时器的截止时间,这里我们的超时时间由毫秒表示,需要进行简单的时间转换,然后在系统时间的基础上计算出来我们的截止时间,并保存到定时器中:
1 2 3 4 5 6 7 8 9 10 11 void timer_cutoff (struct timer_t *t, unsigned int timeout) { struct timeval *base = (struct timeval*)t->timer; struct timeval interval = { .tv_sec = timeout / 1000 , .tv_usec = (timeout % 1000 ) * 1000 , }; gettimeofday(base, NULL ); timeradd(base, &interval, base); }
在判断是否超时时,也是直接和系统时间进行比较,系统时间一定是单调线性增长的。这里我们比较系统时间和之前设定的截止时间比较,如果系统时间超过了定时器,代表超时,返回 1
,否则返回 0
,表示没超时。
1 2 3 4 5 6 7 8 9 10 11 int timer_is_expired (struct timer_t *t) { struct timeval *base = (struct timeval*)t->timer; struct timeval now , minus ; gettimeofday(&now, NULL ); timersub(base, &now, &minus); if (minus.tv_sec > 0 || (minus.tv_sec == 0 && minus.tv_usec > 0 )) return 0 ; return 1 ; }
network 底层网络连接 基于 Linux 下的 socket api,实现底层网络连接的建立和数据读写,首先看看简单的初始化网络抽象:
1 2 3 4 5 6 void network_init (struct network_t *network, char *addr, char *port) { network->sockfd = -1 ; network->addr = addr; network->port = port; }
然后是比较重要的连接建立,这里会根据我们初始化的地址和端口号来创建一个 socket
然后建立连接,细节可以看代码中的注释:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 int network_connect (struct network_t *network) { int retval = 0 ; int sockfd; char *addr, *port; struct addrinfo hint , *result , *cur ; addr = network->addr; port = network->port; memset (&hint, 0 , sizeof (struct addrinfo)); hint.ai_family = AF_UNSPEC; hint.ai_socktype = SOCK_STREAM; hint.ai_protocol = IPPROTO_TCP; retval = getaddrinfo(addr, port, &hint, &result); if (retval != 0 ) return retval; cur = result; while (cur) { sockfd = socket(cur->ai_family, cur->ai_socktype, cur->ai_protocol); if (sockfd < 0 ) { retval = sockfd; continue ; } retval = connect(sockfd, cur->ai_addr, cur->ai_addrlen); if (retval == 0 ) { network->sockfd = sockfd; break ; } close(sockfd); cur = cur->ai_next; } freeaddrinfo(result); return retval; }
网络数据的读写,也比较简单,这里设置了一下网络超时时间:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 int network_read (struct network_t *network, char *buf, int count, int timeout) { int sockfd = network->sockfd; struct timeval tv = { .tv_sec = timeout / 1000 , .tv_usec = (timeout % 1000 ) * 1000 , }; setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof (struct timeval)); return read(sockfd, buf, count); } int network_write (struct network_t *network, const char *buf, int count, int timeout) { int sockfd = network->sockfd; struct timeval tv = { .tv_sec = timeout / 1000 , .tv_usec = (timeout % 1000 ) * 1000 , }; setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof (struct timeval)); return write(sockfd, buf, count); }
最后,释放网络连接:
1 2 3 4 5 6 7 8 int network_release (struct network_t *network) { int retval = 0 ; if (network->sockfd) retval = close(network->sockfd); memset (network, 0 , sizeof (struct network_t )); return retval; }
序列化和反序列化层 基本接口 为了实现连接报文、心跳报文的序列化和反序列化,我们需要先提供一些基本元素的序列化/反序列化方法,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 int serialize_fix_header (unsigned char *buf, int count, enum type t) ;unsigned int packet_len (unsigned int vlen) ;unsigned int mqtt_string_len (struct mqtt_string mstr) ;void writec (unsigned char **ptr, unsigned char ch) ;void writei (unsigned char **ptr, unsigned int i) ;void write_cstring (unsigned char **ptr, const char *str) ;void write_lenstring (unsigned char **ptr, struct len_string len_str) ;void write_mqttstring (unsigned char **ptr, struct mqtt_string mqtt_str) ;void readc (const unsigned char **ptr, unsigned char *ch) ;void readi (const unsigned char **ptr, unsigned int *i) ;void read_mqttstring (unsigned char **ptr, const struct mqtt_string *mqtt_str) ;int write_optslen (unsigned char *buf, unsigned int len) ;int read_optslen (const unsigned char *buf, unsigned int *len) ;
其中大部分函数都比较简单且容易理解,这里简要介绍下 write_optslen
与 read_optslen
两个函数,它们负责计算 可变报头+有效负载 长度并进行序列化与反序列化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 int write_optslen (unsigned char *buf, unsigned int len) { int idx = 0 ; do { char ch = len & (128 - 1 ); len >>= 7 ; if (len > 0 ) ch |= 0x80 ; buf[idx++] = ch; } while (len > 0 ); return idx; } int read_optslen (const unsigned char *buf, unsigned int *len) { int idx = 0 ; int muiltiplier = 1 ; unsigned char remain, val; *len = 0 ; do { if (idx >= 4 ) return -1 ; char ch = buf[idx++]; remain = ch & 0x80 ; val = ch & 0x7f ; *len += val * muiltiplier; muiltiplier <<= 7 ; } while (remain); return idx; }
上面的函数严格遵守 mqtt 报文规定的长度编码方式,即
对小于 128 的值它使用单字节编码。更大的值按下面的方式处理。 低 7 位有效位用于编码数据,最高有效位用于指示是否有更多的字节。因此每个字节可以编码 128 个数值 和一个延续位(continuation bit)。剩余长度字段最大 4 个字节。
对于只有固定头部的报文,这里提供了通用的序列化方式,根据报文类型序列化固定头部:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 int serialize_fix_header (unsigned char *buf, int count, enum type t) { unsigned char *ptr = buf; header_t header = {0 }; if (count < 2 ) return -1 ; header.bits.type = t; writec(&ptr, header.byte); ptr += write_optslen(ptr, 0 ); return ptr - buf; }
连接报文 严格按照 mqtt 协议规定的字段,我们可以实现 CONNECT 报文的序列化操作,细节可以看代码注释:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 int serialize_connect (unsigned char *buf, int count, struct connect_options *conn_opts) { unsigned int len; unsigned char *ptr = buf; header_t header = {0 }; connect_flags_t flags = {0 }; len = conn_opts_size(conn_opts); if (packet_len(len) > count) return -1 ; header.bits.type = CONNECT; writec(&ptr, header.byte); ptr += write_optslen(ptr, len); if (conn_opts->proto_version == 3 ) { write_cstring(&ptr, "MQIsdp" ); writec(&ptr, 3 ); } else if (conn_opts->proto_version == 4 ) { write_cstring(&ptr, "MQTT" ); writec(&ptr, 4 ); } flags.bits.clean_session = conn_opts->clean_session; flags.bits.will_flag = conn_opts->will_flag ? 1 : 0 ; if (conn_opts->will_flag) { flags.bits.will_qos = conn_opts->will_qos; flags.bits.will_retain = conn_opts->will_retain; } flags.bits.password_flag = conn_opts->password_flag; flags.bits.username_flag = conn_opts->username_flag; writec(&ptr, flags.byte); writei(&ptr, conn_opts->keep_alive_interval); write_mqttstring(&ptr, conn_opts->client_id); if (conn_opts->will_flag) { write_mqttstring(&ptr, conn_opts->will_topic); write_mqttstring(&ptr, conn_opts->will_message); } if (conn_opts->username_flag) write_mqttstring(&ptr, conn_opts->username); if (conn_opts->password_flag) write_mqttstring(&ptr, conn_opts->password); return ptr - buf; }
连接响应报文的反序列化比较简单,这里一并讲,细节可以看代码中的注释:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 int deserialize_connack (struct connack_options *connack_opts, const unsigned char *buf, int count) { unsigned int len; const unsigned char * ptr = buf; header_t header = {0 }; connack_flags_t flags = {0 }; readc(&ptr, &header.byte); if (header.bits.type != CONNACK) return 0 ; ptr += read_optslen(ptr, &len); if (len < 2 ) return 0 ; readc(&ptr, &flags.byte); connack_opts->session_present = flags.bits.session_present; readc(&ptr, &connack_opts->return_code); return 1 ; }
断开连接报文只有固定头部部分,它的序列化直接调用上面的通用接口:
1 2 3 4 int serialize_disconnect (unsigned char *buf, int count) { return serialize_fix_header(buf, count, DISCONNECT); }
PING 报文 PINGREQ 和 PINGRESP 报文均只有固定头部部分,实现比较简单:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 int serialize_pingreq (unsigned char *buf, unsigned int count) { return serialize_fix_header(buf, count, PINGREQ); } int deserialize_pingresp (const unsigned char *buf, unsigned int count) { const unsigned char *ptr = buf; header_t header = {0 }; readc(&ptr, &header.byte); if (header.bits.type != PINGRESP) return -1 ; return 2 ; }
MQTT 协议层 mqtt 协议层代码量较哒,出于篇幅考虑,新开一篇文章介绍。