0%

【计算机网络】mqtt通信协议入门(七)异步等待的原理与实现

在实现服务器连接函数时,有提到同步和异步等待 ack 的方法,因为在连接没建立起来之前,我们无其他事情可做,所以在等待 CONNACK 时,采取了同步等待的策略。但是对于 PINGRESP 报文的等待,还可以使用同步的方式吗?哪种方式更好呢?这篇文章我们就来看一下吧。

同步和异步

同步和异步的关注点在于消息通信机制:

  • 同步调用的执行是一个有序的线性序列,当 A 调用 B 后,A 会主动等待 B 执行完成后再继续。比如 A 先后两次调用 B,记为 B(1),B(2),执行顺序一定是:“A call B(1) -> B(1) run -> B(1) done, return A -> A call B(2) -> B(2) run -> B(2) done, return A -> A continue”只有按照这种线性序列去执行程序,才能保证调用双方的状态同步。
  • 异步调用是指当 A 调用 B 的时候,不等 B 执行结束,这个调用就会立即返回。当 B 执行完成后,B 会通过回调函数或通知 的方式告诉 A。异步调用并不是 FIFO 的,即 A 发起的多个调用并不是按照顺序收到通知的,有可能 A 后发出的调用却先收到了 B 的回应。

同步方式等待 ack

根据上面的描述,我们把保持连接这个任务抽象成如下伪代码:

1
2
3
4
5
6
7
8
9
10
11
mqtt_keep_alive()
{
...
mqtt_pingreq();

...
mqtt_wait_pingresp();

...
// 返回做其他事
}

在同步实现方式中,mqtt_wait_pingresp() 可以像 wait_connack 一样,循环等待 PINGRESP:

1
2
3
4
5
6
mqtt_wait_pingresp()
{
do {
mqtt_recv_packet(&type);
} while(type == PINGRESP);
}

在这样的实现中,执行一定是按照 mqtt_pingreq()->mqtt_wait_pingresp()->返回做其他事 这个顺序进行的。在等到 PINGRESP 报文之前,我们什么也做不了。

这种同步的实现方式是可以正常工作的,但是,假设我们在网络较差的环境时,可能会迟迟收不到 PINGRESP 报文,这样我们就会一直等待下去(即使有定时器也要等很久),其他事情(比如 处理收到的订阅的消息)都会跟着等待,显然这不利于充分利用 CPU 的资源(等待期间都在忙等)。

异步方式等待 ack

铺垫了那么多,我最终想说的还是,使用异步的方式去处理 ACK 报文,是一种更好的解决方式,这样会带来更高的并发性,即更好的性能。那么异步等待如何实现呢?

  • 首先,异步等待时, mqtt_wait_pingresp()返回做其他事 一定是并行执行的,所以需要使用多线程编程技术来实现。

  • 其次,异步需要实现一种通知方式,用来通知主线程 “PINGRESP已经收到了/没收到” 这一事件。

第一个问题比较容易解决,我们可以使用 linux 下的 POSIX 线程库来进行多线程编程。

对于第二个问题,我们可以回想一下,对于每一个 session ,我们为其维护了一个 state 状态成员,那么很自然的,我们可以通过在mqtt_wait_pingresp 中设置 session 的状态来通知主线程!这样的话,主线程做其他事情 的时候,也要时刻检查一下 session 的状态,以防止断开连接。

多线程带来的问题

熟悉多线程编程的读者一定意识到了,上面的描述中,session 对象的 state 成员同时被两个线程访问,这是一个典型的竞态(race condition),所以我们需要一些保护共享变量的手段,在这里我们使用互斥量 mutex

除了 state 成员外,我们在读取写入报文时还需要访问 session 的读写缓冲区,其中读缓冲区可以并发访问,但是写缓冲区不行,幸运的是,到目前为止,我们的新线程还没有访问写缓冲区的需要,所暂时就不进行访问限制了,当然这里提到这一点也是因为后续可能会需要(QoS2 需要对 ACK 回复)。

新的层次结构

线程和互斥量的引入改变了我们的层次结构,主要就是需要在底层提供对这两个的支持,新的层次结构图如下:

image-20240718093229735

thread 模块实现

接口

接口设计的比较简单,就是参考 Posix 线程库进行封装的。

1
2
3
4
struct thread_t *thread_create(void *(*entry)(void*), void *arg);
void thread_destory(struct thread_t *thread);
void thread_start(struct thread_t *thread);
void thread_stop(struct thread_t *thread);

实现

先看看 linux 下抽象出来的的线程对象:

1
2
3
4
5
struct thread_t {
pthread_t thread;
pthread_cond_t cond;
pthread_mutex_t mutex;
};

首先是对 pthread_t 的封装,cond 是条件变量,用来控制线程的开始于结束,mutex 用来保护条件变量。

线程创建:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
struct thread_t *thread_create(void *(*entry)(void*), void *arg)
{
int retval = 0;
struct thread_t *thread;

thread = memory_alloc(sizeof(struct thread_t));
if (!thread)
return NULL;

retval = pthread_create(&thread->thread, NULL, entry, arg);
if (retval != 0)
goto error;

retval = pthread_cond_init(&thread->cond, NULL);
if (retval != 0)
goto error;

retval = pthread_mutex_init(&thread->mutex, NULL);
if (retval != 0)
goto error;

return thread;
error:
memory_free(thread);
return NULL;
}

线程销毁:

1
2
3
4
5
void thread_destory(struct thread_t *thread)
{
if (thread)
pthread_detach(thread->thread);
}

线程的开始、结束

1
2
3
4
5
6
7
8
9
10
11
12
void thread_start(struct thread_t *thread)
{
pthread_mutex_lock(&thread->mutex);
pthread_cond_signal(&thread->cond);
pthread_mutex_unlock(&thread->mutex);
}
void thread_stop(struct thread_t *thread)
{
pthread_mutex_lock(&thread->mutex);
pthread_cond_wait(&thread->cond, &thread->mutex);
pthread_mutex_unlock(&thread->mutex);
}

mutex 模块实现

接口

1
2
3
4
5
6
7
struct mutex_t {
pthread_mutex_t lock;
};

void mutex_init(struct mutex_t *mutex);
void mutex_lock(struct mutex_t *mutex);
void mutex_unlock(struct mutex_t *mutex);

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void mutex_init(struct mutex_t *mutex)
{
pthread_mutex_init(&mutex->lock, NULL);
}

void mutex_lock(struct mutex_t *mutex)
{
pthread_mutex_lock(&mutex->lock);
}

void mutex_unlock(struct mutex_t *mutex)
{
pthread_mutex_unlock(&mutex->lock);
}

对于 pthread_mutex_t 的简单封装,不在赘述。

异步等待 PINGRESP 的实现

添加 mutex 对象

1
struct mutex_t state_lock;

封装修改 state 成员的函数

1
2
3
4
5
6
7
8
9
10
11
static void set_session_state(struct mqtt_session *session, enum session_state new_state)
{
mutex_lock(&session->state_lock);
session->state = new_state;
mutex_unlock(&session->state_lock);
}

static enum session_state get_session_state(struct mqtt_session *session)
{
return session->state;
}

在 session 初始化时,mqtt_init 函数中对其初始化

1
2
mutex_init(&session->state_lock);
set_session_state(session, SESSION_INITIALIZE);

异步通知

前面我们提到了,等待 CONNACK 的线程可以通过 session 的状态来通知主线程,例如,在长时间没有收到 PINGRESP 时将 session 设置为断开连接状态,但是这个 “长时间未收到” 如何界定呢?使用定时器可以实现,但是这样太复杂了,我们可以通过一个简单的标志位来判断!下面解释如何使用一个标志,界定 “长时间未收到” :

  • 在 session 中设置一个标志,如 pingresp_waiting 表示正在等待 PINGRESP 响应
  • session->last_send_timer 或者 session->last_recv_timer 计时器过期时(表示我们应该发送心跳包了),首先检查 pingresp_waiting 标志:
    • 如果该标志为 1,表示还在等待上一个PINGREQ对应的 PINGRESP 报文,这时我们一定”长时间未收到” 报文了!
    • 如果该标志为 0,表示上一个PINGREQ被正常响应,我们发送 PINGREQ 报文,并将标志位置 1
  • 同时,在收到 PINGRESP 报文时,将该标志位置 0

下面的伪代码也描述了上述过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
while(1)
{
mqtt_recv_packet(&type);
if (type == PINGRESP)
session->pingresp_waiting = 0;

if (timer_expire(last_send_timer) || timer_expire(last_recv_timer)) {
/* 我们该发送 PINGREQ 了 */
if (session->pingresp_waiting) {
/* 上次的 PINGRESP 未收到,通知主线程出错 */
set_session_state(session, SESSION_DISCONNECT)
error();
} else {
/* 发送 PINGREQ 报文 */
mqtt_pingreq();
/* 等待对应 PINGRESP */
session->pingresp_waiting = 1;
}
}
}

在 session 对象中添加 pingresp_waiting,并在 mqtt_init 中初始化,如下:

1
unsigned char pingresp_waiting;
1
session->pingresp_waiting = 0;

创建线程

在连接完成后,我们需要创建新线程,用来监听所有的 ack 报文,在 mqtt_connect 函数中添加如下代码:

1
2
3
4
5
if (NULL == session->thread) {
session->thread = thread_create((void *(*)(void*))mqtt_ack_handler, (void*)session);
if (NULL != session->thread)
thread_start(session->thread);
}

其中 mqtt_ack_handler 为该线程的主函数,就如同我们上面介绍的那样,该线程会尝试循环读取 PINGRESP 报文,并在合适的时间发送 PINGREQ 报文,先来看看该函数的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static void mqtt_ack_handler(void *arg )
{
struct timer_t thread_timer;
struct mqtt_session *session;
session = (struct mqtt_session*)arg;

printf("ack_handler thread: thread started!\n");
timer_init(&thread_timer);

if (get_session_state(session) != SESSION_CONNECTED) {
printf("ack_handler thread: mqtt is not connected\n");
thread_stop(session->thread);
}

/* 循环检查是否收到 PINGRESP */
while (1) {
if (handle_ack(session, &thread_timer) == PINGRESP)
printf("ack_handler thread: received PINGRESP successful!\n");
}
}

核心的读取 PINGRESP 和发送 PINGREQ 报文是在 handle_ack 函数中实现的,详细解释可以看下文注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
static int handle_ack(struct mqtt_session *session, struct timer_t *timer)
{
int retval;
enum type type = -1;

timer_init(timer);
timer_cutoff(timer, session->net_timeout);

/* 阻塞读取报文 超时直接返回 */
retval = mqtt_recv_packet(session, &type, timer);
if (retval != 0)
retval = -1;

printf("ack_handler thread: try to recv packet type is %d\n", type);

/* 读到 PINGRESP 标志位清零 */
if (type == PINGRESP)
session->pingresp_waiting = 0;

/* 判断是否该发送 PINGREQ 报文 */
retval = mqtt_keep_alive(session);
if (retval != 0) {
/* 通知主线程连接已经断开 */
set_session_state(session, SESSION_INVALID);
retval = -1;
}

if (retval == 0)
return type;

return -1;
}

下面看 mqtt_keep_alive 和我们之前描述的那样实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int mqtt_keep_alive(struct mqtt_session *session)
{
int retval = 0;

printf("ack_handler thread: keep alive timer remained %d and %d\n", \
timer_remain(&session->last_send_timer), timer_remain(&session->last_recv_timer));

/* 两个计时器任一到期,就发送 PINGREQ */
if (timer_is_expired(&session->last_send_timer) || \
timer_is_expired(&session->last_recv_timer)) {
/* 上一个 PINGRESP 未被响应 错误 */
if (session->pingresp_waiting) {
retval = -1;
} else {
/* 发送 PINGREQ */
if ((retval = mqtt_pingreq(session)) == 0)
session->pingresp_waiting = 1;
}
}

return retval;
}

编译运行

测序程序

稍微修改了一下测试程序,我的 net_timeout 设为了 4000, keep_alive_interval 设为了 31:

1
2
3
4
5
6
7
8
mqtt_init(session, &init_params);
mqtt_connect(session);

while (1) {

printf("main thread: thread running session_state is %d\n", session->state);
sleep(5);
}

Makefile

因为使用到了 pthread 库,Makefile 需要添加下面的内容

1
LDFLAGS = -lpthread

运行

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
recv_packet: type is 2
connect to server successful!
main thread: thread running session_state is 1
ack_handler thread: thread started!
ack_handler thread: try to recv packet type is -1
ack_handler thread: keep alive timer remain 26869 and 26972
main thread: thread running session_state is 1
ack_handler thread: try to recv packet type is -1
ack_handler thread: keep alive timer remain 22789 and 22892
main thread: thread running session_state is 1
ack_handler thread: try to recv packet type is -1
ack_handler thread: keep alive timer remain 18710 and 18812
main thread: thread running session_state is 1
ack_handler thread: try to recv packet type is -1
ack_handler thread: keep alive timer remain 14630 and 14732
main thread: thread running session_state is 1
ack_handler thread: try to recv packet type is -1
ack_handler thread: keep alive timer remain 10549 and 10652
ack_handler thread: try to recv packet type is -1
ack_handler thread: keep alive timer remain 6469 and 6572
main thread: thread running session_state is 1
ack_handler thread: try to recv packet type is -1
ack_handler thread: keep alive timer remain 2390 and 2493
main thread: thread running session_state is 1
ack_handler thread: try to recv packet type is -1
ack_handler thread: keep alive timer remain 0 and 0
sent pingreq to server successful!
recv_packet: type is 13
ack_handler thread: try to recv packet type is 13
ack_handler thread: keep alive timer remain 30911 and 30999
ack_handler thread: received PINGRESP successful!
main thread: thread running session_state is 1
ack_handler thread: try to recv packet type is -1
ack_handler thread: keep alive timer remain 26840 and 26928
main thread: thread running session_state is 1
ack_handler thread: try to recv packet type is -1
ack_handler thread: keep alive timer remain 22760 and 22848
ack_handler thread: try to recv packet type is -1
ack_handler thread: keep alive timer remain 18680 and 18768
main thread: thread running session_state is 1
ack_handler thread: try to recv packet type is -1
ack_handler thread: keep alive timer remain 14600 and 14688
main thread: thread running session_state is 1
ack_handler thread: try to recv packet type is -1
ack_handler thread: keep alive timer remain 10520 and 10608
main thread: thread running session_state is 1
ack_handler thread: try to recv packet type is -1
ack_handler thread: keep alive timer remain 6440 and 6528
main thread: thread running session_state is 1
ack_handler thread: try to recv packet type is -1
ack_handler thread: keep alive timer remain 2360 and 2448
main thread: thread running session_state is 1
ack_handler thread: try to recv packet type is -1
ack_handler thread: keep alive timer remain 0 and 0
sent pingreq to server successful!
recv_packet: type is 13
ack_handler thread: try to recv packet type is 13
ack_handler thread: keep alive timer remain 30908 and 30999
ack_handler thread: received PINGRESP successful!
ack_handler thread: try to recv packet type is -1
ack_handler thread: keep alive timer remain 26840 and 26931
main thread: thread running session_state is 1

可以看到,我们的程序中的两个线程都能正常的工作,这样我们就实现了具有更高并发度的异步等待!以上。