原子变量

本节阅读量:

多线程间计数

在前面我们学过,多线程间使用普通的变量进行通讯,会出现竞态条件(race condition)。实际执行的结果,不会符合我们的预期。

例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <iostream>
#include <thread>

int count = 0;

void incr()
{
    // 循环10万次,进行计数
    for (int i = 0; i < 10 * 10000; i++) {
        count++;  // 这里多线程会同时执行到
    }
}

int main()
{
	std::thread t1(incr);
    std::thread t2(incr);
	t1.join();
    t2.join();
	std::cout << "Total Count Is " << count << std::endl;
	return 0;
}

在作者的机器上,这打印:

1
Total Count Is 110253

在多线程的情况下,如果多个线程同时对一个对象进行读写,不进行合适的同步的话,即使是简单的计数都无法正确进行。

当然,我们可以在计数时进行加锁操作,确保同时只能有一个线程对「count」进行操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <iostream>
#include <thread>
#include <mutex>

int count = 0;
std::mutex mtx;

void incr()
{
    // 循环10万次,进行计数
    for (int i = 0; i < 10 * 10000; i++) {
        std::lock_guard<std::mutex> lg(mtx);
        count++;  // 加锁了,所以同时只能有一个线程执行到这里
    }
}

int main()
{
	std::thread t1(incr);
    std::thread t2(incr);
	t1.join();
    t2.join();
	std::cout << "Total Count Is " << count << std::endl;
	return 0;
}

在作者的机器上,这打印:

1
Total Count Is 200000

无锁计数

使用锁确实可以解决多线程间对象同步的问题。但是,操作很简单的情况下,锁要进行更多的操作,带来一定的额外负载。还有一种可能的情况是,当一个线程获得锁之后,如果其它被操作系统调度到了后台。即使这个锁保护的代码段很短,其它线程,也必须等待这个休眠的线程回来执行完锁保护的代码,才能获得锁。

因此在C++中,提供原子变量,用于实现多线程无锁编程。它们通过 std::atomic 模板类提供,确保对变量的操作是 「不可分割」(atomic),从而避免数据竞争(data race)。

要使用原子变量,需要引入 atomic 头文件。我们建议只在内置的基础类型,例如 bool,int,float,double等上进行原子操作。

原子变量支持以下操作:

操作 说明 示例
load 原子读取 int x = counter.load();
store 原子写入 counter.store(42);
++,– 自增自减 counter++;
compare and exchange 比较并交换 见后续的示例
fetch_add, fetch_sub 原子加减并返回旧值 int old = counter.fetch_add(1);

这里我们使用原子变量改写上述的程序:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <iostream>
#include <thread>
#include <atomic>

std::atomic<int> count{0};

void incr()
{
    // 循环10万次,进行计数
    for (int i = 0; i < 10 * 10000; i++) {
        count++;  // 使用原子变量,多线程执行这一行,每次操作都是原子的,所以无需加锁
    }
}

int main()
{
	std::thread t1(incr);
    std::thread t2(incr);
	t1.join();
    t2.join();
	std::cout << "Total Count Is " << count << std::endl;
	return 0;
}

这产生了预期的结果:

1
Total Count Is 200000

基于原子变量的自旋锁

锁实际完成了什么事情?

  1. 如果未加锁,则加锁。这一步的操作必须是原子的。加锁成功之后到释放之前,其它线程无法获得锁。
  2. 如果锁已经被占用,则进行等待。

这就类似于如下的逻辑:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
bool lock_flag = true;

void lock() {
    while (!lock_flag) {
        wait();
    }
    lock_flag = true;
}

void unlock() {
    lock_flag = false;
}

unlock()函数,看起来平平无齐。而对于lock()函数,这里的关键在于,对于lock_flag的检查,以及赋值必须是一条不能被分割的指令中完成。这种操作,就是比较并交换(compare and exchange)。

这里我们介绍下compare_exchange_weak()函数。它的建议版的签名如下:

1
2
template<typename T>
bool std::atomic<T>::compare_exchange_weak(T& expected, T desired)

其效果为:

  1. 先读取当前原子变量的值,记为old
  2. 将old与expected进行比较
  3. 如果相等,将desired赋值给原子变量,返回true
  4. 如果不相等,将原子变量的值拷贝给expected,返回false

expected既可以当做输入(比较成功时),也可以当做输出(比较失败时)。

因此,基于该操作,我们可以自己实现一个锁:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
class SpinLock {
public:
    void lock() {
        bool expect = false;  // 期望 flag_ 为 false
        while (!flag_.compare_exchange_weak(expect, true)) {
            // 检查失败
            expect = false;  // compare_exchange_weak 会把 true 写回 expect
            // 然后这里什么也不干,死循环等待
        }
    }

    void unlock() {
        flag_.store(false);
    }
    
private:
    std::atomic<bool> flag_{false};  // 初始未加锁
};

在lock()函数中:

  1. 如果flag_之前为false,那么比较成功,并且compare_exchange_weak会将true直接写到flag_里,然后执行结果是true,退出循环与函数,加锁成功
  2. 如果flag_之前为true,那么比较失败,进入循环体中。比较失败,会把flag_中存储的true写到expect里,因此在循环体内,要将expect再次赋值为false。这时加锁失败,需要重试。

至于unlock()函数,则只需要简单的将flag_置为false即可。

需要注意的是,基本上任何情况下,你都不需要使用如上的SpinLock,直接使用c++提供的std::mutex即可。

在SpinLock中,如果一个线程调用lock()成功,但是在调用unlock()函数之前,它被操作系统调度到了后台暂时不执行。那么所有在尝试获取这个自旋锁的线程,都会处于无限重试的循环,对应的线程会完全消耗1个底层的cpu资源,如果有多个这样的线程,整台机器都将处于忙碌状态,无法对外提供服务。


0.5 多线程间交互-wait与notify(施工中)

上一节

常用库

下一节