如何充分发挥多线程的性能
本节阅读量:
在前面的章节,我们主要讲了两件事情:
- 线程的生命周期:线程如何创建,怎样给线程传递参数,线程的分离与销毁。
- 线程间的通信:std::mutex,原子变量,以及如何使用它们在多线程间进行同步与互斥。
接下来,我们研究下如何发挥多线程的性能。
首先,我们先给出一段单线程计算的代码,作为多线程处理比较的基准:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
#include <iostream>
#include <chrono>
int main()
{
int i = 0;
auto start = std::chrono::high_resolution_clock::now();
while (i < 1 * 10000 * 10000)
{
i++;
}
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start); // 计算耗时,转换为纳秒
std::cout << "Cost " << duration.count() << std::endl;
}
|
这段代码很简单,从0计数到1亿,在作者的机器上,打印结果如下:
也就是说,花费了0.216秒。
避免多线程间过度同步
我们将上面的程序改成两个线程同时对i进行计数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
#include <iostream>
#include <chrono>
#include <thread>
#include <mutex>
int i = 0;
std::mutex mtx;
void th_func(int j) {
while (true) {
std::lock_guard<std::mutex> lg(mtx);
i++;
if (i > 1 * 10000 * 10000) {
break;
}
}
}
int main()
{
auto start = std::chrono::high_resolution_clock::now();
std::thread t0(th_func, 0);
std::thread t1(th_func, 1);
t0.join();
t1.join();
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start); // 计算耗时,转换为纳秒
std::cout << "Cost " << duration.count() << std::endl;
}
|
在作者的机器上,打印结果如下:
花费了4.16秒。
在前面,我们学过,计数场景下atomic变量更有性能优势。让我们将i,替换成atomic,不再使用mutex,再试一次。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
#include <iostream>
#include <chrono>
#include <thread>
#include <mutex>
#include <atomic>
std::atomic<int> i{0};
void th_func(int j) {
while (i.fetch_add(1) < 1 * 10000 * 10000) {
}
}
int main()
{
auto start = std::chrono::high_resolution_clock::now();
std::thread t0(th_func, 0);
std::thread t1(th_func, 1);
t0.join();
t1.join();
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start); // 计算耗时,转换为纳秒
std::cout << "Cost " << duration.count() << std::endl;
}
|
在作者的机器上,打印结果如下:
花费了1.05秒。
为什么多线程的性能反而不如单线程?
因为我们的多线程的逻辑,是一直对同一个变量进行操作。而因为同步的原因,同时只能有一个线程实际能进行处理。
也就是说,整体处理时间 = 线程间通信的时间 + 有效的工作量。
这里每一步对i的操作,都需要线程间通信,这个时间甚至大于 “i++” 的耗时。所以整体耗时,远大于之前单线程程序。
所以要充分发挥多线程的性能,第一要点就是,「避免线程间不必要的同步」。
设置合理的线程数
如果我们让两个线程分别计数,一个从0开始,一个从5千万开始,每个线程只处理5千万个数,这样就不会有线程间同步的问题了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
#include <iostream>
#include <chrono>
#include <thread>
#include <mutex>
#include <atomic>
void th_func(int s, int e) {
while (s < e) {
s++;
}
}
int main()
{
auto start = std::chrono::high_resolution_clock::now();
std::thread t0(th_func, 0, 5000 * 10000);
std::thread t1(th_func, 5000 * 10000, 10000 * 10000);
t0.join();
t1.join();
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start); // 计算耗时,转换为纳秒
std::cout << "Cost " << duration.count() << std::endl;
}
|
在作者的机器上,打印结果如下:
花费了109毫秒,速度提高了10倍。这里只有线程创建的时候,还需要线程间通信,之后的处理速度,就和单线程一样了。所以整体的处理速度,和线程数是正相关的。
让我们逐步的提高线程数,看看效果:
| 线程数 |
耗时 |
| 1 |
220.1 ms |
| 4 |
55.5 ms |
| 8 |
27.8 ms |
| 12 |
20.4 ms |
| 16 |
15.2 ms |
| 20 |
22.7 ms |
| 24 |
19.4 ms |
可以看到,当线程数少于16时,性能随着线程数的增加成比率的增加,当线程数超过16个时,性能反而下降了。作者的机器上,是支持8核16线程的,所以实际当底层跑满16个线程时,已经到达处理能力上限。
线程过多会增加上下文切换的开销,降低程序性能;线程过少则无法充分利用多核处理器的优势。要根据程序的任务类型、系统资源以及目标硬件平台等因素,合理确定线程数量。例如,对于CPU密集型任务,线程数量可以接近处理器核心数;对于I/O密集型任务,线程数量可以适当增加。
为什么这里线程数到24时,性能反而好于20呢?原因在于,超过16个线程的时候,24 - 16 = 8,这8个线程,会被调度器分配到8个核心上,而20 - 16 = 4,这4个线程,会被调度器分配到4个核心上。所以24个线程,会比20个线程,多占用4个核心。同时每个线程分到的任务也会更少,所以整体的处理速度,会更快。
避免共享CacheLine
什么是伪共享(False Sharing)
现代CPU为了以低价格获得高性能,大量使用了cache,并把cache分了多级。一般使用L1,L2,L3这样的三级cache。其中L1和L2 cache为每个核心独有,L3则所有核心共享。一个核心写入自己的L1 cache是极快的(4 cycles, ~2ns)。
Cache Line 是 CPU 缓存的最小单位,通常为 64 字节。当多个线程频繁读写位于同一个 Cache Line 中的不同变量时,即使这些变量逻辑上不相关,也会导致 Cache Line 在不同核心之间频繁传输,这种现象称为伪共享。
当另一个核心读或写同一处内存时,它得确认看到其他核心中对应的cacheline。对于软件来说,这个过程是原子的,不能在中间穿插其他代码,只能等待CPU完成一致性同步,这个复杂的硬件算法使得原子操作会变得很慢,在E5-2620上竞争激烈时fetch_add会耗费700纳秒左右。访问被多个线程频繁共享的内存往往是比较慢的。
示例:伪共享导致性能下降
下面我们通过一个示例来说明伪共享的性能影响:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
#include <iostream>
#include <chrono>
#include <thread>
#include <atomic>
#include <vector>
struct Counter {
std::atomic<int> value;
};
// 两个计数器,可能位于同一个 Cache Line 中
Counter c1;
Counter c2;
void thread1_func() {
for (int i = 0; i < 10000000; i++) {
c1.value.fetch_add(1, std::memory_order_relaxed);
}
}
void thread2_func() {
for (int i = 0; i < 10000000; i++) {
c2.value.fetch_add(1, std::memory_order_relaxed);
}
}
int main()
{
auto start = std::chrono::high_resolution_clock::now();
std::thread t1(thread1_func);
std::thread t2(thread2_func);
t1.join();
t2.join();
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
std::cout << "c1: " << c1.value << ", c2: " << c2.value << std::endl;
std::cout << "Cost: " << duration.count() << " us" << std::endl;
}
|
在作者的机器上,上述代码的执行时间约为:
c1: 10000000, c2: 10000000
Cost: 201175 us
解决方案:使用缓存行对齐
为了避免伪共享,我们可以将变量对齐到不同的 Cache Line:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
#include <iostream>
#include <chrono>
#include <thread>
#include <atomic>
#include <vector>
// 缓存行大小,通常为 64 字节
constexpr size_t CACHE_LINE_SIZE = 64;
struct PaddedCounter {
std::atomic<int> value;
// 填充字段,确保不同实例位于不同的 Cache Line
char padding[CACHE_LINE_SIZE - sizeof(std::atomic<int>)];
};
// 两个计数器,现在位于不同的 Cache Line 中
PaddedCounter c1;
PaddedCounter c2;
void thread1_func() {
for (int i = 0; i < 10000000; i++) {
c1.value.fetch_add(1, std::memory_order_relaxed);
}
}
void thread2_func() {
for (int i = 0; i < 10000000; i++) {
c2.value.fetch_add(1, std::memory_order_relaxed);
}
}
int main()
{
auto start = std::chrono::high_resolution_clock::now();
std::thread t1(thread1_func);
std::thread t2(thread2_func);
t1.join();
t2.join();
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
std::cout << "c1: " << c1.value << ", c2: " << c2.value << std::endl;
std::cout << "Cost: " << duration.count() << " us" << std::endl;
}
|
使用缓存行对齐后,在作者的机器上,执行时间约为:
c1: 10000000, c2: 10000000
Cost: 60261 us
可以看到,通过避免伪共享,性能提升了约 3.3 倍(从 201ms 降至 60ms)!
总结
避免伪共享的关键点:
- 了解你的缓存行大小:大多数现代处理器的缓存行大小为 64 字节,但有些架构可能不同。
- 使用填充:在结构体中添加适当的填充字段,确保不同实例不会共享同一个缓存行。
- 使用工具检测:性能分析工具(如 perf、VTune)可以帮助识别伪共享问题。
- 平衡空间和时间:缓存行对齐会增加内存使用,需要在性能和内存占用之间找到平衡。
在高性能多线程编程中,避免伪共享是提升性能的重要技术之一,特别是在计数器、统计信息等高频访问的场景下。
如果充分发挥多线程性能
整体上需要注意以下几点:
- 设计合适的线程模型,使用合理的线程数量
- 优化线程任务。合理分解任务,将大任务分解为多个小任务,并分配给不同的线程执行,尽量让每个线程的工作量相近,避免部分线程过载而其他线程空闲
- 减少线程间的竞争,减少共享资源访问,使用高效的同步机制
- 避免伪共享