性能优化建议
本节阅读量:
本文翻译自 Abseil 官网的 Performance Hints。
多年来,Jeff 和 Sanjay 在各种代码的性能调优方面进行了大量深入研究,自谷歌创立之初,提升软件的性能就一直是他们的重点工作,因为这能让谷歌为更多用户提供更多服务。他们撰写了本文,旨在总结进行此类工作时所遵循的一些通用原则和具体技巧。下面挑选出具有说明性的源代码更改(change lists,更改列表,下面简称 CLs),以举例说明各种方法和技巧。下文中的大部分具体建议都涉及 C++,但这些通用原则同样适用于其他语言。本文重点关注单个二进制文件的通用性能调优,并不涉及分布式系统或机器学习(ML)硬件性能调优(这些领域本身就非常庞大)。我们希望本文能对其他人有所帮助。
本文档中的许多示例都包含代码片段,用于演示这些技术。请注意,其中一些代码片段提到了各种内部Google代码库抽象概念。尽管如此,我们还是将其包含在内,因为我们认为这些示例足够独立,即使不熟悉这些抽象概念细节的人也能理解。
编写代码时考虑性能很重要
高德纳(Knuth)有一句话经常被断章取义地引用:“过早优化是万恶之源”。完整语句如下:“我们应该忘记大约97%时间里的小效率提升:过早优化是万恶之源。然而,我们也不应错过那关键的3%的机会。”本文讨论的就是这关键的3%,而高德纳还有一句更引人注目的名言,同样出自他的著作:
“从示例2到示例2a的速度提升仅约12%,许多人会认为这微不足道。当今许多软件工程师普遍持有的传统观念是,不必在意小的效率提升;但我认为,这只是对那些因小失大的程序员所表现出的滥用行为的过度反应,这些程序员无法调试或维护他们“优化”后的程序。在成熟的工程学科中,轻松获得的12%提升,从来不会被视为微不足道;我认为,在软件工程领域也应秉持同样的观点。当然,对于一次性任务,我不会费心进行这样的优化,但当涉及到编写高质量程序时,我不想将自己局限于那些剥夺我这种效率的工具。”
很多人会说:“先把代码写得尽可能简单,等能进行性能分析时再优化性能。”但这种方法通常并不正确:
- 如果在开发大型系统时完全忽视所有性能问题,最终将导致性能分布均匀、无明显热点,因为性能损失分散在各处,难以确定优化的切入点。
- 如果你正在开发一个将被他人使用的库,最终遇到性能问题的用户往往难以自行优化——他们需要理解其他团队或人员编写的代码细节,并与相关方协商性能优化的重要性。
- 当系统被大量使用时,进行重大改动将变得更为困难。
- 因此,难以判断哪些性能问题能通过简单方式解决,最终可能采取代价高昂的方案,例如为应对负载问题而过度复制数据,或对服务进行过度分配资源。
因此,我们建议在编写代码时,只要不会显著影响代码的可读性或复杂度,应尽量选择性能更优的实现方式。
预估
在编写代码时,若能预判当前代码对性能的影响程度,便能做出更明智的决策(例如:为提升性能值得增加多少代码复杂度)。以下是在编码过程中估算性能的实用建议:
- 是否为测试代码?若是,主要需关注算法与数据结构的渐近复杂度(注:开发与验证的循环周期时间同样重要,避免编写运行时间过长的测试)。
- 是否为应用专属代码?若是,此时需判断该段代码的性能敏感度——通常无需过度复杂化:只需区分是初始化/配置代码(如服务启动流程),还是将频繁执行的热点路径代码(如处理每个请求的逻辑),即可基本确定优化优先级。
- 是否为多应用共用的库代码?此类代码的性能敏感度难以预判,因此更需遵循本文所述的简单优化原则。例如:当需存储通常元素较少的vector时,应使用
absl::InlinedVector 而非 std::vector。此类优化手段易于实现,且不会增加系统全局复杂度。与其后续发现代码实际消耗大量资源,不如从一开始就具备更高性能,这样在性能分析时也更易定位下一轮优化点。
在选择性能特性可能存在差异的方案时,可借助粗略估算进行更深入的分析。此类估算能快速提供不同方案性能的粗略对比,从而在无需实际实现的情况下,直接排除部分备选方案。
以下是此类估算的具体操作步骤:
- 估算各类底层操作的执行次数。例如:磁盘寻道次数、网络往返次数、传输字节数等。
- 计算每类高成本操作的估算代价并求和。将各类操作的估算成本(如每磁盘寻道耗时、每网络往返延迟)相乘后累加,得出系统资源消耗的总成本。
- 前面两点所述的消耗,指的是资源消耗。如果你更关心延时,并且系统存在并发处理的情况,一些操作可能会重叠执行(实际延迟可能低于简单累加值),此时需进行更精细的分析。
下面的表格,列出了每种操作的大概消耗,可能对你有所参考:
L1 cache 访问 0.5 ns
L2 cache 访问 3 ns
分支预测失败 5 ns
加锁/解锁 (无竞争) 15 ns
内存访问 50 ns
使用 Snappy 压缩 1 KB 数据 1,000 ns
从 SSD 读取 4 KB 数据 20,000 ns
同一数据中心内往返(延迟 50,000 ns
从内存顺序读 1MB 数据 64,000 ns
从 100 Gbps 网络读取 1MB 数据 100,000 ns
从 SSD 读取 1MB 数据 1,000,000 ns
磁盘寻道 5,000,000 ns
从磁盘顺序读 1MB 数据 10,000,000 ns
跨洲际网络传输 150,000,000 ns
前表列出了部分基础底层操作的粗略成本。您可能还需要跟踪与您的系统相关的高层操作的预估成本。例如,您可能需要了解从SQL数据库中读取单条数据的粗略成本、与云服务交互的延迟时间,或渲染简单HTML页面所需时间。如果您不了解不同操作的相关成本,就无法进行合理的粗略估算!
样例: 对 10 亿 4 字节数据进行快速排序的耗时预估
一个高效的快速排序算法需对规模为N的数组进行约log(N)次遍历。每次遍历中,数组内容将从内存流式传输至处理器缓存,分区代码会将每个元素与基准元素比较一次。我们汇总主要成本如下:
- 内存带宽:数组占用4 GB(4字节/元素 × 10亿元素)。假设 CPU 单核内存带宽约16 GB/s,单次遍历耗时约0.25秒。当 N 为 2^30 时,需约 30 次遍历,总内存传输成本约为 7.5 秒。
- 分支预测错误成本:总比较次数为 N × log(N) = 300 亿次,假设其中 50%(150亿次)预测错误。每次错误预测成本5纳秒,总错误成本 = 150亿 × 5纳秒 = 75秒(注:本分析假设正确预测分支成本为零)。
- 总估算成本: 7.5秒(内存访问) + 75秒(分支预测错误) = 约82.5秒
若需进一步优化分析以纳入处理器缓存影响(尽管基于前述分析,分支预测错误成本已占主导,此优化实际必要性较低),仍可作为示例补充说明:假设系统配备32MB L3缓存,且数据从L3缓存传输至处理器的开销可忽略不计。L3缓存可容纳2^23个元素(约838万元素),因此最后22次遍历可直接在缓存中操作数据(倒数第23次遍历将数据载入L3缓存,后续遍历均基于缓存数据执行)。由此内存传输成本从7.5秒(30次传输)降至2.5秒(10次传输:4GB × 10 ÷ 16GB/s)。
样例: 加载有 30 张图片的网页
对比两种设计方案(假设原始图像存储于磁盘,单张图像约1MB):
- 串行读取: 每张图像需执行1次寻道+1次传输,总耗时5ms(寻道)+ 10ms(传输)= 15ms/张。30张图像总耗时:30 × 15ms = 450ms
- 并行读取(图像均匀分布于K个磁盘):资源使用估算不变,但延迟将降低约K倍(忽略方差影响,例如某磁盘可能承载多于1/K的图像)。若在数百个磁盘的分布式文件系统上运行,预期延迟降至约15ms
- 单SSD方案(所有图像存于单块SSD):顺序读取性能优化为20μs(寻道)+ 1ms(传输)/张,总耗时 约为 30ms(30 × 1.02ms)
测量
前一节已给出一些关于编写代码时如何思考性能的建议。然而,在实际开始优化或遇到涉及性能、简洁性等多方面权衡的情况之前,您需要评估潜在的性能收益。有效的衡量性能是进行相关工作时最需要掌握的首要工具。
顺便提一下,对不熟悉的代码进行性能分析,也是了解代码库结构及运行方式的有效途径。检查程序动态调用图中频繁调用的函数及对应代码,能够帮助您对代码运行时的整体流程形成高层次理解,从而增强您在稍不熟悉的代码中实施性能优化的信心。
性能分析工具与实用技巧
有许多可用的性能分析工具。首先,使用 pprof,它提供良好的高层次性能信息,并且易于在本地和生产代码中使用。此外,如果需要更详细的性能信息,请尝试 perf。
这里有一些使用 pprof 的实用技巧:
- 使用合适的调试和优化 flags 来构建生产二进制文件。
- 如果可能,请编写一个覆盖你正在优化的代码的「微型基准测试」。这能够缩短性能改进的反馈时间,帮助验证性能改进的效果,并有助于防止未来的性能退化。然而,微型基准测试可能存在「陷阱」,导致其无法代表整个系统的实际性能。可以搜索「cpp benchmarks」这个库来辅助编写微型基准测试。
- 使用基准测试库来「输出性能计数器读数」,不仅能提高精度,还能更深入地了解程序行为。
- 锁竞争常常会降低CPU使用率。某些互斥锁的实现支持锁竞争分析。
- 可以使用「xprof」进行机器学习性能分析。
当性能分析没有发现热点怎么办
你经常会遇到一些情况,此时你的CPU性能分析图显示为平缓状态(没有明显的性能瓶颈)。这通常代表所有容易获取的优化点都已经处理完毕。如果你发现自己处于这种状况,可以考虑以下一些建议:
- 不要低估许多小优化的价值!在某个子系统中实现二十次单独的1%改进非常有可能,整体上意味着相当可观的提升(这类工作的开展通常依赖于稳定且高质量的微基准测试)。
- 在调用栈的顶部附近查找循环(火焰图可帮助分析)。潜在地,该循环或其调用的代码可能通过重构来提高效率。例如,你可能在循环里遍历输入的节点和边,逐步构建复杂图结构,这可以修改为一次性传递整个输入来构建图结构。这可以消除初始化图的代码中每条边初始化时大量内部检查。
- 后退一步,关注调用栈高层的结构性变化,而不是纠结于微小的优化。下面列出的算法改进技术在进行此类分析时可能会有所帮助。
- 查找过于通用的代码,并用定制化或更底层的实现方式替换。例如,如果某个应用程序反复使用正则表达式进行匹配,而简单的前缀匹配就已足够,则应考虑停止使用正则表达式。
- 尝试减少内存分配次数,并优先处理对分配次数贡献最大的部分。这将产生两个效果:(1) 它会直接减少在分配器中花费的时间(以及GC语言中的垃圾回收时间);(2) 通常还会减少缓存未命中,因为在使用tcmalloc的长期运行程序中,每次分配往往会指向不同的缓存行。
- 收集其他类型的性能剖析结果,特别是基于硬件性能计数器的数据。此类数据可能会指出遇到高缓存缺失率的情形。可以参考性能剖析工具和技巧部分描述的技术。
API 考量
以下一些技术可能需要更改数据结构和函数签名,这可能会影响调用者。请尝试组织代码,使得性能改进可以在封装边界内进行,而不会影响公共接口。如果您的模块具有深度(通过少量接口访问重要功能),这将更容易实现。
广泛使用的API需要添加新功能,非常困难。在添加新功能时要小心,因为这些会限制未来的实现,并且对于不需要新功能的用户来说,会导致不必要的成本增加。例如,许多C++标准库容器承诺迭代器稳定性,在标准库的实现中,这会显著增加内存分配次数,尽管许多用户并不需要这种稳定性。
以下列出了一些具体的技巧。请仔细考虑由此引入的API可用性问题与性能优势之间的权衡。
批量API
提供批量操作以减少昂贵的API调用或利用算法进行改进。
样例一:添加 MemoryManager::LookupMany 批量操作接口
添加批量接口,这同时简化了新批量接口的函数签名:客户端只需要知道所有键是否都被找到,因此我们可以返回一个bool值而非状态对象
老代码:
1
2
3
4
|
class MemoryManager {
public:
...
util::StatusOr<LiveTensor> Lookup(const TensorIdProto& id);
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
|
class MemoryManager {
public:
...
util::StatusOr<LiveTensor> Lookup(const TensorIdProto& id);
// 查找指定的tensor
struct LookupKey {
ClientHandle client;
uint64 local_id;
};
bool LookupMany(absl::Span<const LookupKey> keys,
absl::Span<tensorflow::Tensor> tensors);
|
样例二:添加批量 ObjectStore::DeleteRefs API 以摊销锁的开销
老代码:
1
2
3
4
5
|
template <typename T>
class ObjectStore {
public:
...
absl::Status DeleteRef(Ref);
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
template <typename T>
class ObjectStore {
public:
...
absl::Status DeleteRef(Ref);
// 删除多个引用。对于每个引用,如果没有任何其他引用指向同一个对象,则该对象将被删除。任何错误时返回非OK状态。
absl::Status DeleteRefs(absl::Span<const Ref> refs);
...
template <typename T>
absl::Status ObjectStore<T>::DeleteRefs(absl::Span<const Ref> refs) {
util::Status result;
absl::MutexLock l(&mu_);
for (auto ref : refs) {
result.Update(DeleteRefLocked(ref));
}
return result;
}
|
之前使用的方式:
1
2
3
4
5
6
7
8
9
10
11
|
void HandleBatch(int, const plaque::Batch& input) override {
for (const auto& t : input) {
auto in = In(t);
PLAQUE_OP_ASSIGN_OR_RETURN(const auto& handles, in.handles());
for (const auto handle : handles.value->handles()) {
PLAQUE_OP_RETURN_IF_ERROR(in_buffer_store_
? bstore_->DeleteRef(handle)
: tstore_->DeleteRef(handle));
}
}
}
|
修改使用DeleteRefs后的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
void HandleBatch(int, const plaque::Batch& input) override {
for (const auto& t : input) {
auto in = In(t);
PLAQUE_OP_ASSIGN_OR_RETURN(const auto& handles, in.handles());
if (in_buffer_store_) {
PLAQUE_OP_RETURN_IF_ERROR(
bstore_->DeleteRefs(handles.value->handles()));
} else {
PLAQUE_OP_RETURN_IF_ERROR(
tstore_->DeleteRefs(handles.value->handles()));
}
}
}
|
堆的批量初始化可以在 O(N) 时间内完成;而逐个添加元素,并在每次添加后更新堆性质,则需要 O(N lg(N)) 时间。
有时很难让调用方直接改用新的批量 API。在这种情况下,可以在内部使用批量 API,并缓存结果,供后续非批量 API 调用复用:
示例:缓存块解码结果,供后续调用使用。
每次查找都需要解码包含 K 个条目的整个块。可以把解码后的条目存入缓存,并在后续查找时查询缓存。
lexicon.cc
修改前:
1
2
3
4
5
6
7
8
9
10
11
|
void GetTokenString(int pos, std::string* out) const {
...
absl::FixedArray<LexiconEntry, 32> entries(pos + 1);
// 解码直到 pos 为止(包含 pos)的所有词典条目。
for (int i = 0; i <= pos; ++i) {
p = util::coding::TwoValuesVarint::Decode32(p, &entries[i].remaining,
&entries[i].shared);
entries[i].remaining_str = p;
p += entries[i].remaining; // remaining 字节跟在每个条目之后。
}
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
mutable std::vector<absl::InlinedVector<std::string, 16>> cache_;
...
void GetTokenString(int pos, std::string* out) const {
...
DCHECK_LT(skentry, cache_.size());
if (!cache_[skentry].empty()) {
*out = cache_[skentry][pos];
return;
}
...
// 初始化缓存。
...
const char* prev = p;
for (int i = 0; i < block_sz; ++i) {
uint32 shared, remaining;
p = TwoValuesVarint::Decode32(p, &remaining, &shared);
auto& cur = cache_[skentry].emplace_back();
gtl::STLStringResizeUninitialized(&cur, remaining + shared);
std::memcpy(cur.data(), prev, shared);
std::memcpy(cur.data() + shared, p, remaining);
prev = cur.data();
p += remaining;
}
*out = cache_[skentry][pos];
|
视图类型
函数参数应优先使用视图类型(例如 std::string_view、std::Span<T>、
absl::FunctionRef<R(Args...)>),除非需要转移数据所有权。这些类型可以减少复制,并允许调用方自行选择容器类型(例如,一个调用方可能使用 std::vector,另一个调用方可能使用 absl::InlinedVector)。
预分配/预计算的参数
对于频繁调用的例程,有时允许上层调用方传入自己持有的数据结构,或者传入调用方已经拥有、而被调例程又需要的信息,会很有帮助。这样可以避免底层例程被迫分配自己的临时数据结构,或重新计算已经可用的信息。
示例:为 RPC_Stats::RecordRPC 增加一个变体,允许客户端传入已有的 WallTime 值。
rpc-stats.h
修改前:
1
|
static void RecordRPC(const Name &name, const RPC_Stats_Measurement& m);
|
修改后:
1
2
|
static void RecordRPC(const Name &name, const RPC_Stats_Measurement& m,
WallTime now);
|
clientchannel.cc
修改前:
1
2
3
|
const WallTime now = WallTime_Now();
...
RPC_Stats::RecordRPC(stats_name, m);
|
修改后:
1
2
3
|
const WallTime now = WallTime_Now();
...
RPC_Stats::RecordRPC(stats_name, m, now);
|
线程兼容类型与线程安全类型
一个类型可以是线程兼容的(由外部同步),也可以是线程安全的(由内部同步)。大多数通用类型应该设计为线程兼容。这样,不需要线程安全的调用方就不必为它付出成本。
示例:由于调用方已经自行同步,将类改为线程兼容。
hitless-transfer-phase.cc
修改前:
1
2
3
4
5
|
TransferPhase HitlessTransferPhase::get() const {
static CallsiteMetrics cm("HitlessTransferPhase::get");
MonitoredMutexLock l(&cm, &mutex_);
return phase_;
}
|
修改后:
1
|
TransferPhase HitlessTransferPhase::get() const { return phase_; }
|
hitless-transfer-phase.cc
修改前:
1
2
3
4
5
|
bool HitlessTransferPhase::AllowAllocate() const {
static CallsiteMetrics cm("HitlessTransferPhase::AllowAllocate");
MonitoredMutexLock l(&cm, &mutex_);
return phase_ == TransferPhase::kNormal || phase_ == TransferPhase::kBrownout;
}
|
修改后:
1
2
3
|
bool HitlessTransferPhase::AllowAllocate() const {
return phase_ == TransferPhase::kNormal || phase_ == TransferPhase::kBrownout;
}
|
不过,如果某个类型的典型用法确实需要同步,应优先把同步放到类型内部。这样就可以在不影响调用方的前提下,按需调整同步机制来提升性能(例如通过分片减少竞争)。
算法改进
最关键的性能改进机会通常来自算法改进,例如把 O(N²) 算法变成 O(N lg(N)) 或 O(N),避免潜在的指数级行为,等等。这类机会在稳定代码中并不常见,但在编写新代码时值得特别留意。下面是一些对既有代码做出此类改进的例子:
示例:按逆后序将节点加入环检测结构。
之前我们逐个把图节点和边加入环检测数据结构,这会在每条边上产生昂贵开销。现在我们按逆后序加入整张图,使环检测变得非常简单。
graphcycles.h
修改前:
1
2
3
4
5
6
|
class GraphCycles : public util_graph::Graph {
public:
GraphCycles();
~GraphCycles() override;
using Node = util_graph::Node;
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
|
class GraphCycles : public util_graph::Graph {
public:
GraphCycles();
~GraphCycles() override;
using Node = util_graph::Node;
// InitFrom 添加 src 中的所有节点和边;如果成功则返回 true,
// 如果遇到环则返回 false。
// 要求:此前还没有向 GraphCycles 添加任何节点和边。
bool InitFrom(const util_graph::Graph& src);
|
graphcycles.cc
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
bool GraphCycles::InitFrom(const util_graph::Graph& src) {
...
// 按拓扑顺序分配 rank,这样初始化期间不需要任何重排。
// 对于无环图,DFS 会按逆拓扑顺序离开节点,
// 因此我们在离开节点时为它们分配递减的 rank。
Rank last_rank = n;
auto leave = [&](util_graph::Node node) {
DCHECK(r->rank[node] == kMissingNodeRank);
NodeInfo* nn = &r->nodes[node];
nn->in = kNil;
nn->out = kNil;
r->rank[node] = --last_rank;
};
util_graph::DFSAll(src, std::nullopt, leave);
// 添加所有边(同时检测环)。
bool have_cycle = false;
util_graph::PerEdge(src, [&](util_graph::Edge e) {
DCHECK_NE(r->rank[e.src], kMissingNodeRank);
DCHECK_NE(r->rank[e.dst], kMissingNodeRank);
if (r->rank[e.src] >= r->rank[e.dst]) {
have_cycle = true;
} else if (!HasEdge(e.src, e.dst)) {
EdgeListAddNode(r, &r->nodes[e.src].out, e.dst);
EdgeListAddNode(r, &r->nodes[e.dst].in, e.src);
}
});
if (have_cycle) {
return false;
} else {
DCHECK(CheckInvariants());
return true;
}
}
|
graph_partitioner.cc
修改前:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
absl::Status MergeGraph::Init() {
const Graph& graph = *compiler_->graph();
clusters_.resize(graph.NodeLimit());
graph.PerNode([&](Node node) {
graph_->AddNode(node);
NodeList* n = new NodeList;
n->push_back(node);
clusters_[node] = n;
});
absl::Status s;
PerEdge(graph, [&](Edge e) {
if (!s.ok()) return;
if (graph_->HasEdge(e.src, e.dst)) return; // 已添加
if (!graph_->InsertEdge(e.src, e.dst)) {
s = absl::InvalidArgumentError("cycle in the original graph");
}
});
return s;
}
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
absl::Status MergeGraph::Init() {
const Graph& graph = *compiler_->graph();
if (!graph_->InitFrom(graph)) {
return absl::InvalidArgumentError("cycle in the original graph");
}
clusters_.resize(graph.NodeLimit());
graph.PerNode([&](Node node) {
NodeList* n = new NodeList;
n->push_back(node);
clusters_[node] = n;
});
return absl::OkStatus();
}
|
示例:用更好的算法替换互斥锁实现内置的死锁检测系统。
将死锁检测算法替换为一个快约 50 倍的新算法,并且可以轻松扩展到数百万个互斥锁(旧算法依赖 2K 限制来避免性能断崖)。新代码基于论文:A dynamic topological sort algorithm for directed acyclic graphs,作者 David J. Pearce、Paul H. J. Kelly,发表于 Journal of Experimental Algorithmics (JEA),第 11 卷,2006 年,文章编号 1.7。
新算法占用 O(|V|+|E|) 空间(旧算法需要 O(|V|^2) 位)。锁获取顺序图非常稀疏,因此空间占用小得多。这个算法也很简单:核心大约 100 行 C++。由于代码现在可以扩展到更多 Mutex,我们得以放宽人为设置的 2K 限制,并由此发现了一些真实程序中潜伏的死锁。
基准测试结果:这些测试在 DEBUG 模式下运行,因为死锁检测主要在 debug 模式中启用。基准测试参数(如 /2k)表示被跟踪的节点数量。在旧算法默认的 2k 限制下,新算法每次 InsertEdge 只需 0.5 微秒,而旧算法需要 22 微秒。新算法也能轻松扩展到更大的图,而旧算法很快就会撑不住。
1
2
3
4
5
|
DEBUG: Benchmark Time(ns) CPU(ns) Iterations
----------------------------------------------------------
DEBUG: BM_StressTest/2k 23553 23566 29086
DEBUG: BM_StressTest/4k 45879 45909 15287
DEBUG: BM_StressTest/16k 776938 777472 817
|
1
2
3
4
5
|
DEBUG: BM_StressTest/2k 392 393 10485760
DEBUG: BM_StressTest/4k 392 393 10485760
DEBUG: BM_StressTest/32k 407 407 10485760
DEBUG: BM_StressTest/256k 456 456 10485760
DEBUG: BM_StressTest/1M 534 534 10485760
|
示例:用哈希表(O(1) 查找)替换 IntervalMap(O(lg N) 查找)。
初始代码使用 IntervalMap,因为它看起来适合支持相邻块合并。但实际上哈希表就足够了,因为相邻块可以通过哈希表查找找到。这个改动(加上该 CL 中的其他改动)让 tpu::BestFitAllocator 的性能提升约 4 倍。
best_fit_allocator.h
修改前:
1
2
3
4
5
6
7
8
9
10
11
|
using Block = gtl::IntervalMap<int64, BlockState>::Entry;
...
// pair(地址范围,BlockState)的映射,每个分配有一个条目,
// 覆盖范围 [0, allocatable_range_end_)。相邻的 kFree 和
// kReserved 块会被合并。相邻的 kAllocated 块不会
// 被合并。
gtl::IntervalMap<int64, BlockState> block_list_;
// 所有空闲块的集合,按分配策略排序。相邻
// 空闲块会被合并。
std::set<Block, BlockSelector> free_list_;
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
// BlockTable 中 offset 的更快哈希函数。
struct OffsetHash {
ABSL_ATTRIBUTE_ALWAYS_INLINE size_t operator()(int64 value) const {
uint64 m = value;
m *= uint64_t{0x9ddfea08eb382d69};
return static_cast<uint64_t>(m ^ (m >> 32));
}
};
// 哈希表将块起始地址映射到块信息。
// 我们在该信息中包含前一个块的长度,
// 这样就能找到可合并的前序块。
struct HashTableEntry {
BlockState state;
int64 my_length;
int64 prev_length; // 如果没有前一个块,则为 0。
};
using BlockTable = absl::flat_hash_map<int64, HashTableEntry, OffsetHash>;
|
示例:用哈希表查找(O(N))替换有序列表求交(O(N log N))。
旧代码为了检测两个节点是否共享同一个来源,会按排序顺序获取每个节点的来源,然后对两个有序列表求交。新代码把一个节点的来源放进哈希表,再遍历另一个节点的来源并查询哈希表。
1
2
|
name old time/op new time/op delta
BM_CompileLarge 28.5s ± 2% 22.4s ± 2% -21.61% (p=0.008 n=5+5)
|
示例:实现良好的哈希函数,让操作从 O(N) 变为 O(1)。
location.h
修改前:
1
2
3
4
5
6
|
// Location 对象的哈希器。
struct LocationHash {
size_t operator()(const Location* key) const {
return key != nullptr ? util_hash::Hash(key->address()) : 0;
}
};
|
修改后:
1
2
3
4
5
6
7
|
size_t HashLocation(const Location& loc);
...
struct LocationHash {
size_t operator()(const Location* key) const {
return key != nullptr ? HashLocation(*key) : 0;
}
};
|
location.cc
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
size_t HashLocation(const Location& loc) {
util_hash::MurmurCat m;
// 将一些较简单的特征编码到单个值中。
m.AppendAligned((loc.dynamic() ? 1 : 0) //
| (loc.append_shard_to_address() ? 2 : 0) //
| (loc.is_any() ? 4 : 0) //
| (!loc.any_of().empty() ? 8 : 0) //
| (loc.has_shardmap() ? 16 : 0) //
| (loc.has_sharding() ? 32 : 0));
if (loc.has_shardmap()) {
m.AppendAligned(loc.shardmap().output() |
static_cast<uint64_t>(loc.shardmap().stmt()) << 20);
}
if (loc.has_sharding()) {
uint64_t num = 0;
switch (loc.sharding().type_case()) {
case Sharding::kModShard:
num = loc.sharding().mod_shard();
break;
case Sharding::kRangeSplit:
num = loc.sharding().range_split();
break;
case Sharding::kNumShards:
num = loc.sharding().num_shards();
break;
default:
num = 0;
break;
}
m.AppendAligned(static_cast<uint64_t>(loc.sharding().type_case()) |
(num << 3));
}
auto add_string = [&m](absl::string_view s) {
if (!s.empty()) {
m.Append(s.data(), s.size());
}
};
add_string(loc.address());
add_string(loc.lb_policy());
// 我们不包含 any_of,因为计算一个不受顺序和重复影响的
// 哈希值比较复杂。
return m.GetHash();
}
|
更好的内存表示
仔细考虑关键数据结构的内存占用和缓存占用,通常能带来很大的收益。下面这些数据结构侧重于让常见操作触碰更少的缓存行。这里的细致设计可以:(a)避免昂贵的缓存未命中;(b)减少内存总线流量,从而加速当前程序以及同一台机器上运行的其他程序。它们依赖一些常见技巧,你在设计自己的数据结构时也可能会用到。
紧凑的数据结构
对于经常访问的数据,或占据应用大量内存的数据,应使用紧凑表示。紧凑表示可以通过减少触碰的缓存行、降低内存总线带宽占用,显著减少内存使用并提升性能。不过也要注意缓存行竞争。
内存布局
对于内存占用或缓存占用很大的类型,应仔细考虑其内存布局。
- 重新排列字段,减少对齐要求不同的字段之间产生的填充字节(参见类布局讨论)。
- 如果存储的数据可以放进更小的数值类型,就使用更小的数值类型。
- 如果不小心处理,枚举值有时会占用整个机器字。可以考虑使用更小的表示形式(例如使用
enum class OpType : uint8_t { ... },而不是 enum class OpType { ... })。
- 调整字段顺序,让经常一起访问的字段彼此更近,这可以减少常见操作触碰的缓存行数量。
- 将热点只读字段和热点可变字段分开,避免写入可变字段时把只读字段从附近缓存中驱逐出去。
- 移动冷数据,让它不要和热数据挨在一起。可以把冷数据放到结构体末尾、放到一层间接引用之后,或放到单独数组中。
- 考虑用位级或字节级编码把数据压缩到更少字节中。这可能会变复杂,因此只有当相关数据被封装在经过充分测试的模块中,并且总体内存使用减少显著时才这样做。此外,还要注意副作用,例如常用数据对齐不足,或访问压缩表示时代码成本更高。此类改动应使用基准测试验证。
用索引代替指针
在现代 64 位机器上,指针占 64 位。如果数据结构里有大量指针,T* 形式的间接引用很容易吃掉大量内存。可以考虑改用指向数组 T[] 或其他数据结构的整数索引。这样不仅引用本身会更小(如果索引数量足够少,可以放进 32 位或更少位数),所有 T[] 元素也会连续存储,通常能带来更好的缓存局部性。
批量存储
避免使用那种每存一个元素就单独分配一个对象的数据结构(例如 C++ 中的 std::map、std::unordered_map)。可以考虑使用分块或扁平表示的类型,让多个元素在内存中紧密存放(例如 C++ 中的 std::vector、absl::flat_hash_{map,set})。这类类型往往有更好的缓存行为,并且分配器开销也更少。
一种有用的技巧是把元素划分到多个块中,每个块容纳固定数量的元素。这种技巧可以在保持良好渐近行为的同时,显著降低数据结构的缓存占用。
对于某些数据结构,一个块就足以容纳所有元素(例如字符串和 vector)。其他类型(例如 absl::flat_hash_map)也使用了这种技巧。
内联存储
有些容器类型针对存储少量元素做了优化。它们会在对象本身内部为少量元素提供空间,并在元素数量较小时完全避免分配。当这类类型的实例经常被构造(例如作为频繁执行代码中的栈变量),或者同时存活的实例很多时,这会非常有帮助。如果某个容器通常只包含少量元素,可以考虑使用带内联存储的类型,例如 InlinedVector。
注意:如果 sizeof(T) 很大,内联存储容器可能不是最佳选择,因为内联的后备存储本身也会很大。
不必要的嵌套 map
有时,嵌套 map 数据结构可以替换为带复合 key 的单层 map。这可以显著降低查找和插入成本。
示例:将嵌套 btree 转为以复合键为 key 的 btree,以减少分配并改善缓存占用。
graph_splitter.cc
修改前:
1
|
absl::btree_map<std::string, absl::btree_map<std::string, OpDef>> ops;
|
修改后:
1
2
3
4
|
// btree 从 {package_name, op_name} 映射到对应的 const Opdef*。
absl::btree_map<std::pair<absl::string_view, absl::string_view>,
const OpDef*>
ops;
|
注意:如果第一层 map 的 key 很大,继续使用嵌套 map 可能更好:
示例:改用嵌套 map,使微基准性能提升 76%。
之前我们使用单层哈希表,key 由一个字符串路径和其他数字子 key 组成。平均每个路径会出现在约 1000 个 key 中。我们把哈希表拆成两层:第一层以路径为 key,每个第二层哈希表只保存某个路径下的子 key 到数据的映射。这样把路径存储的内存使用降低了 1000 倍,同时也加速了同一路径下许多子 key 被一起访问的场景。
Arena
Arena 可以帮助降低内存分配成本,同时还能把独立分配的对象紧密排列在一起,通常占用更少缓存行,并消除大部分析构成本。对于包含许多子对象的复杂数据结构,它们可能最有效。可以考虑为 arena 提供合适的初始大小,这有助于减少分配。
注意:Arena 很容易被误用,例如把太多短生命周期对象放进长生命周期 arena 中,这可能会不必要地膨胀内存占用。
用数组代替 map
如果 map 的取值域可以用小整数表示,或者本身是枚举;又或者 map 的元素非常少,那么有时可以用数组或某种 vector 替代 map。
示例:用数组代替 flat_map。
rtp_controller.h
修改前:
1
|
const gtl::flat_map<int, int> payload_type_to_clock_frequency_;
|
修改后:
1
2
3
4
5
6
7
|
// 一个按 payload_type 索引到 clock freq 的 map(用简单数组实现),
// 表示该 payload type 的 clock freq(或 0)。
struct PayloadTypeToClockRateMap {
int map[128];
};
...
const PayloadTypeToClockRateMap payload_type_to_clock_frequency_;
|
用位向量代替 set
如果 set 的取值域可以用小整数表示,那么可以用位向量替代 set(InlinedBitVector 通常是不错的选择)。在这种表示上,集合操作也可以通过按位布尔运算高效实现(OR 表示并集,AND 表示交集,等等)。
示例:Spanner placement 系统:用每个 zone 一位的位向量替换 dense_hash_set。
zone_set.h
修改前:
1
2
3
4
5
6
|
class ZoneSet: public dense_hash_set<ZoneId> {
public:
...
bool Contains(ZoneId zone) const {
return count(zone) > 0;
}
|
修改后:
1
2
3
4
5
6
7
8
9
10
|
class ZoneSet {
...
// 当且仅当集合包含 “zone” 时返回 true。
bool ContainsZone(ZoneId zone) const {
return zone < b_.size() && b_.get_bit(zone);
}
...
private:
int size_; // 已插入 zone 的数量。
util::bitmap::InlinedBitVector<256> b_;
|
基准测试结果:
1
2
3
4
5
6
7
8
9
10
11
|
CPU: AMD Opteron (4 cores) dL1:64KB dL2:1024KB
Benchmark Base (ns) New (ns) Improvement
------------------------------------------------------------------
BM_Evaluate/1 960 676 +29.6%
BM_Evaluate/2 1661 1138 +31.5%
BM_Evaluate/3 2305 1640 +28.9%
BM_Evaluate/4 3053 2135 +30.1%
BM_Evaluate/5 3780 2665 +29.5%
BM_Evaluate/10 7819 5739 +26.6%
BM_Evaluate/20 17922 12338 +31.2%
BM_Evaluate/40 36836 26430 +28.2%
|
示例:用位矩阵跟踪操作数之间的可达性属性,而不是使用哈希表。
hlo_computation.h
修改前:
1
2
3
|
using TransitiveOperandMap =
std::unordered_map<const HloInstruction*,
std::unordered_set<const HloInstruction*>>;
|
修改后:
1
2
3
4
5
6
7
|
class HloComputation::ReachabilityMap {
...
// 从 HloInstruction* 到数字的密集 id 分配。
tensorflow::gtl::FlatMap<const HloInstruction*, int> ids_;
// 当且仅当 b 可从 a 到达时,matrix_(a,b) 为 true。
tensorflow::core::Bitmap matrix_;
};
|
减少内存分配
内存分配会带来成本:
- 它会增加分配器中消耗的时间。
- 新分配的对象可能需要昂贵的初始化;当不再需要时,有时还需要对应的昂贵析构。
- 每次分配往往都会落在新的缓存行上,因此分散在许多独立分配中的数据,会比分散在更少分配中的数据拥有更大的缓存占用。
带垃圾回收的运行时有时会通过把连续分配顺序放在内存中,来缓解第 3 个问题。
避免不必要的分配
示例:减少分配使基准测试吞吐量提升 21%。
memory_manager.cc
修改前:
1
2
3
4
5
|
LiveTensor::LiveTensor(tf::Tensor t, std::shared_ptr<const DeviceInfo> dinfo,
bool is_batched)
: tensor(std::move(t)),
device_info(dinfo ? std::move(dinfo) : std::make_shared<DeviceInfo>()),
is_batched(is_batched) {
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
static const std::shared_ptr<DeviceInfo>& empty_device_info() {
static std::shared_ptr<DeviceInfo>* result =
new std::shared_ptr<DeviceInfo>(new DeviceInfo);
return *result;
}
LiveTensor::LiveTensor(tf::Tensor t, std::shared_ptr<const DeviceInfo> dinfo,
bool is_batched)
: tensor(std::move(t)), is_batched(is_batched) {
if (dinfo) {
device_info = std::move(dinfo);
} else {
device_info = empty_device_info();
}
|
示例:尽可能使用静态分配的零向量,而不是分配一个 vector 再填充零。
embedding_executor_8bit.cc
1
2
3
4
5
6
7
8
|
// EmbeddingLookUpT 的实际实现使用模板参数,
// 而不是对象成员,以提升性能。
template <bool Mean, bool SymmetricInputRange>
static tensorflow::Status EmbeddingLookUpT(...) {
...
std::unique_ptr<tensorflow::quint8[]> zero_data(
new tensorflow::quint8[max_embedding_width]);
memset(zero_data.get(), 0, sizeof(tensorflow::quint8) * max_embedding_width);
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
// 足够处理大多数 embedding 宽度的大小。
static const int kTypicalMaxEmbedding = 256;
static tensorflow::quint8 static_zero_data[kTypicalMaxEmbedding]; // 全为零。
...
// EmbeddingLookUpT 的实际实现使用模板参数,
// 而不是对象成员,以提升性能。
template <bool Mean, bool SymmetricInputRange>
static tensorflow::Status EmbeddingLookUpT(...) {
...
std::unique_ptr<tensorflow::quint8[]> zero_data_backing(nullptr);
// 获取指向一段内存区域的指针,该区域至少包含
// “max_embedding_width” 个 quint8 零值。
tensorflow::quint8* zero_data;
if (max_embedding_width <= ARRAYSIZE(static_zero_data)) {
// static_zero_data 足够大,因此不需要分配零数据。
zero_data = &static_zero_data[0];
} else {
// static_zero_data 不够大:需要分配零数据。
zero_data_backing =
absl::make_unique<tensorflow::quint8[]>(max_embedding_width);
memset(zero_data_backing.get(), 0,
sizeof(tensorflow::quint8) * max_embedding_width);
zero_data = zero_data_backing.get();
}
|
此外,当对象生命周期受作用域限制时,应优先使用栈分配而不是堆分配(不过对于大对象,要小心栈帧大小)。
调整容器大小或预留容量
当 vector(或其他容器类型)的最大大小或预期最大大小提前已知时,应预先设置容器的后备存储大小(例如在 C++ 中使用 resize 或 reserve)。
示例:预先设置 vector 大小并填充,而不是执行 N 次 push_back。
indexblockdecoder.cc
修改前:
1
2
3
4
5
6
7
|
for (int i = 0; i < ndocs-1; i++) {
uint32 delta;
ERRORCHECK(b->GetRice(rice_base, &delta));
docs_.push_back(DocId(my_shard_ + (base + delta) * num_shards_));
base = base + delta + 1;
}
docs_.push_back(last_docid_);
|
修改后:
1
2
3
4
5
6
7
8
9
10
|
docs_.resize(ndocs);
DocId* docptr = &docs_[0];
for (int i = 0; i < ndocs-1; i++) {
uint32 delta;
ERRORCHECK(b.GetRice(rice_base, &delta));
*docptr = DocId(my_shard_ + (base + delta) * num_shards_);
docptr++;
base = base + delta + 1;
}
*docptr = last_docid_;
|
注意:不要用 resize 或 reserve 每次只增长一个元素,因为这可能导致二次复杂度行为。另外,如果元素构造成本较高,优先使用一次初始 reserve,然后多次 push_back 或 emplace_back,而不是一开始就 resize,因为后者会让构造调用次数翻倍。
尽可能避免复制
- 在可能时,优先移动数据结构,而不是复制。
- 如果生命周期不是问题,在临时数据结构中存储指针或索引,而不是对象副本。例如,如果本地 map 用于从传入的 proto 列表中选择一组 proto,可以让 map 只保存指向传入 proto 的指针,而不是复制可能深度嵌套的数据。另一个常见例子是排序索引 vector,而不是直接排序大型对象 vector,因为后者会产生大量复制/移动成本。
示例:通过 gRPC 接收 tensor 时避免一次额外复制。
发送约 400KB tensor 的基准测试加速了约 10-15%:
1
2
3
|
Benchmark Time(ns) CPU(ns) Iterations
-----------------------------------------------------
BM_RPC/30/98k_mean 148764691 1369998944 1000
|
1
2
3
|
Benchmark Time(ns) CPU(ns) Iterations
-----------------------------------------------------
BM_RPC/30/98k_mean 131595940 1216998084 1000
|
示例:移动大型 options 结构,而不是复制。
index.cc
修改前:
1
|
return search_iterators::DocPLIteratorFactory::Create(opts);
|
修改后:
1
|
return search_iterators::DocPLIteratorFactory::Create(std::move(opts));
|
示例:使用 std::sort 而不是 std::stable_sort,从而避免稳定排序实现中的内部复制。
encoded-vector-hits.h
修改前:
1
2
|
std::stable_sort(hits_.begin(), hits_.end(),
gtl::OrderByField(&HitWithPayloadOffset::docid));
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
struct HitWithPayloadOffset {
search_iterators::LocalDocId64 docid;
int first_payload_offset; // payload vector 中的 offset。
int num_payloads;
bool operator<(const HitWithPayloadOffset& other) const {
return (docid < other.docid) ||
(docid == other.docid &&
first_payload_offset < other.first_payload_offset);
}
};
...
std::sort(hits_.begin(), hits_.end());
|
复用临时对象
在循环内部声明的容器或对象,会在每次循环迭代时重新创建。这可能导致昂贵的构造、析构和扩容。把声明提升到循环外可以实现复用,并可能带来显著性能提升。(由于语言语义限制,或无法确保程序等价,编译器通常不能自行完成这种提升。)
示例:把变量定义提升到循环迭代之外。
autofdo_profile_utils.h
修改前:
1
2
3
4
5
6
7
8
9
10
|
auto iterator = absl::WrapUnique(sstable->GetIterator());
while (!iterator->done()) {
T profile;
if (!profile.ParseFromString(iterator->value_view())) {
return absl::InternalError(
"Failed to parse mem_block to specified profile type.");
}
...
iterator->Next();
}
|
修改后:
1
2
3
4
5
6
7
8
9
10
|
auto iterator = absl::WrapUnique(sstable->GetIterator());
T profile;
while (!iterator->done()) {
if (!profile.ParseFromString(iterator->value_view())) {
return absl::InternalError(
"Failed to parse mem_block to specified profile type.");
}
...
iterator->Next();
}
|
示例:在循环外定义 protobuf 变量,让其已分配存储可以在多次迭代间复用。
stats-router.cc
修改前:
1
2
3
4
5
6
7
8
9
|
for (auto& r : routers_to_update) {
...
ResourceRecord record;
{
MutexLock agg_lock(r.agg->mutex());
r.agg->AddResourceRecordUsages(measure_indices, &record);
}
...
}
|
修改后:
1
2
3
4
5
6
7
8
9
10
|
ResourceRecord record;
for (auto& r : routers_to_update) {
...
record.Clear();
{
MutexLock agg_lock(r.agg->mutex());
r.agg->AddResourceRecordUsages(measure_indices, &record);
}
...
}
|
示例:反复序列化到同一个 std::string。
program_rep.cc
修改前:
1
2
3
4
5
6
7
8
|
std::string DeterministicSerialization(const proto2::Message& m) {
std::string result;
proto2::io::StringOutputStream sink(&result);
proto2::io::CodedOutputStream out(&sink);
out.SetSerializationDeterministic(true);
m.SerializePartialToCodedStream(&out);
return result;
}
|
修改后:
1
2
3
4
5
6
7
8
9
|
absl::string_view DeterministicSerializationTo(const proto2::Message& m,
std::string* scratch) {
scratch->clear();
proto2::io::StringOutputStream sink(scratch);
proto2::io::CodedOutputStream out(&sink);
out.SetSerializationDeterministic(true);
m.SerializePartialToCodedStream(&out);
return absl::string_view(*scratch);
}
|
注意:protobuf、string、vector、容器等往往会增长到曾经存储过的最大值大小。因此,周期性地重建它们(例如每使用 N 次后)可以帮助降低内存需求和重新初始化成本。
避免不必要的工作
提升性能最有效的一类方法,也许就是避免做不必做的工作。它可以有很多形式:为常见情况创建专门路径以避开更通用也更昂贵的计算、预计算、把工作延迟到真正需要时再做、把工作提升到执行频率更低的代码位置,等等。下面按几个代表性类别列出许多相关示例。
为常见情况提供快速路径
代码通常会写成覆盖所有情况,但其中某些子情况比其他情况简单得多,也常见得多。例如,vector::push_back 通常有足够空间容纳新元素,但它也包含在空间不足时扩容底层存储的代码。适当关注代码结构,可以让常见的简单情况更快,同时又不显著伤害非常见情况的性能。
示例:让快速路径覆盖更多常见情况。
增加对尾部单个 ASCII 字节的处理,而不是只让该例程处理 4 字节倍数的情况。这样可以避免对例如 5 字节的全 ASCII 字符串调用较慢的通用例程。
utf8statetable.cc
修改前:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
// 基于状态表扫描 UTF-8 stringpiece。
// 始终扫描完整 UTF-8 字符。
// 设置已扫描字节数,并返回退出原因。
// 针对 7-bit ASCII 0000..007f 全部有效的情况进行优化。
int UTF8GenericScanFastAscii(const UTF8ScanObj* st, absl::string_view str,
int* bytes_consumed) {
...
int exit_reason;
do {
// 一次跳过 8 个 ASCII 字节;不存在字节序问题。
while ((src_limit - src >= 8) &&
(((UNALIGNED_LOAD32(src + 0) | UNALIGNED_LOAD32(src + 4)) &
0x80808080) == 0)) {
src += 8;
}
// 对剩余部分运行状态表。
int rest_consumed;
exit_reason = UTF8GenericScan(
st, absl::ClippedSubstr(str, src - initial_src), &rest_consumed);
src += rest_consumed;
} while (exit_reason == kExitDoAgain);
*bytes_consumed = src - initial_src;
return exit_reason;
}
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
// 基于状态表扫描 UTF-8 stringpiece。
// 始终扫描完整 UTF-8 字符。
// 设置已扫描字节数,并返回退出原因。
// 针对 7-bit ASCII 0000..007f 全部有效的情况进行优化。
int UTF8GenericScanFastAscii(const UTF8ScanObj* st, absl::string_view str,
int* bytes_consumed) {
...
int exit_reason = kExitOK;
do {
// 一次跳过 8 个 ASCII 字节;不存在字节序问题。
while ((src_limit - src >= 8) &&
(((UNALIGNED_LOAD32(src + 0) | UNALIGNED_LOAD32(src + 4)) &
0x80808080) == 0)) {
src += 8;
}
while (src < src_limit && Is7BitAscii(*src)) { // 跳过 ASCII 字节。
src++;
}
if (src < src_limit) {
// 对剩余部分运行状态表。
int rest_consumed;
exit_reason = UTF8GenericScan(
st, absl::ClippedSubstr(str, src - initial_src), &rest_consumed);
src += rest_consumed;
}
} while (exit_reason == kExitDoAgain);
*bytes_consumed = src - initial_src;
return exit_reason;
}
|
示例:为 InlinedVector 提供更简单的快速路径。
inlined_vector.h
修改前:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
auto Storage<T, N, A>::Resize(ValueAdapter values, size_type new_size) -> void {
StorageView storage_view = MakeStorageView();
IteratorValueAdapter<MoveIterator> move_values(
MoveIterator(storage_view.data));
AllocationTransaction allocation_tx(GetAllocPtr());
ConstructionTransaction construction_tx(GetAllocPtr());
absl::Span<value_type> construct_loop;
absl::Span<value_type> move_construct_loop;
absl::Span<value_type> destroy_loop;
if (new_size > storage_view.capacity) {
...
} else if (new_size > storage_view.size) {
construct_loop = {storage_view.data + storage_view.size,
new_size - storage_view.size};
} else {
destroy_loop = {storage_view.data + new_size, storage_view.size - new_size};
}
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
auto Storage<T, N, A>::Resize(ValueAdapter values, size_type new_size) -> void {
StorageView storage_view = MakeStorageView();
auto* const base = storage_view.data;
const size_type size = storage_view.size;
auto* alloc = GetAllocPtr();
if (new_size <= size) {
// 销毁多余的旧元素。
inlined_vector_internal::DestroyElements(alloc, base + new_size,
size - new_size);
} else if (new_size <= storage_view.capacity) {
// 原地构造新元素。
inlined_vector_internal::ConstructElements(alloc, base + size, &values,
new_size - size);
} else {
...
}
|
示例:为初始化 1-D 到 4-D tensor 的常见情况提供快速路径。
tensor_shape.cc
修改前:
1
2
3
4
5
6
7
8
9
10
|
template <class Shape>
TensorShapeBase<Shape>::TensorShapeBase(gtl::ArraySlice<int64> dim_sizes) {
set_tag(REP16);
set_data_type(DT_INVALID);
set_ndims_byte(0);
set_num_elements(1);
for (int64 s : dim_sizes) {
AddDim(internal::SubtleMustCopy(s));
}
}
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
template <class Shape>
void TensorShapeBase<Shape>::InitDims(gtl::ArraySlice<int64> dim_sizes) {
DCHECK_EQ(tag(), REP16);
// 允许小于 kint64max^0.25 的大小,这样下面的四路乘法
// 不会溢出。
static const uint64 kMaxSmall = 0xd744;
static_assert(kMaxSmall * kMaxSmall * kMaxSmall * kMaxSmall <= kint64max,
"bad overflow check");
bool large_size = false;
for (auto s : dim_sizes) {
if (s > kMaxSmall) {
large_size = true;
break;
}
}
if (!large_size) {
// 每个 size 都适合 16 位;对 dims 属于 {1,2,3,4} 的情况使用快速路径。
uint16* dst = as16()->dims_;
switch (dim_sizes.size()) {
case 1: {
set_ndims_byte(1);
const int64 size = dim_sizes[0];
const bool neg = Set16(kIsPartial, dst, 0, size);
set_num_elements(neg ? -1 : size);
return;
}
case 2: {
set_ndims_byte(2);
const int64 size0 = dim_sizes[0];
const int64 size1 = dim_sizes[1];
bool neg = Set16(kIsPartial, dst, 0, size0);
neg |= Set16(kIsPartial, dst, 1, size1);
set_num_elements(neg ? -1 : (size0 * size1));
return;
}
case 3: {
...
}
case 4: {
...
}
}
}
set_ndims_byte(0);
set_num_elements(1);
for (int64 s : dim_sizes) {
AddDim(internal::SubtleMustCopy(s));
}
}
|
示例:让 varint 解析器快速路径只覆盖 1 字节情况,而不是同时覆盖 1 字节和 2 字节情况。
减小(内联)快速路径的大小,可以减少代码体积和指令缓存压力,从而提升性能。
parse_context.h
修改前:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
template <typename T>
PROTOBUF_NODISCARD const char* VarintParse(const char* p, T* out) {
auto ptr = reinterpret_cast<const uint8_t*>(p);
uint32_t res = ptr[0];
if (!(res & 0x80)) {
*out = res;
return p + 1;
}
uint32_t byte = ptr[1];
res += (byte - 1) << 7;
if (!(byte & 0x80)) {
*out = res;
return p + 2;
}
return VarintParseSlow(p, res, out);
}
|
修改后:
1
2
3
4
5
6
7
8
9
10
|
template <typename T>
PROTOBUF_NODISCARD const char* VarintParse(const char* p, T* out) {
auto ptr = reinterpret_cast<const uint8_t*>(p);
uint32_t res = ptr[0];
if (!(res & 0x80)) {
*out = res;
return p + 1;
}
return VarintParseSlow(p, res, out);
}
|
parse_context.cc
修改前:
1
2
3
4
5
6
7
8
9
10
11
12
|
std::pair<const char*, uint32_t> VarintParseSlow32(const char* p,
uint32_t res) {
for (std::uint32_t i = 2; i < 5; i++) {
...
}
...
std::pair<const char*, uint64_t> VarintParseSlow64(const char* p,
uint32_t res32) {
uint64_t res = res32;
for (std::uint32_t i = 2; i < 10; i++) {
...
}
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
|
std::pair<const char*, uint32_t> VarintParseSlow32(const char* p,
uint32_t res) {
for (std::uint32_t i = 1; i < 5; i++) {
...
}
...
std::pair<const char*, uint64_t> VarintParseSlow64(const char* p,
uint32_t res32) {
uint64_t res = res32;
for (std::uint32_t i = 1; i < 10; i++) {
...
}
|
示例:如果没有发生错误,在 RPC_Stats_Measurement 加法中跳过大量工作。
rpc-stats.h
修改前:
1
2
3
|
struct RPC_Stats_Measurement {
...
double errors[RPC::NUM_ERRORS];
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
struct RPC_Stats_Measurement {
...
double get_errors(int index) const { return errors[index]; }
void set_errors(int index, double value) {
errors[index] = value;
any_errors_set = true;
}
private:
...
// 将其设为 private,以便跟踪这些值中是否有任何一个
// 被设置为非零值。
double errors[RPC::NUM_ERRORS];
bool any_errors_set; // 当且仅当任意 errors[i] 值非零时为 true。
|
rpc-stats.cc
修改前:
1
2
3
4
5
6
|
void RPC_Stats_Measurement::operator+=(const RPC_Stats_Measurement& x) {
...
for (int i = 0; i < RPC::NUM_ERRORS; ++i) {
errors[i] += x.errors[i];
}
}
|
修改后:
1
2
3
4
5
6
7
8
9
|
void RPC_Stats_Measurement::operator+=(const RPC_Stats_Measurement& x) {
...
if (x.any_errors_set) {
for (int i = 0; i < RPC::NUM_ERRORS; ++i) {
errors[i] += x.errors[i];
}
any_errors_set = true;
}
}
|
示例:根据字符串首字节做数组查找,通常可以避免对整个字符串计算 fingerprint。
soft-tokens-helper.cc
修改前:
1
2
3
4
|
bool SoftTokensHelper::IsSoftToken(const StringPiece& token) const {
return soft_tokens_.find(Fingerprint(token.data(), token.size())) !=
soft_tokens_.end();
}
|
soft-tokens-helper.h
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
class SoftTokensHelper {
...
private:
...
// 由于 soft token 大多与标点相关,出于性能考虑,
// 我们维护数组 filter_。当且仅当任何 soft token
// 以字节值 “i” 开头时,filter_[i] 为 true。这样可避免
// 在常见情况下对 term 计算 fingerprint,因为只需基于首字节做数组
// 查找;如果 filter_[b] 为 false,
// 就可以立即返回 false。
bool filter_[256];
...
};
inline bool SoftTokensHelper::IsSoftToken(const StringPiece& token) const {
if (token.size() >= 1) {
char first_char = token.data()[0];
if (!filter_[first_char]) {
return false;
}
}
return IsSoftTokenFallback(token);
}
|
soft-tokens-helper.cc
修改后:
1
2
3
4
|
bool SoftTokensHelper::IsSoftTokenFallback(const StringPiece& token) const {
return soft_tokens_.find(Fingerprint(token.data(), token.size())) !=
soft_tokens_.end();
}
|
一次性预计算昂贵信息
示例:预计算 TensorFlow 图执行节点属性,以便快速排除某些不常见情况。
executor.cc
修改前:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
struct NodeItem {
...
bool kernel_is_expensive = false; // 当且仅当 kernel->IsExpensive() 时为 true。
bool kernel_is_async = false; // 当且仅当 kernel->AsAsync() != nullptr 时为 true。
bool is_merge = false; // 当且仅当 IsMerge(node) 时为 true。
...
if (IsEnter(node)) {
...
} else if (IsExit(node)) {
...
} else if (IsNextIteration(node)) {
...
} else {
// 大多数节点的普通路径。
...
}
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
struct NodeItem {
...
bool kernel_is_expensive : 1; // 当且仅当 kernel->IsExpensive() 时为 true。
bool kernel_is_async : 1; // 当且仅当 kernel->AsAsync() != nullptr 时为 true。
bool is_merge : 1; // 当且仅当 IsMerge(node) 时为 true。
bool is_enter : 1; // 当且仅当 IsEnter(node) 时为 true。
bool is_exit : 1; // 当且仅当 IsExit(node) 时为 true。
bool is_control_trigger : 1; // 当且仅当 IsControlTrigger(node) 时为 true。
bool is_sink : 1; // 当且仅当 IsSink(node) 时为 true。
// 当且仅当 IsEnter(node) || IsExit(node) || IsNextIteration(node) 时为 true。
bool is_enter_exit_or_next_iter : 1;
...
if (!item->is_enter_exit_or_next_iter) {
// 不需要特殊处理的节点类型的快速路径。
DCHECK_EQ(input_frame, output_frame);
...
} else if (item->is_enter) {
...
} else if (item->is_exit) {
...
} else {
DCHECK(IsNextIteration(node));
...
}
|
示例:预计算 256 元素数组,并在 trigram 初始化时使用。
byte_trigram_classifier.cc
修改前:
1
2
3
4
5
6
7
8
9
10
11
12
|
void ByteTrigramClassifier::VerifyModel(void) const {
ProbT class_sums[num_classes_];
for (int cls = 0; cls < num_classes_; cls++) {
class_sums[cls] = 0;
}
for (ByteNgramId id = 0; id < trigrams_.num_trigrams(); id++) {
for (int cls = 0; cls < num_classes_; ++cls) {
class_sums[cls] += Prob(trigram_probs_[id].log_probs[cls]);
}
}
...
}
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
void ByteTrigramClassifier::VerifyModel(void) const {
CHECK_EQ(sizeof(ByteLogProbT), 1);
ProbT fast_prob[256];
for (int b = 0; b < 256; b++) {
fast_prob[b] = Prob(static_cast<ByteLogProbT>(b));
}
ProbT class_sums[num_classes_];
for (int cls = 0; cls < num_classes_; cls++) {
class_sums[cls] = 0;
}
for (ByteNgramId id = 0; id < trigrams_.num_trigrams(); id++) {
for (int cls = 0; cls < num_classes_; ++cls) {
class_sums[cls] += fast_prob[trigram_probs_[id].log_probs[cls]];
}
}
...
}
|
一般建议:在模块边界检查畸形输入,而不是在内部反复检查。
将昂贵计算移到循环外
示例:将边界计算移到循环外。
literal_linearizer.cc
修改前:
1
2
|
for (int64 i = 0; i < src_shape.dimensions(dimension_numbers.front());
++i) {
|
修改后:
1
2
3
4
|
int64 dim_front = src_shape.dimensions(dimension_numbers.front());
const uint8* src_buffer_data = src_buffer.data();
uint8* dst_buffer_data = dst_buffer.data();
for (int64 i = 0; i < dim_front; ++i) {
|
延迟昂贵计算
示例:将 GetSubSharding 调用延迟到需要时再执行,把 CPU 时间从 43 秒减少到 2 秒。
sharding_propagation.cc
修改前:
1
2
3
4
5
6
7
|
HloSharding alternative_sub_sharding =
user.sharding().GetSubSharding(user.shape(), {i});
if (user.operand(i) == &instruction &&
hlo_sharding_util::IsShardingMoreSpecific(alternative_sub_sharding,
sub_sharding)) {
sub_sharding = alternative_sub_sharding;
}
|
修改后:
1
2
3
4
5
6
7
8
9
10
|
if (user.operand(i) == &instruction) {
// 只有当该 operand 值得关注时才计算 GetSubSharding,
// 因为它相对昂贵。
HloSharding alternative_sub_sharding =
user.sharding().GetSubSharding(user.shape(), {i});
if (hlo_sharding_util::IsShardingMoreSpecific(
alternative_sub_sharding, sub_sharding)) {
sub_sharding = alternative_sub_sharding;
}
}
|
示例:不要急切更新统计信息;按需计算。
不要在非常频繁的分配/释放调用中更新统计信息。相反,在调用频率低得多的 Stats() 方法被调用时,按需计算统计信息。
示例:在 Google Web 服务器的查询处理中预分配 10 个节点,而不是 200 个。
一个简单改动,让 Web 服务器 CPU 使用量降低了 7.5%。
querytree.h
修改前:
1
|
static const int kInitParseTreeSize = 200; // querynode 池的初始大小。
|
修改后:
1
|
static const int kInitParseTreeSize = 10; // querynode 池的初始大小。
|
示例:改变搜索顺序,使吞吐量提升 19%。
一个旧搜索系统(约 2000 年)有两层索引:一层包含全文索引,另一层只包含标题和锚文本词项的索引。过去我们会先搜索较小的标题/锚文本层。反直觉的是,我们发现先搜索较大的全文索引层成本更低,因为如果我们到达全文层末尾,就可以完全跳过标题/锚文本层(它是全文层的子集)。这种情况相当常见,因此我们降低了处理一次查询所需的平均磁盘寻道次数。
背景信息可参见 The Anatomy of a Large-Scale Hypertextual Web Search Engine 中关于标题和锚文本处理的讨论。
特化代码
某个对性能敏感的调用点,可能并不需要通用库提供的完整通用性。在这种情况下,如果专用代码能够提升性能,可以考虑编写专用代码,而不是调用通用代码。
示例:为 Histogram 类编写定制打印代码,比 sprintf 快 4 倍。
这段代码对性能敏感,因为监控系统从各类服务器收集统计信息时会调用它。
histogram_export.cc
修改前:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
void Histogram::PopulateBuckets(const string &prefix,
expvar::MapProto *const var) const {
...
for (int i = min_bucket; i <= max_bucket; ++i) {
const double count = BucketCount(i);
if (!export_empty_buckets && count == 0.0) continue;
acc += count;
// 离散直方图导出 bucket 的标签格式
// 指定包含式上界,这与原始 Histogram 实现
// 相同。该格式不适用于非离散直方图,
// 因此它们使用半开区间,
// 并用 “_” 而不是 “-” 作为分隔符,
// 以便区分这些格式。
string key =
options_.export_cumulative_counts() ?
StringPrintf("%.12g", boundaries_->BucketLimit(i)) :
options_.discrete() ?
StringPrintf("%.0f-%.0f",
ceil(boundaries_->BucketStart(i)),
ceil(boundaries_->BucketLimit(i)) - 1.0) :
StringPrintf("%.12g_%.12g",
boundaries_->BucketStart(i),
boundaries_->BucketLimit(i));
EscapeMapKey(&key);
const double value = options_.export_cumulative_counts() ? acc : count;
expvar::AddMapFloat(StrCat(prefix,
options_.export_bucket_key_prefix(),
key),
value * count_mult,
var);
}
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
// 按 format 格式化 “val”。如果 “need_escape” 为 true,
// 则 format 可能产生包含 “.” 的输出,结果将被转义。
// 如果 “need_escape” 为 false,则调用方保证 format
// 生成的数字不会包含任何 “.” 字符,
// 因此可以避免调用 EscapeKey。
// 如有必要,该函数可以自由使用 “*scratch” 作为临时空间,
// 生成的 StringPiece 也可能指向 “*scratch” 内部。
static StringPiece FormatNumber(const char* format,
bool need_escape,
double val, string* scratch) {
// 该例程被特化为只处理有限数量的格式。
DCHECK(StringPiece(format) == "%.0f" || StringPiece(format) == "%.12g");
scratch->clear();
if (val == trunc(val) && val >= kint32min && val <= kint32max) {
// 可直接使用 StrAppend 的整数。
StrAppend(scratch, static_cast<int32>(val));
return StringPiece(*scratch);
} else if (isinf(val)) {
// 无穷大,直接表示为 “inf”。
return StringPiece("inf", 3);
} else {
// 根据 “format” 格式化,并在需要时转义。
StringAppendF(scratch, format, val);
if (need_escape) {
EscapeMapKey(scratch);
} else {
DCHECK(!StringPiece(*scratch).contains("."));
}
return StringPiece(*scratch);
}
}
...
void Histogram::PopulateBuckets(const string &prefix,
expvar::MapProto *const var) const {
...
const string full_key_prefix = StrCat(prefix,
options_.export_bucket_key_prefix());
string key = full_key_prefix; // key 将以 “full_key_prefix” 开头。
string start_scratch;
string limit_scratch;
const bool cumul_counts = options_.export_cumulative_counts();
const bool discrete = options_.discrete();
for (int i = min_bucket; i <= max_bucket; ++i) {
const double count = BucketCount(i);
if (!export_empty_buckets && count == 0.0) continue;
acc += count;
// 离散直方图导出 bucket 的标签格式
// 指定包含式上界,这与原始 Histogram 实现
// 相同。该格式不适用于非离散直方图,
// 因此它们使用半开区间,
// 并用 “_” 而不是 “-” 作为分隔符,
// 以便区分这些格式。
key.resize(full_key_prefix.size()); // 从 full_key_prefix 开始。
DCHECK_EQ(key, full_key_prefix);
const double limit = boundaries_->BucketLimit(i);
if (cumul_counts) {
StrAppend(&key, FormatNumber("%.12g", true, limit, &limit_scratch));
} else {
const double start = boundaries_->BucketStart(i);
if (discrete) {
StrAppend(&key,
FormatNumber("%.0f", false, ceil(start), &start_scratch),
"-",
FormatNumber("%.0f", false, ceil(limit) - 1.0,
&limit_scratch));
} else {
StrAppend(&key,
FormatNumber("%.12g", true, start, &start_scratch),
"_",
FormatNumber("%.12g", true, limit, &limit_scratch));
}
}
const double value = cumul_counts ? acc : count;
// 添加到 map 变量。
expvar::AddMapFloat(key, value * count_mult, var);
}
}
|
示例:为 VLOG(1)、VLOG(2) 等添加特化,以提升速度并减小代码体积。
VLOG 是整个代码库中大量使用的宏。这个改动避免了在几乎每个调用点传递额外的整数常量(如果日志级别在调用点是常量,几乎总是如此,例如 VLOG(1) << ...),从而节省代码空间。
vlog_is_on.h
修改前:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
class VLogSite final {
public:
...
bool IsEnabled(int level) {
int stale_v = v_.load(std::memory_order_relaxed);
if (ABSL_PREDICT_TRUE(level > stale_v)) {
return false;
}
// 我们把快速路径之外的所有内容,即 vlogging 已初始化
// 但未启用的情况,放到非内联函数后面以减少代码大小。
return SlowIsEnabled(stale_v, level);
}
...
private:
...
ABSL_ATTRIBUTE_NOINLINE
bool SlowIsEnabled(int stale_v, int level);
...
};
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
class VLogSite final {
public:
...
bool IsEnabled(int level) {
int stale_v = v_.load(std::memory_order_relaxed);
if (ABSL_PREDICT_TRUE(level > stale_v)) {
return false;
}
// 我们把快速路径之外的所有内容,即 vlogging 已初始化
// 但未启用的情况,放到非内联函数后面以减少代码大小。
// “level” 几乎总是调用点常量,因此通过对 1、2、3 级
// 做特殊处理,可以节省一些代码空间。
#if defined(__has_builtin) && __has_builtin(__builtin_constant_p)
if (__builtin_constant_p(level)) {
if (level == 0) return SlowIsEnabled0(stale_v);
if (level == 1) return SlowIsEnabled1(stale_v);
if (level == 2) return SlowIsEnabled2(stale_v);
if (level == 3) return SlowIsEnabled3(stale_v);
if (level == 4) return SlowIsEnabled4(stale_v);
if (level == 5) return SlowIsEnabled5(stale_v);
}
#endif
return SlowIsEnabled(stale_v, level);
...
private:
...
ABSL_ATTRIBUTE_NOINLINE
bool SlowIsEnabled(int stale_v, int level);
ABSL_ATTRIBUTE_NOINLINE bool SlowIsEnabled0(int stale_v);
ABSL_ATTRIBUTE_NOINLINE bool SlowIsEnabled1(int stale_v);
ABSL_ATTRIBUTE_NOINLINE bool SlowIsEnabled2(int stale_v);
ABSL_ATTRIBUTE_NOINLINE bool SlowIsEnabled3(int stale_v);
ABSL_ATTRIBUTE_NOINLINE bool SlowIsEnabled4(int stale_v);
ABSL_ATTRIBUTE_NOINLINE bool SlowIsEnabled5(int stale_v);
...
};
|
vlog_is_on.cc
修改后:
1
2
3
4
5
6
|
bool VLogSite::SlowIsEnabled0(int stale_v) { return SlowIsEnabled(stale_v, 0); }
bool VLogSite::SlowIsEnabled1(int stale_v) { return SlowIsEnabled(stale_v, 1); }
bool VLogSite::SlowIsEnabled2(int stale_v) { return SlowIsEnabled(stale_v, 2); }
bool VLogSite::SlowIsEnabled3(int stale_v) { return SlowIsEnabled(stale_v, 3); }
bool VLogSite::SlowIsEnabled4(int stale_v) { return SlowIsEnabled(stale_v, 4); }
bool VLogSite::SlowIsEnabled5(int stale_v) { return SlowIsEnabled(stale_v, 5); }
|
示例:在可能时用简单前缀匹配替换 RE2 调用。
read_matcher.cc
修改前:
1
2
3
4
5
6
|
enum MatchItemType {
MATCH_TYPE_INVALID,
MATCH_TYPE_RANGE,
MATCH_TYPE_EXACT,
MATCH_TYPE_REGEXP,
};
|
修改后:
1
2
3
4
5
6
7
|
enum MatchItemType {
MATCH_TYPE_INVALID,
MATCH_TYPE_RANGE,
MATCH_TYPE_EXACT,
MATCH_TYPE_REGEXP,
MATCH_TYPE_PREFIX, // regexp “.*” 的特殊类型。
};
|
read_matcher.cc
修改前:
1
|
p->type = MATCH_TYPE_REGEXP;
|
修改后:
1
2
3
4
5
6
7
|
term.NonMetaPrefix().CopyToString(&p->prefix);
if (term.RegexpSuffix() == ".*") {
// 对匹配任意内容的 regexp 做特殊处理,
// 这样可以绕过 RE2::FullMatch。
p->type = MATCH_TYPE_PREFIX;
} else {
p->type = MATCH_TYPE_REGEXP;
|
示例:使用 StrCat 而不是 StringPrintf 格式化 IP 地址。
ipaddress.cc
修改前:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
string IPAddress::ToString() const {
char buf[INET6_ADDRSTRLEN];
switch (address_family_) {
case AF_INET:
CHECK(inet_ntop(AF_INET, &addr_.addr4, buf, INET6_ADDRSTRLEN) != NULL);
return buf;
case AF_INET6:
CHECK(inet_ntop(AF_INET6, &addr_.addr6, buf, INET6_ADDRSTRLEN) != NULL);
return buf;
case AF_UNSPEC:
LOG(DFATAL) << "Calling ToString() on an empty IPAddress";
return "";
default:
LOG(FATAL) << "Unknown address family " << address_family_;
}
}
...
string IPAddressToURIString(const IPAddress& ip) {
switch (ip.address_family()) {
case AF_INET6:
return StringPrintf("[%s]", ip.ToString().c_str());
default:
return ip.ToString();
}
}
...
string SocketAddress::ToString() const {
return IPAddressToURIString(host_) + StringPrintf(":%u", port_);
}
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
string IPAddress::ToString() const {
char buf[INET6_ADDRSTRLEN];
switch (address_family_) {
case AF_INET: {
uint32 addr = gntohl(addr_.addr4.s_addr);
int a1 = static_cast<int>((addr >> 24) & 0xff);
int a2 = static_cast<int>((addr >> 16) & 0xff);
int a3 = static_cast<int>((addr >> 8) & 0xff);
int a4 = static_cast<int>(addr & 0xff);
return StrCat(a1, ".", a2, ".", a3, ".", a4);
}
case AF_INET6:
CHECK(inet_ntop(AF_INET6, &addr_.addr6, buf, INET6_ADDRSTRLEN) != NULL);
return buf;
case AF_UNSPEC:
LOG(DFATAL) << "Calling ToString() on an empty IPAddress";
return "";
default:
LOG(FATAL) << "Unknown address family " << address_family_;
}
}
...
string IPAddressToURIString(const IPAddress& ip) {
switch (ip.address_family()) {
case AF_INET6:
return StrCat("[", ip.ToString(), "]");
default:
return ip.ToString();
}
}
...
string SocketAddress::ToString() const {
return StrCat(IPAddressToURIString(host_), ":", port_);
}
|
使用缓存避免重复工作
示例:基于大型序列化 proto 的预计算 fingerprint 进行缓存。
dp_ops.cc
修改前:
1
2
3
4
5
|
InputOutputMappingProto mapping_proto;
PLAQUE_OP_REQUIRES(
mapping_proto.ParseFromStringPiece(GetAttrMappingProto(state)),
absl::InternalError("Failed to parse InputOutputMappingProto"));
ParseMapping(mapping_proto);
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
uint64 mapping_proto_fp = GetAttrMappingProtoFp(state);
{
absl::MutexLock l(&fp_to_iometa_mu);
if (fp_to_iometa == nullptr) {
fp_to_iometa =
new absl::flat_hash_map<uint64, std::unique_ptr<ProgramIOMetadata>>;
}
auto it = fp_to_iometa->find(mapping_proto_fp);
if (it != fp_to_iometa->end()) {
io_metadata_ = it->second.get();
} else {
auto serial_proto = GetAttrMappingProto(state);
DCHECK_EQ(mapping_proto_fp, Fingerprint(serial_proto));
InputOutputMappingProto mapping_proto;
PLAQUE_OP_REQUIRES(
mapping_proto.ParseFromStringPiece(GetAttrMappingProto(state)),
absl::InternalError("Failed to parse InputOutputMappingProto"));
auto io_meta = ParseMapping(mapping_proto);
io_metadata_ = io_meta.get();
(*fp_to_iometa)[mapping_proto_fp] = std::move(io_meta);
}
}
|
让编译器更容易优化
编译器可能很难穿透多层抽象进行优化,因为它必须对代码整体行为做保守假设,也可能无法做出正确的速度与体积权衡。应用程序员通常更了解系统行为,可以通过把代码改写到更低层的操作方式来帮助编译器。不过,只有在性能剖析显示确实存在问题时才这样做,因为编译器通常自己就能做对。查看性能关键例程生成的汇编代码,可以帮助你理解编译器是否“做对了”。Pprof 提供了非常有用的[源码与反汇编交错显示][annotated source],并带有性能数据标注。一些可能有用的技巧:
-
避免在热点函数中调用其他函数(让编译器可以避免帧设置成本)。
-
将慢路径代码移到单独的尾调用函数中。
-
在大量使用少量数据前,先把它复制到局部变量中。
这可以让编译器假设它不会与其他数据发生别名,从而可能改善自动向量化和寄存器分配。
-
手动展开非常热的循环。
示例:用指向底层数组的裸指针替换 absl::Span,加速 ShapeUtil::ForEachState。
shape_util.h
修改前:
1
2
3
4
5
6
7
8
9
|
struct ForEachState {
ForEachState(const Shape& s, absl::Span<const int64_t> b,
absl::Span<const int64_t> c, absl::Span<const int64_t> i);
~ForEachState();
const Shape& shape;
const absl::Span<const int64_t> base;
const absl::Span<const int64_t> count;
const absl::Span<const int64_t> incr;
|
修改后:
1
2
3
4
5
6
7
8
9
10
|
struct ForEachState {
ForEachState(const Shape& s, absl::Span<const int64_t> b,
absl::Span<const int64_t> c, absl::Span<const int64_t> i);
inline ~ForEachState() = default;
const Shape& shape;
// 指向传入 span 所表示数组的指针。
const int64_t* const base;
const int64_t* const count;
const int64_t* const incr;
|
示例:手动展开循环冗余校验(CRC)计算循环。
crc.cc
修改前:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
void CRC32::Extend(uint64 *lo, uint64 *hi, const void *bytes, size_t length)
const {
...
// 每次处理 4 字节。
while ((p + 4) <= e) {
uint32 c = l ^ WORD(p);
p += 4;
l = this->table3_[c & 0xff] ^
this->table2_[(c >> 8) & 0xff] ^
this->table1_[(c >> 16) & 0xff] ^
this->table0_[c >> 24];
}
// 处理最后几个字节。
while (p != e) {
int c = (l & 0xff) ^ *p++;
l = this->table0_[c] ^ (l >> 8);
}
*lo = l;
}
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
void CRC32::Extend(uint64 *lo, uint64 *hi, const void *bytes, size_t length)
const {
...
#define STEP { \
uint32 c = l ^ WORD(p); \
p += 4; \
l = this->table3_[c & 0xff] ^ \
this->table2_[(c >> 8) & 0xff] ^ \
this->table1_[(c >> 16) & 0xff] ^ \
this->table0_[c >> 24]; \
}
// 每次处理 16 字节。
while ((e-p) >= 16) {
STEP;
STEP;
STEP;
STEP;
}
// 每次处理 4 字节。
while ((p + 4) <= e) {
STEP;
}
#undef STEP
// 处理最后几个字节。
while (p != e) {
int c = (l & 0xff) ^ *p++;
l = this->table0_[c] ^ (l >> 8);
}
*lo = l;
}
|
示例:解析 Spanner key 时一次处理四个字符。
key.cc
修改前:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
void Key::InitSeps(const char* start) {
const char* base = &rep_[0];
const char* limit = base + rep_.size();
const char* s = start;
DCHECK_GE(s, base);
DCHECK_LT(s, limit);
for (int i = 0; i < 3; i++) {
s = (const char*)memchr(s, '#', limit - s);
DCHECK(s != NULL);
seps_[i] = s - base;
s++;
}
}
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
inline const char* ScanBackwardsForSep(const char* base, const char* p) {
while (p >= base + 4) {
if (p[0] == '#') return p;
if (p[-1] == '#') return p-1;
if (p[-2] == '#') return p-2;
if (p[-3] == '#') return p-3;
p -= 4;
}
while (p >= base && *p != '#') p--;
return p;
}
void Key::InitSeps(const char* start) {
const char* base = &rep_[0];
const char* limit = base + rep_.size();
const char* s = start;
DCHECK_GE(s, base);
DCHECK_LT(s, limit);
// 我们从字符串末尾向后走,而不是向前走,
// 因为目录名可能很长,且肯定不包含
// 任何 “#” 字符。
const char* p = ScanBackwardsForSep(s, limit - 1);
DCHECK(*p == '#');
seps_[2] = p - base;
p--;
p = ScanBackwardsForSep(s, p);
DCHECK(*p == '#');
seps_[1] = p - base;
p--;
p = ScanBackwardsForSep(s, p);
DCHECK(*p == '#');
seps_[0] = p - base;
}
|
示例:将 ABSL_LOG(FATAL) 改为 ABSL_DCHECK(false),避免帧设置成本。
arena_cleanup.h
修改前:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
inline ABSL_ATTRIBUTE_ALWAYS_INLINE size_t Size(Tag tag) {
if (!EnableSpecializedTags()) return sizeof(DynamicNode);
switch (tag) {
case Tag::kDynamic:
return sizeof(DynamicNode);
case Tag::kString:
return sizeof(TaggedNode);
case Tag::kCord:
return sizeof(TaggedNode);
default:
ABSL_LOG(FATAL) << "Corrupted cleanup tag: " << static_cast<int>(tag);
return sizeof(DynamicNode);
}
}
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
inline ABSL_ATTRIBUTE_ALWAYS_INLINE size_t Size(Tag tag) {
if (!EnableSpecializedTags()) return sizeof(DynamicNode);
switch (tag) {
case Tag::kDynamic:
return sizeof(DynamicNode);
case Tag::kString:
return sizeof(TaggedNode);
case Tag::kCord:
return sizeof(TaggedNode);
default:
ABSL_DCHECK(false) << "Corrupted cleanup tag: " << static_cast<int>(tag);
return sizeof(DynamicNode);
}
}
|
降低统计信息收集成本
需要在系统统计信息及其他行为信息的价值,与维护这些信息的成本之间做权衡。额外信息通常能帮助人们理解和改进高层行为,但维护它们也可能很昂贵。
没有用处的统计信息可以直接删除。
示例:停止维护 SelectServer 中 alarm 和 closure 数量这类昂贵统计。
这是把设置 alarm 的时间从 771 ns 降到 271 ns 的改动之一。
selectserver.h
修改前:
1
2
3
4
5
6
7
8
9
10
|
class SelectServer {
public:
...
protected:
...
scoped_ptr<MinuteTenMinuteHourStat> num_alarms_stat_;
...
scoped_ptr<MinuteTenMinuteHourStat> num_closures_stat_;
...
};
|
修改后:
1
2
3
4
5
6
|
// Selectserver 类。
class SelectServer {
...
protected:
...
};
|
/selectserver.cc
修改前:
1
2
3
4
5
6
7
8
9
|
void SelectServer::AddAlarmInternal(Alarmer* alarmer,
int offset_in_ms,
int id,
bool is_periodic) {
...
alarms_->insert(alarm);
num_alarms_stat_->IncBy(1);
...
}
|
修改后:
1
2
3
4
5
6
7
8
|
void SelectServer::AddAlarmInternal(Alarmer* alarmer,
int offset_in_ms,
int id,
bool is_periodic) {
...
alarms_->Add(alarm);
...
}
|
/selectserver.cc
修改前:
1
2
3
4
5
6
|
void SelectServer::RemoveAlarm(Alarmer* alarmer, int id) {
...
alarms_->erase(alarm);
num_alarms_stat_->IncBy(-1);
...
}
|
修改后:
1
2
3
4
5
|
void SelectServer::RemoveAlarm(Alarmer* alarmer, int id) {
...
alarms_->Remove(alarm);
...
}
|
通常可以只为系统处理的元素样本维护统计信息或其他属性(例如 RPC 请求、输入记录、用户)。许多子系统都使用这种方法(如 tcmalloc 分配跟踪、/requestz 状态页、Dapper 样本)。
使用采样时,应在合适的情况下考虑降低采样率。
示例:只为 doc info 请求样本维护统计信息。
采样让我们可以在大多数请求中避免触碰 39 个直方图和 MinuteTenMinuteHour 统计。
generic-leaf-stats.cc
1
|
... code that touches 39 histograms to update various stats on every request ...
|
1
2
3
4
5
6
7
|
// 定期添加到直方图。
if (TryLockToUpdateHistogramsDocInfo(docinfo_stats, bucket)) {
// 仅当应该采样该请求以维护统计信息时,
// 返回 true 并获取 bucket->lock。
... code that touches 39 histograms to update various stats ...
bucket->lock.Unlock();
}
|
示例:降低采样率,并让采样决策更快。
这个改动将采样率从 1/10 降到 1/32。此外,我们现在只为被采样事件保留执行时间统计,并通过使用 2 的幂取模来加速采样决策。这段代码会在 Google Meet 视频会议系统的每个数据包上调用;在 COVID 疫情初期,用户迅速转向更多线上会议,因此需要做性能优化来跟上容量需求。
packet_executor.cc
修改前:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
class ScopedPerformanceMeasurement {
public:
explicit ScopedPerformanceMeasurement(PacketExecutor* packet_executor)
: packet_executor_(packet_executor),
tracer_(packet_executor->packet_executor_trace_threshold_,
kClosureTraceName) {
// ThreadCPUUsage 是昂贵调用。在撰写时,
// 它耗时超过 400ns,大约比 absl::Now 慢 30 倍,
// 因此我们只采样 10% 的 closure 以降低成本。
if (packet_executor->closures_executed_ % 10 == 0) {
thread_cpu_usage_start_ = base::ThreadCPUUsage();
}
// 在可能执行上述昂贵调用后再采样开始时间,
// 以免污染墙钟时间测量。
run_start_time_ = absl::Now();
}
~ScopedPerformanceMeasurement() {
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
ScopedPerformanceMeasurement::ScopedPerformanceMeasurement(
PacketExecutor* packet_executor)
: packet_executor_(packet_executor),
tracer_(packet_executor->packet_executor_trace_threshold_,
kClosureTraceName) {
// ThreadCPUUsage 是昂贵调用。在撰写时,
// 它耗时超过 400ns,大约比 absl::Now 慢 30 倍,
// 因此我们只采样 1/32 的 closure 以降低成本。
if (packet_executor->closures_executed_ % 32 == 0) {
thread_cpu_usage_start_ = base::ThreadCPUUsage();
}
// 在可能执行上述昂贵调用后再采样开始时间,
// 以免污染墙钟时间测量。
run_start_time_ = absl::Now();
}
|
packet_executor.cc
修改前:
1
2
3
4
5
6
7
8
9
|
~ScopedPerformanceMeasurement() {
auto run_end_time = absl::Now();
auto run_duration = run_end_time - run_start_time_;
if (thread_cpu_usage_start_.has_value()) {
...
}
closure_execution_time->Record(absl::ToInt64Microseconds(run_duration));
|
修改后:
1
2
3
4
5
6
7
8
|
ScopedPerformanceMeasurement::~ScopedPerformanceMeasurement() {
auto run_end_time = absl::Now();
auto run_duration = run_end_time - run_start_time_;
if (thread_cpu_usage_start_.has_value()) {
...
closure_execution_time->Record(absl::ToInt64Microseconds(run_duration));
}
|
基准测试结果:
1
2
3
4
5
|
Run on (40 X 2793 MHz CPUs); 2020-03-24T20:08:19.991412535-07:00
CPU: Intel Ivybridge with HyperThreading (20 cores) dL1:32KB dL2:256KB dL3:25MB
Benchmark Base (ns) New (ns) Improvement
----------------------------------------------------------------------------
BM_PacketOverhead_mean 224 85 +62.0%
|
避免在热点路径上记录日志
日志语句可能很昂贵,即使该语句的日志级别实际上不会输出任何内容。例如,ABSL_VLOG 的实现至少需要一次加载和一次比较,这在热点代码路径中可能成为问题。此外,日志代码的存在还可能抑制编译器优化。可以考虑从热点路径中完全移除日志。
示例:从内存分配器核心路径中移除日志。
这是一个更大改动中的一小部分。
gpu_bfc_allocator.cc
1
2
3
4
5
6
7
8
9
10
11
|
void GPUBFCAllocator::SplitChunk(...) {
...
VLOG(6) << "Adding to chunk map: " << new_chunk->ptr;
...
}
...
void GPUBFCAllocator::DeallocateRawInternal(void* ptr) {
...
VLOG(6) << "Chunk at " << c->ptr << " no longer in use";
...
}
|
1
2
3
4
5
6
7
|
void GPUBFCAllocator::SplitChunk(...) {
...
}
...
void GPUBFCAllocator::DeallocateRawInternal(void* ptr) {
...
}
|
示例:在嵌套循环外预先计算日志是否启用。
image_similarity.cc
修改前:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
for (int j = 0; j < output_subimage_size_y; j++) {
int j1 = j - rad + output_to_integral_subimage_y;
int j2 = j1 + 2 * rad + 1;
// 为该行输出创建指针,同时考虑到相对
// 完整图像的 offset。
double *image_diff_ptr = &(*image_diff)(j + min_j, min_i);
for (int i = 0; i < output_subimage_size_x; i++) {
...
if (VLOG_IS_ON(3)) {
...
}
...
}
}
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
const bool vlog_3 = DEBUG_MODE ? VLOG_IS_ON(3) : false;
for (int j = 0; j < output_subimage_size_y; j++) {
int j1 = j - rad + output_to_integral_subimage_y;
int j2 = j1 + 2 * rad + 1;
// 为该行输出创建指针,同时考虑到相对
// 完整图像的 offset。
double *image_diff_ptr = &(*image_diff)(j + min_j, min_i);
for (int i = 0; i < output_subimage_size_x; i++) {
...
if (vlog_3) {
...
}
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
|
Run on (40 X 2801 MHz CPUs); 2016-05-16T15:55:32.250633072-07:00
CPU: Intel Ivybridge with HyperThreading (20 cores) dL1:32KB dL2:256KB dL3:25MB
Benchmark Base (ns) New (ns) Improvement
------------------------------------------------------------------
BM_NCCPerformance/16 29104 26372 +9.4%
BM_NCCPerformance/64 473235 425281 +10.1%
BM_NCCPerformance/512 30246238 27622009 +8.7%
BM_NCCPerformance/1k 125651445 113361991 +9.8%
BM_NCCLimitedBoundsPerformance/16 8314 7498 +9.8%
BM_NCCLimitedBoundsPerformance/64 143508 132202 +7.9%
BM_NCCLimitedBoundsPerformance/512 9335684 8477567 +9.2%
BM_NCCLimitedBoundsPerformance/1k 37223897 34201739 +8.1%
|
示例:预先计算日志是否启用,并在辅助例程中使用该结果。
periodic_call.cc
修改前:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
VLOG(1) << Logid()
<< "MaybeScheduleAlarmAtNextTick. Time until next real time: "
<< time_until_next_real_time;
...
uint64 next_virtual_time_ms =
next_virtual_time_ms_ - num_ticks * kResolutionMs;
CHECK_GE(next_virtual_time_ms, 0);
ScheduleAlarm(now, delay, next_virtual_time_ms);
}
void ScheduleNextAlarm(uint64 current_virtual_time_ms)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
if (calls_.empty()) {
VLOG(1) << Logid() << "No calls left, entering idle mode";
next_real_time_ = absl::InfiniteFuture();
return;
}
uint64 next_virtual_time_ms = FindNextVirtualTime(current_virtual_time_ms);
auto delay =
absl::Milliseconds(next_virtual_time_ms - current_virtual_time_ms);
ScheduleAlarm(GetClock().TimeNow(), delay, next_virtual_time_ms);
}
// 该函数调度的 alarm 会取代之前所有已调度的
// alarm。这通过 `scheduling_sequence_number_` 保证。
void ScheduleAlarm(absl::Time now, absl::Duration delay,
uint64 virtual_time_ms)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
next_real_time_ = now + delay;
next_virtual_time_ms_ = virtual_time_ms;
++ref_count_; // Alarm 持有一个引用。
++scheduling_sequence_number_;
VLOG(1) << Logid() << "ScheduleAlarm. Time : "
<< absl::FormatTime("%M:%S.%E3f", now, absl::UTCTimeZone())
<< ", delay: " << delay << ", virtual time: " << virtual_time_ms
<< ", refs: " << ref_count_
<< ", seq: " << scheduling_sequence_number_
<< ", executor: " << executor_;
executor_->AddAfter(
delay, new Alarm(this, virtual_time_ms, scheduling_sequence_number_));
}
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
const bool vlog_1 = VLOG_IS_ON(1);
if (vlog_1) {
VLOG(1) << Logid()
<< "MaybeScheduleAlarmAtNextTick. Time until next real time: "
<< time_until_next_real_time;
}
...
uint64 next_virtual_time_ms =
next_virtual_time_ms_ - num_ticks * kResolutionMs;
CHECK_GE(next_virtual_time_ms, 0);
ScheduleAlarm(now, delay, next_virtual_time_ms, vlog_1);
}
void ScheduleNextAlarm(uint64 current_virtual_time_ms, bool vlog_1)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
if (calls_.empty()) {
if (vlog_1) {
VLOG(1) << Logid() << "No calls left, entering idle mode";
}
next_real_time_ = absl::InfiniteFuture();
return;
}
uint64 next_virtual_time_ms = FindNextVirtualTime(current_virtual_time_ms);
auto delay =
absl::Milliseconds(next_virtual_time_ms - current_virtual_time_ms);
ScheduleAlarm(GetClock().TimeNow(), delay, next_virtual_time_ms, vlog_1);
}
// 该函数调度的 alarm 会取代之前所有已调度的
// alarm。这通过 `scheduling_sequence_number_` 保证。
void ScheduleAlarm(absl::Time now, absl::Duration delay,
uint64 virtual_time_ms,
bool vlog_1)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
next_real_time_ = now + delay;
next_virtual_time_ms_ = virtual_time_ms;
++ref_count_; // Alarm 持有一个引用。
++scheduling_sequence_number_;
if (vlog_1) {
VLOG(1) << Logid() << "ScheduleAlarm. Time : "
<< absl::FormatTime("%M:%S.%E3f", now, absl::UTCTimeZone())
<< ", delay: " << delay << ", virtual time: " << virtual_time_ms
<< ", refs: " << ref_count_
<< ", seq: " << scheduling_sequence_number_
<< ", executor: " << executor_;
}
executor_->AddAfter(
delay, new Alarm(this, virtual_time_ms, scheduling_sequence_number_));
}
|
代码大小方面的考量
性能不只是运行时速度。有时也值得考虑软件选择对生成代码体积的影响。代码体积大意味着更长的编译和链接时间、臃肿的二进制、更多内存使用、更大的指令缓存压力,以及对分支预测器等微架构结构的其他负面影响。在编写会被许多地方使用的底层库代码,或编写预期会为许多不同类型实例化的模板代码时,思考这些问题尤其重要。
减少代码体积的有效技巧会因编程语言而有很大差异。下面是一些我们认为对 C++ 代码有用的技巧(C++ 代码很容易因为过度使用模板和内联而膨胀)。
精简常被内联的代码
被广泛调用的函数一旦与内联结合,可能会对代码体积产生巨大影响。
示例:加速 TF_CHECK_OK。
避免创建 Ok 对象,并把致命错误消息的复杂格式化移到非内联路径,而不是在每个调用点执行,从而节省代码空间。
status.h
修改前:
1
2
|
#define TF_CHECK_OK(val) CHECK_EQ(::tensorflow::Status::OK(), (val))
#define TF_QCHECK_OK(val) QCHECK_EQ(::tensorflow::Status::OK(), (val))
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
extern tensorflow::string* TfCheckOpHelperOutOfLine(
const ::tensorflow::Status& v, const char* msg);
inline tensorflow::string* TfCheckOpHelper(::tensorflow::Status v,
const char* msg) {
if (v.ok()) return nullptr;
return TfCheckOpHelperOutOfLine(v, msg);
}
#define TF_CHECK_OK(val) \
while (tensorflow::string* _result = TfCheckOpHelper(val, #val)) \
LOG(FATAL) << *(_result)
#define TF_QCHECK_OK(val) \
while (tensorflow::string* _result = TfCheckOpHelper(val, #val)) \
LOG(QFATAL) << *(_result)
|
status.cc
修改后:
1
2
3
4
5
6
7
8
9
|
string* TfCheckOpHelperOutOfLine(const ::tensorflow::Status& v,
const char* msg) {
string r("Non-OK-status: ");
r += msg;
r += " status: ";
r += v.ToString();
// 会泄漏字符串,但它只用于 fatal 错误消息。
return new string(r);
}
|
示例:让每个 RETURN_IF_ERROR 调用点减少 79 字节代码。
-
增加专门供 RETURN_IF_ERROR 使用的适配器类。
-
不在 RETURN_IF_ERROR 的快速路径上构造/析构 StatusBuilder。
-
不内联某些 StatusBuilder 方法,因为快速路径上已经不再需要它们。
-
避免不必要的 ~Status 调用。
示例:让 CHECK_GE 性能提升 4.5 倍,并将代码大小从 125 字节缩小到 77 字节。
logging.h
修改前:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
struct CheckOpString {
CheckOpString(string* str) : str_(str) { }
~CheckOpString() { delete str_; }
operator bool() const { return str_ == NULL; }
string* str_;
};
...
#define DEFINE_CHECK_OP_IMPL(name, op) \
template <class t1, class t2> \
inline string* Check##name##Impl(const t1& v1, const t2& v2, \
const char* names) { \
if (v1 op v2) return NULL; \
else return MakeCheckOpString(v1, v2, names); \
} \
string* Check##name##Impl(int v1, int v2, const char* names);
DEFINE_CHECK_OP_IMPL(EQ, ==)
DEFINE_CHECK_OP_IMPL(NE, !=)
DEFINE_CHECK_OP_IMPL(LE, <=)
DEFINE_CHECK_OP_IMPL(LT, < )
DEFINE_CHECK_OP_IMPL(GE, >=)
DEFINE_CHECK_OP_IMPL(GT, > )
#undef DEFINE_CHECK_OP_IMPL
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
struct CheckOpString {
CheckOpString(string* str) : str_(str) { }
// 没有析构函数:如果 str_ 非 NULL,说明即将 LOG(FATAL),
// 因此清理 str_ 没有意义。
operator bool() const { return str_ == NULL; }
string* str_;
};
...
extern string* MakeCheckOpStringIntInt(int v1, int v2, const char* names);
template<int, int>
string* MakeCheckOpString(const int& v1, const int& v2, const char* names) {
return MakeCheckOpStringIntInt(v1, v2, names);
}
...
#define DEFINE_CHECK_OP_IMPL(name, op) \
template <class t1, class t2> \
inline string* Check##name##Impl(const t1& v1, const t2& v2, \
const char* names) { \
if (v1 op v2) return NULL; \
else return MakeCheckOpString(v1, v2, names); \
} \
inline string* Check##name##Impl(int v1, int v2, const char* names) { \
if (v1 op v2) return NULL; \
else return MakeCheckOpString(v1, v2, names); \
}
DEFINE_CHECK_OP_IMPL(EQ, ==)
DEFINE_CHECK_OP_IMPL(NE, !=)
DEFINE_CHECK_OP_IMPL(LE, <=)
DEFINE_CHECK_OP_IMPL(LT, < )
DEFINE_CHECK_OP_IMPL(GE, >=)
DEFINE_CHECK_OP_IMPL(GT, > )
#undef DEFINE_CHECK_OP_IMPL
|
logging.cc
修改后:
1
2
3
4
5
|
string* MakeCheckOpStringIntInt(int v1, int v2, const char* names) {
strstream ss;
ss << names << " (" << v1 << " vs. " << v2 << ")";
return new string(ss.str(), ss.pcount());
}
|
谨慎内联
内联通常能提升性能,但有时会增加代码体积,却没有相应的性能收益(某些情况下甚至会因为指令缓存压力增加而造成性能下降)。
示例:减少 TensorFlow 中的内联。
该改动停止内联许多非性能敏感函数(例如错误路径和 op 注册代码)。此外,一些性能敏感函数的慢路径被移到非内联函数中。
这些改动让典型二进制中的 TensorFlow 符号大小减少 12.2%(从 8814545 字节降到 7740233 字节)。
示例:Protocol Buffer 库改动:对于 ≥ 128 字节的消息,避免为编码消息长度生成昂贵的内联代码,改为调用共享的非内联例程。
这不仅让重要的大型二进制更小,也让它们更快。
某个大型二进制中,一个重度内联例程每行源码生成的代码字节数。第一个数字表示某一源码行在所有内联位置生成的总字节数。
修改前:
1
2
3
4
5
6
7
8
9
|
. 0 1825 template <typename MessageType>
. 0 1826 inline uint8* WireFormatLite::InternalWriteMessage(
. 0 1827 int field_number, const MessageType& value, uint8* target,
. 0 1828 io::EpsCopyOutputStream* stream) {
>>> 389246 1829 target = WriteTagToArray(field_number, WIRETYPE_LENGTH_DELIMITED, target);
>>> 5454640 1830 target = io::CodedOutputStream::WriteVarint32ToArray(
>>> 337837 1831 static_cast<uint32>(value.GetCachedSize()), target);
>>> 1285539 1832 return value._InternalSerialize(target, stream);
. 0 1833 }
|
此改动后的新代码大小输出如下:
1
2
3
4
5
6
7
8
9
|
. 0 1825 template <typename MessageType>
. 0 1826 inline uint8* WireFormatLite::InternalWriteMessage(
. 0 1827 int field_number, const MessageType& value, uint8* target,
. 0 1828 io::EpsCopyOutputStream* stream) {
>>> 450612 1829 target = WriteTagToArray(field_number, WIRETYPE_LENGTH_DELIMITED, target);
>> 9609 1830 target = io::CodedOutputStream::WriteVarint32ToArrayOutOfLine(
>>> 434668 1831 static_cast<uint32>(value.GetCachedSize()), target);
>>> 1597394 1832 return value._InternalSerialize(target, stream);
. 0 1833 }
|
coded_stream.h
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
class PROTOBUF_EXPORT CodedOutputStream {
...
// 类似 WriteVarint32(),但直接写入目标数组,
// 并将较少见路径放到非内联路径,而不是内联。
static uint8* WriteVarint32ToArrayOutOfLine(uint32 value, uint8* target);
...
};
...
inline uint8* CodedOutputStream::WriteVarint32ToArrayOutOfLine(uint32 value,
uint8* target) {
target[0] = static_cast<uint8>(value);
if (value < 0x80) {
return target + 1;
} else {
return WriteVarint32ToArrayOutOfLineHelper(value, target);
}
}
|
coded_stream.cc
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
uint8* CodedOutputStream::WriteVarint32ToArrayOutOfLineHelper(uint32 value,
uint8* target) {
DCHECK_GE(value, 0x80);
target[0] |= static_cast<uint8>(0x80);
value >>= 7;
target[1] = static_cast<uint8>(value);
if (value < 0x80) {
return target + 2;
}
target += 2;
do {
// 打开刚写入字节中的 continuation bit。
target[-1] |= static_cast<uint8>(0x80);
value >>= 7;
*target = static_cast<uint8>(value);
++target;
} while (value >= 0x80);
return target;
}
|
示例:减少 absl::flat_hash_set 和 absl::flat_hash_map 的代码大小。
让一些大型二进制体积减少约 0.5%。
示例:不使用 protobuf arena 时,不内联字符串分配和释放。
public/arenastring.h
修改前:
1
2
3
4
5
6
7
8
|
if (IsDefault(default_value)) {
std::string* new_string = new std::string();
tagged_ptr_.Set(new_string);
return new_string;
} else {
return UnsafeMutablePointer();
}
}
|
修改后:
1
2
3
4
5
6
|
if (IsDefault(default_value)) {
return SetAndReturnNewString();
} else {
return UnsafeMutablePointer();
}
}
|
internal/arenastring.cc
修改后:
1
2
3
4
5
|
std::string* ArenaStringPtr::SetAndReturnNewString() {
std::string* new_string = new std::string();
tagged_ptr_.Set(new_string);
return new_string;
}
|
示例:避免内联某些例程。创建接受 const char* 而不是 const std::string& 的例程变体,以避免每个调用点都生成 std::string 构造代码。
op.h
修改前:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
class OpDefBuilderWrapper {
public:
explicit OpDefBuilderWrapper(const char name[]) : builder_(name) {}
OpDefBuilderWrapper& Attr(std::string spec) {
builder_.Attr(std::move(spec));
return *this;
}
OpDefBuilderWrapper& Input(std::string spec) {
builder_.Input(std::move(spec));
return *this;
}
OpDefBuilderWrapper& Output(std::string spec) {
builder_.Output(std::move(spec));
return *this;
}
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
class OpDefBuilderWrapper {
public:
explicit OpDefBuilderWrapper(const char name[]) : builder_(name) {}
OpDefBuilderWrapper& Attr(std::string spec) {
builder_.Attr(std::move(spec));
return *this;
}
OpDefBuilderWrapper& Attr(const char* spec) TF_ATTRIBUTE_NOINLINE {
return Attr(std::string(spec));
}
OpDefBuilderWrapper& Input(std::string spec) {
builder_.Input(std::move(spec));
return *this;
}
OpDefBuilderWrapper& Input(const char* spec) TF_ATTRIBUTE_NOINLINE {
return Input(std::string(spec));
}
OpDefBuilderWrapper& Output(std::string spec) {
builder_.Output(std::move(spec));
return *this;
}
OpDefBuilderWrapper& Output(const char* spec) TF_ATTRIBUTE_NOINLINE {
return Output(std::string(spec));
}
|
减少模板实例化
模板代码在实例化时,可能会为模板参数的每种可能组合生成重复代码。
示例:用普通参数替换模板参数。
将一个以 bool 为模板参数的大型例程改为把该 bool 作为额外普通参数传入。(这个 bool 只用于在两个字符串常量中选择一个,因此运行时检查完全可以接受。)这把该大型例程的实例化数量从 287 减少到 143。
sharding_util_ops.cc
修改前:
1
2
3
4
5
6
7
8
9
10
|
template <bool Split>
Status GetAndValidateAttributes(OpKernelConstruction* ctx,
std::vector<int32>& num_partitions,
int& num_slices, std::vector<int32>& paddings,
bool& has_paddings) {
absl::string_view num_partitions_attr_name =
Split ? kNumSplitsAttrName : kNumConcatsAttrName;
...
return OkStatus();
}
|
修改后:
1
2
3
4
5
6
7
8
9
|
Status GetAndValidateAttributes(bool split, OpKernelConstruction* ctx,
std::vector<int32>& num_partitions,
int& num_slices, std::vector<int32>& paddings,
bool& has_paddings) {
absl::string_view num_partitions_attr_name =
split ? kNumSplitsAttrName : kNumConcatsAttrName;
...
return OkStatus();
}
|
示例:将模板构造函数中的大块代码移到非模板的共享基类构造函数中。
同时减少模板实例化数量:从每种组合一次,变为每个相关类型各一次。
sharding_util_ops.cc
修改前:
1
2
3
4
5
6
7
8
|
template <typename Device, typename T>
class XlaSplitNDBaseOp : public OpKernel {
public:
explicit XlaSplitNDBaseOp(OpKernelConstruction* ctx) : OpKernel(ctx) {
OP_REQUIRES_OK(
ctx, GetAndValidateAttributes(/*split=*/true, ctx, num_splits_,
num_slices_, paddings_, has_paddings_));
}
|
修改后:
1
2
3
4
5
6
7
8
9
10
|
// 用于节省代码空间的共享基类。
class XlaSplitNDShared : public OpKernel {
public:
explicit XlaSplitNDShared(OpKernelConstruction* ctx) TF_ATTRIBUTE_NOINLINE
: OpKernel(ctx),
num_slices_(1),
has_paddings_(false) {
GetAndValidateAttributes(/*split=*/true, ctx, num_splits_, num_slices_,
paddings_, has_paddings_);
}
|
示例:减少 absl::flat_hash_set 和 absl::flat_hash_map 的生成代码大小。
减少容器操作
应考虑 map 和其他容器操作的影响,因为每次调用这类操作都可能生成大量代码。
示例:将连续许多次 map 插入(用于初始化 emoji 字符哈希表)改成一次批量插入操作(在链接进许多二进制的库中,文本段从 188KB 降到 360 字节)。😊
textfallback_init.h
修改前:
1
2
3
4
5
6
7
8
9
10
11
12
|
inline void AddEmojiFallbacks(TextFallbackMap *map) {
(*map)[0xFE000] = &kFE000;
(*map)[0xFE001] = &kFE001;
(*map)[0xFE002] = &kFE002;
(*map)[0xFE003] = &kFE003;
(*map)[0xFE004] = &kFE004;
(*map)[0xFE005] = &kFE005;
...
(*map)[0xFEE7D] = &kFEE7D;
(*map)[0xFEEA0] = &kFEEA0;
(*map)[0xFE331] = &kFE331;
};
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
inline void AddEmojiFallbacks(TextFallbackMap *map) {
#define PAIR(x) {0x##x, &k##x}
// clang-format off
map->insert({
PAIR(FE000),
PAIR(FE001),
PAIR(FE002),
PAIR(FE003),
PAIR(FE004),
PAIR(FE005),
...
PAIR(FEE7D),
PAIR(FEEA0),
PAIR(FE331)});
// clang-format on
#undef PAIR
};
|
示例:停止内联一个大量使用 InlinedVector 操作的函数。
将一个原本在 .h 文件中被内联的超长例程移到 .cc 文件中(内联它没有实际性能收益)。
reduction_ops_common.h
1
2
3
4
|
Status Simplify(const Tensor& data, const Tensor& axis,
const bool keep_dims) {
... Eighty line routine body ...
}
|
1
|
Status Simplify(const Tensor& data, const Tensor& axis, const bool keep_dims);
|
并行化与同步
利用并行性
现代机器有很多核心,而且经常没有被充分利用。因此,昂贵工作可以通过并行化更快完成。最常见的方法是并行处理不同项目,并在完成后合并结果。通常会先把项目划分为批次,以避免为每个项目单独支付并行执行成本。
示例:四路并行化使 token 编码速率提升约 3.6 倍。
blocked-token-coder.cc
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
MutexLock l(&encoder_threads_lock);
if (encoder_threads == NULL) {
encoder_threads = new ThreadPool(NumCPUs());
encoder_threads->SetStackSize(262144);
encoder_threads->StartWorkers();
}
encoder_threads->Add
(NewCallback(this,
&BlockedTokenEncoder::EncodeRegionInThread,
region_tokens, N, region,
stats,
controller_->GetClosureWithCost
(NewCallback(&DummyCallback), N)));
|
示例:并行化使解码性能提升 5 倍。
coding.cc
1
2
3
|
for (int c = 0; c < clusters->size(); c++) {
RET_CHECK_OK(DecodeBulkForCluster(...);
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
struct SubTask {
absl::Status result;
absl::Notification done;
};
std::vector<SubTask> tasks(clusters->size());
for (int c = 0; c < clusters->size(); c++) {
options_.executor->Schedule([&, c] {
tasks[c].result = DecodeBulkForCluster(...);
tasks[c].done.Notify();
});
}
for (int c = 0; c < clusters->size(); c++) {
tasks[c].done.WaitForNotification();
}
for (int c = 0; c < clusters->size(); c++) {
RETURN_IF_ERROR(tasks[c].result);
}
|
并行化对系统性能的影响应仔细测量。如果没有空闲 CPU,或者内存带宽已经饱和,并行化可能没有帮助,甚至可能有害。
摊销加锁成本
避免细粒度加锁,以降低热点路径中 Mutex 操作的成本。注意:只有当改动不会增加锁竞争时才应这样做。
示例:只获取一次锁来释放整棵查询节点树,而不是为树中每个节点重复获取锁。
mustang-query.cc
修改前:
1
2
3
4
5
6
7
8
9
10
11
|
// 查询节点池。
ThreadSafeFreeList<MustangQuery> pool_(256);
...
void MustangQuery::Release(MustangQuery* node) {
if (node == NULL)
return;
for (int i=0; i < node->children_->size(); ++i)
Release((*node->children_)[i]);
node->children_->clear();
pool_.Delete(node);
}
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
// 查询节点池。
Mutex pool_lock_;
FreeList<MustangQuery> pool_(256);
...
void MustangQuery::Release(MustangQuery* node) {
if (node == NULL)
return;
MutexLock l(&pool_lock_);
ReleaseLocked(node);
}
void MustangQuery::ReleaseLocked(MustangQuery* node) {
#ifndef NDEBUG
pool_lock_.AssertHeld();
#endif
if (node == NULL)
return;
for (int i=0; i < node->children_->size(); ++i)
ReleaseLocked((*node->children_)[i]);
node->children_->clear();
pool_.Delete(node);
}
|
保持临界区短小
避免在临界区内执行昂贵工作。尤其要警惕那些看似无害、但可能会执行 RPC 或访问文件的代码。
示例:减少临界区中触碰的缓存行数量。
细致调整数据结构,显著减少访问的缓存行数量,并让一次 ML 训练运行性能提升 3.3%。
-
将某些每节点类型属性预计算为 NodeItem 数据结构中的位,
这意味着在临界区内处理出边时,可以避免触碰 Node* 对象。
-
修改 ExecutorState::ActivateNodes,让每条出边使用目标节点的 NodeItem,
而不是触碰 *item->node 对象中的字段。
通常这意味着访问所需边数据时,总共只触碰 1 到 2 个缓存行,
而不是 ~2 + O(num_outgoing edges) 个缓存行;
对于许多核心执行的大图,这也会降低 TLB 压力。
示例:持有 Mutex 时避免 RPC。
trainer.cc
修改前:
1
2
3
4
5
6
|
{
// 通知参数服务器我们即将开始。
MutexLock l(&lock_);
model_ = model;
MaybeRecordProgress(last_global_step_);
}
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
|
bool should_start_record_progress = false;
int64 step_for_progress = -1;
{
// 通知参数服务器我们即将开始。
MutexLock l(&lock_);
model_ = model;
should_start_record_progress = ShouldStartRecordProgress();
step_for_progress = last_global_step_;
}
if (should_start_record_progress) {
StartRecordProgress(step_for_progress);
}
|
此外,要警惕那些会在 Mutex 解锁前运行的昂贵析构函数。
(当 Mutex 解锁由 ~MutexUnlock 触发时,这种情况经常发生。)
如果线程安全性允许,在 MutexLock 之前声明带有昂贵析构函数的对象可能会有帮助。
通过分片减少竞争
有时,一个受 Mutex 保护且竞争很高的数据结构,可以安全拆分成多个分片,每个分片拥有自己的 Mutex。(注意:这要求不同分片之间没有跨分片不变量。)
示例:将缓存分成 16 个分片,使多线程负载下吞吐量提升约 2 倍。
cache.cc
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
class ShardedLRUCache : public Cache {
private:
LRUCache shard_[kNumShards];
port::Mutex id_mutex_;
uint64_t last_id_;
static inline uint32_t HashSlice(const Slice& s) {
return Hash(s.data(), s.size(), 0);
}
static uint32_t Shard(uint32_t hash) {
return hash >> (32 - kNumShardBits);
}
...
virtual Handle* Lookup(const Slice& key) {
const uint32_t hash = HashSlice(key);
return shard_[Shard(hash)].Lookup(key, hash);
}
|
示例:对用于跟踪调用的 Spanner 数据结构进行分片。
transaction_manager.cc
修改前:
1
2
3
4
5
|
absl::MutexLock l(&active_calls_in_mu_);
ActiveCallMap::const_iterator iter = active_calls_in_.find(m->tid());
if (iter != active_calls_in_.end()) {
iter->second.ExtractElements(&m->tmp_calls_);
}
|
修改后:
1
2
3
4
5
6
|
ActiveCalls::LockedShard shard(active_calls_in_, m->tid());
const ActiveCallMap& active_calls_map = shard.active_calls_map();
ActiveCallMap::const_iterator iter = active_calls_map.find(m->tid());
if (iter != active_calls_map.end()) {
iter->second.ExtractElements(&m->tmp_calls_);
}
|
如果相关数据结构是 map,可以考虑改用并发哈希 map 实现。
要小心用于分片选择的信息。例如,如果使用哈希值中的某些位进行分片选择,而这些相同位稍后又被再次使用,后续使用可能会因为看到偏斜的哈希值分布而表现很差。
示例:修正用于分片选择的信息,避免哈希表问题。
netmon_map_impl.h
修改前:
1
2
3
4
5
6
7
8
|
ConnectionBucket* GetBucket(Index index) {
// 重新哈希,确保我们不是基于原始哈希
// 对 bucket 进行分区。如果 num_buckets_ 是 2 的幂,
// 那样会降低 bucket 的熵。
size_t original_hash = absl::Hash<Index>()(index);
int hash = absl::Hash<size_t>()(original_hash) % num_buckets_;
return &buckets_[hash];
}
|
修改后:
1
2
3
4
5
6
|
ConnectionBucket* GetBucket(Index index) {
absl::Hash<std::pair<Index, size_t>> hasher{};
// 将哈希值与 42 组合,以防分片选择使用
// 与底层哈希表相同的位。
return &buckets_[hasher({index, 42}) % num_buckets_];
}
|
示例:对用于跟踪调用的 Spanner 数据结构进行分片。
这个 CL 将 ActiveCallMap 分成 64 个分片。每个分片由单独的互斥锁保护。给定事务会被映射到且只映射到一个分片。新增 LockedShard(tid) 接口,用于以线程安全方式访问某个事务对应的 ActiveCallMap。示例用法:
transaction_manager.cc
修改前:
1
2
3
4
|
{
absl::MutexLock l(&active_calls_in_mu_);
delayed_locks_timer_ring_.Add(delayed_locks_flush_time_ms, tid);
}
|
修改后:
1
2
3
4
|
{
ActiveCalls::LockedShard shard(active_calls_in_, tid);
shard.delayed_locks_timer_ring().Add(delayed_locks_flush_time_ms, tid);
}
|
结果显示,在使用 8192 个 fiber 运行基准测试时,总体墙钟时间减少了 69%。
1
2
3
4
|
Benchmark Time(ns) CPU(ns) Iterations
------------------------------------------------------------------
BM_ActiveCalls/8k 11854633492 98766564676 10
BM_ActiveCalls/16k 26356203552 217325836709 10
|
1
2
3
4
|
Benchmark Time(ns) CPU(ns) Iterations
------------------------------------------------------------------
BM_ActiveCalls/8k 3696794642 39670670110 10
BM_ActiveCalls/16k 7366284437 79435705713 10
|
SIMD 指令
可以探索使用现代 CPU 提供的 SIMD 指令一次处理多个项目是否能带来加速(例如参见下面批量操作一节中对 absl::flat_hash_map 的讨论)。
减少伪共享
如果不同线程访问不同的可变数据,考虑把这些不同数据项放到不同缓存行上,例如在 C++ 中使用 alignas 指令。不过这些指令很容易误用,并可能显著增加对象大小,因此要确保性能测量能证明它们的使用是合理的。
示例:将经常变更的字段与其他字段分隔到不同缓存行。
histogram.h
修改前:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
HistogramOptions options_;
...
internal::HistogramBoundaries *boundaries_;
...
std::vector<double> buckets_;
double min_; // 最小值。
double max_; // 最大值。
double count_; // 出现总次数。
double sum_; // 值之和。
double sum_of_squares_; // 值平方之和。
...
RegisterVariableExporter *exporter_;
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
HistogramOptions options_;
...
internal::HistogramBoundaries *boundaries_;
...
RegisterVariableExporter *exporter_;
...
// 将下面字段放入专用缓存行,因为它们经常
// 被修改,这样可以避免潜在的伪共享。
...
#ifndef SWIG
alignas(ABSL_CACHELINE_SIZE)
#endif
std::vector<double> buckets_;
double min_; // 最小值。
double max_; // 最大值。
double count_; // 出现总次数。
double sum_; // 值之和。
double sum_of_squares_; // 值平方之和。
|
降低上下文切换频率
示例:内联处理小工作项,而不是放到设备线程池上。
cast_op.cc
修改后:
1
2
3
4
5
6
7
8
9
10
|
template <typename Device, typename Tout, typename Tin>
void CastMaybeInline(const Device& d, typename TTypes<Tout>::Flat o,
typename TTypes<Tin>::ConstFlat i) {
if (o.size() * (sizeof(Tin) + sizeof(Tout)) < 16384) {
// CPU 上的小型 cast:内联执行。
o = i.template cast<Tout>();
} else {
o.device(d) = i.template cast<Tout>();
}
}
|
使用带缓冲的通道进行流水线处理
Channel 可以是无缓冲的,这意味着写入者会阻塞,直到读取者准备好取走项目。当 channel 用于同步时,无缓冲 channel 可能有用;但当 channel 用于增加并行性时,就不适合了。
考虑无锁方案
有时,无锁数据结构相比传统的 Mutex 保护数据结构能带来明显差异。不过,直接操作原子变量可能很[危险][atomic danger]。应优先使用更高层的抽象。
示例:使用无锁 map 管理 RPC channel 缓存。
RPC stub 缓存中的条目每秒会被读取数千次,但很少修改。切换到合适的无锁 map 可将查找延迟降低 3%-5%。
示例:使用固定词典和无锁哈希 map 加速 IsValidTokenId 判断。
dynamic_token_class_manager.h
修改前:
1
2
3
4
5
6
7
|
mutable Mutex mutex_;
// 该哈希 map 的密度由以下事实保证:
// 动态词典会先复用之前分配的 TokenId,
// 然后才尝试分配新的 TokenId。
dense_hash_map<TokenId, common::LocalTokenClassId> tid_to_cid_
GUARDED_BY(mutex_);
|
修改后:
1
2
3
4
5
6
|
// 对该哈希 map 的读访问应使用
// `epoch_gc_`::(EnterFast / LeaveFast)。写入者应定期
// 通过调用 LockFreeHashMap::CreateGC 对已删除条目做 GC。
typedef util::gtl::LockFreeHashMap<TokenId, common::LocalTokenClassId>
TokenIdTokenClassIdMap;
TokenIdTokenClassIdMap tid_to_cid_;
|
Protocol Buffer 建议
Protobuf 是一种方便的数据表示,尤其适合需要通过网络发送或持久化存储的数据。然而,它们可能带来显著性能成本。例如,一段填充 1000 个点列表并汇总 Y 坐标的代码,从 protobuf 改为 C++ 结构体 std::vector 后,速度提升了 20 倍!
示例:两个版本的基准测试代码。
1
2
|
name old time/op new time/op delta
BenchmarkIteration 17.4µs ± 5% 0.8µs ± 1% -95.30% (p=0.000 n=11+12)
|
Protobuf 版本:
1
2
3
4
5
6
7
|
message PointProto {
int32 x = 1;
int32 y = 2;
}
message PointListProto {
repeated PointProto points = 1;
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
void SumProto(const PointListProto& vec) {
int sum = 0;
for (const PointProto& p : vec.points()) {
sum += p.y();
}
ABSL_VLOG(1) << sum;
}
void BenchmarkIteration() {
PointListProto points;
points.mutable_points()->Reserve(1000);
for (int i = 0; i < 1000; i++) {
PointProto* p = points.add_points();
p->set_x(i);
p->set_y(i * 2);
}
SumProto(points);
}
|
非 protobuf 版本:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
struct PointStruct {
int x;
int y;
};
void SumVector(const std::vector<PointStruct>& vec) {
int sum = 0;
for (const PointStruct& p : vec) {
sum += p.y;
}
ABSL_VLOG(1) << sum;
}
void BenchmarkIteration() {
std::vector<PointStruct> points;
points.reserve(1000);
for (int i = 0; i < 1000; i++) {
points.push_back({i, i * 2});
}
SumVector(points);
}
|
此外,protobuf 版本会给二进制增加几 KB 的代码和数据。看起来不多,但在包含许多 protobuf 类型的系统中会快速累积。这种体积增加会造成指令缓存和数据缓存压力,从而带来性能问题。
下面是一些与 protobuf 性能相关的技巧:
示例:不要不必要地使用 protobuf。
鉴于上面提到的 20 倍性能差异,如果某些数据永远不会被序列化或解析,你很可能不应该把它放进 protocol buffer。Protocol buffer 的目的是让数据结构易于序列化和反序列化,但它们可能带来显著的代码体积、内存和 CPU 开销。如果你只是想要 DebugString、可复制性等其他便利,不要使用它们。
示例:避免不必要的消息层级。
消息层级有助于以更易读的方式组织信息。然而,额外的消息层级会带来内存分配、函数调用、缓存未命中、更大的序列化消息等开销。
例如,不要这样:
1
2
3
4
5
6
7
8
9
|
message Foo {
optional Bar bar = 1;
}
message Bar {
optional Baz baz = 1;
}
message Baz {
optional int32 count = 1;
}
|
优先这样:
1
2
3
|
message Foo {
optional int32 count = 1;
}
|
一个 protocol buffer 消息会对应 C++ 生成代码中的一个消息类,并在线上传输时发出 tag 和 payload 长度。为了携带一个整数,旧形式需要更多分配(和释放),并生成更多代码。因此,所有 protocol buffer 操作(解析、序列化、计算大小等)都会变得更昂贵,因为它们必须遍历消息树。新形式没有这些开销,因此更高效。
示例:对频繁出现的字段使用较小字段号。
Protobuf 使用变长整数来表示字段号和 wire format 的组合(参见 protobuf 编码文档)。字段号在 1 到 15 之间时,该表示占 1 字节;字段号在 16 到 2047 之间时,占 2 字节。(通常应避免使用 2048 或更大的字段号。)
对于性能敏感的 protobuf,可以考虑预留一些较小字段号以供未来扩展。
示例:谨慎选择 int32、sint32、fixed32 和 uint32(64 位变体同理)。
通常使用 int32 或 int64,但对于哈希码这样的大值,应使用 fixed32 或 fixed64;对于经常为负的值,应使用 sint32 或 sint64。
varint 编码小整数时占用更少字节,可以节省空间,但解码成本更高。不过,对于负值或大值,它可能占用更多空间。在这种情况下,使用 fixed32 或 fixed64(而不是 uint32 或 uint64)可以减小体积,并让编码和解码便宜得多。对于较小的负整数,使用 sint32 或 sint64,而不是 int32 或 int64。
示例:在 proto2 中,通过标注 [packed=true] 打包 repeated 数值字段。
在 proto2 中,repeated 值默认序列化为一系列 (tag, value) 对。这效率较低,因为每个元素都必须解码 tag。
packed repeated primitive 会先序列化 payload 长度,然后是不带 tag 的值。使用定宽值时,我们在开始解析时就知道最终大小,因此可以避免重新分配,也就是没有重新分配成本。对于 payload 中有多少 varint,我们仍然不知道,可能仍需支付重新分配成本。
在 proto3 中,repeated 字段默认是 packed。
packed 最适合 fixed32、fixed64、float、double 等定宽值,因为整个编码长度可以通过元素数量乘以固定值大小预先确定,而不必计算每个单独元素的长度。
示例:对二进制数据和大值使用 bytes,而不是 string。
string 类型保存 UTF-8 编码文本,有时需要验证。bytes 类型可以保存任意字节序列(非文本数据),通常比 string 更合适也更高效。
示例:考虑使用 string_type = VIEW 来避免复制。
解析期间复制大型 string 或 bytes 字段很昂贵。通常可以通过将字段标记为 string_type = VIEW 来避免这种成本。
1
2
3
4
|
message Image {
...
bytes jpeg_encoding = 4 [features.(pb.cpp).string_type=VIEW];
}
|
如果没有 VIEW 注解,解析 protocol buffer 时,潜在很大的字段内容会从序列化 protocol buffer 复制到内存中的 string 对象。根据 string 或 bytes 字段数量以及字段大小,复制开销可能非常显著。
ParseFromStringWithAliasing 之类的例程不会复制大型二进制 blob,而是使用 absl::string_view 引用原始后备字符串。注意,后备字符串(序列化 protocol buffer)必须比包含该别名的 protocol buffer 实例活得更久。
示例:对大字段考虑使用 Cord,以减少复制成本。
用 [ctype=CORD] 注解大型 bytes 和 string 字段可能减少复制成本。该注解会把字段表示从 std::string 改为 absl::Cord。absl::Cord 使用引用计数和基于树的存储来降低复制和追加成本。如果 protocol buffer 被序列化到 cord,那么解析带 [ctype=CORD] 的 string 或 bytes 字段可以避免复制字段内容。
1
2
3
4
|
message Document {
...
bytes html = 4 [ctype = CORD];
}
|
Cord 字段的性能取决于长度分布和访问模式。应使用基准测试验证此类改动。
示例:在 C++ 代码中使用 protobuf arena。
考虑使用 arena 节省分配和释放成本,尤其是对于包含 repeated、string 或 message 字段的 protobuf。
message 和 string 字段会在堆上分配(即使顶层 protocol buffer 对象是在栈上分配的)。如果一个 protocol buffer 消息有很多子 message 字段和 string 字段,分配和释放成本可能很显著。Arena 可以摊销分配成本,并让释放几乎免费;它还通过从连续内存块分配来改善内存局部性。
示例:保持 .proto 文件短小。
不要在单个 .proto 文件中放太多消息。只要依赖某个 .proto 文件中的任何内容,整个文件都会被链接器拉入,即使其中大部分都未使用。这会增加构建时间和二进制大小。可以使用扩展和 Any 来避免对包含许多消息类型的大型 .proto 文件产生硬依赖。
示例:考虑以序列化形式存储 protocol buffer,即使是在内存中。
内存中的 protobuf 对象内存占用很大(通常是 wire format 大小的 5 倍),并且可能分散在许多缓存行上。因此,如果应用需要长期保留许多 protobuf 对象,考虑以序列化形式存储它们。
示例:避免 protobuf map 字段。
Protobuf map 字段存在性能问题,通常超过它们提供的少量语法便利。优先使用从 protobuf 内容初始化的非 protobuf map:
msg.proto
1
|
map<string, bytes> env_variables = 5;
|
1
2
3
4
5
|
message Var {
string key = 1;
bytes value = 2;
}
repeated Var env_variables = 5;
|
示例:使用只包含部分字段的 protobuf 消息定义。
如果你只想访问大型消息类型中的少数字段,考虑定义自己的 protocol buffer 消息类型来模拟原始类型,但只定义你关心的字段。示例如下:
1
2
3
4
5
6
7
8
|
message FullMessage {
optional int32 field1 = 1;
optional BigMessage field2 = 2;
optional int32 field3 = 3;
repeated AnotherBigMessage field4 = 4;
...
optional int32 field100 = 100;
}
|
1
2
3
4
|
message SubsetMessage {
optional int32 field3 = 3;
optional int32 field88 = 88;
}
|
通过把序列化的 FullMessage 解析成 SubsetMessage,一百个字段中只有两个会被解析,其他字段会被视为未知字段。在合适时,可以考虑使用丢弃未知字段的 API,以进一步提升性能。
示例:尽可能复用 protobuf 对象。
在循环外声明 protobuf 对象,使其已分配存储可以在多次循环迭代之间复用。
C++ 专项建议
absl::flat_hash_map 和 absl::flat_hash_set
Absl 哈希表 通常比 C++ 标准库容器(如 std::map 和 std::unordered_map)性能更好。
示例:加速 LanguageFromCode(使用 absl::flat_hash_map 替代 __gnu_cxx::hash_map)。
languages.cc
修改前:
1
2
3
4
|
class CodeToLanguage
...
: public __gnu_cxx::hash_map<absl::string_view, i18n::languages::Language,
CodeHash, CodeCompare> {
|
修改后:
1
2
3
4
|
class CodeToLanguage
...
: public absl::flat_hash_map<absl::string_view, i18n::languages::Language,
CodeHash, CodeCompare> {
|
基准测试结果:
1
2
|
name old time/op new time/op delta
BM_CodeToLanguage 19.4ns ± 1% 10.2ns ± 3% -47.47% (p=0.000 n=8+10)
|
示例:加速统计信息发布/取消发布(这是较早的改动,因此使用 dense_hash_map,当时还没有 absl::flat_hash_map)。
publish.cc
修改前:
1
2
|
typedef hash_map<uint64, Publication*> PublicationMap;
static PublicationMap* publications = NULL;
|
修改后:
1
2
|
typedef dense_hash_map<uint64, Publication*> PublicationMap;;
static PublicationMap* publications GUARDED_BY(mu) = NULL;
|
示例:使用 dense_hash_map 而不是 hash_map 跟踪 SelectServer alarm(今天会使用 absl::flat_hash_map)。
alarmer.h
修改前:
1
|
typedef hash_map<int, Alarm*> AlarmList;
|
修改后:
1
|
typedef dense_hash_map<int, Alarm*> AlarmList;
|
absl::btree_map 和 absl::btree_set
absl::btree_map 和 absl::btree_set 会在每个树节点中存储多个条目。相比 C++ 标准库有序容器(如 std::map),这有几个优势。首先,指向子树节点的指针开销通常会显著降低。其次,由于给定 btree 节点中的条目或 key/value 在内存中连续存储,缓存效率通常也显著更好。
示例:使用 btree_set 而不是 std::set 表示使用非常频繁的工作队列。
register_allocator.h
1
|
using container_type = std::set<WorklistItem>;
|
1
|
using container_type = absl::btree_set<WorklistItem>;
|
util::bitmap::InlinedBitVector
util::bitmap::InlinedBitvector 可以内联存储较短的位向量,因此通常比 std::vector<bool> 或其他 bitmap 类型更合适。
示例:使用 InlinedBitVector 而不是 std::vector,再用 FindNextBitSet 查找下一个关注项。
block_encoder.cc
修改前:
1
2
3
4
5
6
|
vector<bool> live_reads(nreads);
...
for (int offset = 0; offset < b_.block_width(); offset++) {
...
for (int r = 0; r < nreads; r++) {
if (live_reads[r]) {
|
修改后:
1
2
3
4
5
6
|
util::bitmap::InlinedBitVector<4096> live_reads(nreads);
...
for (int offset = 0; offset < b_.block_width(); offset++) {
...
for (size_t r = 0; live_reads.FindNextSetBit(&r); r++) {
DCHECK(live_reads[r]);
|
absl::InlinedVector
absl::InlinedVector 会内联存储少量元素(数量通过第二个模板参数配置)。这让不超过该数量的小 vector 通常拥有更好的缓存效率,并在元素数量较少时完全避免分配后备存储数组。
示例:在多处使用 InlinedVector 而不是 std::vector。
bundle.h
1
2
3
4
5
6
7
8
|
class Bundle {
public:
...
private:
// (已分槽指令,未分槽立即数操作数)序列。
std::vector<InstructionRecord> instructions_;
...
};
|
1
2
3
4
5
6
7
8
|
class Bundle {
public:
...
private:
// (已分槽指令,未分槽立即数操作数)序列。
absl::InlinedVector<InstructionRecord, 2> instructions_;
...
};
|
gtl::vector32
通过使用只支持 32 位大小范围的定制 vector 类型来节省空间。
示例:简单类型变更为 Spanner 节省约 8TiB 内存。
table_ply.h
修改前:
1
2
3
4
5
6
7
8
9
10
|
class TablePly {
...
// 返回此文件中为该表存储的数据列集合。
const std::vector<FamilyId>& modified_data_columns() const {
return modified_data_columns_;
}
...
private:
...
std::vector<FamilyId> modified_data_columns_; // 表中的数据列。
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
|
#include "util/gtl/vector32.h"
...
// 返回此文件中为该表存储的数据列集合。
absl::Span<const FamilyId> modified_data_columns() const {
return modified_data_columns_;
}
...
...
// 表中的数据列。
gtl::vector32<FamilyId> modified_data_columns_;
|
gtl::small_map
gtl::small_map 使用内联数组存储最多一定数量的唯一 key-value 元素;当空间不足时,会自动升级为由用户指定的 map 类型作为后备。
示例:在 tflite_model 中使用 gtl::small_map。
tflite_model.cc
修改前:
1
|
using ChoiceIdToContextMap = gtl::flat_hash_map<int, TFLiteContext*>;
|
修改后:
1
2
|
using ChoiceIdToContextMap =
gtl::small_map<gtl::flat_hash_map<int, TFLiteContext*>>;
|
gtl::small_ordered_set
gtl::small_ordered_set 是针对关联容器(如 std::set 或 absl::btree_multiset)的优化。它使用固定数组存储一定数量的元素;空间不足时,再退回到 set 或 multiset。对于通常较小的 set,这可能比直接使用 set 快得多,因为 set 是为大型数据集优化的。该改动减少了缓存占用,并缩短了临界区长度。
示例:使用 gtl::small_ordered_set 保存 listener 集合。
broadcast_stream.h
修改前:
1
2
3
4
5
6
|
class BroadcastStream : public ParsedRtpTransport {
...
private:
...
std::set<ParsedRtpTransport*> listeners_ ABSL_GUARDED_BY(listeners_mutex_);
};
|
修改后:
1
2
3
4
5
6
7
|
class BroadcastStream : public ParsedRtpTransport {
...
private:
...
using ListenersSet =
gtl::small_ordered_set<std::set<ParsedRtpTransport*>, 10>;
ListenersSet listeners_ ABSL_GUARDED_BY(listeners_mutex_);
|
gtl::intrusive_list
gtl::intrusive_list<T> 是双向链表,链表指针嵌入在 T 类型元素内部。相比 std::list<T*>,它为每个元素节省一个缓存行和一次间接访问。
示例:使用 intrusive_list 跟踪每个索引行更新的进行中请求。
row-update-sender-inflight-set.h
修改前:
1
|
std::set<int64> inflight_requests_ GUARDED_BY(mu_);
|
修改后:
1
2
3
4
5
6
7
|
class SeqNum : public gtl::intrusive_link<SeqNum> {
...
int64 val_ = -1;
...
};
...
gtl::intrusive_list<SeqNum> inflight_requests_ GUARDED_BY(mu_);
|
限制 absl::Status 和 absl::StatusOr 的使用
虽然 absl::Status 和 absl::StatusOr 类型相当高效,但即使在成功路径上也有非零开销。因此,对于不需要返回有意义错误详情(甚至可能永远不会失败)的热点例程,应避免使用它们:
示例:避免 RoundUpToAlignment() 函数返回 StatusOr。
best_fit_allocator.cc
修改前:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
absl::StatusOr<int64> BestFitAllocator::RoundUpToAlignment(int64 bytes) const {
TPU_RET_CHECK_GE(bytes, 0);
const int64 max_aligned = MathUtil::RoundDownTo<int64>(
std::numeric_limits<int64>::max(), alignment_in_bytes_);
if (bytes > max_aligned) {
return util::ResourceExhaustedErrorBuilder(ABSL_LOC)
<< "Attempted to allocate "
<< strings::HumanReadableNumBytes::ToString(bytes)
<< " which after aligning to "
<< strings::HumanReadableNumBytes::ToString(alignment_in_bytes_)
<< " cannot be expressed as an int64.";
}
return MathUtil::RoundUpTo<int64>(bytes, alignment_in_bytes_);
}
|
best_fit_allocator.h
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
|
// 将 bytes 向上取整到 alignment_ 的最近倍数。
// 要求:bytes >= 0。
// 要求:结果不会溢出 int64。
// 要求:alignment_in_bytes_ 是 2 的幂(在构造函数中检查)。
int64 RoundUpToAlignment(int64 bytes) const {
DCHECK_GE(bytes, 0);
DCHECK_LE(bytes, max_aligned_bytes_);
int64 result =
((bytes + (alignment_in_bytes_ - 1)) & ~(alignment_in_bytes_ - 1));
DCHECK_EQ(result, MathUtil::RoundUpTo<int64>(bytes, alignment_in_bytes_));
return result;
}
|
示例:添加 ShapeUtil::ForEachIndexNoStatus,避免为 tensor 的每个元素创建 Status 返回对象。
shape_util.h
修改前:
1
2
3
4
5
6
7
|
using ForEachVisitorFunction =
absl::FunctionRef<StatusOr<bool>(absl::Span<const int64_t>)>;
...
static void ForEachIndex(const Shape& shape, absl::Span<const int64_t> base,
absl::Span<const int64_t> count,
absl::Span<const int64_t> incr,
const ForEachVisitorFunction& visitor_function);
|
修改后:
1
2
3
4
5
6
7
|
using ForEachVisitorFunctionNoStatus =
absl::FunctionRef<bool(absl::Span<const int64_t>)>;
...
static void ForEachIndexNoStatus(
const Shape& shape, absl::Span<const int64_t> base,
absl::Span<const int64_t> count, absl::Span<const int64_t> incr,
const ForEachVisitorFunctionNoStatus& visitor_function);
|
literal.cc
修改前:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
ShapeUtil::ForEachIndex(
result_shape, [&](absl::Span<const int64_t> output_index) {
for (int64_t i = 0, end = dimensions.size(); i < end; ++i) {
scratch_source_index[i] = output_index[dimensions[i]];
}
int64_t dest_index = IndexUtil::MultidimensionalIndexToLinearIndex(
result_shape, output_index);
int64_t source_index = IndexUtil::MultidimensionalIndexToLinearIndex(
shape(), scratch_source_index);
memcpy(dest_data + primitive_size * dest_index,
source_data + primitive_size * source_index, primitive_size);
return true;
});
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
ShapeUtil::ForEachIndexNoStatus(
result_shape, [&](absl::Span<const int64_t> output_index) {
// 计算 dest_index。
int64_t dest_index = IndexUtil::MultidimensionalIndexToLinearIndex(
result_shape, result_minor_to_major, output_index);
// 计算 source_index。
int64_t source_index;
for (int64_t i = 0, end = dimensions.size(); i < end; ++i) {
scratch_source_array[i] = output_index[dimensions[i]];
}
if (src_shape_dims == 1) {
// 该情况的快速路径。
source_index = scratch_source_array[0];
DCHECK_EQ(source_index,
IndexUtil::MultidimensionalIndexToLinearIndex(
src_shape, src_minor_to_major, scratch_source_span));
} else {
source_index = IndexUtil::MultidimensionalIndexToLinearIndex(
src_shape, src_minor_to_major, scratch_source_span);
}
// 将一个元素从 source 中的 source_index 移到 dest 中的 dest_index。
memcpy(dest_data + PRIMITIVE_SIZE * dest_index,
source_data + PRIMITIVE_SIZE * source_index, PRIMITIVE_SIZE);
return true;
});
|
示例:在 TF_CHECK_OK 中,避免为了测试 ok() 而创建 Ok 对象。
status.h
修改前:
1
2
|
#define TF_CHECK_OK(val) CHECK_EQ(::tensorflow::Status::OK(), (val))
#define TF_QCHECK_OK(val) QCHECK_EQ(::tensorflow::Status::OK(), (val))
|
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
extern tensorflow::string* TfCheckOpHelperOutOfLine(
const ::tensorflow::Status& v, const char* msg);
inline tensorflow::string* TfCheckOpHelper(::tensorflow::Status v,
const char* msg) {
if (v.ok()) return nullptr;
return TfCheckOpHelperOutOfLine(v, msg);
}
#define TF_CHECK_OK(val) \
while (tensorflow::string* _result = TfCheckOpHelper(val, #val)) \
LOG(FATAL) << *(_result)
#define TF_QCHECK_OK(val) \
while (tensorflow::string* _result = TfCheckOpHelper(val, #val)) \
LOG(QFATAL) << *(_result)
|
示例:从远程过程调用(RPC)的热点路径中移除 StatusOr。
从热点路径移除 StatusOr,消除了早前改动在 RPC 基准测试中造成的 14% CPU 回退。
privacy_context.h
修改前:
1
2
|
absl::StatusOr<privacy::context::PrivacyContext> GetRawPrivacyContext(
const CensusHandle& h);
|
privacy_context_statusfree.h
修改后:
1
2
3
4
5
6
7
8
9
10
11
|
enum class Result {
kSuccess,
kNoRootScopedData,
kNoPrivacyContext,
kNoDDTContext,
kDeclassified,
kNoPrequestContext
};
...
Result GetRawPrivacyContext(const CensusHandle& h,
PrivacyContext* privacy_context);
|
批量操作
如果可能,应一次处理多个项目,而不是一次只处理一个。
示例:absl::flat_hash_map 使用单条 SIMD 指令,对一组 key 中每个 key 比较一个哈希字节。
参见 Swiss Table 设计说明,以及 Matt Kulukundis 相关的 CppCon 2017 和 CppCon 2019 演讲。
raw_hash_set.h
修改后:
1
2
3
4
5
6
|
// 返回位掩码,表示与 hash 匹配的 slot 位置。
BitMask<uint32_t> Match(h2_t hash) const {
auto ctrl = _mm_loadu_si128(reinterpret_cast<const __m128i*>(pos));
auto match = _mm_set1_epi8(hash);
return BitMask<uint32_t>(_mm_movemask_epi8(_mm_cmpeq_epi8(match, ctrl)));
}
|
示例:用单次操作处理多个字节并进行修正,而不是逐字节检查该怎么做。
ordered-code.cc
修改前:
1
2
3
4
5
6
7
8
9
|
int len = 0;
while (val > 0) {
len++;
buf[9 - len] = (val & 0xff);
val >>= 8;
}
buf[9 - len - 1] = (unsigned char)len;
len++;
FastStringAppend(dest, reinterpret_cast<const char*>(buf + 9 - len), len);
|
修改后:
1
2
3
4
5
|
BigEndian::Store(val, buf + 1); // buf[0] 可能需要用于长度。
const unsigned int length = OrderedNumLength(val);
char* start = buf + 9 - length - 1;
*start = length;
AppendUpto9(dest, start, length + 1);
|
示例:通过更高效地分块处理多个交错输入缓冲区,提升 Reed-Solomon 处理速度。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
Run on (12 X 3501 MHz CPUs); 2016-09-27T16:04:55.065995192-04:00
CPU: Intel Haswell with HyperThreading (6 cores) dL1:32KB dL2:256KB dL3:15MB
Benchmark Base (ns) New (ns) Improvement
------------------------------------------------------------------
BM_OneOutput/3/2 466867 351818 +24.6%
BM_OneOutput/4/2 563130 474756 +15.7%
BM_OneOutput/5/3 815393 688820 +15.5%
BM_OneOutput/6/3 897246 780539 +13.0%
BM_OneOutput/8/4 1270489 1137149 +10.5%
BM_AllOutputs/3/2 848772 642942 +24.3%
BM_AllOutputs/4/2 1067647 638139 +40.2%
BM_AllOutputs/5/3 1739135 1151369 +33.8%
BM_AllOutputs/6/3 2045817 1456744 +28.8%
BM_AllOutputs/8/4 3012958 2484937 +17.5%
BM_AllOutputsSetUpOnce/3/2 717310 493371 +31.2%
BM_AllOutputsSetUpOnce/4/2 833866 600060 +28.0%
BM_AllOutputsSetUpOnce/5/3 1537870 1137357 +26.0%
BM_AllOutputsSetUpOnce/6/3 1802353 1398600 +22.4%
BM_AllOutputsSetUpOnce/8/4 3166930 2455973 +22.4%
|
示例:一次解码四个整数(约 2004 年)。
引入 GroupVarInt 格式,它一次用 5-17 字节编码/解码一组 4 个变长整数,而不是一次处理一个整数。用新格式解码一组 4 个整数所需时间约为分别解码 4 个 varint 编码整数的 1/3。
groupvarint.cc
修改后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
const char* DecodeGroupVar(const char* p, int N, uint32* dest) {
assert(groupvar_initialized);
assert(N % 4 == 0);
while (N) {
uint8 tag = *p;
p++;
uint8* lenptr = &groupvar_table[tag].length[0];
#define GET_NEXT \
do { \
uint8 len = *lenptr; \
*dest = UNALIGNED_LOAD32(p) & groupvar_mask[len]; \
dest++; \
p += len; \
lenptr++; \
} while (0)
GET_NEXT;
GET_NEXT;
GET_NEXT;
GET_NEXT;
#undef GET_NEXT
N -= 4;
}
return p;
}
|
示例:一次编码一组 4 个 k 位数字。
添加 KBitStreamEncoder 和 KBitStreamDecoder 类,用于一次将 4 个 k 位数字编码/解码到位流中。由于 K 在编译期已知,编码和解码可以非常高效。例如,由于一次编码 4 个数字,代码可以假设流始终是字节对齐(k 为偶数)或半字节对齐(k 为奇数)。
展示多种技巧的 CL
有时单个 CL 会包含多个性能改进,并使用前面提到的许多技巧。观察这些 CL 中的改动类型,有助于培养一种思维方式:当系统某部分被确认是瓶颈后,如何做出通用改动来提升其性能。
示例:将 GPU 内存分配器加速约 40%。
GPUBFCAllocator 的分配/释放速度提升 36-48%:
-
使用句柄编号而不是 Chunk 指针来标识 chunk。Chunk 数据结构现在分配在 vector 中,句柄是该 vector 中指向特定 chunk 的索引。这样 Chunk 中的 next 和 prev 指针可以是 ChunkHandle(4 字节),而不是 Chunk*(8 字节)。
-
当 Chunk 对象不再使用时,我们维护一个 Chunk 对象空闲链表,链表头由 ChunkHandle free_chunks_list_ 指定,并通过 Chunk->next 指向下一个空闲条目。结合第 1 点,这使我们在分配器中可以避免对 Chunk 对象进行堆分配/释放,除非 vector 偶尔增长。同时,所有 Chunk 对象的内存也变成连续的。
-
不再让 bins_ 数据结构使用 std::set,也不再通过 lower_bound 根据 byte_size 查找合适的 bin;我们改用 bin 数组,并用 log₂(byte_size/256) 这样的函数做索引。这样可以用几个位操作定位 bin,而不是做二叉搜索树查找。它还允许把所有 Bin 数据结构的存储分配在连续数组中,而不是分散在许多不同缓存行上。这减少了多线程执行分配时需要在核心之间移动的缓存行数量。
-
为 GPUBFCAllocator::AllocateRaw 添加快速路径,先尝试在不涉及 retry_helper_ 的情况下分配内存。如果初次尝试失败(返回 nullptr),再进入 retry_helper_;但通常可以避免多层过程调用,以及带多个参数的 std::function 分配/释放。
-
注释掉大多数 VLOG 调用。需要调试时,可以有选择地取消注释并重新编译。
添加多线程基准测试,用于测试竞争下的分配。
在我配有 Titan X 显卡的台式机上,ptb_word_lm 从每秒 8036 个词提升到每秒 8272 个词(+2.9%)。
1
2
3
4
5
6
7
8
9
10
11
12
|
Run on (40 X 2801 MHz CPUs); 2016/02/16-15:12:49
CPU: Intel Ivybridge with HyperThreading (20 cores) dL1:32KB dL2:256KB dL3:25MB
Benchmark Base (ns) New (ns) Improvement
------------------------------------------------------------------
BM_Allocation 347 184 +47.0%
BM_AllocationThreaded/1 351 181 +48.4%
BM_AllocationThreaded/4 2470 1975 +20.0%
BM_AllocationThreaded/16 11846 9507 +19.7%
BM_AllocationDelayed/1 392 199 +49.2%
BM_AllocationDelayed/10 285 169 +40.7%
BM_AllocationDelayed/100 245 149 +39.2%
BM_AllocationDelayed/1000 238 151 +36.6%
|
示例:通过一组杂项改动,将 Pathways 吞吐量提升约 20%。
-
将一批特殊的快速 descriptor 解析函数统一成单个 ParsedDescriptor 类,并在更多地方使用该类,以避免昂贵的完整解析调用。
-
将几个 protocol buffer 字段从 string 改为 bytes(避免不必要的 UTF-8 检查及相关错误处理代码)。
-
DescriptorProto.inlined_contents 现在是 string,而不是 Cord(预期只用于较小的 tensor)。这需要在 tensor_util.cc 中增加一批复制辅助函数(现在需要同时支持 string 和 Cord)。
-
在几个地方使用 flat_hash_map 替代 std::unordered_map。
-
添加 MemoryManager::LookupMany 供 Stack op 使用,而不是为每个批元素调用 Lookup。该改动减少了加锁等设置开销。
-
移除 TransferDispatchOp 中一些不必要的字符串创建。
-
在同一进程内,从一个组件向另一个组件传输一批 1000 个 1KB tensor 的性能结果:
1
2
|
修改前: 227.01 steps/sec
After: 272.52 steps/sec (+20% throughput)
|
示例:通过一系列改动,使 XLA 编译器性能提升约 15%。
一些用于加速 XLA 编译的改动:
-
在 SortComputationsByContent 中,如果比较函数里 a == b,则返回 false,以避免序列化和 fingerprint 长 computation 字符串。
-
将 CHECK 改为 DCHECK,避免在 HloComputation::ComputeInstructionPostOrder 中触碰额外缓存行。
-
避免在 CoreSequencer::IsVectorSyncHoldSatisfied() 中对 front instruction 做昂贵复制。
-
重写双参数 HloComputation::ToString 和 HloComputation::ToCord 例程,让大部分工作基于追加到 std::string,而不是追加到 Cord。
-
修改 PerformanceCounterSet::Increment,只执行一次哈希表查找,而不是两次。
-
精简 Scoreboard::Update 代码。
一个重要模型的 XLA 编译时间整体加速 14%。
示例:加速 Google Meet 应用代码中的低层日志。
加速位于每个数据包关键路径上的 ScopedLogId。
-
移除看起来只是用于观察不变量是否被破坏的 LOG_EVERY_N(ERROR, ...) 消息。
-
内联 PushLogId 和 PopLogid() 例程(因为去掉 LOG_EVERY_N_SECONDS(ERROR, ...) 语句后,它们已经足够小,可以内联)。
-
改用大小为 4 的固定数组和一个 int size 变量来维护线程本地状态。
由于它本来就从不会超过大小 4,InlinedVector 的功能比实际需要更通用。
1
2
3
4
5
6
7
8
9
10
11
|
Base: Baseline plus the code in scoped_logid_test.cc to add the benchmark
New: This changelist
CPU: Intel Ivybridge with HyperThreading (20 cores) dL1:32KB dL2:256KB dL3:25MB
Benchmark Base (ns) New (ns) Improvement
----------------------------------------------------------------------------
BM_ScopedLogId/threads:1 8 4 +52.6%
BM_ScopedLogId/threads:2 8 4 +51.9%
BM_ScopedLogId/threads:4 8 4 +52.9%
BM_ScopedLogId/threads:8 8 4 +52.1%
BM_ScopedLogId/threads:16 11 6 +44.0%
|
示例:通过改进 Shape 处理,将 XLA 编译时间减少约 31%。
若干用于改善 XLA 编译器性能的改动:
-
通过几种方式改善 ShapeUtil::ForEachIndex... 迭代性能:
-
在 ShapeUtil::ForEachState 中,只保存 span 所表示数组的指针,而不是完整 span 对象。
-
预先形成指向 ShapeUtil::ForEachState::indexes vector 的 ShapeUtil::ForEachState::indexes_span,而不是在每次循环迭代中从 vector 构造 span。
-
保存指向 ShapeUtil::ForEachState::indexes vector 后备存储的 ShapeUtil::ForEachState::indexes_ptr 指针,让 ShapeUtil::ForEachState::IncrementDim() 可以使用简单数组操作,而不是更昂贵的 vector::operator[]。
-
保存 ShapeUtil::ForEachState::minor_to_major 数组指针,在构造函数中通过 shape.layout().minor_to_major().data() 初始化,而不是在每次迭代中对每个维度调用 LayoutUtil::Minor(...)。
-
内联 ShapeUtil::ForEachState 构造函数和 ShapeUtil::ForEachState::IncrementDim() 例程。
-
对不需要传入函数返回 Status 的调用点,改善 ShapeUtil::ForEachIndex 迭代性能。做法是引入 ShapeUtil::ForEachIndexNoStatus 变体,它接受 ForEachVisitorFunctionNoStatus(返回普通 bool)。这比接受 ForEachVisitorFunction 的 ShapeUtil::ForEachIndex 更快,因为后者返回 StatusOr,会在迭代的每个元素上产生昂贵的 StatusOr 析构调用。
-
在 LiteralBase::Broadcast 和 GenerateReduceOutputElement 中使用这个 ShapeUtil::ForEachIndexNoStatus 变体。
-
通过几种方式改善 LiteralBase::Broadcast 性能:
-
在 literal.cc 中引入模板化的 BroadcastHelper 例程,并针对不同 primitive 字节大小特化。没有这个改动时,primitive_size 是运行时变量,编译器无法很好优化每个元素上的 memcpy,会调用假设字节数较大的通用 memcpy 路径,尽管这里的大小只是很小的 2 的幂(通常是 1、2、4 或 8)。
-
在 LiteralBase::Broadcast 例程开始处只调用一次 shape(),从而避免每次 Broadcast 调用中约 5 + num_dimensions + num_result_elements 次虚调用中的绝大部分。散落各处、看起来无害的 shape() 调用最终会变成 root_piece().subshape(),其中 subshape() 是虚函数。
-
在 BroadcastHelper 例程中,对源维度为 1 的情况做特殊处理,避免在这种情况下调用 IndexUtil::MultiDimensionalIndexToLinearIndex。
-
在 BroadcastHelper 中,使用指向 scratch_source_index vector 后备存储的 scratch_source_array 指针变量,并直接使用它来避免逐元素代码中的 vector::operator[] 操作。还在逐元素循环外预计算指向 scratch_source_index vector 的 scratch_source_span,避免为每个元素从 vector 构造 span。
-
引入三参数版 IndexUtil::MultiDimensionalIndexToLinearIndex,由调用方传入与 shape 参数相关的 minor_to_major span。在 BroadcastHelper 中使用它,让 src 和 dst shape 每次 Broadcast 只计算一次,而不是每复制一个元素计算一次。
-
在 ShardingPropagation::GetShardingFromUser 中,对于 HloOpcode::kTuple 情况,只有在找到感兴趣的 operand 后才调用 user.sharding().GetSubSharding(...)。避免急切调用,让某次长编译中该例程的 CPU 时间从 43.7s 降到 2.0s。
-
为 ShapeUtil::ForEachIndex、Literal::Broadcast 和新的 ShapeUtil::ForEachIndexNoStatus 添加基准测试。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
Base is with the benchmark additions of
BM_ForEachIndex and BM_BroadcastVectorToMatrix (and BUILD file change to add
benchmark dependency), but no other changes.
New is this cl
Run on (72 X 1357.56 MHz CPU s) CPU Caches: L1 Data 32 KiB (x36)
L1 Instruction 32 KiB (x36) L2 Unified 1024 KiB (x36) L3 Unified 25344 KiB (x2)
Benchmark Base (ns) New (ns) Improvement
----------------------------------------------------------------------------
BM_MakeShape 18.40 18.90 -2.7%
BM_MakeValidatedShape 35.80 35.60 +0.6%
BM_ForEachIndex/0 57.80 55.80 +3.5%
BM_ForEachIndex/1 90.90 85.50 +5.9%
BM_ForEachIndex/2 1973606 1642197 +16.8%
|
新增的 ForEachIndexNoStatus 明显快于 ForEachIndex 变体(它只存在于这个新 CL 中,但 BM_ForEachIndexNoStatus/NUM 完成的基准工作与上面的 BM_ForEachIndex/NUM 结果可比)。
1
2
3
4
5
|
Benchmark Base (ns) New (ns) Improvement
----------------------------------------------------------------------------
BM_ForEachIndexNoStatus/0 0 46.90 ----
BM_ForEachIndexNoStatus/1 0 65.60 ----
BM_ForEachIndexNoStatus/2 0 1001277 ----
|
Broadcast 性能提升约 58%。
1
2
3
4
5
|
Benchmark Base (ns) New (ns) Improvement
----------------------------------------------------------------------------
BM_BroadcastVectorToMatrix/16/16 5556 2374 +57.3%
BM_BroadcastVectorToMatrix/16/1024 319510 131075 +59.0%
BM_BroadcastVectorToMatrix/1024/1024 20216949 8408188 +58.4%
|
对大型语言模型进行提前编译的宏观结果(程序不只做 XLA 编译,但略少于一半时间花在 XLA 相关代码中):
基线程序总体耗时:573 秒。使用此 CL 后总体耗时:465 秒(+19% 改进)。
运行该程序时,编译两个最大 XLA 程序所花时间:
基线:141s + 143s = 284s。使用此 CL:99s + 95s = 194s(+31% 改进)。
示例:在 Plaque(一个分布式执行框架)中,将大型程序编译时间减少约 22%。
通过小幅调整让编译加速约 22%。
-
加速检测两个节点是否共享同一个来源。
以前我们会按排序顺序获取每个节点的来源,然后做有序求交。
现在我们把一个节点的来源放入哈希表,
再遍历另一个节点的来源并查询该哈希表。
-
在步骤 1 中复用同一个临时哈希表。
-
生成已编译 proto 时,维护一个以 pair<package, opname> 为 key 的单个 btree,而不是 btree 的 btree。
-
在前述 btree 中存储指向 opdef 的指针,而不是复制 opdef。
大型程序(约 45K op)上的速度测量:
1
2
|
name old time/op new time/op delta
BM_CompileLarge 28.5s ± 2% 22.4s ± 2% -21.61% (p=0.008 n=5+5)
|
示例:MapReduce 改进(wordcount 基准约 2 倍加速)。
MapReduce 加速点:
-
修改了 SafeCombinerMapOutput 类的 combiner 数据结构。以前使用 hash_multimap,表中插入的每个唯一 key/value 都有一个哈希表条目;现在改用 hash_map(其中 ValuePtr 是值和重复计数的链表)。这有三方面帮助:
-
它显著减少内存使用,因为每个值只使用 sizeof(ValuePtr) + value_len 字节,而不是 sizeof(SafeCombinerKey) + sizeof(StringPiece) + value_len + 新哈希表条目开销。这意味着 reducer buffer 刷新的频率更低。
-
它显著更快,因为当为表中已有 key 插入新值时,我们避免了额外哈希表条目,而是把该值挂到该 key 的值链表中。
-
由于链表中的每个值都关联一个重复计数,因此可以表示如下序列:
1
2
3
4
5
|
Output(key, "1");
Output(key, "1");
Output(key, "1");
Output(key, "1");
Output(key, "1");
|
作为 key 对应链表中的单个条目,重复计数为 5。内部会向用户级 combining 函数产出五次 1。(类似技巧或许也可以应用在 reduce 端。)
-
(小改动)在默认 MapReductionBase::KeyFingerprintSharding 函数中添加 nshards == 1 测试;如果只使用 1 个 reduce shard,就完全避免对 key 计算 fingerprint(因为此时无需检查 key,直接返回 0 即可)。
-
在每次向 combiner 添加 key/value 都会调用的代码路径中,将一些 VLOG(3) 语句改为 DVLOG(3)。
将某个 wordcount 基准的耗时从 12.56s 降到 6.55s。
示例:重写 SelectServer 中的 alarm 处理代码,显著提升性能(添加+删除一个 alarm 从 771 ns 降到 271 ns)。
重写了 SelectServer 中的 alarm 处理代码,以显著提升其性能。
改动:
-
将 AlarmQueue 从 set 改为 AdjustablePriorityQueue。这显著加快了 alarm 处理,把添加和删除一个 alarm 的时间从 771 纳秒降到 281 纳秒。该改动避免了每次设置 alarm 时对 STL set 红黑树节点的一次分配/释放,并且缓存局部性也好得多(因为 AdjustablePriorityQueue 是基于 vector 实现的堆,而不是红黑树);每次通过 selectserver 循环操作 AlarmQueue 时,触碰的缓存行更少。
-
将 Alarmer 中的 AlarmList 从 hash_map 转为 dense_hash_map,避免每次添加/删除 alarm 时再发生一次分配/释放(这也改善了添加/删除 alarm 时的缓存局部性)。
-
移除 num_alarms_stat_ 和 num_closures_stat_ 这两个 MinuteTenMinuteHourStat 对象以及对应的导出变量。虽然监控这些指标看起来不错,但实践中它们会给关键网络代码增加显著开销。即使把这些变量保留为 Atomic32 而不是 MinuteTenMinuteHourStat,也会让添加和删除 alarm 的成本从 281 纳秒增加到 340 纳秒。
基准测试结果
1
2
3
|
Benchmark Time(ns) CPU(ns) Iterations
-----------------------------------------------------------
BM_AddAlarm/1 902 771 777777
|
此改动后
1
2
3
|
Benchmark Time(ns) CPU(ns) Iterations
-----------------------------------------------------------
BM_AddAlarm/1 324 281 2239999
|
示例:索引服务速度提升 3.3 倍!
2001 年,在计划从磁盘索引服务切换到内存索引服务时,我们发现了许多性能问题。这个改动修复了其中许多问题,使我们在双处理器 Pentium III 机器上的 2GB 内存索引中,从每秒 150 次内存查询提升到超过 500 次。
-
对索引块解码速度做了大量性能改进(微基准中从 8.9 MB/s 提升到 13.1 MB/s)。
-
现在我们在解码期间对块做校验和。这使得所有 getsymbol 操作都可以不做边界检查。
-
使用一些粗糙但有效的宏,在整个循环期间把 BitDecoder 的各个字段保存在局部变量中,并在循环结束后写回。
-
使用内联汇编访问 Intel 芯片上的 bsf 指令,用于 getUnary(查找一个 word 中第一个 1 bit 的索引)。
-
将值解码到 vector 时,在循环外调整 vector 大小,然后沿着 vector 移动指针,而不是每次存储值都做带边界检查的访问。
-
解码 docid 期间,将 docid 保持在本地 docid 空间中,以避免乘以 num_shards_。只有真正需要实际 docid 值时,才乘以 num_shards_ 并加上 my_shard_。
-
IndexBlockDecoder 现在导出 AdvanceToDocid 接口,返回第一个 docid ≥ d 的索引。这允许扫描基于本地 docid 进行,而不必在客户端为块中每个索引调用 GetDocid(index) 时,把每个本地 docid 强制转换为全局 docid。
-
文档位置数据现在按需解码,而不是在客户端请求块中任一文档的位置数据时,就急切解码整个块。
-
如果正在解码的索引块结尾距离页边界不到 4 字节,就把它复制到本地缓冲区。这样我们总能通过 4 字节加载读取 bit 解码缓冲区,而不必担心越过 mmap 页尾导致段错误。
-
各种评分数据结构只初始化前 nterms_ 个元素,而不是初始化所有 MAX_TERMS 元素(某些情况下,我们曾经会为每个被评分文档不必要地 memset 20K 到 100K 数据)。
-
当正在计算的值为 0 时,避免对中间评分值执行 round_to_int 以及后续计算(后续计算只是把 0 写回到我们已经 memset 为 0 的位置,而这是最常见情况)。
-
将评分数据结构上的边界检查改成 debug 模式断言。
下一篇:每周性能技巧 #97:良性生态循环
下一节