多线程间交互-wait与notify

本节阅读量:

我们稍微对上节的实例程序进行修改。在main函数中,不停的从std::cin来读取数据,然后将消息放入到队列中,在多个线程里进行处理。那么,我们写出来的程序大概就是长这个样子。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#include <iostream>
#include <mutex>
#include <thread>
#include <queue>

class ThreadSafeQueue
{
public:
    void Push(int v)
    {
        std::lock_guard<std::mutex> lg(mtx_); // 加锁
        q_.push(v);  // 将元素添加到队列中
    }

    int Pop()
    {
        std::lock_guard<std::mutex> lg(mtx_); // 加锁
        if (q_.empty()) {
            return 0;  // 假设我们队列中的都是整数,因此可以返回0,代表队列为空
        }
        int tmp = q_.front();  // 获取队头元素
        q_.pop();  // 移除队列头部的元素
        return tmp;
    }

private:
    std::mutex mtx_;
    std::queue<int> q_;
};

// 简单示意,这里直接使用全局变量
ThreadSafeQueue tsq;
std::mutex cout_mtx;  // 针对cout操作加锁

void work(std::string thread_id)
{
    while (true)
    {
        int res = tsq.Pop();
        if (res)
        {
            std::lock_guard<std::mutex> lg(cout_mtx);
            std::cout << thread_id << " Get " << res << "\n";
        }
    }
}

int main() {
    std::thread t1(work, "T1");
    std::thread t2(work, "T2");
    t1.detach();
    t2.detach();
    int input;
    while (std::cin >> input)
    {
        // 不停的读取输入数据,放入后台线程进行处理
        tsq.Push(input);
    }
    return 0;
}

可能得执行结果是:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
1
T1 Get 1
2
T2 Get 2
3
T2 Get 3
4
T1 Get 4
5
T2 Get 5

看起来也是符合预期的。但是这里如果我们看下程序cpu的使用率,就会发现问题(注,这里使用的是linux系统,window用户可以直接看任务管理器)。

1
2
3
> ps -aux | grep a.out
%CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
200  0.0 167576  1192 pts/5    Sl+  00:30   3:42 ./a.out

可以发现,示例的程序消耗了2核的cpu。

在work()函数中,每次尝试从tsq中获取数据。如果获取到了就进行处理,不然就再此尝试获取。显而易见,这中间不会有任何的停顿。这种无限执行的指令序列,足以完全消耗单核的cpu。这里我们启动了两个线程,所以消耗完了2个cpu。如果按照这种写法,启动了和cpu个数一样多的线程。那么cpu资源将被完全消耗干净,计算机将会卡顿到难以进行其它任何操作了。


无任务时等待机制

显而易见的一个办法是,当没有消息到达时,当前线程让出cpu一段时间。这里可以调用sleep()函数来达到对应的效果。

下面是改进后的版本:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
#include <chrono>

void work(std::string thread_id)
{
    while (true)
    {
        int res = tsq.Pop();
        if (res)
        {
            std::lock_guard<std::mutex> lg(cout_mtx);
            std::cout << thread_id << " Get " << res << "\n";
        }
        else
        {
            std::this_thread::sleep_for(std::chrono::seconds(5)); // 没有任务到达,睡眠5秒,再起来工作
        }
    }
}

一个可能的运行结果如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
1
2
3
T2 Get 1
T2 Get 3
T1 Get 2
4
T1 Get 4
5
T1 Get 5

当我们查看cpu利用率时,发现已经恢复正常:

1
2
3
> ps -aux | grep a.out
%CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
0.0  0.0 167572  1196 pts/5    Sl+  00:54   0:00 ./a.out

这里快速的输入了1,2,3。过了2秒,才打印出来结果。这是因为我们设定的线程休眠的时间是5秒,在线程t1和t2休眠时,数据缓存在了tsq中,不会被处理。等t1和t2结束休眠后,快速的处理了一波数据。

诚然,我们可以调小这里的时间,来提高相应速度。但是当需要这里相应的足够快,就需要将这个休眠时间调整的非常小,可能是需要毫秒,乃至微秒粒度。又或者当我们启动的线程数较多时,大量的线程在不停的结束休眠,检查数据,然后再进行休眠。这些都会消耗不少的cpu资源。

本质上,这里需要的是,无任务时线程进行休眠。有任务时,有一种机制,来唤醒线程进行工作!


等待(wait)与通知(notify)机制

如果一个朋友要来我们家做客。我们可以一直守在门口等着,当然这不是一个好主意。或者可以每隔一个小时去门口看一看,不过这也有点累。最省事的办法就是在家里等着(wait),等朋友到了敲敲门,或者按个门铃来通知下(notify)。

在C++中,可以使用std::condition_variable来完成wait与notify,std::condition_variable需要与std::unique_lock一起使用:

  1. cv.wait(std::unique_lock lock)。调用完成之后,会导致当前线程阻塞,直到别的线程调用了notify_one()或者本线程被虚假唤醒了。
  2. cv.notify_one(),用来唤醒调用了wait之后处于阻塞状态的线程。

当线程调用完wait之后,会发生如下的行为:

  1. 原地调用lock.unlock(),然后进入阻塞状态
  2. 如果被别的线程notify_one()唤醒,则调用lock.lock(),然后返回至业务的逻辑

基于此,我们可以将上述代码优化为:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <queue>

class ThreadSafeQueue
{
public:
    void Push(int v)
    {
        std::lock_guard<std::mutex> lg(mtx_); // 加锁
        q_.push(v);  // 将元素添加到队列中
        cv_.notify_one();  // 唤醒处于等待状态的线程
    }

    int Pop()
    {
        std::unique_lock<std::mutex> lk(mtx_);  // 加锁
        while (q_.empty()) {  // 如果队列为空,就等待被唤醒
            cv_.wait(lk);
        }
        int tmp = q_.front();  // 获取队头元素
        q_.pop();  // 移除队列头部的元素
        return tmp;
    }

private:
    std::mutex mtx_;
    std::condition_variable cv_;
    std::queue<int> q_;
};

// 简单示意,这里直接使用全局变量
ThreadSafeQueue tsq;
std::mutex cout_mtx;  // 针对cout操作加锁

void work(std::string thread_id)
{
    while (true)
    {
        int res = tsq.Pop();  // 0 现在也是可以处理的元素了
        std::lock_guard<std::mutex> lg(cout_mtx);
        std::cout << thread_id << " Get " << res << "\n";
    }
}

int main() {
    std::thread t1(work, "T1");
    std::thread t2(work, "T2");
    t1.detach();
    t2.detach();
    int input;
    while (std::cin >> input)
    {
        // 不停的读取输入数据,放入后台线程进行处理
        tsq.Push(input);
    }
    return 0;
}

这产生如下的结果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
0
T1 Get 0
1
T2 Get 1
2
T1 Get 2
3
T2 Get 3
4
T1 Get 4
5
T2 Get 5

相比于之前的程序,这有如下的好处:

  1. 不再需要额外的逻辑来标记队列为空
  2. 当有消息到达时,可以直接唤醒线程来进行工作,时效性更好

虚假唤醒(Spurious Wakeup)

你可以对上面Pop()函数中的这个循环表示疑惑:

1
2
3
while (q_.empty()) {  // 如果队列为空,就等待被唤醒
    cv_.wait(lk);
}

wait被唤醒了之后,为啥不能直接开始处理?

这是因为可能存在虚假唤醒的可能:

  1. 底层实现机制的原因,可能没有其它线程调用notify,但是被阻塞的线程也被操作系统唤醒了。也就是说有人误按了门铃。
  2. 即使不存在一的这种可能,在当前线程被唤醒,执行lock.lock()之前,可能其它线程在这个空挡操作了数据了。这代表门铃确实是朋友按的,但是我们起身太慢,可能朋友又去了外面透气,门口这时来了个快递小哥。

因此,当线程被唤醒之后,第一件要做的事情,就是检查是否满足处理条件。如果不满足,需要再次调用wait函数进行等待。


0.4 多线程间交互-消息队列

上一节

0.6 原子变量

下一节