0%

【计算机网络】mqtt通信协议入门(九)订阅报文实战

经过前面的努力,我们的程序已经能够正常连接服务器,并且可以实现保活功能。接下来我们继续深入学习。这篇文章的主题是 Topic 和订阅与取消订阅,对应 SUBSCRIBE 与 UNSUBSCRIEB 报文,同时还需要接收服务端响应的 SUBACK 和 UNSUBACK 报文,这些报文的结构在上一篇文章已经介绍了,本文我们研究一下如何实现。

SUBSCRIBE 订阅

序列化/反序列化

SUBSCRIBE 报文的序列化

有了 CONNECT ,PINGREQ 等报文序列化的基础,很容易就能把 SUBSCRIBE 报文的序列化方法实现,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
int serialize_sub(unsigned char *buf, unsigned int buflen, unsigned short packet_id, \
struct mqtt_string filters[], enum qos qoss[], unsigned int count)
{
unsigned char *ptr = buf;
header_t header = {0};
unsigned int len = 0;

/* 固定头部 */
header.divide.type = SUBSCRIBE;
header.divide.reserved = 0x2;
writec(&ptr, header.byte);

/* 剩余长度部分 */
len = sub_opts_len(filters, count);
if (packet_len(len) + len > buflen)
return -1;
ptr += write_optslen(ptr, len);

/* 可变头部 */
writei(&ptr, packet_id);

/* 有效负载 */
for (int i = 0; i < count; i++) {
write_mqttstring(&ptr, filters[i]);
writei(&ptr, qoss[i]);
}

return ptr - buf;
}

其中剩余长度的计算如下:

1
2
3
4
5
6
7
8
9
10
11
static unsigned int sub_opts_len(struct mqtt_string filters[], unsigned int count)
{
const unsigned int packet_id_len = 2, qos_len = 1;
unsigned int len = 0;

len += packet_id_len;
for (int i = 0; i < count; i++)
len += 2 + mqtt_string_len(filters[i]) + qos_len;

return len;
}

SUBACK 报文的反序列化

SUBACK 报文会携带 packet_id 以及 返回码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int deserialize_suback(unsigned int *packet_id, unsigned char *rc, unsigned int *count, \
const unsigned char *buf)
{
const unsigned char *ptr = buf;
header_t header = {0};
unsigned int len;

readc(&ptr, &header.byte);
if (header.divide.type != SUBACK)
return -1;

ptr += read_optslen(ptr, &len);
readi(&ptr, packet_id);

*count = len - 2;
for (int i = 0; i < *count; i++)
readc(&ptr, rc + i);

return ptr - buf;
}

协议接口层

在实现协议层的 subscribe 前,先来考虑这样一个问题:应该以什么方式等待 SUBACK 响应报文?

在之前的文章中,我们已经展示了异步等待的好处,也实现了 PINGRESP 报文的异步等待。但是,在之前的实现中,PINGREQ 报文是系统根据定时器来确定何时发送的,也就是说,PINGREQ 报文总是会在合适的时间由系统发送,除了这一点,PINGRESP 与 SUBACK 对于 PINGREQ 和 subscribe 报文的响应最大的不太在于:

SUBACK 会对拥有特定 packet_id 的 SUBSCRIBE 报文响应,是一对一的

PINGRESP 会对所有的 PINGRESP 进行响应,没有一一对应的关系

举一个实际的例子,假设我们很短时间内连续发送了两个 PINGREQ 报文,只收到了一条 PINGRESP 报文,但是只要 PINGRESP 报文是在 keep_alive_interval 之内回复的,那我们就认为保活成功了。换句话说,服务器使用一条 PINGRESP 报文,回复了客户端两条 PINGREQ 报文!

同样的例子对于 subscribe 报文和 SUBACK 报文是不适用的!因为二者使用了 “packet_id” 做完一一对应的标识

所以,在等待 SUBACK 报文时,不能像等待 PINGRESP 那样处理,我们还需要将所有发送的 subscribe 报文保存下来 ,等收到 SUBACK 时,再扫描这些报文找到 “packet_id” 匹配的那一个。很明显,我们需要在 session 对象中维护一个等待响应队列 结构,可以使用链表实现。下面的伪代码描述了订阅的过程:

1
2
3
4
5
6
subscribe()
{
... /* 序列化和发送到网络 */

list_add(); /* 加入到等待响应队列 */
}

在等待 SUBACK 时,我们仍可使用异步的方式进行,这里直接扩充 ack_handler 线程的工作内容即可,下面的伪代码描述了扩充后的异步线程功能,(这是在之前的框架上修改的,可以参考异步等待的原理与实现):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
while(1)
{
// mqtt_recv_packet(&type);
// if (type == PINGRESP)
// session->pingresp_waiting = 0;
else if (type == SUBACK) {
list_remove(); /* 订阅成功,从等待响应队列移除 */
}

/* 遍历等待队列 移除超时对象 */
for_each_entry_in_waitlist() {
if (expired(entry->timer)) {
list_remove(); /* 订阅失败,从等待响应队列移除 */
printf("xxx entry subscribe timeout\n");
}
}

// if (timer_expire(last_send_timer) || timer_expire(last_recv_timer)) {
// /* 我们该发送 PINGREQ 了 */
// if (session->pingresp_waiting) {
// /* 上次的 PINGRESP 未收到,通知主线程出错 */
// set_session_state(session, SESSION_DISCONNECT)
// error();
// } else {
// /* 发送 PINGREQ 报文 */
// mqtt_pingreq();
// /* 等待对应 PINGRESP */
// session->pingresp_waiting = 1;
// }
// }

}

list_head 结构

由于等待队列的出现,我们需要相应的数据结构来实现它,这里我们封装一个链表数据结构,并提供一下简单的使用接口,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
struct list_head {
struct list_head *prev, *next;
};

static inline void list_init(struct list_head *list) {
list->next = list;
list->prev = list;
}

static inline void __list_add(struct list_head *new,
struct list_head *prev,
struct list_head *next)
{
next->prev = new;
new->next = next;
new->prev = prev;
prev->next = new;
}

static inline void list_add(struct list_head *new, struct list_head *head)
{
__list_add(new, head, head->next);
}

static inline void list_add_tail(struct list_head *new, struct list_head *head)
{
__list_add(new, head->prev, head);
}

static inline void __list_del(struct list_head * prev, struct list_head * next)
{
next->prev = prev;
prev->next = next;
}

static inline void list_del(struct list_head *entry)
{
__list_del(entry->prev, entry->next);
}

static inline int list_empty(const struct list_head *head)
{
return head->next == head;
}

#define offset_of(type, number) \
((size_t)&(((type *)0)->number))

#define container_of(ptr, type, number) \
((type *)((unsigned char *)ptr - offset_of(type, number)))

#define list_entry(list, type, number) \
container_of(list, type, number)

#define list_for_each(curr, list) \
for (curr = (list)->next; curr != (list); curr = curr->next)

#define list_for_each_safe(curr, next, list) \
for (curr = (list)->next, next = curr->next; curr != (list); \
curr = next, next = curr->next)

将 list_head 加入 session 作为等待队列的头结点,并在 session_init 函数中初始化:

1
2
3
4
5
struct mqtt_session {
...
struct list_head wait_ack_list;
...
}
1
2
3
4
5
6
session_init()
{
...
list_init(&session->wait_ack_list);
...
}

packet_id 的维护

发送 SUBSCRIBE 和 UNSUBSCRIBE 报文都需要为报文提供 packet_id 这一字段,mqtt 对其要求为 16 位的非零数字,在收到回复前不能复用,我们需要在 session 对象中维护当前可用 packet_id:

1
2
3
4
struct mqtt_session {
...
unsigned short packet_id;
}

通过下面的函数获取:

1
2
3
4
5
6
7
static unsigned int get_next_id(struct mqtt_session *session)
{
if (session->packet_id == MQTT_MAX_PACKET_ID)
session->packet_id = MQTT_MIN_PACKET_ID;
session->packet_id++;
return session->packet_id;
}

订阅报文接口

正如 mqtt 协议规定的那样,SUBSCRIBE 报文需要携带其想要订阅的主题,即携带一个主题过滤器,我们是这样规定的:

1
2
3
4
5
struct topic_filter_t {
char **filters;
enum qos *qoss;
unsigned int count;
};

接下来是 mqtt_subscribe 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
int mqtt_subscribe(struct mqtt_session *session, struct topic_filter_t *tf)
{
int retval = 0;
struct mqtt_string *filters;
unsigned int count, packet_len;
struct timer_t timer = {0};
unsigned short packet_id;

if (!session || !session->write_buf)
return -1;

if (!tf)
return -1;

if (!timer_init(&timer))
return -1;

count = tf->count;
filters = (struct mqtt_string*)memory_alloc(sizeof(struct mqtt_string) * count);
if (!filters) {
retval = -1;
goto destory_timer;
}

for (int i = 0; i < count; i++)
(filters + i)->cstring = tf->filters[i];

/* 获取 packet_id 以及序列化报文 */
packet_id = get_next_id(session);
retval = serialize_sub(session->write_buf, session->write_buflen, packet_id, filters, tf->qoss, count);
if (retval < 0)
goto free_filters;
else
packet_len = retval;

/* 发送报文 */
retval = mqtt_send_packet(session, packet_len, &timer);
if (retval < 0)
goto free_filters;

/* 加入到等待 SUBACK 的队列中 */
ack_wait_list_add(session, SUBACK, packet_id);

printf("main thread: sending a SUBSCRIBE packet id is %d\n", packet_id);

free_filters:
memory_free(filters);
destory_timer:
timer_destory(&timer);
return retval;

}

和以前的报文发送函数相比,发送 SUBSCRIBE 报文没有什么不同,唯一的不同点在于:它使用了 ack_wait_list_add 函数,在等待队列上创建了一个实体,该实体包含下面的元素:

1
2
3
4
5
6
struct ack_wait_entry {
struct list_head list;
enum type type;
unsigned short packet_id;
struct timer_t timer;
};
  • list 成员用来将所有的 ack_wait_entry 连成一个链表
  • type 成员代表了想要等待的报文类型,对于 SUBSCRIBE 报文而言,其想要等待的是 SUBACK 类型的报文
  • packet_id 成员代表了 SUBSCRIBE 报文和 SUBACK 报文的报文标识符
  • timer 成员用来定时,当想要的 ack 报文迟迟没有等到时,需要为这个等待实体设置一个超时时间,在超时后需要将其从等待队列删除

下面是 ack_wait_list_add 函数的实现,该函数创建一个 ack_wait_entry 实体,并把它加入到 session 的等待队列中,在这一切之前需要判断队列中是否已经存在一个相同的实体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
static int ack_wait_list_add(struct mqtt_session *session, enum type t, unsigned short packet_id)
{
int retval = 0;
struct ack_wait_entry *entry;

if (ack_wait_entry_exist(&session->wait_ack_list, t, packet_id)) {
return -1;
}

/* 创建和初始化 ack_wait_entry 并将其加入等待队列 */
entry = memory_alloc(sizeof(struct ack_wait_entry));
if (!entry)
return -1;
memset(entry, 0, sizeof(struct ack_wait_entry));

if (!timer_init(&entry->timer)) {
retval = -1;
goto free_entry;
}
timer_cutoff(&entry->timer, session->net_timeout);

entry->packet_id = packet_id;
entry->type = t;
list_add(&entry->list, &session->wait_ack_list);

return 0;

free_entry:
memory_free(entry);
return retval;

}

用来判断等待队列是否存在相同实体的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static int ack_wait_entry_exist(struct list_head *head, enum type t, unsigned short packet_id)
{
int bool = 0;
struct ack_wait_entry *entry;
struct list_head *cur;

/* 遍历队列 根据实体的类型和报文标识符来判断是否已存在 */
list_for_each(cur, head) {
entry = list_entry(cur, struct ack_wait_entry, list);
if (entry->type == t && entry->packet_id == packet_id) {
bool = 1;
break;
}
}

return bool;
}

在辅助线程的主函数中,我们出来处理 ack 报文,还需要扫描等待队列,移除超时的报文:

1
2
3
4
5
6
7
8
9
static void mqtt_ack_handler(void *arg )
{
...
while (1) {
handle_ack(session, &thread_timer);
ack_wait_scan(session);
}
}

ack_wait_scan 函数实现如下,它辅助扫描等待队列,移除定时器过期的实体,并释放实体的内存空间:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static void ack_wait_scan(struct mqtt_session *session)
{
struct list_head *cur, *next;
struct ack_wait_entry *entry;

list_for_each_safe(cur, next, &session->wait_ack_list) {
entry = list_entry(cur, struct ack_wait_entry, list);
/* 找到过期的实体,将其删除 */
if ((entry->type == SUBACK || entry->type == UNSUBACK) && timer_is_expired(&entry->timer)) {
printf("ack_handler thread: SUB or UNSUB failed packet id is %d\n", entry->packet_id);

list_del(cur);
timer_destory(&entry->timer);
memory_free(entry);
}
}
}

handle_ack 中还要加入对 SUBACK 和 UNSUBACK 的响应:

1
2
3
4
5
6
7
8
9
10
11
static int handle_ack(struct mqtt_session *session, struct timer_t *timer)
{
...
if (type == PINGRESP)
session->pingresp_waiting = 0;
else if (type == SUBACK)
retval = suback_handler(session);
else if (type == UNSUBACK)
retval = unsuback_handler(session);
...
}

我们继续看,这两个函数的处理是类似的,它们会反序列化对应报文,并删除等待队列中的对应实体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static int suback_handler(struct mqtt_session *session)
{
int retval = 0;
unsigned int packet_id, count;
unsigned char return_code;

/* 反序列化 & 删除实体 */
retval = deserialize_suback(&packet_id, &return_code, &count, session->read_buf);
if (retval < 0)
return -1;
ack_wait_list_del(session, SUBACK, packet_id);

if (return_code != QoSF)
printf("ack_handler thread: received SUBACK packet is %d\n", packet_id);
else
printf("ack_handler thread: received SUBACK[ack = 0x80] packet is %d\n", packet_id);

return retval;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static int unsuback_handler(struct mqtt_session *session)
{
int retval = 0;
unsigned int packet_id;

/* 反序列化 & 删除实体 */
retval = deserialize_unsuback(&packet_id, session->read_buf);
if (retval < 0)
return -1;
ack_wait_list_del(session, UNSUBACK, packet_id);

printf("ack_handler thread: received UNSUBACK packet is %d\n", packet_id);

return retval;
}

具体删除等待队列中指定实体的任务由 ack_wait_list_del 函数实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static int ack_wait_list_del(struct mqtt_session *session, enum type t, unsigned short packet_id)
{
struct ack_wait_entry *entry = NULL;
struct list_head *cur;

/* 遍历等待队列,找到指定实体 */
list_for_each(cur, &session->wait_ack_list) {
entry = list_entry(cur, struct ack_wait_entry, list);
if (entry->type == t && entry->packet_id == packet_id) {
list_del(cur);
break;
}
}
/* 释放实体内存空间 */
if (!entry) {
timer_destory(&entry->timer);
memory_free(entry);
}


return 0;
}

UNSUBSCRIBE 取消订阅

序列化/反序列化

UNSUBSCRIBE 报文的序列化

和 SUBSCRIBE 报文的序列化类似,不同的地方在于其有效负载中无 QoS 信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
int serialize_unsub(unsigned char *buf, unsigned int buflen, unsigned short packet_id, \
struct mqtt_string filters[], unsigned int count)
{
unsigned char *ptr = buf;
header_t header = {0};
unsigned int len = 0;

/* SUBSCRIBE 报文固定头部 */
header.divide.type = UNSUBSCRIBE;
header.divide.reserved = 0x2;
writec(&ptr, header.byte);

/* 剩余长度部分 */
len = unsub_opts_len(filters, count);
if (packet_len(len) + len > buflen)
return -1;
ptr += write_optslen(ptr, len);

/* 可变头部 */
writei(&ptr, packet_id);

/* 有效负载 */
for (int i = 0; i < count; i++)
write_mqttstring(&ptr, filters[i]);

return ptr - buf;
}

计算剩余长度

1
2
3
4
5
6
7
8
9
10
11
static unsigned int unsub_opts_len(struct mqtt_string filters[], unsigned int count)
{
const unsigned int packet_id_len = 2;
unsigned int len = 0;

len += packet_id_len;
for (int i = 0; i < count; i++)
len += 2 + mqtt_string_len(filters[i]);

return len;
}

UNSUBACK 报文的反序列化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int deserialize_unsuback(unsigned int *packet_id, const unsigned char *buf)
{
const unsigned char *ptr = buf;
header_t header = {0};
unsigned int len;

readc(&ptr, &header.byte);
if (header.divide.type != UNSUBACK)
return -1;

ptr += read_optslen(ptr, &len);
readi(&ptr, packet_id);

return ptr - buf;
}

协议接口层

取消订阅接口

取消订阅的实现,几乎和订阅的实现一致,这里不在赘述,看代码即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
int mqtt_unsubscribe(struct mqtt_session *session, struct topic_filter_t *tf)
{
int retval = 0;
struct mqtt_string *filters;
unsigned int count, packet_len;
struct timer_t timer = {0};
unsigned short packet_id;

if (!session || !session->write_buf)
return -1;

if (!tf)
return -1;

if (!timer_init(&timer))
return -1;

count = tf->count;
filters = (struct mqtt_string*)memory_alloc(sizeof(struct mqtt_string) * count);
if (!filters) {
retval = -1;
goto destory_timer;
}

for (int i = 0; i < count; i++)
(filters + i)->cstring = tf->filters[i];

packet_id = get_next_id(session);

retval = serialize_unsub(session->write_buf, session->write_buflen, packet_id, filters, count);

if (retval < 0)
goto free_filters;
else
packet_len = retval;

retval = mqtt_send_packet(session, packet_len, &timer);

if (retval < 0)
goto free_filters;

ack_wait_list_add(session, UNSUBACK, packet_id);

printf("main thread: sending a UNSUB packet id is %d\n", packet_id);

free_filters:
memory_free(filters);
destory_timer:
timer_destory(&timer);
return retval;
}