0%

【计算机网络】mqtt通信协议入门(十)PUBLISH 报文结构与 QoS0 消息收发实战

历经重重坎坷,我们已经完成了连接报文、心跳报文以及订阅报文和取消订阅报文的分析和实战,仅仅还剩下最后一种报文 —— PUBLISH 报文我们还没学习,这也是最复杂的一种报文,我们需要支持 QoS0-QoS2 三种消息服务级别。这篇文章我们就来看看 PUBLISH 报文的结构,以及完成 QoS0 级别消息的发送与接收实战。

回顾

在开始新的内容之前,在让我们来回顾一下项目的最新架构,如图:

image-20240720091843461

系统接口层

  • network:基于 socket API实现网络连接断开,以及数据读写的功能
  • timer:基于 timerval 结构为整个系统提供定时功能
  • memory:基于 libc 的 malloc/free 函数,实现内存动态申请于释放
  • thread:基于 POSIX 线程库实现
  • mutex:也是基于 pthread 库中提供的 pthread_mutex_t 实现

报文收发层

  • send packet:将缓冲区中的报文发送到网络
  • receive packet:将网络中的报文接收到缓冲区

序列化/反序列化模块

  • 实现报文的序列化和反序列化

异步等待队列

使用异步的方式处理来自服务器 发送的报文,所有待响应的报文都挂在等待队列上,有单独的线程异步处理

ACK handler 异步处理线程

接收来自服务器的响应报文,并处理,同时也会扫描并移除等待队列中超时的报文

mqtt 协议接口

提供 mqtt 协议规定的各种报文的发送与接收,完全遵守协议的规定

PUBLISH 报文结构

PUBLISH 控制报文是指从客户端向服务端或者服务端向客户端传输一个应用消息。

固定报头

image-20240720093417787

这里主要讲一下控制位:

  • DUP:重发标志。如果 DUP 标志被设置为 0,表示这是客户端或服务端第一次请求发送这个 PUBLISH 报文。如果 DUP 标志 被设置为 1,表示这可能是一个早前报文请求的重发。

    • 客户端或服务端请求重发一个 PUBLISH 报文时,必须将 DUP 标志设置为 1。
    • 对于 QoS0 的消息,DUP 标志必须设置为 0.
  • QoS:服务质量等级。不同的服务质量等级对通信双方的要求不同:

    • 0x00:QoS0 的服务质量,发送方最多发送一次,接收方无需回复
    • 0x01:QoS1 的服务质量,发送方最少发送一次,接收方需回复一次,报文可能重复
    • 0x02:QoS2 的服务质量,在 QoS1 的基础上,发送方还需要对接收方的回复进行回复,接收方收到回复后还需回复一次,即发送方发送一次回复一次,接收方回复两次。这样能够确保报文被接受且不会重复
    • PUBLISH 报文不能将 QoS 所有的位设置为 1。如果服务端或客户端收到 QoS 所有位都为 1 的 PUBLISH 报文,它必须关闭网络连接
  • RETAIN:保留标志

    • 如果客户端发给服务端的 PUBLISH 报文的保留(RETAIN)标志被设置为 1,服务端必须存储这个应用消息和它的服务质量等级(QoS),以便它可以被分发给未来的主题名匹配的订阅者

    • 一个新的订阅建立时,对每个匹配的主题名,如果存在最近保留的消息,它必须被发送给这个订阅者。

    • 如果服务端收到一条保留(RETAIN)标志为 1 的 QoS0 消息,它必须丢弃之前为那个主题保留的任何消息。它应该将这个新的 QoS0 消息当作那个主题的新保留消息,但是任何时候都可以选择丢弃它 — 如果这种情况发生了,那个主题将没有保留消息。

    • 如果客户端发给服务端的 PUBLISH 报文的保留标志位 0,服务端不能存储这个消息也不能移除或替换任何 现存的保留消息

    • 客户端刚刚成功订阅某个主题后,如果该主题有保留消息,服务端会使用 PUBLISH 报文向客户端发布该消息,并且 RETAIN 位置 1

    • 对于客户端已经订阅的主题,该主题的发布者发布新的消息时,服务端会使用 PUBLISH 报文向客户端发布该消息,并且 RETAIN 位置 0(不论该主题发布者发布该消息时 RETAIN 置为 0 还是 1)

可变报头

PUBLISH 报文的可变报头按顺序包含主题名和报文标识符,下图是一个例子:

image-20240720100527898

  • 主题名:用于识别有效载荷数据应该被发布到哪一个信息通道,主题名必须是 PUBLISH 报文可变报头的第一个字段,主题名不能包含通配符
  • 报文标识符:只有当 QoS 等级是 1 或 2 时,报文标识符(Packet Identifier)字段才能出现在 PUBLISH 报文中

有效载荷

有效载荷包含将被发布的应用消息。数据的内容和格式是应用特定的。有效载荷的长度这样计算:用固定报头中的剩余长度字段的值减去可变报头的长度。包含零长度有效载荷的 PUBLISH 报文是合法的。

动作约定

  • 客户端使用 PUBLISH 报文发送应用消息给服务端,目的是分发到其它订阅该主题的客户端。

  • 服务端使用 PUBLISH 报文发送应用消息给每一个匹配订阅主题的客户端。

  • 收到一个 PUBLISH 报文时,接收者的动作取决于上面描述的 QoS 等级。

  • 客户端使用带通配符的主题过滤器请求订阅时,客户端的订阅可能会重复,因此发布的消息可能会匹配多 个过滤器。

    • 假设客户端 A 订阅了 foo/#(QoS 1)和 foo/bar(QoS 2),当一条消息发布到 foo/bar 时,服务器会根据 QoS 2 等级将消息发送给客户端 A。

    • 假设:客户端 A 订阅 foo/#,QoS 1;客户端 B 订阅 foo/bar,QoS 2;客户端 C 订阅 foo/+,QoS 0

      现在,有一条消息发布到主题 foo/bar,QoS 1,服务器会这样做

      • 先按照最高 QoS 等级发送消息:服务器将这条消息以 QoS 2 的等级发送给所有匹配的订阅者。
      • 分发消息副本 :服务器会根据每个匹配订阅的原始 QoS 等级,将消息副本发送给各个订阅者:
        • 客户端 A:收到消息,QoS 1
        • 客户端 B:收到消息,QoS 2
        • 客户端 C:收到消息,QoS 0

收发消息的 QoS 决定

  • 当发布者发布消息时,它会指定一个 QoS 等级(0、1 或 2)。这个 QoS 等级决定了消息在从发布者到服务器的传递过程中需要的服务质量。

  • 订阅者在订阅主题时,也会指定一个 QoS 等级。这个 QoS 等级表示订阅者期望从服务器接收消息时的服务质量。

  • 当服务器收到发布者的消息后,它会将消息分发给所有订阅了该主题的订阅者。消息的最终 QoS 等级是发布者的 QoS 等级和订阅者的 QoS 等级的较低者

  • 客户端收到消息时,会将消息的 QoS 等级调整为客户端订阅时指定的 QoS 等级和消息本身的 QoS 等级中的较低者

新的抽象

PUBLISH 报文

PUBLISH 报文作为消息的主要载体,需要对其进行抽象,如下:

1
2
3
4
5
6
7
8
9
struct message_t {
unsigned char retain;
enum qos qos;
unsigned char dup;
char *topic_name;
unsigned short packet_id;
unsigned int msglen;
void *msg;
};

这些字段和上面分析的 PUBLISH 报文几乎是一一对应的,这里不在赘述。

消息处理

现在有一个新的问题需要思考:消息在客户端是如何被处理的?显然,这应该有用户来决定!那么,用户通过何种方式来决定消息如何被处理呢?

从客户端的较角度来看,消息是通过订阅时的主题过滤器来分类的,订阅这个动作表明客户端想要接收一类消息!所以,客户端应该在订阅时告诉系统其想要的消息处理方式,这里我们需要定义一个消息处理的函数指针,以及修改 subscribe 接口:

1
2
3
typedef void (*msg_handler_t)(void *msg);

int mqtt_subscribe(struct mqtt_session *session, struct topic_filter_t *tf, msg_handler_t msg_handler);

好了,我们已经解决了消息处理方式的定义,新的问题是:当收到携带消息的 PUBLISH 报文时,如何找到其对应的消息处理函数?这个问题的描述似曾相识,考虑考虑 ack 报文是如何处理的。是的,队列!我们同样可以在用户订阅时将消息处理的实体放入等待队列中,后续收到 PUBLISH 报文时,扫描等待队列,通过Topic 匹配合适的消息处理实体,回调消息处理函数。先来看看消息处理等待实体的抽象:

1
2
3
4
5
6
struct msg_wait_entry {
struct list_head list;
enum qos qos;
char *topic_filter;
msg_handler_t msg_handler;
};
  • qos:决定了订阅的报文的 qos 等级
  • topic_filter:主题过滤器,用来匹配收到的 PUBLISH 报文
  • msg_handler:报文处理函数,有用户传递

有了链表实体,还需要链表头部:

1
2
3
4
struct mqtt_session {
...
struct list_head wait_msg_list;
};

那么实体何时加入到链表中呢?在 subscribe 函数中?显然太早了,因为我们在调用完该函数后还不能确认订阅成功,需要手动 SUBACK 报文后才说明订阅成功,所以应该在处理 SUBACK 报文时添加。想法是好的,可是实现起来会遇到些困难,在 subscribe 函数中,我们可以轻松的添加实体,但是该函数执行完之后,参数中的消息处理函数就无处可寻了。所以我们需要将它保存起来,就保存在 ack_wait_entry 对象中吧:

1
2
3
4
struct ack_wait_entry {
...
struct msg_wait_entry *msg_wait_entry;
};

subscribe 函数中,创建消息等待实体

1
2
3
4
5
6
7
8
int mqtt_subscribe(struct mqtt_session *session, struct topic_filter_t *tf, msg_handler_t handler) {
...
msg_entry = msg_wait_entry_create(filter->cstring, tf->qos, msg_handler);
if (!msg_entry)
goto free_filters;
ack_wait_list_add(session, SUBACK, packet_id, msg_entry, 0);
...
}

在成功处理 SUBACK 报文时,添加消息等待实体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
static int suback_handler(struct mqtt_session *session) 
{
...
ack_entry = ack_wait_list_test_and_del(session, SUBACK, packet_id);
if (!ack_entry)
return -1;
msg_entry = ack_entry->msg_wait_entry;
if (!msg_entry) {
retval = -1;
goto ack_entry_free;
}

if (return_code == QoSF) {
retval = -1;
goto msg_entry_free;
}

msg_wait_list_add(session, msg_entry);
goto ack_entry_free;

msg_entry_free:
memory_free(msg_entry);
ack_entry_free:
timer_destory(&ack_entry->timer);
memory_free(ack_entry->resend_buf);
memory_free(ack_entry);
return retval;
}

在成功处理 UNSUBACK 报文时,删除消息等待实体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static int unsuback_handler(struct mqtt_session *session)
{
retval = deserialize_unsuback(&packet_id, session->read_buf);
if (retval < 0)
return -1;
ack_entry = ack_wait_list_test_and_del(session, UNSUBACK, packet_id);
if (!ack_entry)
return -1;


printf("ack_handler thread: received UNSUBACK packet_id is %d\n", packet_id);

msg_entry = ack_entry->msg_wait_entry;
if (!msg_entry) {
goto ack_entry_free;
retval = -1;
}

msg_wait_list_del(session, msg_entry);
msg_wait_entry_destory(msg_entry);
}

上面涉及的 msg_wait_list_xxx 函数的实现和 ack_wait_list_xxx 函数十分相似,这里不在赘述。

QoS0 消息收发实战

序列化与反序列化

按照报文进行序列化即可,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
int serialize_publish(unsigned char *buf, unsigned int buflen, struct message_t *message)
{
unsigned int len = 0;
unsigned char *ptr = buf;
header_t header = {0};
enum qos qos = message->qos;

header.bits.type = PUBLISH;
header.bits.retain = message->retain;
header.bits.qos = qos;
header.bits.dup = message->dup;
if (qos == QoS0)
header.bits.dup = 0;

writec(&ptr, header.byte);
len = pub_opts_len(message);
if (len + packet_len(len) > buflen)
return -1;

ptr += write_optslen(ptr, len);

if (message->topic_name != NULL)
write_cstring(&ptr, message->topic_name);

if (qos != QoS0)
writei(&ptr, message->packet_id);

memcpy(ptr, message->msg, message->msglen);
ptr += message->msglen;

return ptr - buf;
}

反序列话也是一样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
int deserialize_publish(unsigned char *retain, enum qos *qos, unsigned char *dup, char **topic_name, unsigned int *topic_len, unsigned int *packet_id, void **msg, unsigned int *msglen, const unsigned char *buf)
{
unsigned int optlen, strlen;
const unsigned char *ptr = buf;
header_t header = {0};

readc(&ptr, &header.byte);
if (header.bits.type != PUBLISH)
return -1;

*retain = header.bits.retain;
*qos = header.bits.qos;
*dup = header.bits.dup;

ptr += read_optslen(ptr, &optlen);
readi(&ptr, &strlen);
*topic_len = strlen;
*topic_name = (char*)ptr;
ptr += strlen;

if (*qos != QoS0) {
/* QoS1 || QoS2 */
*packet_id = 0;
*msglen = optlen - (2 + strlen + 2);
readi(&ptr, packet_id);
}
else {
*msglen = optlen - (strlen + 2);
*packet_id = 0;
}

*msg = (void*)ptr;
ptr += *msglen;

return ptr - buf;
}

发布报文

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
int mqtt_publish(struct mqtt_session *session, struct message_t *message)
{
int retval = 0;
unsigned int packet_len, packet_id = 0;
DECLARE_TIMER(timer);
enum qos qos = message->qos;

if (!session || !message)
return -1;

if (!timer_init(&timer))
return -1;

if (qos != QoS0)
packet_id = get_next_id(session);
message->packet_id = packet_id;

mutex_lock(&session->write_lock);
memset(session->write_buf, 0, session->write_buflen);
retval = serialize_publish(session->write_buf, session->write_buflen, message);
if (retval < 0) {
mutex_unlock(&session->write_lock);
goto destory_timer;
}
packet_len = retval;
retval = mqtt_send_packet(session, packet_len, &timer);
if (retval < 0) {
mutex_unlock(&session->write_lock);
goto destory_timer;
}

if (qos == QoS1) {
retval = ack_wait_list_add(session, PUBACK, packet_id, NULL, packet_len);
} else if (qos == QoS2) {
retval = ack_wait_list_add(session, PUBREC, packet_id, NULL, packet_len);
}
mutex_unlock(&session->write_lock);

printf("main thread: sending a PUBLISH packet id is 0x%-4x\n", message->packet_id);

return 0;

destory_timer:
timer_destory(&timer);
return retval;
}

接收报文

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static int publish_handler(struct mqtt_session *session)
{
int retval = 0;
struct message_t message;
unsigned int packet_id, packet_len, msglen, topic_len;
unsigned char retain, dup;
DECLARE_TIMER(timer);
char *topic_name;
enum qos qos;
void *msg;

timer_init(&timer);
timer_cutoff(&timer, session->net_timeout);

retval = deserialize_publish(&retain, &qos, &dup, &topic_name, &topic_len, &packet_id, &msg, &msglen, session->read_buf);

if (retval < 0)
goto exit;

message.topic_name = memory_alloc(topic_len);
if (!message.topic_name) {
retval = -1;
goto exit;
}
memcpy(message.topic_name, topic_name, topic_len);

message.msg = memory_alloc(msglen);
if (!message.msg) {
retval = -1;
goto exit;
}
memcpy(message.msg, msg, msglen);

message.retain = retain;
message.qos = qos;
message.dup = dup;
message.packet_id = packet_id;
message.msglen = msglen;
printf("qos = %d, topic = %s, msg = %s, packet_id is 0x%x\n",qos, message.topic_name, (char*)message.msg, packet_id);

if (qos == QoS0) {
message_handler(session, &message);
exit:
timer_destory(&timer);
return retval;
}

消息处理

1
2
3
4
5
6
7
8
9
10
11
12
static void message_handler(struct mqtt_session *session, struct message_t *message)
{
struct list_head *cur;
struct msg_wait_entry *entry;

list_for_each(cur, &session->wait_msg_list) {
entry = list_entry(cur, struct msg_wait_entry, list);

if (match_topic(entry->topic_filter, message->topic_name))
entry->msg_handler(message->msg);
}
}

消息处理时,需要考虑消息等待实体的主题过滤器和 PUBLISH 报文的主题是否匹配,mqtt 协议使用 # 通配符匹配所有层级内容,+ 匹配当前层级内容,函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
static int match_topic(char *filter, char *name)
{
if (strcmp(filter, name) == 0)
return 1;

char *endptr = name + strlen(name);

while (filter && name) {
if (*filter != '#' && *filter != '+' && *filter != *name)
break;

if (*filter == '/' && *name != '/')
break;

if (*filter == '+') {
char *nextpos = name + 1;
while (nextpos < endptr && *nextpos != '/') {
nextpos = ++name + 1;
}
} else if (*filter == '#')
name = endptr - 1;

filter++;
name++;
}

return (name == endptr) && (*filter == '\0');
}

测试程序

1
2
3
4
5
6
7
8
...
mqtt_subscribe(session, &tf_params, NULL);
// mqtt_unsubscribe(session, &tf_params);

while (1){
mqtt_publish(session, &message);
sleep(15);
}

这个程序首先会订阅一个报文,然后向该主题发送消息,正常情况下,我们会收到自身发送的消息(因为我们订阅了该主题)。编译运行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
connect to server successful!
ack_handler thread: thread started!
main thread: sending a SUBSCRIBE packet id is 1
main thread: sending a PUBLISH packet id is 0x0
ack_handler thread: received packet type is SUBACK
ack_handler thread: keep alive timer remain 30929 and 30999
[debug ack_waiter]:
[debug msg_waiter]: [filter = /k1jpiNpomjE/test/user/abc, qos = 0] ->
ack_handler thread: received packet type is PUBLISH
qos = 0, topic = /k1jpiNpomjE/test/user/abc, msg = 12345, packet_id is 0x0
DEFAULT MSG HANDLER: received msg 12345
ack_handler thread: keep alive timer remain 30850 and 30999
[debug ack_waiter]:
[debug msg_waiter]: [filter = /k1jpiNpomjE/test/user/abc, qos = 0] ->
ack_handler thread: keep alive timer remain 26828 and 26978
[debug ack_waiter]:
[debug msg_waiter]: [filter = /k1jpiNpomjE/test/user/abc, qos = 0] ->
ack_handler thread: keep alive timer remain 22748 and 22898
[debug ack_waiter]:
[debug msg_waiter]: [filter = /k1jpiNpomjE/test/user/abc, qos = 0] ->
ack_handler thread: keep alive timer remain 18668 and 18818
[debug ack_waiter]:
[debug msg_waiter]: [filter = /k1jpiNpomjE/test/user/abc, qos = 0] ->
main thread: sending a PUBLISH packet id is 0x0
ack_handler thread: received packet type is PUBLISH
qos = 0, topic = /k1jpiNpomjE/test/user/abc, msg = 12345, packet_id is 0x0
DEFAULT MSG HANDLER: received msg 12345
ack_handler thread: keep alive timer remain 30861 and 30999
[debug ack_waiter]:
[debug msg_waiter]: [filter = /k1jpiNpomjE/test/user/abc, qos = 0] ->
ack_handler thread: keep alive timer remain 26788 and 26927
[debug ack_waiter]:
[debug msg_waiter]: [filter = /k1jpiNpomjE/test/user/abc, qos = 0] ->

稍微分析一下,发现结果符号我们的预取,我们也可以像以前一样使用 tcpdump 分析发送的报文,这里我已经分析过,感兴趣的读者可以试下。收工!