原子变量
本节阅读量:
多线程间计数
在前面我们学过,多线程间使用普通的变量进行通讯,会出现竞态条件(race condition)。实际执行的结果,不会符合我们的预期。
例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
#include <iostream>
#include <thread>
int count = 0;
void incr()
{
// 循环10万次,进行计数
for (int i = 0; i < 10 * 10000; i++) {
count++; // 这里多线程会同时执行到
}
}
int main()
{
std::thread t1(incr);
std::thread t2(incr);
t1.join();
t2.join();
std::cout << "Total Count Is " << count << std::endl;
return 0;
}
|
在作者的机器上,这打印:
在多线程的情况下,如果多个线程同时对一个对象进行读写,不进行合适的同步的话,即使是简单的计数都无法正确进行。
当然,我们可以在计数时进行加锁操作,确保同时只能有一个线程对「count」进行操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
#include <iostream>
#include <thread>
#include <mutex>
int count = 0;
std::mutex mtx;
void incr()
{
// 循环10万次,进行计数
for (int i = 0; i < 10 * 10000; i++) {
std::lock_guard<std::mutex> lg(mtx);
count++; // 加锁了,所以同时只能有一个线程执行到这里
}
}
int main()
{
std::thread t1(incr);
std::thread t2(incr);
t1.join();
t2.join();
std::cout << "Total Count Is " << count << std::endl;
return 0;
}
|
在作者的机器上,这打印:
无锁计数
使用锁确实可以解决多线程间对象同步的问题。但是,操作很简单的情况下,锁要进行更多的操作,带来一定的额外负载。还有一种可能的情况是,当一个线程获得锁之后,如果其它被操作系统调度到了后台。即使这个锁保护的代码段很短,其它线程,也必须等待这个休眠的线程回来执行完锁保护的代码,才能获得锁。
因此在C++中,提供原子变量,用于实现多线程无锁编程。它们通过 std::atomic 模板类提供,确保对变量的操作是 「不可分割」(atomic),从而避免数据竞争(data race)。
要使用原子变量,需要引入 atomic 头文件。我们建议只在内置的基础类型,例如 bool,int,float,double等上进行原子操作。
原子变量支持以下操作:
操作 |
说明 |
示例 |
load |
原子读取 |
int x = counter.load(); |
store |
原子写入 |
counter.store(42); |
++,– |
自增自减 |
counter++; |
compare and exchange |
比较并交换 |
见后续的示例 |
fetch_add, fetch_sub |
原子加减并返回旧值 |
int old = counter.fetch_add(1); |
这里我们使用原子变量改写上述的程序:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
#include <iostream>
#include <thread>
#include <atomic>
std::atomic<int> count{0};
void incr()
{
// 循环10万次,进行计数
for (int i = 0; i < 10 * 10000; i++) {
count++; // 使用原子变量,多线程执行这一行,每次操作都是原子的,所以无需加锁
}
}
int main()
{
std::thread t1(incr);
std::thread t2(incr);
t1.join();
t2.join();
std::cout << "Total Count Is " << count << std::endl;
return 0;
}
|
这产生了预期的结果:
基于原子变量的自旋锁
锁实际完成了什么事情?
- 如果未加锁,则加锁。这一步的操作必须是原子的。加锁成功之后到释放之前,其它线程无法获得锁。
- 如果锁已经被占用,则进行等待。
这就类似于如下的逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
|
bool lock_flag = true;
void lock() {
while (!lock_flag) {
wait();
}
lock_flag = true;
}
void unlock() {
lock_flag = false;
}
|
unlock()函数,看起来平平无齐。而对于lock()函数,这里的关键在于,对于lock_flag的检查,以及赋值必须是一条不能被分割的指令中完成。这种操作,就是比较并交换(compare and exchange)。
这里我们介绍下compare_exchange_weak()函数。它的建议版的签名如下:
1
2
|
template<typename T>
bool std::atomic<T>::compare_exchange_weak(T& expected, T desired)
|
其效果为:
- 先读取当前原子变量的值,记为old
- 将old与expected进行比较
- 如果相等,将desired赋值给原子变量,返回true
- 如果不相等,将原子变量的值拷贝给expected,返回false
expected既可以当做输入(比较成功时),也可以当做输出(比较失败时)。
因此,基于该操作,我们可以自己实现一个锁:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
class SpinLock {
public:
void lock() {
bool expect = false; // 期望 flag_ 为 false
while (!flag_.compare_exchange_weak(expect, true)) {
// 检查失败
expect = false; // compare_exchange_weak 会把 true 写回 expect
// 然后这里什么也不干,死循环等待
}
}
void unlock() {
flag_.store(false);
}
private:
std::atomic<bool> flag_{false}; // 初始未加锁
};
|
在lock()函数中:
- 如果flag_之前为false,那么比较成功,并且compare_exchange_weak会将true直接写到flag_里,然后执行结果是true,退出循环与函数,加锁成功
- 如果flag_之前为true,那么比较失败,进入循环体中。比较失败,会把flag_中存储的true写到expect里,因此在循环体内,要将expect再次赋值为false。这时加锁失败,需要重试。
至于unlock()函数,则只需要简单的将flag_置为false即可。
需要注意的是,基本上任何情况下,你都不需要使用如上的SpinLock,直接使用c++提供的std::mutex即可。
在SpinLock中,如果一个线程调用lock()成功,但是在调用unlock()函数之前,它被操作系统调度到了后台暂时不执行。那么所有在尝试获取这个自旋锁的线程,都会处于无限重试的循环,对应的线程会完全消耗1个底层的cpu资源,如果有多个这样的线程,整台机器都将处于忙碌状态,无法对外提供服务。
0.5 多线程间交互-wait与notify(施工中)
上一节