经过前面的努力,我们的程序已经能够正常连接服务器,并且可以实现保活功能。接下来我们继续深入学习。这篇文章的主题是 Topic 和订阅与取消订阅,对应 SUBSCRIBE 与 UNSUBSCRIEB 报文,同时还需要接收服务端响应的 SUBACK 和 UNSUBACK 报文,这些报文的结构在上一篇文章已经介绍了,本文我们研究一下如何实现。
SUBSCRIBE 订阅 序列化/反序列化 SUBSCRIBE 报文的序列化 有了 CONNECT ,PINGREQ 等报文序列化的基础,很容易就能把 SUBSCRIBE 报文的序列化方法实现,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 int serialize_sub (unsigned char *buf, unsigned int buflen, unsigned short packet_id, \ struct mqtt_string filters[], enum qos qoss[], unsigned int count) { unsigned char *ptr = buf; header_t header = {0 }; unsigned int len = 0 ; header.divide.type = SUBSCRIBE; header.divide.reserved = 0x2 ; writec(&ptr, header.byte); len = sub_opts_len(filters, count); if (packet_len(len) + len > buflen) return -1 ; ptr += write_optslen(ptr, len); writei(&ptr, packet_id); for (int i = 0 ; i < count; i++) { write_mqttstring(&ptr, filters[i]); writei(&ptr, qoss[i]); } return ptr - buf; }
其中剩余长度的计算如下:
1 2 3 4 5 6 7 8 9 10 11 static unsigned int sub_opts_len (struct mqtt_string filters[], unsigned int count) { const unsigned int packet_id_len = 2 , qos_len = 1 ; unsigned int len = 0 ; len += packet_id_len; for (int i = 0 ; i < count; i++) len += 2 + mqtt_string_len(filters[i]) + qos_len; return len; }
SUBACK 报文的反序列化 SUBACK 报文会携带 packet_id 以及 返回码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 int deserialize_suback (unsigned int *packet_id, unsigned char *rc, unsigned int *count, \ const unsigned char *buf) { const unsigned char *ptr = buf; header_t header = {0 }; unsigned int len; readc(&ptr, &header.byte); if (header.divide.type != SUBACK) return -1 ; ptr += read_optslen(ptr, &len); readi(&ptr, packet_id); *count = len - 2 ; for (int i = 0 ; i < *count; i++) readc(&ptr, rc + i); return ptr - buf; }
协议接口层 在实现协议层的 subscribe 前,先来考虑这样一个问题:应该以什么方式等待 SUBACK 响应报文?
在之前的文章中,我们已经展示了异步等待的好处,也实现了 PINGRESP 报文的异步等待。但是,在之前的实现中,PINGREQ 报文是系统根据定时器来确定何时发送的,也就是说,PINGREQ 报文总是会在合适的时间由系统发送,除了这一点,PINGRESP 与 SUBACK 对于 PINGREQ 和 subscribe 报文的响应最大的不太在于:
SUBACK 会对拥有特定 packet_id 的 SUBSCRIBE 报文响应,是一对一的
PINGRESP 会对所有的 PINGRESP 进行响应,没有一一对应的关系
举一个实际的例子,假设我们很短时间内连续发送了两个 PINGREQ 报文,只收到了一条 PINGRESP 报文,但是只要 PINGRESP 报文是在 keep_alive_interval 之内回复的,那我们就认为保活成功了。换句话说,服务器使用一条 PINGRESP 报文,回复了客户端两条 PINGREQ 报文!
同样的例子对于 subscribe 报文和 SUBACK 报文是不适用的!因为二者使用了 “packet_id” 做完一一对应的标识
所以,在等待 SUBACK 报文时,不能像等待 PINGRESP 那样处理,我们还需要将所有发送的 subscribe 报文保存下来 ,等收到 SUBACK 时,再扫描这些报文找到 “packet_id” 匹配的那一个。很明显,我们需要在 session 对象中维护一个等待响应队列 结构,可以使用链表实现。下面的伪代码描述了订阅的过程:
1 2 3 4 5 6 subscribe() { ... list_add(); }
在等待 SUBACK 时,我们仍可使用异步的方式进行,这里直接扩充 ack_handler
线程的工作内容即可,下面的伪代码描述了扩充后的异步线程功能,(这是在之前的框架上修改的,可以参考异步等待的原理与实现 ):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 while (1 ){ else if (type == SUBACK) { list_remove(); } for_each_entry_in_waitlist() { if (expired(entry->timer)) { list_remove(); printf ("xxx entry subscribe timeout\n" ); } } }
list_head 结构 由于等待队列的出现,我们需要相应的数据结构来实现它,这里我们封装一个链表数据结构,并提供一下简单的使用接口,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 struct list_head { struct list_head *prev , *next ; }; static inline void list_init (struct list_head *list ) { list ->next = list ; list ->prev = list ; } static inline void __list_add(struct list_head *new, struct list_head *prev, struct list_head *next) { next->prev = new; new->next = next; new->prev = prev; prev->next = new; } static inline void list_add (struct list_head *new, struct list_head *head) { __list_add(new, head, head->next); } static inline void list_add_tail (struct list_head *new, struct list_head *head) { __list_add(new, head->prev, head); } static inline void __list_del(struct list_head * prev, struct list_head * next){ next->prev = prev; prev->next = next; } static inline void list_del (struct list_head *entry) { __list_del(entry->prev, entry->next); } static inline int list_empty (const struct list_head *head) { return head->next == head; } #define offset_of(type, number) \ ((size_t)&(((type *)0)->number)) #define container_of(ptr, type, number) \ ((type *)((unsigned char *)ptr - offset_of(type, number))) #define list_entry(list, type, number) \ container_of(list, type, number) #define list_for_each(curr, list) \ for (curr = (list)->next; curr != (list); curr = curr->next) #define list_for_each_safe(curr, next, list) \ for (curr = (list)->next, next = curr->next; curr != (list); \ curr = next, next = curr->next)
将 list_head 加入 session 作为等待队列的头结点,并在 session_init 函数中初始化:
1 2 3 4 5 struct mqtt_session { ... struct list_head wait_ack_list ; ... }
1 2 3 4 5 6 session_init() { ... list_init(&session->wait_ack_list); ... }
packet_id 的维护 发送 SUBSCRIBE 和 UNSUBSCRIBE 报文都需要为报文提供 packet_id 这一字段,mqtt 对其要求为 16 位的非零数字,在收到回复前不能复用,我们需要在 session 对象中维护当前可用 packet_id:
1 2 3 4 struct mqtt_session { ... unsigned short packet_id; }
通过下面的函数获取:
1 2 3 4 5 6 7 static unsigned int get_next_id (struct mqtt_session *session) { if (session->packet_id == MQTT_MAX_PACKET_ID) session->packet_id = MQTT_MIN_PACKET_ID; session->packet_id++; return session->packet_id; }
订阅报文接口 正如 mqtt 协议规定的那样,SUBSCRIBE 报文需要携带其想要订阅的主题,即携带一个主题过滤器,我们是这样规定的:
1 2 3 4 5 struct topic_filter_t { char **filters; enum qos *qoss ; unsigned int count; };
接下来是 mqtt_subscribe 函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 int mqtt_subscribe (struct mqtt_session *session, struct topic_filter_t *tf) { int retval = 0 ; struct mqtt_string *filters ; unsigned int count, packet_len; struct timer_t timer = {0 }; unsigned short packet_id; if (!session || !session->write_buf) return -1 ; if (!tf) return -1 ; if (!timer_init(&timer)) return -1 ; count = tf->count; filters = (struct mqtt_string*)memory_alloc(sizeof (struct mqtt_string) * count); if (!filters) { retval = -1 ; goto destory_timer; } for (int i = 0 ; i < count; i++) (filters + i)->cstring = tf->filters[i]; packet_id = get_next_id(session); retval = serialize_sub(session->write_buf, session->write_buflen, packet_id, filters, tf->qoss, count); if (retval < 0 ) goto free_filters; else packet_len = retval; retval = mqtt_send_packet(session, packet_len, &timer); if (retval < 0 ) goto free_filters; ack_wait_list_add(session, SUBACK, packet_id); printf ("main thread: sending a SUBSCRIBE packet id is %d\n" , packet_id); free_filters: memory_free(filters); destory_timer: timer_destory(&timer); return retval; }
和以前的报文发送函数相比,发送 SUBSCRIBE 报文没有什么不同,唯一的不同点在于:它使用了 ack_wait_list_add
函数,在等待队列上创建了一个实体,该实体包含下面的元素:
1 2 3 4 5 6 struct ack_wait_entry { struct list_head list ; enum type type ; unsigned short packet_id; struct timer_t timer ; };
list 成员用来将所有的 ack_wait_entry
连成一个链表
type 成员代表了想要等待的报文类型,对于 SUBSCRIBE 报文而言,其想要等待的是 SUBACK 类型的报文
packet_id 成员代表了 SUBSCRIBE 报文和 SUBACK 报文的报文标识符
timer 成员用来定时,当想要的 ack 报文迟迟没有等到时,需要为这个等待实体设置一个超时时间,在超时后需要将其从等待队列删除
下面是 ack_wait_list_add
函数的实现,该函数创建一个 ack_wait_entry
实体,并把它加入到 session 的等待队列中,在这一切之前需要判断队列中是否已经存在一个相同的实体:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 static int ack_wait_list_add (struct mqtt_session *session, enum type t, unsigned short packet_id) { int retval = 0 ; struct ack_wait_entry *entry ; if (ack_wait_entry_exist(&session->wait_ack_list, t, packet_id)) { return -1 ; } entry = memory_alloc(sizeof (struct ack_wait_entry)); if (!entry) return -1 ; memset (entry, 0 , sizeof (struct ack_wait_entry)); if (!timer_init(&entry->timer)) { retval = -1 ; goto free_entry; } timer_cutoff(&entry->timer, session->net_timeout); entry->packet_id = packet_id; entry->type = t; list_add(&entry->list , &session->wait_ack_list); return 0 ; free_entry: memory_free(entry); return retval; }
用来判断等待队列是否存在相同实体的函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 static int ack_wait_entry_exist (struct list_head *head, enum type t, unsigned short packet_id) { int bool = 0 ; struct ack_wait_entry *entry ; struct list_head *cur ; list_for_each(cur, head) { entry = list_entry(cur, struct ack_wait_entry, list ); if (entry->type == t && entry->packet_id == packet_id) { bool = 1 ; break ; } } return bool ; }
在辅助线程的主函数中,我们出来处理 ack 报文,还需要扫描等待队列,移除超时的报文:
1 2 3 4 5 6 7 8 9 static void mqtt_ack_handler (void *arg ) { ... while (1 ) { handle_ack(session, &thread_timer); ack_wait_scan(session); } }
ack_wait_scan 函数实现如下,它辅助扫描等待队列,移除定时器过期的实体,并释放实体的内存空间:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 static void ack_wait_scan (struct mqtt_session *session) { struct list_head *cur , *next ; struct ack_wait_entry *entry ; list_for_each_safe(cur, next, &session->wait_ack_list) { entry = list_entry(cur, struct ack_wait_entry, list ); if ((entry->type == SUBACK || entry->type == UNSUBACK) && timer_is_expired(&entry->timer)) { printf ("ack_handler thread: SUB or UNSUB failed packet id is %d\n" , entry->packet_id); list_del(cur); timer_destory(&entry->timer); memory_free(entry); } } }
在 handle_ack
中还要加入对 SUBACK 和 UNSUBACK 的响应:
1 2 3 4 5 6 7 8 9 10 11 static int handle_ack (struct mqtt_session *session, struct timer_t *timer) { ... if (type == PINGRESP) session->pingresp_waiting = 0 ; else if (type == SUBACK) retval = suback_handler(session); else if (type == UNSUBACK) retval = unsuback_handler(session); ... }
我们继续看,这两个函数的处理是类似的,它们会反序列化对应报文,并删除等待队列中的对应实体:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 static int suback_handler (struct mqtt_session *session) { int retval = 0 ; unsigned int packet_id, count; unsigned char return_code; retval = deserialize_suback(&packet_id, &return_code, &count, session->read_buf); if (retval < 0 ) return -1 ; ack_wait_list_del(session, SUBACK, packet_id); if (return_code != QoSF) printf ("ack_handler thread: received SUBACK packet is %d\n" , packet_id); else printf ("ack_handler thread: received SUBACK[ack = 0x80] packet is %d\n" , packet_id); return retval; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 static int unsuback_handler (struct mqtt_session *session) { int retval = 0 ; unsigned int packet_id; retval = deserialize_unsuback(&packet_id, session->read_buf); if (retval < 0 ) return -1 ; ack_wait_list_del(session, UNSUBACK, packet_id); printf ("ack_handler thread: received UNSUBACK packet is %d\n" , packet_id); return retval; }
具体删除等待队列中指定实体的任务由 ack_wait_list_del
函数实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 static int ack_wait_list_del (struct mqtt_session *session, enum type t, unsigned short packet_id) { struct ack_wait_entry *entry = NULL ; struct list_head *cur ; list_for_each(cur, &session->wait_ack_list) { entry = list_entry(cur, struct ack_wait_entry, list ); if (entry->type == t && entry->packet_id == packet_id) { list_del(cur); break ; } } if (!entry) { timer_destory(&entry->timer); memory_free(entry); } return 0 ; }
UNSUBSCRIBE 取消订阅 序列化/反序列化 UNSUBSCRIBE 报文的序列化 和 SUBSCRIBE 报文的序列化类似,不同的地方在于其有效负载中无 QoS 信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 int serialize_unsub (unsigned char *buf, unsigned int buflen, unsigned short packet_id, \ struct mqtt_string filters[], unsigned int count) { unsigned char *ptr = buf; header_t header = {0 }; unsigned int len = 0 ; header.divide.type = UNSUBSCRIBE; header.divide.reserved = 0x2 ; writec(&ptr, header.byte); len = unsub_opts_len(filters, count); if (packet_len(len) + len > buflen) return -1 ; ptr += write_optslen(ptr, len); writei(&ptr, packet_id); for (int i = 0 ; i < count; i++) write_mqttstring(&ptr, filters[i]); return ptr - buf; }
计算剩余长度
1 2 3 4 5 6 7 8 9 10 11 static unsigned int unsub_opts_len (struct mqtt_string filters[], unsigned int count) { const unsigned int packet_id_len = 2 ; unsigned int len = 0 ; len += packet_id_len; for (int i = 0 ; i < count; i++) len += 2 + mqtt_string_len(filters[i]); return len; }
UNSUBACK 报文的反序列化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 int deserialize_unsuback (unsigned int *packet_id, const unsigned char *buf) { const unsigned char *ptr = buf; header_t header = {0 }; unsigned int len; readc(&ptr, &header.byte); if (header.divide.type != UNSUBACK) return -1 ; ptr += read_optslen(ptr, &len); readi(&ptr, packet_id); return ptr - buf; }
协议接口层 取消订阅接口 取消订阅的实现,几乎和订阅的实现一致,这里不在赘述,看代码即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 int mqtt_unsubscribe (struct mqtt_session *session, struct topic_filter_t *tf) { int retval = 0 ; struct mqtt_string *filters ; unsigned int count, packet_len; struct timer_t timer = {0 }; unsigned short packet_id; if (!session || !session->write_buf) return -1 ; if (!tf) return -1 ; if (!timer_init(&timer)) return -1 ; count = tf->count; filters = (struct mqtt_string*)memory_alloc(sizeof (struct mqtt_string) * count); if (!filters) { retval = -1 ; goto destory_timer; } for (int i = 0 ; i < count; i++) (filters + i)->cstring = tf->filters[i]; packet_id = get_next_id(session); retval = serialize_unsub(session->write_buf, session->write_buflen, packet_id, filters, count); if (retval < 0 ) goto free_filters; else packet_len = retval; retval = mqtt_send_packet(session, packet_len, &timer); if (retval < 0 ) goto free_filters; ack_wait_list_add(session, UNSUBACK, packet_id); printf ("main thread: sending a UNSUB packet id is %d\n" , packet_id); free_filters: memory_free(filters); destory_timer: timer_destory(&timer); return retval; }