多线程间交互-wait与notify
本节阅读量:
我们稍微对上节的实例程序进行修改。在main函数中,不停的从std::cin来读取数据,然后将消息放入到队列中,在多个线程里进行处理。那么,我们写出来的程序大概就是长这个样子。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
#include <iostream>
#include <mutex>
#include <thread>
#include <queue>
class ThreadSafeQueue
{
public:
void Push(int v)
{
std::lock_guard<std::mutex> lg(mtx_); // 加锁
q_.push(v); // 将元素添加到队列中
}
int Pop()
{
std::lock_guard<std::mutex> lg(mtx_); // 加锁
if (q_.empty()) {
return 0; // 假设我们队列中的都是整数,因此可以返回0,代表队列为空
}
int tmp = q_.front(); // 获取队头元素
q_.pop(); // 移除队列头部的元素
return tmp;
}
private:
std::mutex mtx_;
std::queue<int> q_;
};
// 简单示意,这里直接使用全局变量
ThreadSafeQueue tsq;
std::mutex cout_mtx; // 针对cout操作加锁
void work(std::string thread_id)
{
while (true)
{
int res = tsq.Pop();
if (res)
{
std::lock_guard<std::mutex> lg(cout_mtx);
std::cout << thread_id << " Get " << res << "\n";
}
}
}
int main() {
std::thread t1(work, "T1");
std::thread t2(work, "T2");
t1.detach();
t2.detach();
int input;
while (std::cin >> input)
{
// 不停的读取输入数据,放入后台线程进行处理
tsq.Push(input);
}
return 0;
}
|
可能得执行结果是:
1
2
3
4
5
6
7
8
9
10
|
1
T1 Get 1
2
T2 Get 2
3
T2 Get 3
4
T1 Get 4
5
T2 Get 5
|
看起来也是符合预期的。但是这里如果我们看下程序cpu的使用率,就会发现问题(注,这里使用的是linux系统,window用户可以直接看任务管理器)。
1
2
3
|
> ps -aux | grep a.out
%CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
200 0.0 167576 1192 pts/5 Sl+ 00:30 3:42 ./a.out
|
可以发现,示例的程序消耗了2核的cpu。
在work()函数中,每次尝试从tsq中获取数据。如果获取到了就进行处理,不然就再此尝试获取。显而易见,这中间不会有任何的停顿。这种无限执行的指令序列,足以完全消耗单核的cpu。这里我们启动了两个线程,所以消耗完了2个cpu。如果按照这种写法,启动了和cpu个数一样多的线程。那么cpu资源将被完全消耗干净,计算机将会卡顿到难以进行其它任何操作了。
无任务时等待机制
显而易见的一个办法是,当没有消息到达时,当前线程让出cpu一段时间。这里可以调用sleep()函数来达到对应的效果。
下面是改进后的版本:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
#include <chrono>
void work(std::string thread_id)
{
while (true)
{
int res = tsq.Pop();
if (res)
{
std::lock_guard<std::mutex> lg(cout_mtx);
std::cout << thread_id << " Get " << res << "\n";
}
else
{
std::this_thread::sleep_for(std::chrono::seconds(5)); // 没有任务到达,睡眠5秒,再起来工作
}
}
}
|
一个可能的运行结果如下:
1
2
3
4
5
6
7
8
9
10
|
1
2
3
T2 Get 1
T2 Get 3
T1 Get 2
4
T1 Get 4
5
T1 Get 5
|
当我们查看cpu利用率时,发现已经恢复正常:
1
2
3
|
> ps -aux | grep a.out
%CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
0.0 0.0 167572 1196 pts/5 Sl+ 00:54 0:00 ./a.out
|
这里快速的输入了1,2,3。过了2秒,才打印出来结果。这是因为我们设定的线程休眠的时间是5秒,在线程t1和t2休眠时,数据缓存在了tsq中,不会被处理。等t1和t2结束休眠后,快速的处理了一波数据。
诚然,我们可以调小这里的时间,来提高相应速度。但是当需要这里相应的足够快,就需要将这个休眠时间调整的非常小,可能是需要毫秒,乃至微秒粒度。又或者当我们启动的线程数较多时,大量的线程在不停的结束休眠,检查数据,然后再进行休眠。这些都会消耗不少的cpu资源。
本质上,这里需要的是,无任务时线程进行休眠。有任务时,有一种机制,来唤醒线程进行工作!
等待(wait)与通知(notify)机制
如果一个朋友要来我们家做客。我们可以一直守在门口等着,当然这不是一个好主意。或者可以每隔一个小时去门口看一看,不过这也有点累。最省事的办法就是在家里等着(wait),等朋友到了敲敲门,或者按个门铃来通知下(notify)。
在C++中,可以使用std::condition_variable来完成wait与notify,std::condition_variable需要与std::unique_lock一起使用:
- cv.wait(std::unique_lock lock)。调用完成之后,会导致当前线程阻塞,直到别的线程调用了notify_one()或者本线程被虚假唤醒了。
- cv.notify_one(),用来唤醒调用了wait之后处于阻塞状态的线程。
当线程调用完wait之后,会发生如下的行为:
- 原地调用lock.unlock(),然后进入阻塞状态
- 如果被别的线程notify_one()唤醒,则调用lock.lock(),然后返回至业务的逻辑
基于此,我们可以将上述代码优化为:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <queue>
class ThreadSafeQueue
{
public:
void Push(int v)
{
std::lock_guard<std::mutex> lg(mtx_); // 加锁
q_.push(v); // 将元素添加到队列中
cv_.notify_one(); // 唤醒处于等待状态的线程
}
int Pop()
{
std::unique_lock<std::mutex> lk(mtx_); // 加锁
while (q_.empty()) { // 如果队列为空,就等待被唤醒
cv_.wait(lk);
}
int tmp = q_.front(); // 获取队头元素
q_.pop(); // 移除队列头部的元素
return tmp;
}
private:
std::mutex mtx_;
std::condition_variable cv_;
std::queue<int> q_;
};
// 简单示意,这里直接使用全局变量
ThreadSafeQueue tsq;
std::mutex cout_mtx; // 针对cout操作加锁
void work(std::string thread_id)
{
while (true)
{
int res = tsq.Pop(); // 0 现在也是可以处理的元素了
std::lock_guard<std::mutex> lg(cout_mtx);
std::cout << thread_id << " Get " << res << "\n";
}
}
int main() {
std::thread t1(work, "T1");
std::thread t2(work, "T2");
t1.detach();
t2.detach();
int input;
while (std::cin >> input)
{
// 不停的读取输入数据,放入后台线程进行处理
tsq.Push(input);
}
return 0;
}
|
这产生如下的结果:
1
2
3
4
5
6
7
8
9
10
11
12
|
0
T1 Get 0
1
T2 Get 1
2
T1 Get 2
3
T2 Get 3
4
T1 Get 4
5
T2 Get 5
|
相比于之前的程序,这有如下的好处:
- 不再需要额外的逻辑来标记队列为空
- 当有消息到达时,可以直接唤醒线程来进行工作,时效性更好
虚假唤醒(Spurious Wakeup)
你可以对上面Pop()函数中的这个循环表示疑惑:
1
2
3
|
while (q_.empty()) { // 如果队列为空,就等待被唤醒
cv_.wait(lk);
}
|
wait被唤醒了之后,为啥不能直接开始处理?
这是因为可能存在虚假唤醒的可能:
- 底层实现机制的原因,可能没有其它线程调用notify,但是被阻塞的线程也被操作系统唤醒了。也就是说有人误按了门铃。
- 即使不存在一的这种可能,在当前线程被唤醒,执行lock.lock()之前,可能其它线程在这个空挡操作了数据了。这代表门铃确实是朋友按的,但是我们起身太慢,可能朋友又去了外面透气,门口这时来了个快递小哥。
因此,当线程被唤醒之后,第一件要做的事情,就是检查是否满足处理条件。如果不满足,需要再次调用wait函数进行等待。