前面的文章已经完成了 mqtt 协议所规定的所有报文的发送与接收功能,但是为了适应较差的网络环境,mqtt 协议还规定了断开连接后的网络重连功能。此外,为了满足 QoS1 和 QoS2 的通信要求,mqtt 也对消息的超时重发进行了规定。这篇文章,我们的主题就是实现网络重连和消息超时重发功能,也是 mqtt 协议入门系列的最后一篇文章。
消息分发重试 mqtt 协议规定 在 mqtt 协议的规定中,有如下描述:
客户端设置清理会话(CleanSession)标志为 0 重连时,客户端和服务端必须使用原始的报文标识符重发 任何未确认的 PUBLISH 报文(如果 QoS>0)和 PUBREL 报文 。这是唯一要求客户端或 服务端重发消息的情况。
从中我们可知,客户端需要在下面三种情况下进行消息重发操作:
1、QoS1 服务质量的 PUBLISH 报文,在等待 PUBACK 时超时
2、QoS2 服务质量的 PUBLISH 报文,在等待 PUBREC 时超时
3、QoS2 服务质量的 PUBREC 报文,在等待 PUBREL 时超时
4、QoS2 服务质量的 PUBREL 报文,在等待 PUBCOMP 时超时
实现 显然,我们需要做的就是修改这些报文在超时情况下的处理,但是在这之前,我们需要有地方保存需要重发的报文。稍微分析一下可知,我们需要在扫描 ack 等待队列时处理超时情况,那么在等待队列实体 ack_wait_entry 中添加待重发的报文最合适不过了,如下:
1 2 3 4 5 struct ack_wait_entry { ... unsigned int resend_len; unsigned char *resend_buf; };
两个成员分别代表序列化后的待发报文,以及报文长度。
在扫描链表时,对超时等待的处理会有所不同,直接看代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 static void ack_wait_scan (struct mqtt_session *session) { struct list_head *cur , *next ; struct ack_wait_entry *entry ; list_for_each_safe(cur, next, &session->wait_ack_list) { entry = list_entry(cur, struct ack_wait_entry, list ); enum type type = entry->type; if (timer_is_expired(&entry->timer)) { switch (type) { case PUBACK: case PUBREC: case PUBREL: case PUBCOMP: try_to_resend(session, entry); break ; default : list_del(cur); timer_destory(&entry->timer); memory_free(entry); } } } }
对于上面提到的四种报文,我们会尝试重新发送,其余的报文如果超时,那么直接删除。try_to_resend 函数如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 static void try_to_resend (struct mqtt_session *session, struct ack_wait_entry *ack_entry) { DECLARE_TIMER(resend_timer); timer_cutoff(&resend_timer, session->net_timeout); timer_cutoff(&ack_entry->timer, session->net_timeout); mutex_lock(&session->write_lock); memcpy (session->write_buf, ack_entry->resend_buf, ack_entry->resend_len); mqtt_send_packet(session, ack_entry->resend_len, &resend_timer); mutex_unlock(&session->write_lock); timer_destory(&resend_timer); }
主要是修改等待实体的等待时间,获取到等待实体对应的重发报文,最后重新发送即可。
剩下的工作就是在发送上面各个报文时,将待发送的序列化报文备份一份到 ack_wait_entry 即可。因为所有的等待实体都是通过函数 ack_wait_list_add 添加到等待队列的,所以我们要修改此函数,原来的参数列表以及不能满足要求,需要新添加一个参数表示序列化的报文长度,修改后的函数如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 static int ack_wait_list_add (struct mqtt_session *session, enum type t, unsigned short packet_id, struct msg_wait_entry *msg_entry, unsigned int packet_len) { ... resend_buf = memory_alloc(packet_len); if (!resend_buf) return -1 ; memcpy (resend_buf, session->write_buf, packet_len); ... entry->resend_len = packet_len; entry->resend_buf = resend_buf; list_add(&entry->list , &session->wait_ack_list); ... }
需要注意的是,为了阻止内存泄漏,需要在移除等待实体时,释放申请的空间:
1 2 3 4 ack_entry_free: timer_destory(&ack_entry->timer); memory_free(ack_entry->resend_buf); memory_free(ack_entry);
项目中涉及释放 ack_wait_entry 实体空间时,都需要按照此方式释放。
互斥访问 最后的最后,在调用 ack_wait_list_add 将等待实体加入到队列中时,要确保 该函数在 互斥锁的保护中进行,因为在未修改这个函数之前,它不会访问缓冲区,但是修改后会访问,所以我们需要将其保护起来。下面以 PUBLISH 的 QoS1 和 QoS2 发布为例:
1 2 3 4 5 6 7 8 9 ... if (qos == QoS1) { retval = ack_wait_list_add(session, PUBACK, packet_id, NULL , packet_len); } else if (qos == QoS2) { retval = ack_wait_list_add(session, PUBREC, packet_id, NULL , packet_len); } mutex_unlock(&session->write_lock);
超时重连 当超过一定时间没有收到 或者 发送报文时,session 中的两个计时器会过期,并将 session 的状态设置为 无效状态。同时,线程会循环检测 session 的状态,一旦其断开连接,就尝试重连:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 while (1 ) { while (get_session_state(session) != SESSION_CONNECTED) { printf ("ack_handler thread: mqtt is not connected, reconnecting\n" ); if (try_to_reconnect(session) > 0 ) continue ; thread_stop(session->thread); } handle_ack(session, &thread_timer); debug_dump_ack_wait_list(session); debug_dump_msg_wait_list(session); ack_wait_scan(session); }
为了限制重连的尝试时间,需要在 session 中指定一个新的字段:
1 2 3 4 struct mqtt_session { ... unsigned int try_reconn_timeout; };
尝试重连的函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 static int try_to_reconnect (struct mqtt_session *session) { int retval = 0 ; DECLARE_TIMER(timer); timer_init(&timer); timer_cutoff(&timer, session->try_reconn_timeout); do { retval = mqtt_connect(session); } while (!timer_is_expired(&timer) && get_session_state(session) != SESSION_CONNECTED); if (get_session_state(session) == SESSION_CONNECTED) try_to_resubscribe(session); timer_destory(&timer); return retval; }
重新订阅报文 最后,看看重连后重新订阅报文的函数实现,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 static int try_to_resubscribe (struct mqtt_session *session) { int retval = 0 ; struct list_head *cur , *next ; struct msg_wait_entry *entry ; if (list_empty(&session->wait_msg_list)) return retval; list_for_each_safe(cur, next, &session->wait_msg_list) { entry = list_entry(cur, struct msg_wait_entry, list ); msg_handler_t handler = entry->msg_handler; struct topic_filter_t tf = { .filter = entry->topic_filter, .qos = tf.qos }; retval = mqtt_subscribe(session, &tf, handler); } return retval; }
总结 有时间会将整个项目进行一个总结,未完待续……