多线程间交互-wait与notify(施工中)

本节阅读量:

我们稍微对上节的实例程序进行修改。在main函数中,不停的从std::cin来读取数据,然后将消息放入到队列中,在多个线程里进行处理。那么,我们写出来的程序大概就是长这个样子。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#include <iostream>
#include <mutex>
#include <thread>
#include <queue>

class ThreadSafeQueue
{
public:
    void Push(int v)
    {
        std::lock_guard<std::mutex> lg(mtx_); // 加锁
        q_.push(v);  // 将元素添加到队列中
    }

    int Pop()
    {
        std::lock_guard<std::mutex> lg(mtx_); // 加锁
        if (q_.empty()) {
            return 0;  // 假设我们队列中的都是整数,因此可以返回0,代表队列为空
        }
        int tmp = q_.front();  // 获取队头元素
        q_.pop();  // 移除队列头部的元素
        return tmp;
    }

private:
    std::mutex mtx_;
    std::queue<int> q_;
};

// 简单示意,这里直接使用全局变量
ThreadSafeQueue tsq;
std::mutex cout_mtx;  // 针对cout操作加锁

void work(std::string thread_id)
{
    while (true)
    {
        int res = tsq.Pop();
        if (res)
        {
            std::lock_guard<std::mutex> lg(cout_mtx);
            std::cout << thread_id << " Get " << res << "\n";
        }
    }
}

int main() {
    std::thread t1(work, "T1");
    std::thread t2(work, "T2");
    t1.detach();
    t2.detach();
    int input;
    while (std::cin >> input)
    {
        // 不停的读取输入数据,放入后台线程进行处理
        tsq.Push(input);
    }
    return 0;
}

可能得执行结果是:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
1
T1 Get 1
2
T2 Get 2
3
T2 Get 3
4
T1 Get 4
5
T2 Get 5

看起来也是符合预期的。但是这里如果我们看下程序cpu的使用率,就会发现问题(注,这里使用的是linux系统,window用户可以直接看任务管理器)。

1
2
3
> ps -aux | grep a.out
%CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
200  0.0 167576  1192 pts/5    Sl+  00:30   3:42 ./a.out

可以发现,示例的程序消耗了2核的cpu。

在work()函数中,每次尝试从tsq中获取数据。如果获取到了就进行处理,不然就再此尝试获取。显而易见,这中间不会有任何的停顿。这种无限执行的指令序列,足以完全消耗单核的cpu。这里我们启动了两个线程,所以消耗完了2个cpu。如果按照这种写法,启动了和cpu个数一样多的线程。那么cpu资源将被完全消耗干净,计算机将会卡顿到难以进行其它任何操作了。


无任务时等待机制

显而易见的一个办法是,当没有消息到达时,当前线程让出cpu一段时间。这里可以调用sleep()函数来达到对应的效果。

下面是改进后的版本:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
#include <chrono>

void work(std::string thread_id)
{
    while (true)
    {
        int res = tsq.Pop();
        if (res)
        {
            std::lock_guard<std::mutex> lg(cout_mtx);
            std::cout << thread_id << " Get " << res << "\n";
        }
        else
        {
            std::this_thread::sleep_for(std::chrono::seconds(5)); // 没有任务到达,睡眠5秒,再起来工作
        }
    }
}

一个可能的运行结果如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
1
2
3
T2 Get 1
T2 Get 3
T1 Get 2
4
T1 Get 4
5
T1 Get 5

当我们查看cpu利用率时,发现已经恢复正常:

1
2
3
> ps -aux | grep a.out
%CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
0.0  0.0 167572  1196 pts/5    Sl+  00:54   0:00 ./a.out

这里快速的输入了1,2,3。过了2秒,才打印出来结果。这是因为我们设定的线程休眠的时间是5秒,在线程t1和t2休眠时,数据缓存在了tsq中,不会被处理。等t1和t2结束休眠后,快速的处理了一波数据。

诚然,我们可以调小这里的时间,来提高相应速度。但是当需要这里相应的足够快,就需要将这个休眠时间调整的非常小,可能是需要毫秒,乃至微秒粒度。又或者当我们启动的线程数较多时,大量的线程在不停的结束休眠,检查数据,然后再进行休眠。这些都会消耗不少的cpu资源。

本质上,这里需要的是,无任务时线程进行休眠。有任务时,有一种机制,来唤醒线程进行工作!


等待(wait)与通知(notify)机制

如果一个朋友要来我们家做客。我们可以一直守在门口等着,当然这不是一个好主意。或者可以每隔一个小时去门口看一看,不过这也有点累。最省事的办法就是在家里等着(wait),等朋友到了敲敲门,或者按个门铃来通知下(notify)。

在C++中,可以使用std::condition_variable来完成wait与notify。


0.4 多线程间交互-消息队列

上一节

0.6 原子变量

下一节