多线程间交互-消息队列
本节阅读量:
在更多的场景下,多个线程并不是完全对称的,也就是说,它们并不是完全执行相同的处理逻辑。
与之相反的是,少部分线程负责与外部进行通信,收发任务,然后将任务分配给其余的线程,其余的线程来负责实际的计算任务。
因此需要一个中间的介质,来做消息的传递。
std::queue
一般情况下,我们都是希望先到达的消息先处理(先入先出)。描述这种性质的数据结构叫队列(queue)。在标准库中提供了该功能,位于queue头文件。
这里我们暂时只需要关注四个函数:
- push 往队列尾部插入一条数据
- front 查看队列头部的元素
- pop 移除队列头部的元素
- empty 判断队列是否为空
下面是一个简单的使用示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
#include <iostream>
#include <queue>
int main() {
std::queue<int> q;
q.push(1); // q = 1
q.push(2); // q = 1, 2
q.push(3); // q = 1, 2, 3
std::cout << "front: " << q.front() << std::endl;
q.pop(); // q = 2, 3
std::cout << "front: " << q.front() << std::endl;
q.pop(); // q = 3
q.push(4); // q = 3, 4
q.push(5); // q = 3, 4, 5
std::cout << "Remain:" << std::endl;
// 打印并pop剩下的所有元素
while (!q.empty())
{
std::cout << q.front() << std::endl;
q.pop();
}
return 0;
}
|
输出:
1
2
3
4
5
6
|
front: 1
front: 2
Remain:
3
4
5
|
这里按顺序插入了1到5到队列的尾部。同时front()与pop()交替进行,查看并删除队列头部的元素。
因为队列是先入先出,所以打印的顺序,与插入的顺序一致。
此外,需要额外注意的是,调用front()与pop()函数之前,一定要确保队列中存在数据,否则会产生未定义的行为。
如下:
1
2
3
4
5
6
7
8
|
#include <iostream>
#include <queue>
int main() {
std::queue<int> q;
q.pop(); // 这里程序可能会直接崩溃
return 0;
}
|
执行到pop()函数时,程序可能会直接崩溃,毕竟谁也无法从“空”中变出任何玩意。如果无法明确保障队列中存在数据,使用empty()或者size()函数进行判断。
线程安全的队列
如果要实现线程安全的队列,你可能会想。这很简单,包装一个类,然后将上述的四个函数,使用锁保护起来就可以了。
因此可能会写出下面的代码(以int队列为例):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
#include <iostream>
#include <mutex>
#include <thread>
#include <queue>
class ThreadSafeQueue {
public:
void Push(int v) {
std::lock_guard<std::mutex> lg(mtx_); // 加锁
q_.push(v); // 将元素添加到队列中
}
void Pop() {
std::lock_guard<std::mutex> lg(mtx_); // 加锁
q_.pop(); // 移除队列头部的元素
}
int Front() {
std::lock_guard<std::mutex> lg(mtx_); // 加锁
return q_.front(); // 查看队列头部的元素
}
bool Empty() {
std::lock_guard<std::mutex> lg(mtx_); // 加锁
return q_.empty(); // 判断队列是否为空
}
private:
std::mutex mtx_;
std::queue<int> q_;
};
// 简单示意,这里直接使用全局变量
ThreadSafeQueue tsq;
void thread1() {
if (!tsq.Empty()) {
// 设置一个足够的时间,让多线程之间操作tsq的问题暴露
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::cout << "T1 See Empty Status " << tsq.Empty() << "\n";
std::cout << "T1 Get " << tsq.Front() << "\n";
tsq.Pop();
std::cout << "T1 Pop " << "\n";
}
}
void thread2() {
if (!tsq.Empty()) {
// 设置一个足够的时间,让多线程之间操作tsq的问题暴露
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::cout << "T2 See Empty Status " << tsq.Empty() << "\n";
std::cout << "T2 Get " << tsq.Front() << "\n";
tsq.Pop();
std::cout << "T2 Pop " << "\n";
}
}
int main() {
tsq.Push(1);
std::thread t1(thread1);
std::thread t2(thread2);
t1.join();
t2.join();
return 0;
}
|
对于q_的所有操作都被保护了起来,看起来足够的安全。事实真的如此么?
可能的执行结果是:
1
2
3
4
5
|
T1 See Empty Status 0
T1 Get 1
T1 Pop
T2 See Empty Status 1
T2 Get
|
然后程序直接崩溃。
这里thread1先执行,这时tsq不为空,等待500毫秒,然后将tsq中的数据取出。thread2与thread1同时执行,这时候tsq中仍然存在一条数据,因此进入到if语句内。然后thread2等待1秒,这时候tsq中的数据已经被取出,因此tsq.Empty()返回true。接下来,从tsq中取数据,这产生了未定义的行为。
因为这里存在三个操作「检查为空」-「查看队头数据」- 「弹出队头数据」。在多线程情况下,即使单个操作是原子的,并不代表这三个操作合并在一起是原子的。这三个操作应该放在同一个临界区中,对外暴露一个接口。
因此,我们有了改进后的版本。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
#include <iostream>
#include <mutex>
#include <thread>
#include <queue>
class ThreadSafeQueue {
public:
void Push(int v) {
std::lock_guard<std::mutex> lg(mtx_); // 加锁
q_.push(v); // 将元素添加到队列中
}
int Pop() {
std::lock_guard<std::mutex> lg(mtx_); // 加锁
if (q_.empty()) {
return 0; // 假设我们队列中的都是整数,因此可以返回0,代表队列为空
}
int tmp = q_.front(); // 获取队头元素
q_.pop(); // 移除队列头部的元素
return tmp;
}
private:
std::mutex mtx_;
std::queue<int> q_;
};
// 简单示意,这里直接使用全局变量
ThreadSafeQueue tsq;
void thread1() {
int res = tsq.Pop();
if (res) {
std::cout << "T1 Get " << res << "\n";
} else {
std::cout << "T1 Get Empty " << "\n";
}
}
void thread2() {
int res = tsq.Pop();
if (res) {
std::cout << "T2 Get " << res << "\n";
} else {
std::cout << "T2 Get Empty " << "\n";
}
}
int main() {
tsq.Push(1);
std::thread t1(thread1);
std::thread t2(thread2);
t1.join();
t2.join();
return 0;
}
|
可能得执行结果是:
1
2
|
T1 Get T2 Get Empty
1
|
发生了什么?似乎thread1与thread2打印的结果发生了错乱。这是因为「std::cout « “T1 Get " « res « “\n”;」,实际上调用了「«」三次。thread1与thread2,都同时对std::cout进行多次操作。所以实际打印的顺序是不确定的。所以想有序的进行打印,需要对打印语句进行加锁。
下面是再次改进后的版本:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
#include <iostream>
#include <mutex>
#include <thread>
#include <queue>
class ThreadSafeQueue {
public:
void Push(int v) {
std::lock_guard<std::mutex> lg(mtx_); // 加锁
q_.push(v); // 将元素添加到队列中
}
int Pop() {
std::lock_guard<std::mutex> lg(mtx_); // 加锁
if (q_.empty()) {
return 0; // 假设我们队列中的都是整数,因此可以返回0,代表队列为空
}
int tmp = q_.front(); // 获取队头元素
q_.pop(); // 移除队列头部的元素
return tmp;
}
private:
std::mutex mtx_;
std::queue<int> q_;
};
// 简单示意,这里直接使用全局变量
ThreadSafeQueue tsq;
std::mutex cout_mtx; // 针对cout操作加锁
void thread1() {
int res = tsq.Pop();
if (res) {
std::lock_guard<std::mutex> lg(cout_mtx);
std::cout << "T1 Get " << res << "\n";
} else {
std::lock_guard<std::mutex> lg(cout_mtx);
std::cout << "T1 Get Empty " << "\n";
}
}
void thread2() {
int res = tsq.Pop();
if (res) {
std::lock_guard<std::mutex> lg(cout_mtx);
std::cout << "T2 Get " << res << "\n";
} else {
std::lock_guard<std::mutex> lg(cout_mtx);
std::cout << "T2 Get Empty " << "\n";
}
}
int main() {
tsq.Push(1);
std::thread t1(thread1);
std::thread t2(thread2);
t1.join();
t2.join();
return 0;
}
|
可能得执行结果是:
1
2
|
T1 Get 1
T2 Get Empty
|
或者
1
2
|
T2 Get 1
T1 Get Empty
|
打印顺序随t1和t2的启动与执行顺序而定。但最终执行结果符合我们的预期。
最佳实践
多条语句,当多线程可能竞争执行时,如果需要试作一条原子命令运行,它们需要整体放到同一临界区中进行保护。
0.5 多线程间交互-wait与notify(施工中)
下一节