历经重重坎坷,我们已经完成了连接报文、心跳报文以及订阅报文和取消订阅报文的分析和实战,仅仅还剩下最后一种报文 —— PUBLISH 报文我们还没学习,这也是最复杂的一种报文,我们需要支持 QoS0-QoS2 三种消息服务级别。这篇文章我们就来看看 PUBLISH 报文的结构,以及完成 QoS0 级别消息的发送与接收实战。
回顾
在开始新的内容之前,在让我们来回顾一下项目的最新架构,如图:
系统接口层
- network:基于 socket API实现网络连接断开,以及数据读写的功能
- timer:基于 timerval 结构为整个系统提供定时功能
- memory:基于 libc 的 malloc/free 函数,实现内存动态申请于释放
- thread:基于 POSIX 线程库实现
- mutex:也是基于 pthread 库中提供的 pthread_mutex_t 实现
报文收发层
- send packet:将缓冲区中的报文发送到网络
- receive packet:将网络中的报文接收到缓冲区
序列化/反序列化模块
异步等待队列
使用异步的方式处理来自服务器 发送的报文,所有待响应的报文都挂在等待队列上,有单独的线程异步处理
ACK handler 异步处理线程
接收来自服务器的响应报文,并处理,同时也会扫描并移除等待队列中超时的报文
mqtt 协议接口
提供 mqtt 协议规定的各种报文的发送与接收,完全遵守协议的规定
PUBLISH 报文结构
PUBLISH 控制报文是指从客户端向服务端或者服务端向客户端传输一个应用消息。
固定报头
这里主要讲一下控制位:
可变报头
PUBLISH 报文的可变报头按顺序包含主题名和报文标识符,下图是一个例子:
- 主题名:用于识别有效载荷数据应该被发布到哪一个信息通道,主题名必须是 PUBLISH 报文可变报头的第一个字段,主题名不能包含通配符
- 报文标识符:只有当 QoS 等级是 1 或 2 时,报文标识符(Packet Identifier)字段才能出现在 PUBLISH 报文中
有效载荷
有效载荷包含将被发布的应用消息。数据的内容和格式是应用特定的。有效载荷的长度这样计算:用固定报头中的剩余长度字段的值减去可变报头的长度。包含零长度有效载荷的 PUBLISH 报文是合法的。
动作约定
客户端使用 PUBLISH 报文发送应用消息给服务端,目的是分发到其它订阅该主题的客户端。
服务端使用 PUBLISH 报文发送应用消息给每一个匹配订阅主题的客户端。
收到一个 PUBLISH 报文时,接收者的动作取决于上面描述的 QoS 等级。
客户端使用带通配符的主题过滤器请求订阅时,客户端的订阅可能会重复,因此发布的消息可能会匹配多 个过滤器。
假设客户端 A 订阅了 foo/#
(QoS 1)和 foo/bar
(QoS 2),当一条消息发布到 foo/bar
时,服务器会根据 QoS 2 等级将消息发送给客户端 A。
假设:客户端 A 订阅 foo/#
,QoS 1;客户端 B 订阅 foo/bar
,QoS 2;客户端 C 订阅 foo/+
,QoS 0
现在,有一条消息发布到主题 foo/bar
,QoS 1,服务器会这样做
- 先按照最高 QoS 等级发送消息:服务器将这条消息以 QoS 2 的等级发送给所有匹配的订阅者。
- 分发消息副本 :服务器会根据每个匹配订阅的原始 QoS 等级,将消息副本发送给各个订阅者:
- 客户端 A:收到消息,QoS 1
- 客户端 B:收到消息,QoS 2
- 客户端 C:收到消息,QoS 0
收发消息的 QoS 决定
当发布者发布消息时,它会指定一个 QoS 等级(0、1 或 2)。这个 QoS 等级决定了消息在从发布者到服务器的传递过程中需要的服务质量。
订阅者在订阅主题时,也会指定一个 QoS 等级。这个 QoS 等级表示订阅者期望从服务器接收消息时的服务质量。
当服务器收到发布者的消息后,它会将消息分发给所有订阅了该主题的订阅者。消息的最终 QoS 等级是发布者的 QoS 等级和订阅者的 QoS 等级的较低者。
客户端收到消息时,会将消息的 QoS 等级调整为客户端订阅时指定的 QoS 等级和消息本身的 QoS 等级中的较低者。
新的抽象
PUBLISH 报文
PUBLISH 报文作为消息的主要载体,需要对其进行抽象,如下:
1 2 3 4 5 6 7 8 9
| struct message_t { unsigned char retain; enum qos qos; unsigned char dup; char *topic_name; unsigned short packet_id; unsigned int msglen; void *msg; };
|
这些字段和上面分析的 PUBLISH 报文几乎是一一对应的,这里不在赘述。
消息处理
现在有一个新的问题需要思考:消息在客户端是如何被处理的?显然,这应该有用户来决定!那么,用户通过何种方式来决定消息如何被处理呢?
从客户端的较角度来看,消息是通过订阅时的主题过滤器来分类的,订阅这个动作表明客户端想要接收一类消息!所以,客户端应该在订阅时告诉系统其想要的消息处理方式,这里我们需要定义一个消息处理的函数指针,以及修改 subscribe
接口:
1 2 3
| typedef void (*msg_handler_t)(void *msg);
int mqtt_subscribe(struct mqtt_session *session, struct topic_filter_t *tf, msg_handler_t msg_handler);
|
好了,我们已经解决了消息处理方式的定义,新的问题是:当收到携带消息的 PUBLISH 报文时,如何找到其对应的消息处理函数?这个问题的描述似曾相识,考虑考虑 ack 报文是如何处理的。是的,队列!我们同样可以在用户订阅时将消息处理的实体放入等待队列中,后续收到 PUBLISH 报文时,扫描等待队列,通过Topic 匹配合适的消息处理实体,回调消息处理函数。先来看看消息处理等待实体的抽象:
1 2 3 4 5 6
| struct msg_wait_entry { struct list_head list; enum qos qos; char *topic_filter; msg_handler_t msg_handler; };
|
- qos:决定了订阅的报文的 qos 等级
- topic_filter:主题过滤器,用来匹配收到的 PUBLISH 报文
- msg_handler:报文处理函数,有用户传递
有了链表实体,还需要链表头部:
1 2 3 4
| struct mqtt_session { ... struct list_head wait_msg_list; };
|
那么实体何时加入到链表中呢?在 subscribe
函数中?显然太早了,因为我们在调用完该函数后还不能确认订阅成功,需要手动 SUBACK 报文后才说明订阅成功,所以应该在处理 SUBACK 报文时添加。想法是好的,可是实现起来会遇到些困难,在 subscribe
函数中,我们可以轻松的添加实体,但是该函数执行完之后,参数中的消息处理函数就无处可寻了。所以我们需要将它保存起来,就保存在 ack_wait_entry
对象中吧:
1 2 3 4
| struct ack_wait_entry { ... struct msg_wait_entry *msg_wait_entry; };
|
在 subscribe
函数中,创建消息等待实体
1 2 3 4 5 6 7 8
| int mqtt_subscribe(struct mqtt_session *session, struct topic_filter_t *tf, msg_handler_t handler) { ... msg_entry = msg_wait_entry_create(filter->cstring, tf->qos, msg_handler); if (!msg_entry) goto free_filters; ack_wait_list_add(session, SUBACK, packet_id, msg_entry, 0); ... }
|
在成功处理 SUBACK 报文时,添加消息等待实体:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| static int suback_handler(struct mqtt_session *session) { ... ack_entry = ack_wait_list_test_and_del(session, SUBACK, packet_id); if (!ack_entry) return -1; msg_entry = ack_entry->msg_wait_entry; if (!msg_entry) { retval = -1; goto ack_entry_free; }
if (return_code == QoSF) { retval = -1; goto msg_entry_free; } msg_wait_list_add(session, msg_entry); goto ack_entry_free; msg_entry_free: memory_free(msg_entry); ack_entry_free: timer_destory(&ack_entry->timer); memory_free(ack_entry->resend_buf); memory_free(ack_entry); return retval; }
|
在成功处理 UNSUBACK 报文时,删除消息等待实体:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| static int unsuback_handler(struct mqtt_session *session) { retval = deserialize_unsuback(&packet_id, session->read_buf); if (retval < 0) return -1; ack_entry = ack_wait_list_test_and_del(session, UNSUBACK, packet_id); if (!ack_entry) return -1;
printf("ack_handler thread: received UNSUBACK packet_id is %d\n", packet_id);
msg_entry = ack_entry->msg_wait_entry; if (!msg_entry) { goto ack_entry_free; retval = -1; }
msg_wait_list_del(session, msg_entry); msg_wait_entry_destory(msg_entry); }
|
上面涉及的 msg_wait_list_xxx 函数的实现和 ack_wait_list_xxx 函数十分相似,这里不在赘述。
QoS0 消息收发实战
序列化与反序列化
按照报文进行序列化即可,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| int serialize_publish(unsigned char *buf, unsigned int buflen, struct message_t *message) { unsigned int len = 0; unsigned char *ptr = buf; header_t header = {0}; enum qos qos = message->qos;
header.bits.type = PUBLISH; header.bits.retain = message->retain; header.bits.qos = qos; header.bits.dup = message->dup; if (qos == QoS0) header.bits.dup = 0;
writec(&ptr, header.byte); len = pub_opts_len(message); if (len + packet_len(len) > buflen) return -1;
ptr += write_optslen(ptr, len);
if (message->topic_name != NULL) write_cstring(&ptr, message->topic_name);
if (qos != QoS0) writei(&ptr, message->packet_id);
memcpy(ptr, message->msg, message->msglen); ptr += message->msglen; return ptr - buf; }
|
反序列话也是一样:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| int deserialize_publish(unsigned char *retain, enum qos *qos, unsigned char *dup, char **topic_name, unsigned int *topic_len, unsigned int *packet_id, void **msg, unsigned int *msglen, const unsigned char *buf) { unsigned int optlen, strlen; const unsigned char *ptr = buf; header_t header = {0};
readc(&ptr, &header.byte); if (header.bits.type != PUBLISH) return -1;
*retain = header.bits.retain; *qos = header.bits.qos; *dup = header.bits.dup;
ptr += read_optslen(ptr, &optlen); readi(&ptr, &strlen); *topic_len = strlen; *topic_name = (char*)ptr; ptr += strlen;
if (*qos != QoS0) { *packet_id = 0; *msglen = optlen - (2 + strlen + 2); readi(&ptr, packet_id); } else { *msglen = optlen - (strlen + 2); *packet_id = 0; }
*msg = (void*)ptr; ptr += *msglen;
return ptr - buf; }
|
发布报文
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| int mqtt_publish(struct mqtt_session *session, struct message_t *message) { int retval = 0; unsigned int packet_len, packet_id = 0; DECLARE_TIMER(timer); enum qos qos = message->qos;
if (!session || !message) return -1;
if (!timer_init(&timer)) return -1; if (qos != QoS0) packet_id = get_next_id(session); message->packet_id = packet_id;
mutex_lock(&session->write_lock); memset(session->write_buf, 0, session->write_buflen); retval = serialize_publish(session->write_buf, session->write_buflen, message); if (retval < 0) { mutex_unlock(&session->write_lock); goto destory_timer; } packet_len = retval; retval = mqtt_send_packet(session, packet_len, &timer); if (retval < 0) { mutex_unlock(&session->write_lock); goto destory_timer; }
if (qos == QoS1) { retval = ack_wait_list_add(session, PUBACK, packet_id, NULL, packet_len); } else if (qos == QoS2) { retval = ack_wait_list_add(session, PUBREC, packet_id, NULL, packet_len); } mutex_unlock(&session->write_lock); printf("main thread: sending a PUBLISH packet id is 0x%-4x\n", message->packet_id);
return 0;
destory_timer: timer_destory(&timer); return retval; }
|
接收报文
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| static int publish_handler(struct mqtt_session *session) { int retval = 0; struct message_t message; unsigned int packet_id, packet_len, msglen, topic_len; unsigned char retain, dup; DECLARE_TIMER(timer); char *topic_name; enum qos qos; void *msg;
timer_init(&timer); timer_cutoff(&timer, session->net_timeout);
retval = deserialize_publish(&retain, &qos, &dup, &topic_name, &topic_len, &packet_id, &msg, &msglen, session->read_buf);
if (retval < 0) goto exit;
message.topic_name = memory_alloc(topic_len); if (!message.topic_name) { retval = -1; goto exit; } memcpy(message.topic_name, topic_name, topic_len);
message.msg = memory_alloc(msglen); if (!message.msg) { retval = -1; goto exit; } memcpy(message.msg, msg, msglen); message.retain = retain; message.qos = qos; message.dup = dup; message.packet_id = packet_id; message.msglen = msglen; printf("qos = %d, topic = %s, msg = %s, packet_id is 0x%x\n",qos, message.topic_name, (char*)message.msg, packet_id);
if (qos == QoS0) { message_handler(session, &message); exit: timer_destory(&timer); return retval; }
|
消息处理
1 2 3 4 5 6 7 8 9 10 11 12
| static void message_handler(struct mqtt_session *session, struct message_t *message) { struct list_head *cur; struct msg_wait_entry *entry;
list_for_each(cur, &session->wait_msg_list) { entry = list_entry(cur, struct msg_wait_entry, list);
if (match_topic(entry->topic_filter, message->topic_name)) entry->msg_handler(message->msg); } }
|
消息处理时,需要考虑消息等待实体的主题过滤器和 PUBLISH 报文的主题是否匹配,mqtt 协议使用 #
通配符匹配所有层级内容,+
匹配当前层级内容,函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| static int match_topic(char *filter, char *name) { if (strcmp(filter, name) == 0) return 1;
char *endptr = name + strlen(name);
while (filter && name) { if (*filter != '#' && *filter != '+' && *filter != *name) break;
if (*filter == '/' && *name != '/') break;
if (*filter == '+') { char *nextpos = name + 1; while (nextpos < endptr && *nextpos != '/') { nextpos = ++name + 1; } } else if (*filter == '#') name = endptr - 1;
filter++; name++; }
return (name == endptr) && (*filter == '\0'); }
|
测试程序
1 2 3 4 5 6 7 8
| ... mqtt_subscribe(session, &tf_params, NULL); // mqtt_unsubscribe(session, &tf_params);
while (1){ mqtt_publish(session, &message); sleep(15); }
|
这个程序首先会订阅一个报文,然后向该主题发送消息,正常情况下,我们会收到自身发送的消息(因为我们订阅了该主题)。编译运行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| connect to server successful! ack_handler thread: thread started! main thread: sending a SUBSCRIBE packet id is 1 main thread: sending a PUBLISH packet id is 0x0 ack_handler thread: received packet type is SUBACK ack_handler thread: keep alive timer remain 30929 and 30999 [debug ack_waiter]: [debug msg_waiter]: [filter = /k1jpiNpomjE/test/user/abc, qos = 0] -> ack_handler thread: received packet type is PUBLISH qos = 0, topic = /k1jpiNpomjE/test/user/abc, msg = 12345, packet_id is 0x0 DEFAULT MSG HANDLER: received msg 12345 ack_handler thread: keep alive timer remain 30850 and 30999 [debug ack_waiter]: [debug msg_waiter]: [filter = /k1jpiNpomjE/test/user/abc, qos = 0] -> ack_handler thread: keep alive timer remain 26828 and 26978 [debug ack_waiter]: [debug msg_waiter]: [filter = /k1jpiNpomjE/test/user/abc, qos = 0] -> ack_handler thread: keep alive timer remain 22748 and 22898 [debug ack_waiter]: [debug msg_waiter]: [filter = /k1jpiNpomjE/test/user/abc, qos = 0] -> ack_handler thread: keep alive timer remain 18668 and 18818 [debug ack_waiter]: [debug msg_waiter]: [filter = /k1jpiNpomjE/test/user/abc, qos = 0] -> main thread: sending a PUBLISH packet id is 0x0 ack_handler thread: received packet type is PUBLISH qos = 0, topic = /k1jpiNpomjE/test/user/abc, msg = 12345, packet_id is 0x0 DEFAULT MSG HANDLER: received msg 12345 ack_handler thread: keep alive timer remain 30861 and 30999 [debug ack_waiter]: [debug msg_waiter]: [filter = /k1jpiNpomjE/test/user/abc, qos = 0] -> ack_handler thread: keep alive timer remain 26788 and 26927 [debug ack_waiter]: [debug msg_waiter]: [filter = /k1jpiNpomjE/test/user/abc, qos = 0] ->
|
稍微分析一下,发现结果符号我们的预取,我们也可以像以前一样使用 tcpdump
分析发送的报文,这里我已经分析过,感兴趣的读者可以试下。收工!