0%

【计算机网络】mqtt通信协议入门(十二)网络重连和消息重发实战

前面的文章已经完成了 mqtt 协议所规定的所有报文的发送与接收功能,但是为了适应较差的网络环境,mqtt 协议还规定了断开连接后的网络重连功能。此外,为了满足 QoS1 和 QoS2 的通信要求,mqtt 也对消息的超时重发进行了规定。这篇文章,我们的主题就是实现网络重连和消息超时重发功能,也是 mqtt 协议入门系列的最后一篇文章。

消息分发重试

mqtt 协议规定

在 mqtt 协议的规定中,有如下描述:

客户端设置清理会话(CleanSession)标志为 0 重连时,客户端和服务端必须使用原始的报文标识符重发 任何未确认的 PUBLISH 报文(如果 QoS>0)和 PUBREL 报文 。这是唯一要求客户端或 服务端重发消息的情况。

从中我们可知,客户端需要在下面三种情况下进行消息重发操作:

1、QoS1 服务质量的 PUBLISH 报文,在等待 PUBACK 时超时

2、QoS2 服务质量的 PUBLISH 报文,在等待 PUBREC 时超时

3、QoS2 服务质量的 PUBREC 报文,在等待 PUBREL 时超时

4、QoS2 服务质量的 PUBREL 报文,在等待 PUBCOMP 时超时

实现

显然,我们需要做的就是修改这些报文在超时情况下的处理,但是在这之前,我们需要有地方保存需要重发的报文。稍微分析一下可知,我们需要在扫描 ack 等待队列时处理超时情况,那么在等待队列实体 ack_wait_entry 中添加待重发的报文最合适不过了,如下:

1
2
3
4
5
struct ack_wait_entry {
...
unsigned int resend_len;
unsigned char *resend_buf;
};

两个成员分别代表序列化后的待发报文,以及报文长度。

在扫描链表时,对超时等待的处理会有所不同,直接看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static void ack_wait_scan(struct mqtt_session *session)
{
struct list_head *cur, *next;
struct ack_wait_entry *entry;

list_for_each_safe(cur, next, &session->wait_ack_list) {
entry = list_entry(cur, struct ack_wait_entry, list);
enum type type = entry->type;

if (timer_is_expired(&entry->timer)) {
switch (type) {
case PUBACK:
case PUBREC:
case PUBREL:
case PUBCOMP:
try_to_resend(session, entry);
break;
default:
list_del(cur);
timer_destory(&entry->timer);
memory_free(entry);
}
}
}
}

对于上面提到的四种报文,我们会尝试重新发送,其余的报文如果超时,那么直接删除。try_to_resend 函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
static void try_to_resend(struct mqtt_session *session, struct ack_wait_entry *ack_entry)
{
DECLARE_TIMER(resend_timer);
timer_cutoff(&resend_timer, session->net_timeout);
timer_cutoff(&ack_entry->timer, session->net_timeout);

mutex_lock(&session->write_lock);
memcpy(session->write_buf, ack_entry->resend_buf, ack_entry->resend_len);

mqtt_send_packet(session, ack_entry->resend_len, &resend_timer);
mutex_unlock(&session->write_lock);
timer_destory(&resend_timer);
}

主要是修改等待实体的等待时间,获取到等待实体对应的重发报文,最后重新发送即可。

剩下的工作就是在发送上面各个报文时,将待发送的序列化报文备份一份到 ack_wait_entry 即可。因为所有的等待实体都是通过函数 ack_wait_list_add 添加到等待队列的,所以我们要修改此函数,原来的参数列表以及不能满足要求,需要新添加一个参数表示序列化的报文长度,修改后的函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static int ack_wait_list_add(struct mqtt_session *session, enum type t, unsigned short packet_id, struct msg_wait_entry *msg_entry, unsigned int packet_len)
{
...
/* 申请新的空间并备份 待发送缓冲区 */
resend_buf = memory_alloc(packet_len);
if (!resend_buf)
return -1;
memcpy(resend_buf, session->write_buf, packet_len);

...
/* 初始化 */
entry->resend_len = packet_len;
entry->resend_buf = resend_buf;
list_add(&entry->list, &session->wait_ack_list);
...
}

需要注意的是,为了阻止内存泄漏,需要在移除等待实体时,释放申请的空间:

1
2
3
4
ack_entry_free:
timer_destory(&ack_entry->timer);
memory_free(ack_entry->resend_buf);
memory_free(ack_entry);

项目中涉及释放 ack_wait_entry 实体空间时,都需要按照此方式释放。

互斥访问

最后的最后,在调用 ack_wait_list_add 将等待实体加入到队列中时,要确保 该函数在 互斥锁的保护中进行,因为在未修改这个函数之前,它不会访问缓冲区,但是修改后会访问,所以我们需要将其保护起来。下面以 PUBLISH 的 QoS1 和 QoS2 发布为例:

1
2
3
4
5
6
7
8
9
...
/* 先添加到等待队列 */
if (qos == QoS1) {
retval = ack_wait_list_add(session, PUBACK, packet_id, NULL, packet_len);
} else if (qos == QoS2) {
retval = ack_wait_list_add(session, PUBREC, packet_id, NULL, packet_len);
}
/* 再释放互斥锁 */
mutex_unlock(&session->write_lock);

超时重连

当超过一定时间没有收到 或者 发送报文时,session 中的两个计时器会过期,并将 session 的状态设置为 无效状态。同时,线程会循环检测 session 的状态,一旦其断开连接,就尝试重连:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
while (1) {

while (get_session_state(session) != SESSION_CONNECTED) {
printf("ack_handler thread: mqtt is not connected, reconnecting\n");

if (try_to_reconnect(session) > 0)
continue;
thread_stop(session->thread);
}

handle_ack(session, &thread_timer);
debug_dump_ack_wait_list(session);
debug_dump_msg_wait_list(session);
ack_wait_scan(session);
}

为了限制重连的尝试时间,需要在 session 中指定一个新的字段:

1
2
3
4
struct mqtt_session {
...
unsigned int try_reconn_timeout;
};

尝试重连的函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static int try_to_reconnect(struct mqtt_session *session)
{
int retval = 0;
DECLARE_TIMER(timer);

/* 设置重连计时器 */
timer_init(&timer);
timer_cutoff(&timer, session->try_reconn_timeout);

/* 尝试重连 */
do {
retval = mqtt_connect(session);
} while (!timer_is_expired(&timer) && get_session_state(session) != SESSION_CONNECTED);

/* 重连后重新订阅报文 */
if (get_session_state(session) == SESSION_CONNECTED)
try_to_resubscribe(session);

timer_destory(&timer);
return retval;
}

重新订阅报文

最后,看看重连后重新订阅报文的函数实现,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static int try_to_resubscribe(struct mqtt_session *session)
{
int retval = 0;
struct list_head *cur, *next;
struct msg_wait_entry *entry;

if (list_empty(&session->wait_msg_list))
return retval;

/* 遍历 msg_wait_list 消息处理链表 */
list_for_each_safe(cur, next, &session->wait_msg_list) {
entry = list_entry(cur, struct msg_wait_entry, list);

/* 获取主题过滤器 */
msg_handler_t handler = entry->msg_handler;
struct topic_filter_t tf = {
.filter = entry->topic_filter,
.qos = tf.qos
};

/* 重新订阅主题 */
retval = mqtt_subscribe(session, &tf, handler);
}

return retval;
}

总结

有时间会将整个项目进行一个总结,未完待续……