多线程间交互-消息队列

本节阅读量:

在更多的场景下,多个线程并不是完全对称的,也就是说,它们并不是完全执行相同的处理逻辑。

与之相反的是,少部分线程负责与外部进行通信,收发任务,然后将任务分配给其余的线程,其余的线程来负责实际的计算任务。

因此需要一个中间的介质,来做消息的传递。


std::queue

一般情况下,我们都是希望先到达的消息先处理(先入先出)。描述这种性质的数据结构叫队列(queue)。在标准库中提供了该功能,位于queue头文件。

这里我们暂时只需要关注四个函数:

  1. push 往队列尾部插入一条数据
  2. front 查看队列头部的元素
  3. pop 移除队列头部的元素
  4. empty 判断队列是否为空

下面是一个简单的使用示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <iostream>
#include <queue>

int main() {
    std::queue<int> q;
    q.push(1);  // q = 1
    q.push(2);  // q = 1, 2
    q.push(3);  // q = 1, 2, 3
    std::cout << "front: " << q.front() << std::endl;
    q.pop();  // q = 2, 3
    std::cout << "front: " << q.front() << std::endl;
    q.pop();  // q = 3
    q.push(4);  // q = 3, 4
    q.push(5);  // q = 3, 4, 5
    std::cout << "Remain:" << std::endl;
    // 打印并pop剩下的所有元素
    while (!q.empty())
    {
        std::cout << q.front() << std::endl;
        q.pop();
    }
    return 0;
}

输出:

1
2
3
4
5
6
front: 1
front: 2
Remain:
3
4
5

这里按顺序插入了1到5到队列的尾部。同时front()与pop()交替进行,查看并删除队列头部的元素。

因为队列是先入先出,所以打印的顺序,与插入的顺序一致。

此外,需要额外注意的是,调用front()与pop()函数之前,一定要确保队列中存在数据,否则会产生未定义的行为。

如下:

1
2
3
4
5
6
7
8
#include <iostream>
#include <queue>

int main() {
    std::queue<int> q;
    q.pop();  // 这里程序可能会直接崩溃
    return 0;
}

执行到pop()函数时,程序可能会直接崩溃,毕竟谁也无法从“空”中变出任何玩意。如果无法明确保障队列中存在数据,使用empty()或者size()函数进行判断。


线程安全的队列

如果要实现线程安全的队列,你可能会想。这很简单,包装一个类,然后将上述的四个函数,使用锁保护起来就可以了。

因此可能会写出下面的代码(以int队列为例):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#include <iostream>
#include <mutex>
#include <thread>
#include <queue>

class ThreadSafeQueue {
public:
    void Push(int v) {
        std::lock_guard<std::mutex> lg(mtx_); // 加锁
        q_.push(v);  // 将元素添加到队列中
    }

    void Pop() {
        std::lock_guard<std::mutex> lg(mtx_); // 加锁
        q_.pop();  // 移除队列头部的元素
    }

    int Front() {
        std::lock_guard<std::mutex> lg(mtx_); // 加锁
        return q_.front();  // 查看队列头部的元素
    }

    bool Empty() {
        std::lock_guard<std::mutex> lg(mtx_); // 加锁
        return q_.empty();  // 判断队列是否为空
    }

private:
    std::mutex mtx_;
    std::queue<int> q_;
};

// 简单示意,这里直接使用全局变量
ThreadSafeQueue tsq;

void thread1() {
    if (!tsq.Empty()) {
        // 设置一个足够的时间,让多线程之间操作tsq的问题暴露
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
        std::cout << "T1 See Empty Status " << tsq.Empty() << "\n";
        std::cout << "T1 Get " << tsq.Front() << "\n";
        tsq.Pop();
        std::cout << "T1 Pop " << "\n";
    }
}

void thread2() {
    if (!tsq.Empty()) {
        // 设置一个足够的时间,让多线程之间操作tsq的问题暴露
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        std::cout << "T2 See Empty Status " << tsq.Empty() << "\n";
        std::cout << "T2 Get " << tsq.Front() << "\n";
        tsq.Pop();
        std::cout << "T2 Pop " << "\n";
    }
}

int main() {
    tsq.Push(1);
    std::thread t1(thread1);
    std::thread t2(thread2);

    t1.join();
    t2.join();

    return 0;
}

对于q_的所有操作都被保护了起来,看起来足够的安全。事实真的如此么?

可能的执行结果是:

1
2
3
4
5
T1 See Empty Status 0
T1 Get 1
T1 Pop 
T2 See Empty Status 1
T2 Get

然后程序直接崩溃。

这里thread1先执行,这时tsq不为空,等待500毫秒,然后将tsq中的数据取出。thread2与thread1同时执行,这时候tsq中仍然存在一条数据,因此进入到if语句内。然后thread2等待1秒,这时候tsq中的数据已经被取出,因此tsq.Empty()返回true。接下来,从tsq中取数据,这产生了未定义的行为。

因为这里存在三个操作「检查为空」-「查看队头数据」- 「弹出队头数据」。在多线程情况下,即使单个操作是原子的,并不代表这三个操作合并在一起是原子的。这三个操作应该放在同一个临界区中,对外暴露一个接口。

因此,我们有了改进后的版本。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include <iostream>
#include <mutex>
#include <thread>
#include <queue>

class ThreadSafeQueue {
public:
    void Push(int v) {
        std::lock_guard<std::mutex> lg(mtx_); // 加锁
        q_.push(v);  // 将元素添加到队列中
    }

    int Pop() {
        std::lock_guard<std::mutex> lg(mtx_); // 加锁
        if (q_.empty()) {
            return 0;  // 假设我们队列中的都是整数,因此可以返回0,代表队列为空
        }
        int tmp = q_.front();  // 获取队头元素
        q_.pop();  // 移除队列头部的元素
        return tmp;
    }

private:
    std::mutex mtx_;
    std::queue<int> q_;
};

// 简单示意,这里直接使用全局变量
ThreadSafeQueue tsq;

void thread1() {
    int res = tsq.Pop();
    if (res) {
        std::cout << "T1 Get " << res << "\n";
    } else {
        std::cout << "T1 Get Empty " << "\n";
    }
}

void thread2() {
    int res = tsq.Pop();
    if (res) {
        std::cout << "T2 Get " << res << "\n";
    } else {
        std::cout << "T2 Get Empty " << "\n";
    }
}

int main() {
    tsq.Push(1);
    std::thread t1(thread1);
    std::thread t2(thread2);

    t1.join();
    t2.join();

    return 0;
}

可能得执行结果是:

1
2
T1 Get T2 Get Empty 
1

发生了什么?似乎thread1与thread2打印的结果发生了错乱。这是因为「std::cout « “T1 Get " « res « “\n”;」,实际上调用了「«」三次。thread1与thread2,都同时对std::cout进行多次操作。所以实际打印的顺序是不确定的。所以想有序的进行打印,需要对打印语句进行加锁。

下面是再次改进后的版本:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#include <iostream>
#include <mutex>
#include <thread>
#include <queue>

class ThreadSafeQueue {
public:
    void Push(int v) {
        std::lock_guard<std::mutex> lg(mtx_); // 加锁
        q_.push(v);  // 将元素添加到队列中
    }

    int Pop() {
        std::lock_guard<std::mutex> lg(mtx_); // 加锁
        if (q_.empty()) {
            return 0;  // 假设我们队列中的都是整数,因此可以返回0,代表队列为空
        }
        int tmp = q_.front();  // 获取队头元素
        q_.pop();  // 移除队列头部的元素
        return tmp;
    }

private:
    std::mutex mtx_;
    std::queue<int> q_;
};

// 简单示意,这里直接使用全局变量
ThreadSafeQueue tsq;
std::mutex cout_mtx;  // 针对cout操作加锁

void thread1() {
    int res = tsq.Pop();
    if (res) {
        std::lock_guard<std::mutex> lg(cout_mtx);
        std::cout << "T1 Get " << res << "\n";
    } else {
        std::lock_guard<std::mutex> lg(cout_mtx);
        std::cout << "T1 Get Empty " << "\n";
    }
}

void thread2() {
    int res = tsq.Pop();
    if (res) {
        std::lock_guard<std::mutex> lg(cout_mtx);
        std::cout << "T2 Get " << res << "\n";
    } else {
        std::lock_guard<std::mutex> lg(cout_mtx);
        std::cout << "T2 Get Empty " << "\n";
    }
}

int main() {
    tsq.Push(1);
    std::thread t1(thread1);
    std::thread t2(thread2);

    t1.join();
    t2.join();

    return 0;
}

可能得执行结果是:

1
2
T1 Get 1
T2 Get Empty

或者

1
2
T2 Get 1
T1 Get Empty

打印顺序随t1和t2的启动与执行顺序而定。但最终执行结果符合我们的预期。


0.3 死锁,以及如何避免

上一节

0.5 多线程间交互-wait与notify(施工中)

下一节