性能优化建议

本节阅读量:

本文翻译自 Abseil 官网的 Performance Hints

多年来,Jeff 和 Sanjay 在各种代码的性能调优方面进行了大量深入研究,自谷歌创立之初,提升软件的性能就一直是他们的重点工作,因为这能让谷歌为更多用户提供更多服务。他们撰写了本文,旨在总结进行此类工作时所遵循的一些通用原则和具体技巧。下面挑选出具有说明性的源代码更改(change lists,更改列表,下面简称 CLs),以举例说明各种方法和技巧。下文中的大部分具体建议都涉及 C++,但这些通用原则同样适用于其他语言。本文重点关注单个二进制文件的通用性能调优,并不涉及分布式系统或机器学习(ML)硬件性能调优(这些领域本身就非常庞大)。我们希望本文能对其他人有所帮助。

本文档中的许多示例都包含代码片段,用于演示这些技术。请注意,其中一些代码片段提到了各种内部Google代码库抽象概念。尽管如此,我们还是将其包含在内,因为我们认为这些示例足够独立,即使不熟悉这些抽象概念细节的人也能理解。

编写代码时考虑性能很重要

高德纳(Knuth)有一句话经常被断章取义地引用:“过早优化是万恶之源”。完整语句如下:“我们应该忘记大约97%时间里的小效率提升:过早优化是万恶之源。然而,我们也不应错过那关键的3%的机会。”本文讨论的就是这关键的3%,而高德纳还有一句更引人注目的名言,同样出自他的著作:

“从示例2到示例2a的速度提升仅约12%,许多人会认为这微不足道。当今许多软件工程师普遍持有的传统观念是,不必在意小的效率提升;但我认为,这只是对那些因小失大的程序员所表现出的滥用行为的过度反应,这些程序员无法调试或维护他们“优化”后的程序。在成熟的工程学科中,轻松获得的12%提升,从来不会被视为微不足道;我认为,在软件工程领域也应秉持同样的观点。当然,对于一次性任务,我不会费心进行这样的优化,但当涉及到编写高质量程序时,我不想将自己局限于那些剥夺我这种效率的工具。”

很多人会说:“先把代码写得尽可能简单,等能进行性能分析时再优化性能。”但这种方法通常并不正确:

  1. 如果在开发大型系统时完全忽视所有性能问题,最终将导致性能分布均匀、无明显热点,因为性能损失分散在各处,难以确定优化的切入点。
  2. 如果你正在开发一个将被他人使用的库,最终遇到性能问题的用户往往难以自行优化——他们需要理解其他团队或人员编写的代码细节,并与相关方协商性能优化的重要性。
  3. 当系统被大量使用时,进行重大改动将变得更为困难。
  4. 因此,难以判断哪些性能问题能通过简单方式解决,最终可能采取代价高昂的方案,例如为应对负载问题而过度复制数据,或对服务进行过度分配资源。

因此,我们建议在编写代码时,只要不会显著影响代码的可读性或复杂度,应尽量选择性能更优的实现方式。

预估

在编写代码时,若能预判当前代码对性能的影响程度,便能做出更明智的决策(例如:为提升性能值得增加多少代码复杂度)。以下是在编码过程中估算性能的实用建议:

  • 是否为测试代码?若是,主要需关注算法与数据结构的渐近复杂度(注:开发与验证的循环周期时间同样重要,避免编写运行时间过长的测试)。
  • 是否为应用专属代码?若是,此时需判断该段代码的性能敏感度——通常无需过度复杂化:只需区分是初始化/配置代码(如服务启动流程),还是将频繁执行的热点路径代码(如处理每个请求的逻辑),即可基本确定优化优先级。
  • 是否为多应用共用的库代码?此类代码的性能敏感度难以预判,因此更需遵循本文所述的简单优化原则。例如:当需存储通常元素较少的vector时,应使用 absl::InlinedVector 而非 std::vector。此类优化手段易于实现,且不会增加系统全局复杂度。与其后续发现代码实际消耗大量资源,不如从一开始就具备更高性能,这样在性能分析时也更易定位下一轮优化点。

在选择性能特性可能存在差异的方案时,可借助粗略估算进行更深入的分析。此类估算能快速提供不同方案性能的粗略对比,从而在无需实际实现的情况下,直接排除部分备选方案。

以下是此类估算的具体操作步骤:

  1. 估算各类底层操作的执行次数。例如:磁盘寻道次数、网络往返次数、传输字节数等。
  2. 计算每类高成本操作的估算代价并求和。将各类操作的估算成本(如每磁盘寻道耗时、每网络往返延迟)相乘后累加,得出系统资源消耗的总成本。
  3. 前面两点所述的消耗,指的是资源消耗。如果你更关心延时,并且系统存在并发处理的情况,一些操作可能会重叠执行(实际延迟可能低于简单累加值),此时需进行更精细的分析。

下面的表格,列出了每种操作的大概消耗,可能对你有所参考:

L1 cache 访问                                 0.5 ns
L2 cache 访问                                   3 ns
分支预测失败                                     5 ns
加锁/解锁 (无竞争)                               15 ns
内存访问                                        50 ns
使用 Snappy 压缩 1 KB 数据                    1,000 ns
从 SSD 读取 4 KB 数据                        20,000 ns
同一数据中心内往返(延迟                       50,000 ns
从内存顺序读 1MB 数据                         64,000 ns
从 100 Gbps 网络读取 1MB 数据                100,000 ns
从 SSD 读取 1MB 数据                      1,000,000 ns
磁盘寻道                                  5,000,000 ns
从磁盘顺序读 1MB 数据                     10,000,000 ns
跨洲际网络传输                            150,000,000 ns

前表列出了部分基础底层操作的粗略成本。您可能还需要跟踪与您的系统相关的高层操作的预估成本。例如,您可能需要了解从SQL数据库中读取单条数据的粗略成本、与云服务交互的延迟时间,或渲染简单HTML页面所需时间。如果您不了解不同操作的相关成本,就无法进行合理的粗略估算!

样例: 对 10 亿 4 字节数据进行快速排序的耗时预估

一个高效的快速排序算法需对规模为N的数组进行约log(N)次遍历。每次遍历中,数组内容将从内存流式传输至处理器缓存,分区代码会将每个元素与基准元素比较一次。我们汇总主要成本如下:

  1. 内存带宽:数组占用4 GB(4字节/元素 × 10亿元素)。假设 CPU 单核内存带宽约16 GB/s,单次遍历耗时约0.25秒。当 N 为 2^30 时,需约 30 次遍历,总内存传输成本约为 7.5 秒。
  2. 分支预测错误成本:总比较次数为 N × log(N) = 300 亿次,假设其中 50%(150亿次)预测错误。每次错误预测成本5纳秒,总错误成本 = 150亿 × 5纳秒 = 75秒(注:本分析假设正确预测分支成本为零)。
  3. 总估算成本: 7.5秒(内存访问) + 75秒(分支预测错误) = 约82.5秒

若需进一步优化分析以纳入处理器缓存影响(尽管基于前述分析,分支预测错误成本已占主导,此优化实际必要性较低),仍可作为示例补充说明:假设系统配备32MB L3缓存,且数据从L3缓存传输至处理器的开销可忽略不计。L3缓存可容纳2^23个元素(约838万元素),因此最后22次遍历可直接在缓存中操作数据(倒数第23次遍历将数据载入L3缓存,后续遍历均基于缓存数据执行)。由此内存传输成本从7.5秒(30次传输)降至2.5秒(10次传输:4GB × 10 ÷ 16GB/s)。

样例: 加载有 30 张图片的网页

对比两种设计方案(假设原始图像存储于磁盘,单张图像约1MB):

  1. 串行读取: 每张图像需执行1次寻道+1次传输,总耗时5ms(寻道)+ 10ms(传输)= 15ms/张。30张图像总耗时:30 × 15ms = 450ms
  2. 并行读取(图像均匀分布于K个磁盘):资源使用估算不变,但延迟将降低约K倍(忽略方差影响,例如某磁盘可能承载多于1/K的图像)。若在数百个磁盘的分布式文件系统上运行,预期延迟降至约15ms
  3. 单SSD方案(所有图像存于单块SSD):顺序读取性能优化为20μs(寻道)+ 1ms(传输)/张,总耗时 约为 30ms(30 × 1.02ms)

测量

前一节已给出一些关于编写代码时如何思考性能的建议。然而,在实际开始优化或遇到涉及性能、简洁性等多方面权衡的情况之前,您需要评估潜在的性能收益。有效的衡量性能是进行相关工作时最需要掌握的首要工具。

顺便提一下,对不熟悉的代码进行性能分析,也是了解代码库结构及运行方式的有效途径。检查程序动态调用图中频繁调用的函数及对应代码,能够帮助您对代码运行时的整体流程形成高层次理解,从而增强您在稍不熟悉的代码中实施性能优化的信心。

性能分析工具与实用技巧

有许多可用的性能分析工具。首先,使用 pprof,它提供良好的高层次性能信息,并且易于在本地和生产代码中使用。此外,如果需要更详细的性能信息,请尝试 perf

这里有一些使用 pprof 的实用技巧:

  • 使用合适的调试和优化 flags 来构建生产二进制文件。
  • 如果可能,请编写一个覆盖你正在优化的代码的「微型基准测试」。这能够缩短性能改进的反馈时间,帮助验证性能改进的效果,并有助于防止未来的性能退化。然而,微型基准测试可能存在「陷阱」,导致其无法代表整个系统的实际性能。可以搜索「cpp benchmarks」这个库来辅助编写微型基准测试。
  • 使用基准测试库来「输出性能计数器读数」,不仅能提高精度,还能更深入地了解程序行为。
  • 锁竞争常常会降低CPU使用率。某些互斥锁的实现支持锁竞争分析。
  • 可以使用「xprof」进行机器学习性能分析。

当性能分析没有发现热点怎么办

你经常会遇到一些情况,此时你的CPU性能分析图显示为平缓状态(没有明显的性能瓶颈)。这通常代表所有容易获取的优化点都已经处理完毕。如果你发现自己处于这种状况,可以考虑以下一些建议:

  • 不要低估许多小优化的价值!在某个子系统中实现二十次单独的1%改进非常有可能,整体上意味着相当可观的提升(这类工作的开展通常依赖于稳定且高质量的微基准测试)。
  • 在调用栈的顶部附近查找循环(火焰图可帮助分析)。潜在地,该循环或其调用的代码可能通过重构来提高效率。例如,你可能在循环里遍历输入的节点和边,逐步构建复杂图结构,这可以修改为一次性传递整个输入来构建图结构。这可以消除初始化图的代码中每条边初始化时大量内部检查。
  • 后退一步,关注调用栈高层的结构性变化,而不是纠结于微小的优化。下面列出的算法改进技术在进行此类分析时可能会有所帮助。
  • 查找过于通用的代码,并用定制化或更底层的实现方式替换。例如,如果某个应用程序反复使用正则表达式进行匹配,而简单的前缀匹配就已足够,则应考虑停止使用正则表达式。
  • 尝试减少内存分配次数,并优先处理对分配次数贡献最大的部分。这将产生两个效果:(1) 它会直接减少在分配器中花费的时间(以及GC语言中的垃圾回收时间);(2) 通常还会减少缓存未命中,因为在使用tcmalloc的长期运行程序中,每次分配往往会指向不同的缓存行。
  • 收集其他类型的性能剖析结果,特别是基于硬件性能计数器的数据。此类数据可能会指出遇到高缓存缺失率的情形。可以参考性能剖析工具和技巧部分描述的技术。

API 考量

以下一些技术可能需要更改数据结构和函数签名,这可能会影响调用者。请尝试组织代码,使得性能改进可以在封装边界内进行,而不会影响公共接口。如果您的模块具有深度(通过少量接口访问重要功能),这将更容易实现。

广泛使用的API需要添加新功能,非常困难。在添加新功能时要小心,因为这些会限制未来的实现,并且对于不需要新功能的用户来说,会导致不必要的成本增加。例如,许多C++标准库容器承诺迭代器稳定性,在标准库的实现中,这会显著增加内存分配次数,尽管许多用户并不需要这种稳定性。

以下列出了一些具体的技巧。请仔细考虑由此引入的API可用性问题与性能优势之间的权衡。

批量API

提供批量操作以减少昂贵的API调用或利用算法进行改进。

样例一:添加 MemoryManager::LookupMany 批量操作接口

添加批量接口,这同时简化了新批量接口的函数签名:客户端只需要知道所有键是否都被找到,因此我们可以返回一个bool值而非状态对象

老代码:

1
2
3
4
class MemoryManager {
 public:
  ...
  util::StatusOr<LiveTensor> Lookup(const TensorIdProto& id);

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
class MemoryManager {
 public:
  ...
  util::StatusOr<LiveTensor> Lookup(const TensorIdProto& id);

  // 查找指定的tensor
  struct LookupKey {
    ClientHandle client;
    uint64 local_id;
  };
  bool LookupMany(absl::Span<const LookupKey> keys,
                  absl::Span<tensorflow::Tensor> tensors);

样例二:添加批量 ObjectStore::DeleteRefs API 以摊销锁的开销

老代码:

1
2
3
4
5
template <typename T>
class ObjectStore {
 public:
  ...
  absl::Status DeleteRef(Ref);

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
template <typename T>
class ObjectStore {
 public:
  ...
  absl::Status DeleteRef(Ref);

// 删除多个引用。对于每个引用,如果没有任何其他引用指向同一个对象,则该对象将被删除。任何错误时返回非OK状态。
  absl::Status DeleteRefs(absl::Span<const Ref> refs);
  ...
template <typename T>
absl::Status ObjectStore<T>::DeleteRefs(absl::Span<const Ref> refs) {
  util::Status result;
  absl::MutexLock l(&mu_);
  for (auto ref : refs) {
    result.Update(DeleteRefLocked(ref));
  }
  return result;
}

之前使用的方式:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
void HandleBatch(int, const plaque::Batch& input) override {
  for (const auto& t : input) {
    auto in = In(t);
    PLAQUE_OP_ASSIGN_OR_RETURN(const auto& handles, in.handles());
    for (const auto handle : handles.value->handles()) {
      PLAQUE_OP_RETURN_IF_ERROR(in_buffer_store_
                                    ? bstore_->DeleteRef(handle)
                                    : tstore_->DeleteRef(handle));
    }
  }
}

修改使用DeleteRefs后的代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
void HandleBatch(int, const plaque::Batch& input) override {
  for (const auto& t : input) {
    auto in = In(t);
    PLAQUE_OP_ASSIGN_OR_RETURN(const auto& handles, in.handles());
    if (in_buffer_store_) {
      PLAQUE_OP_RETURN_IF_ERROR(
          bstore_->DeleteRefs(handles.value->handles()));
    } else {
      PLAQUE_OP_RETURN_IF_ERROR(
          tstore_->DeleteRefs(handles.value->handles()));
    }
  }
}

示例:使用 Floyd 堆构建法 进行高效初始化。

堆的批量初始化可以在 O(N) 时间内完成;而逐个添加元素,并在每次添加后更新堆性质,则需要 O(N lg(N)) 时间。

有时很难让调用方直接改用新的批量 API。在这种情况下,可以在内部使用批量 API,并缓存结果,供后续非批量 API 调用复用:

示例:缓存块解码结果,供后续调用使用。

每次查找都需要解码包含 K 个条目的整个块。可以把解码后的条目存入缓存,并在后续查找时查询缓存。

lexicon.cc

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
void GetTokenString(int pos, std::string* out) const {
  ...
  absl::FixedArray<LexiconEntry, 32> entries(pos + 1);

  // 解码直到 pos 为止(包含 pos)的所有词典条目。
  for (int i = 0; i <= pos; ++i) {
    p = util::coding::TwoValuesVarint::Decode32(p, &entries[i].remaining,
                                                &entries[i].shared);
    entries[i].remaining_str = p;
    p += entries[i].remaining;  // remaining 字节跟在每个条目之后。
  }

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
mutable std::vector<absl::InlinedVector<std::string, 16>> cache_;
...
void GetTokenString(int pos, std::string* out) const {
  ...
  DCHECK_LT(skentry, cache_.size());
  if (!cache_[skentry].empty()) {
    *out = cache_[skentry][pos];
    return;
  }
  ...
  // 初始化缓存。
  ...
  const char* prev = p;
  for (int i = 0; i < block_sz; ++i) {
    uint32 shared, remaining;
    p = TwoValuesVarint::Decode32(p, &remaining, &shared);
    auto& cur = cache_[skentry].emplace_back();
    gtl::STLStringResizeUninitialized(&cur, remaining + shared);

    std::memcpy(cur.data(), prev, shared);
    std::memcpy(cur.data() + shared, p, remaining);
    prev = cur.data();
    p += remaining;
  }
  *out = cache_[skentry][pos];

视图类型

函数参数应优先使用视图类型(例如 std::string_viewstd::Span<T>absl::FunctionRef<R(Args...)>),除非需要转移数据所有权。这些类型可以减少复制,并允许调用方自行选择容器类型(例如,一个调用方可能使用 std::vector,另一个调用方可能使用 absl::InlinedVector)。

预分配/预计算的参数

对于频繁调用的例程,有时允许上层调用方传入自己持有的数据结构,或者传入调用方已经拥有、而被调例程又需要的信息,会很有帮助。这样可以避免底层例程被迫分配自己的临时数据结构,或重新计算已经可用的信息。

示例:为 RPC_Stats::RecordRPC 增加一个变体,允许客户端传入已有的 WallTime 值。

rpc-stats.h

修改前:

1
static void RecordRPC(const Name &name, const RPC_Stats_Measurement& m);

修改后:

1
2
static void RecordRPC(const Name &name, const RPC_Stats_Measurement& m,
                      WallTime now);

clientchannel.cc

修改前:

1
2
3
const WallTime now = WallTime_Now();
...
RPC_Stats::RecordRPC(stats_name, m);

修改后:

1
2
3
const WallTime now = WallTime_Now();
...
RPC_Stats::RecordRPC(stats_name, m, now);

线程兼容类型与线程安全类型

一个类型可以是线程兼容的(由外部同步),也可以是线程安全的(由内部同步)。大多数通用类型应该设计为线程兼容。这样,不需要线程安全的调用方就不必为它付出成本。

示例:由于调用方已经自行同步,将类改为线程兼容。

hitless-transfer-phase.cc

修改前:

1
2
3
4
5
TransferPhase HitlessTransferPhase::get() const {
  static CallsiteMetrics cm("HitlessTransferPhase::get");
  MonitoredMutexLock l(&cm, &mutex_);
  return phase_;
}

修改后:

1
TransferPhase HitlessTransferPhase::get() const { return phase_; }

hitless-transfer-phase.cc

修改前:

1
2
3
4
5
bool HitlessTransferPhase::AllowAllocate() const {
  static CallsiteMetrics cm("HitlessTransferPhase::AllowAllocate");
  MonitoredMutexLock l(&cm, &mutex_);
  return phase_ == TransferPhase::kNormal || phase_ == TransferPhase::kBrownout;
}

修改后:

1
2
3
bool HitlessTransferPhase::AllowAllocate() const {
  return phase_ == TransferPhase::kNormal || phase_ == TransferPhase::kBrownout;
}

不过,如果某个类型的典型用法确实需要同步,应优先把同步放到类型内部。这样就可以在不影响调用方的前提下,按需调整同步机制来提升性能(例如通过分片减少竞争)。

算法改进

最关键的性能改进机会通常来自算法改进,例如把 O(N²) 算法变成 O(N lg(N)) 或 O(N),避免潜在的指数级行为,等等。这类机会在稳定代码中并不常见,但在编写新代码时值得特别留意。下面是一些对既有代码做出此类改进的例子:

示例:按逆后序将节点加入环检测结构。

之前我们逐个把图节点和边加入环检测数据结构,这会在每条边上产生昂贵开销。现在我们按逆后序加入整张图,使环检测变得非常简单。

graphcycles.h

修改前:

1
2
3
4
5
6
class GraphCycles : public util_graph::Graph {
 public:
  GraphCycles();
  ~GraphCycles() override;

  using Node = util_graph::Node;

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
class GraphCycles : public util_graph::Graph {
 public:
  GraphCycles();
  ~GraphCycles() override;

  using Node = util_graph::Node;

  // InitFrom 添加 src 中的所有节点和边;如果成功则返回 true,
  // 如果遇到环则返回 false。
  // 要求:此前还没有向 GraphCycles 添加任何节点和边。
  bool InitFrom(const util_graph::Graph& src);

graphcycles.cc

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
bool GraphCycles::InitFrom(const util_graph::Graph& src) {
  ...
  // 按拓扑顺序分配 rank,这样初始化期间不需要任何重排。
  // 对于无环图,DFS 会按逆拓扑顺序离开节点,
  // 因此我们在离开节点时为它们分配递减的 rank。
  Rank last_rank = n;
  auto leave = [&](util_graph::Node node) {
    DCHECK(r->rank[node] == kMissingNodeRank);
    NodeInfo* nn = &r->nodes[node];
    nn->in = kNil;
    nn->out = kNil;
    r->rank[node] = --last_rank;
  };
  util_graph::DFSAll(src, std::nullopt, leave);

  // 添加所有边(同时检测环)。
  bool have_cycle = false;
  util_graph::PerEdge(src, [&](util_graph::Edge e) {
    DCHECK_NE(r->rank[e.src], kMissingNodeRank);
    DCHECK_NE(r->rank[e.dst], kMissingNodeRank);
    if (r->rank[e.src] >= r->rank[e.dst]) {
      have_cycle = true;
    } else if (!HasEdge(e.src, e.dst)) {
      EdgeListAddNode(r, &r->nodes[e.src].out, e.dst);
      EdgeListAddNode(r, &r->nodes[e.dst].in, e.src);
    }
  });
  if (have_cycle) {
    return false;
  } else {
    DCHECK(CheckInvariants());
    return true;
  }
}

graph_partitioner.cc

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
absl::Status MergeGraph::Init() {
  const Graph& graph = *compiler_->graph();
  clusters_.resize(graph.NodeLimit());
  graph.PerNode([&](Node node) {
    graph_->AddNode(node);
    NodeList* n = new NodeList;
    n->push_back(node);
    clusters_[node] = n;
  });
  absl::Status s;
  PerEdge(graph, [&](Edge e) {
    if (!s.ok()) return;
    if (graph_->HasEdge(e.src, e.dst)) return;  // 已添加
    if (!graph_->InsertEdge(e.src, e.dst)) {
      s = absl::InvalidArgumentError("cycle in the original graph");
    }
  });
  return s;
}

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
absl::Status MergeGraph::Init() {
  const Graph& graph = *compiler_->graph();
  if (!graph_->InitFrom(graph)) {
    return absl::InvalidArgumentError("cycle in the original graph");
  }
  clusters_.resize(graph.NodeLimit());
  graph.PerNode([&](Node node) {
    NodeList* n = new NodeList;
    n->push_back(node);
    clusters_[node] = n;
  });
  return absl::OkStatus();
}

示例:用更好的算法替换互斥锁实现内置的死锁检测系统。

将死锁检测算法替换为一个快约 50 倍的新算法,并且可以轻松扩展到数百万个互斥锁(旧算法依赖 2K 限制来避免性能断崖)。新代码基于论文:A dynamic topological sort algorithm for directed acyclic graphs,作者 David J. Pearce、Paul H. J. Kelly,发表于 Journal of Experimental Algorithmics (JEA),第 11 卷,2006 年,文章编号 1.7。

新算法占用 O(|V|+|E|) 空间(旧算法需要 O(|V|^2) 位)。锁获取顺序图非常稀疏,因此空间占用小得多。这个算法也很简单:核心大约 100 行 C++。由于代码现在可以扩展到更多 Mutex,我们得以放宽人为设置的 2K 限制,并由此发现了一些真实程序中潜伏的死锁。

基准测试结果:这些测试在 DEBUG 模式下运行,因为死锁检测主要在 debug 模式中启用。基准测试参数(如 /2k)表示被跟踪的节点数量。在旧算法默认的 2k 限制下,新算法每次 InsertEdge 只需 0.5 微秒,而旧算法需要 22 微秒。新算法也能轻松扩展到更大的图,而旧算法很快就会撑不住。

1
2
3
4
5
DEBUG: Benchmark            Time(ns)    CPU(ns) Iterations
----------------------------------------------------------
DEBUG: BM_StressTest/2k        23553      23566      29086
DEBUG: BM_StressTest/4k        45879      45909      15287
DEBUG: BM_StressTest/16k      776938     777472        817
1
2
3
4
5
DEBUG: BM_StressTest/2k          392        393   10485760
DEBUG: BM_StressTest/4k          392        393   10485760
DEBUG: BM_StressTest/32k         407        407   10485760
DEBUG: BM_StressTest/256k        456        456   10485760
DEBUG: BM_StressTest/1M          534        534   10485760

示例:用哈希表(O(1) 查找)替换 IntervalMap(O(lg N) 查找)。

初始代码使用 IntervalMap,因为它看起来适合支持相邻块合并。但实际上哈希表就足够了,因为相邻块可以通过哈希表查找找到。这个改动(加上该 CL 中的其他改动)让 tpu::BestFitAllocator 的性能提升约 4 倍。

best_fit_allocator.h

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
using Block = gtl::IntervalMap<int64, BlockState>::Entry;
...
// pair(地址范围,BlockState)的映射,每个分配有一个条目,
// 覆盖范围 [0, allocatable_range_end_)。相邻的 kFree 和
// kReserved 块会被合并。相邻的 kAllocated 块不会
// 被合并。
gtl::IntervalMap<int64, BlockState> block_list_;

// 所有空闲块的集合,按分配策略排序。相邻
// 空闲块会被合并。
std::set<Block, BlockSelector> free_list_;

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// BlockTable 中 offset 的更快哈希函数。
struct OffsetHash {
  ABSL_ATTRIBUTE_ALWAYS_INLINE size_t operator()(int64 value) const {
    uint64 m = value;
    m *= uint64_t{0x9ddfea08eb382d69};
    return static_cast<uint64_t>(m ^ (m >> 32));
  }
};

// 哈希表将块起始地址映射到块信息。
// 我们在该信息中包含前一个块的长度,
// 这样就能找到可合并的前序块。
struct HashTableEntry {
  BlockState state;
  int64 my_length;
  int64 prev_length;  // 如果没有前一个块,则为 0。
};
using BlockTable = absl::flat_hash_map<int64, HashTableEntry, OffsetHash>;

示例:用哈希表查找(O(N))替换有序列表求交(O(N log N))。

旧代码为了检测两个节点是否共享同一个来源,会按排序顺序获取每个节点的来源,然后对两个有序列表求交。新代码把一个节点的来源放进哈希表,再遍历另一个节点的来源并查询哈希表。

1
2
name             old time/op  new time/op  delta
BM_CompileLarge   28.5s ± 2%   22.4s ± 2%  -21.61%  (p=0.008 n=5+5)

示例:实现良好的哈希函数,让操作从 O(N) 变为 O(1)。

location.h

修改前:

1
2
3
4
5
6
// Location 对象的哈希器。
struct LocationHash {
  size_t operator()(const Location* key) const {
    return key != nullptr ? util_hash::Hash(key->address()) : 0;
  }
};

修改后:

1
2
3
4
5
6
7
size_t HashLocation(const Location& loc);
...
struct LocationHash {
  size_t operator()(const Location* key) const {
    return key != nullptr ? HashLocation(*key) : 0;
  }
};

location.cc

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
size_t HashLocation(const Location& loc) {
  util_hash::MurmurCat m;

  // 将一些较简单的特征编码到单个值中。
  m.AppendAligned((loc.dynamic() ? 1 : 0)                    //
                  | (loc.append_shard_to_address() ? 2 : 0)  //
                  | (loc.is_any() ? 4 : 0)                   //
                  | (!loc.any_of().empty() ? 8 : 0)          //
                  | (loc.has_shardmap() ? 16 : 0)            //
                  | (loc.has_sharding() ? 32 : 0));

  if (loc.has_shardmap()) {
    m.AppendAligned(loc.shardmap().output() |
                    static_cast<uint64_t>(loc.shardmap().stmt()) << 20);
  }
  if (loc.has_sharding()) {
    uint64_t num = 0;
    switch (loc.sharding().type_case()) {
      case Sharding::kModShard:
        num = loc.sharding().mod_shard();
        break;
      case Sharding::kRangeSplit:
        num = loc.sharding().range_split();
        break;
      case Sharding::kNumShards:
        num = loc.sharding().num_shards();
        break;
      default:
        num = 0;
        break;
    }
    m.AppendAligned(static_cast<uint64_t>(loc.sharding().type_case()) |
                    (num << 3));
  }

  auto add_string = [&m](absl::string_view s) {
    if (!s.empty()) {
      m.Append(s.data(), s.size());
    }
  };

  add_string(loc.address());
  add_string(loc.lb_policy());

  // 我们不包含 any_of,因为计算一个不受顺序和重复影响的
  // 哈希值比较复杂。
  return m.GetHash();
}

更好的内存表示

仔细考虑关键数据结构的内存占用和缓存占用,通常能带来很大的收益。下面这些数据结构侧重于让常见操作触碰更少的缓存行。这里的细致设计可以:(a)避免昂贵的缓存未命中;(b)减少内存总线流量,从而加速当前程序以及同一台机器上运行的其他程序。它们依赖一些常见技巧,你在设计自己的数据结构时也可能会用到。

紧凑的数据结构

对于经常访问的数据,或占据应用大量内存的数据,应使用紧凑表示。紧凑表示可以通过减少触碰的缓存行、降低内存总线带宽占用,显著减少内存使用并提升性能。不过也要注意缓存行竞争

内存布局

对于内存占用或缓存占用很大的类型,应仔细考虑其内存布局。

  • 重新排列字段,减少对齐要求不同的字段之间产生的填充字节(参见类布局讨论)。
  • 如果存储的数据可以放进更小的数值类型,就使用更小的数值类型。
  • 如果不小心处理,枚举值有时会占用整个机器字。可以考虑使用更小的表示形式(例如使用 enum class OpType : uint8_t { ... },而不是 enum class OpType { ... })。
  • 调整字段顺序,让经常一起访问的字段彼此更近,这可以减少常见操作触碰的缓存行数量。
  • 将热点只读字段和热点可变字段分开,避免写入可变字段时把只读字段从附近缓存中驱逐出去。
  • 移动冷数据,让它不要和热数据挨在一起。可以把冷数据放到结构体末尾、放到一层间接引用之后,或放到单独数组中。
  • 考虑用位级或字节级编码把数据压缩到更少字节中。这可能会变复杂,因此只有当相关数据被封装在经过充分测试的模块中,并且总体内存使用减少显著时才这样做。此外,还要注意副作用,例如常用数据对齐不足,或访问压缩表示时代码成本更高。此类改动应使用基准测试验证。

用索引代替指针

在现代 64 位机器上,指针占 64 位。如果数据结构里有大量指针,T* 形式的间接引用很容易吃掉大量内存。可以考虑改用指向数组 T[] 或其他数据结构的整数索引。这样不仅引用本身会更小(如果索引数量足够少,可以放进 32 位或更少位数),所有 T[] 元素也会连续存储,通常能带来更好的缓存局部性。

批量存储

避免使用那种每存一个元素就单独分配一个对象的数据结构(例如 C++ 中的 std::mapstd::unordered_map)。可以考虑使用分块或扁平表示的类型,让多个元素在内存中紧密存放(例如 C++ 中的 std::vectorabsl::flat_hash_{map,set})。这类类型往往有更好的缓存行为,并且分配器开销也更少。

一种有用的技巧是把元素划分到多个块中,每个块容纳固定数量的元素。这种技巧可以在保持良好渐近行为的同时,显著降低数据结构的缓存占用。

对于某些数据结构,一个块就足以容纳所有元素(例如字符串和 vector)。其他类型(例如 absl::flat_hash_map)也使用了这种技巧。

内联存储

有些容器类型针对存储少量元素做了优化。它们会在对象本身内部为少量元素提供空间,并在元素数量较小时完全避免分配。当这类类型的实例经常被构造(例如作为频繁执行代码中的栈变量),或者同时存活的实例很多时,这会非常有帮助。如果某个容器通常只包含少量元素,可以考虑使用带内联存储的类型,例如 InlinedVector

注意:如果 sizeof(T) 很大,内联存储容器可能不是最佳选择,因为内联的后备存储本身也会很大。

不必要的嵌套 map

有时,嵌套 map 数据结构可以替换为带复合 key 的单层 map。这可以显著降低查找和插入成本。

示例:将嵌套 btree 转为以复合键为 key 的 btree,以减少分配并改善缓存占用。

graph_splitter.cc

修改前:

1
absl::btree_map<std::string, absl::btree_map<std::string, OpDef>> ops;

修改后:

1
2
3
4
// btree 从 {package_name, op_name} 映射到对应的 const Opdef*。
absl::btree_map<std::pair<absl::string_view, absl::string_view>,
                const OpDef*>
    ops;

注意:如果第一层 map 的 key 很大,继续使用嵌套 map 可能更好:

示例:改用嵌套 map,使微基准性能提升 76%。

之前我们使用单层哈希表,key 由一个字符串路径和其他数字子 key 组成。平均每个路径会出现在约 1000 个 key 中。我们把哈希表拆成两层:第一层以路径为 key,每个第二层哈希表只保存某个路径下的子 key 到数据的映射。这样把路径存储的内存使用降低了 1000 倍,同时也加速了同一路径下许多子 key 被一起访问的场景。

Arena

Arena 可以帮助降低内存分配成本,同时还能把独立分配的对象紧密排列在一起,通常占用更少缓存行,并消除大部分析构成本。对于包含许多子对象的复杂数据结构,它们可能最有效。可以考虑为 arena 提供合适的初始大小,这有助于减少分配。

注意:Arena 很容易被误用,例如把太多短生命周期对象放进长生命周期 arena 中,这可能会不必要地膨胀内存占用。

用数组代替 map

如果 map 的取值域可以用小整数表示,或者本身是枚举;又或者 map 的元素非常少,那么有时可以用数组或某种 vector 替代 map。

示例:用数组代替 flat_map

rtp_controller.h

修改前:

1
const gtl::flat_map<int, int> payload_type_to_clock_frequency_;

修改后:

1
2
3
4
5
6
7
// 一个按 payload_type 索引到 clock freq 的 map(用简单数组实现),
// 表示该 payload type 的 clock freq(或 0)。
struct PayloadTypeToClockRateMap {
  int map[128];
};
...
const PayloadTypeToClockRateMap payload_type_to_clock_frequency_;

用位向量代替 set

如果 set 的取值域可以用小整数表示,那么可以用位向量替代 set(InlinedBitVector 通常是不错的选择)。在这种表示上,集合操作也可以通过按位布尔运算高效实现(OR 表示并集,AND 表示交集,等等)。

示例:Spanner placement 系统:用每个 zone 一位的位向量替换 dense_hash_set

zone_set.h

修改前:

1
2
3
4
5
6
class ZoneSet: public dense_hash_set<ZoneId> {
 public:
  ...
  bool Contains(ZoneId zone) const {
    return count(zone) > 0;
  }

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
class ZoneSet {
  ...
  // 当且仅当集合包含 “zone” 时返回 true。
  bool ContainsZone(ZoneId zone) const {
    return zone < b_.size() && b_.get_bit(zone);
  }
  ...
 private:
  int size_;          // 已插入 zone 的数量。
  util::bitmap::InlinedBitVector<256> b_;

基准测试结果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
CPU: AMD Opteron (4 cores) dL1:64KB dL2:1024KB
Benchmark                          Base (ns)  New (ns) Improvement
------------------------------------------------------------------
BM_Evaluate/1                            960       676    +29.6%
BM_Evaluate/2                           1661      1138    +31.5%
BM_Evaluate/3                           2305      1640    +28.9%
BM_Evaluate/4                           3053      2135    +30.1%
BM_Evaluate/5                           3780      2665    +29.5%
BM_Evaluate/10                          7819      5739    +26.6%
BM_Evaluate/20                         17922     12338    +31.2%
BM_Evaluate/40                         36836     26430    +28.2%

示例:用位矩阵跟踪操作数之间的可达性属性,而不是使用哈希表。

hlo_computation.h

修改前:

1
2
3
using TransitiveOperandMap =
    std::unordered_map<const HloInstruction*,
                       std::unordered_set<const HloInstruction*>>;

修改后:

1
2
3
4
5
6
7
class HloComputation::ReachabilityMap {
  ...
  // 从 HloInstruction* 到数字的密集 id 分配。
  tensorflow::gtl::FlatMap<const HloInstruction*, int> ids_;
  // 当且仅当 b 可从 a 到达时,matrix_(a,b) 为 true。
  tensorflow::core::Bitmap matrix_;
};

减少内存分配

内存分配会带来成本:

  1. 它会增加分配器中消耗的时间。
  2. 新分配的对象可能需要昂贵的初始化;当不再需要时,有时还需要对应的昂贵析构。
  3. 每次分配往往都会落在新的缓存行上,因此分散在许多独立分配中的数据,会比分散在更少分配中的数据拥有更大的缓存占用。

带垃圾回收的运行时有时会通过把连续分配顺序放在内存中,来缓解第 3 个问题。

避免不必要的分配

示例:减少分配使基准测试吞吐量提升 21%。

memory_manager.cc

修改前:

1
2
3
4
5
LiveTensor::LiveTensor(tf::Tensor t, std::shared_ptr<const DeviceInfo> dinfo,
                       bool is_batched)
    : tensor(std::move(t)),
      device_info(dinfo ? std::move(dinfo) : std::make_shared<DeviceInfo>()),
      is_batched(is_batched) {

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
static const std::shared_ptr<DeviceInfo>& empty_device_info() {
  static std::shared_ptr<DeviceInfo>* result =
      new std::shared_ptr<DeviceInfo>(new DeviceInfo);
  return *result;
}

LiveTensor::LiveTensor(tf::Tensor t, std::shared_ptr<const DeviceInfo> dinfo,
                       bool is_batched)
    : tensor(std::move(t)), is_batched(is_batched) {
  if (dinfo) {
    device_info = std::move(dinfo);
  } else {
    device_info = empty_device_info();
  }

示例:尽可能使用静态分配的零向量,而不是分配一个 vector 再填充零。

embedding_executor_8bit.cc

1
2
3
4
5
6
7
8
// EmbeddingLookUpT 的实际实现使用模板参数,
// 而不是对象成员,以提升性能。
template <bool Mean, bool SymmetricInputRange>
static tensorflow::Status EmbeddingLookUpT(...) {
    ...
  std::unique_ptr<tensorflow::quint8[]> zero_data(
      new tensorflow::quint8[max_embedding_width]);
  memset(zero_data.get(), 0, sizeof(tensorflow::quint8) * max_embedding_width);
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 足够处理大多数 embedding 宽度的大小。
static const int kTypicalMaxEmbedding = 256;
static tensorflow::quint8 static_zero_data[kTypicalMaxEmbedding];  // 全为零。
...
// EmbeddingLookUpT 的实际实现使用模板参数,
// 而不是对象成员,以提升性能。
template <bool Mean, bool SymmetricInputRange>
static tensorflow::Status EmbeddingLookUpT(...) {
    ...
  std::unique_ptr<tensorflow::quint8[]> zero_data_backing(nullptr);

  // 获取指向一段内存区域的指针,该区域至少包含
  // “max_embedding_width” 个 quint8 零值。
  tensorflow::quint8* zero_data;
  if (max_embedding_width <= ARRAYSIZE(static_zero_data)) {
    // static_zero_data 足够大,因此不需要分配零数据。
    zero_data = &static_zero_data[0];
  } else {
    // static_zero_data 不够大:需要分配零数据。
    zero_data_backing =
        absl::make_unique<tensorflow::quint8[]>(max_embedding_width);
    memset(zero_data_backing.get(), 0,
           sizeof(tensorflow::quint8) * max_embedding_width);
    zero_data = zero_data_backing.get();
  }

此外,当对象生命周期受作用域限制时,应优先使用栈分配而不是堆分配(不过对于大对象,要小心栈帧大小)。

调整容器大小或预留容量

当 vector(或其他容器类型)的最大大小或预期最大大小提前已知时,应预先设置容器的后备存储大小(例如在 C++ 中使用 resizereserve)。

示例:预先设置 vector 大小并填充,而不是执行 N 次 push_back

indexblockdecoder.cc

修改前:

1
2
3
4
5
6
7
for (int i = 0; i < ndocs-1; i++) {
  uint32 delta;
  ERRORCHECK(b->GetRice(rice_base, &delta));
  docs_.push_back(DocId(my_shard_ + (base + delta) * num_shards_));
  base = base + delta + 1;
}
docs_.push_back(last_docid_);

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
docs_.resize(ndocs);
DocId* docptr = &docs_[0];
for (int i = 0; i < ndocs-1; i++) {
  uint32 delta;
  ERRORCHECK(b.GetRice(rice_base, &delta));
  *docptr = DocId(my_shard_ + (base + delta) * num_shards_);
  docptr++;
  base = base + delta + 1;
}
*docptr = last_docid_;

注意:不要用 resizereserve 每次只增长一个元素,因为这可能导致二次复杂度行为。另外,如果元素构造成本较高,优先使用一次初始 reserve,然后多次 push_backemplace_back,而不是一开始就 resize,因为后者会让构造调用次数翻倍。

尽可能避免复制

  • 在可能时,优先移动数据结构,而不是复制。
  • 如果生命周期不是问题,在临时数据结构中存储指针或索引,而不是对象副本。例如,如果本地 map 用于从传入的 proto 列表中选择一组 proto,可以让 map 只保存指向传入 proto 的指针,而不是复制可能深度嵌套的数据。另一个常见例子是排序索引 vector,而不是直接排序大型对象 vector,因为后者会产生大量复制/移动成本。

示例:通过 gRPC 接收 tensor 时避免一次额外复制。

发送约 400KB tensor 的基准测试加速了约 10-15%:

1
2
3
Benchmark              Time(ns)    CPU(ns) Iterations
-----------------------------------------------------
BM_RPC/30/98k_mean    148764691 1369998944       1000
1
2
3
Benchmark              Time(ns)    CPU(ns) Iterations
-----------------------------------------------------
BM_RPC/30/98k_mean    131595940 1216998084       1000

示例:移动大型 options 结构,而不是复制。

index.cc

修改前:

1
return search_iterators::DocPLIteratorFactory::Create(opts);

修改后:

1
return search_iterators::DocPLIteratorFactory::Create(std::move(opts));

示例:使用 std::sort 而不是 std::stable_sort,从而避免稳定排序实现中的内部复制。

encoded-vector-hits.h

修改前:

1
2
std::stable_sort(hits_.begin(), hits_.end(),
                 gtl::OrderByField(&HitWithPayloadOffset::docid));

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
struct HitWithPayloadOffset {
  search_iterators::LocalDocId64 docid;
  int first_payload_offset;  // payload vector 中的 offset。
  int num_payloads;

  bool operator<(const HitWithPayloadOffset& other) const {
    return (docid < other.docid) ||
           (docid == other.docid &&
            first_payload_offset < other.first_payload_offset);
  }
};
    ...
    std::sort(hits_.begin(), hits_.end());

复用临时对象

在循环内部声明的容器或对象,会在每次循环迭代时重新创建。这可能导致昂贵的构造、析构和扩容。把声明提升到循环外可以实现复用,并可能带来显著性能提升。(由于语言语义限制,或无法确保程序等价,编译器通常不能自行完成这种提升。)

示例:把变量定义提升到循环迭代之外。

autofdo_profile_utils.h

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
auto iterator = absl::WrapUnique(sstable->GetIterator());
while (!iterator->done()) {
  T profile;
  if (!profile.ParseFromString(iterator->value_view())) {
    return absl::InternalError(
        "Failed to parse mem_block to specified profile type.");
  }
  ...
  iterator->Next();
}

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
auto iterator = absl::WrapUnique(sstable->GetIterator());
T profile;
while (!iterator->done()) {
  if (!profile.ParseFromString(iterator->value_view())) {
    return absl::InternalError(
        "Failed to parse mem_block to specified profile type.");
  }
  ...
  iterator->Next();
}

示例:在循环外定义 protobuf 变量,让其已分配存储可以在多次迭代间复用。

stats-router.cc

修改前:

1
2
3
4
5
6
7
8
9
for (auto& r : routers_to_update) {
  ...
  ResourceRecord record;
  {
    MutexLock agg_lock(r.agg->mutex());
    r.agg->AddResourceRecordUsages(measure_indices, &record);
  }
  ...
}

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
ResourceRecord record;
for (auto& r : routers_to_update) {
  ...
  record.Clear();
  {
    MutexLock agg_lock(r.agg->mutex());
    r.agg->AddResourceRecordUsages(measure_indices, &record);
  }
  ...
}

示例:反复序列化到同一个 std::string

program_rep.cc

修改前:

1
2
3
4
5
6
7
8
std::string DeterministicSerialization(const proto2::Message& m) {
  std::string result;
  proto2::io::StringOutputStream sink(&result);
  proto2::io::CodedOutputStream out(&sink);
  out.SetSerializationDeterministic(true);
  m.SerializePartialToCodedStream(&out);
  return result;
}

修改后:

1
2
3
4
5
6
7
8
9
absl::string_view DeterministicSerializationTo(const proto2::Message& m,
                                               std::string* scratch) {
  scratch->clear();
  proto2::io::StringOutputStream sink(scratch);
  proto2::io::CodedOutputStream out(&sink);
  out.SetSerializationDeterministic(true);
  m.SerializePartialToCodedStream(&out);
  return absl::string_view(*scratch);
}

注意:protobuf、string、vector、容器等往往会增长到曾经存储过的最大值大小。因此,周期性地重建它们(例如每使用 N 次后)可以帮助降低内存需求和重新初始化成本。

避免不必要的工作

提升性能最有效的一类方法,也许就是避免做不必做的工作。它可以有很多形式:为常见情况创建专门路径以避开更通用也更昂贵的计算、预计算、把工作延迟到真正需要时再做、把工作提升到执行频率更低的代码位置,等等。下面按几个代表性类别列出许多相关示例。

为常见情况提供快速路径

代码通常会写成覆盖所有情况,但其中某些子情况比其他情况简单得多,也常见得多。例如,vector::push_back 通常有足够空间容纳新元素,但它也包含在空间不足时扩容底层存储的代码。适当关注代码结构,可以让常见的简单情况更快,同时又不显著伤害非常见情况的性能。

示例:让快速路径覆盖更多常见情况。

增加对尾部单个 ASCII 字节的处理,而不是只让该例程处理 4 字节倍数的情况。这样可以避免对例如 5 字节的全 ASCII 字符串调用较慢的通用例程。

utf8statetable.cc

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 基于状态表扫描 UTF-8 stringpiece。
// 始终扫描完整 UTF-8 字符。
// 设置已扫描字节数,并返回退出原因。
// 针对 7-bit ASCII 0000..007f 全部有效的情况进行优化。
int UTF8GenericScanFastAscii(const UTF8ScanObj* st, absl::string_view str,
                             int* bytes_consumed) {
                             ...
  int exit_reason;
  do {
    // 一次跳过 8 个 ASCII 字节;不存在字节序问题。
    while ((src_limit - src >= 8) &&
           (((UNALIGNED_LOAD32(src + 0) | UNALIGNED_LOAD32(src + 4)) &
             0x80808080) == 0)) {
      src += 8;
    }
    // 对剩余部分运行状态表。
    int rest_consumed;
    exit_reason = UTF8GenericScan(
        st, absl::ClippedSubstr(str, src - initial_src), &rest_consumed);
    src += rest_consumed;
  } while (exit_reason == kExitDoAgain);

  *bytes_consumed = src - initial_src;
  return exit_reason;
}

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 基于状态表扫描 UTF-8 stringpiece。
// 始终扫描完整 UTF-8 字符。
// 设置已扫描字节数,并返回退出原因。
// 针对 7-bit ASCII 0000..007f 全部有效的情况进行优化。
int UTF8GenericScanFastAscii(const UTF8ScanObj* st, absl::string_view str,
                             int* bytes_consumed) {
                             ...
  int exit_reason = kExitOK;
  do {
    // 一次跳过 8 个 ASCII 字节;不存在字节序问题。
    while ((src_limit - src >= 8) &&
           (((UNALIGNED_LOAD32(src + 0) | UNALIGNED_LOAD32(src + 4)) &
             0x80808080) == 0)) {
      src += 8;
    }
    while (src < src_limit && Is7BitAscii(*src)) { // 跳过 ASCII 字节。
      src++;
    }
    if (src < src_limit) {
      // 对剩余部分运行状态表。
      int rest_consumed;
      exit_reason = UTF8GenericScan(
          st, absl::ClippedSubstr(str, src - initial_src), &rest_consumed);
      src += rest_consumed;
    }
  } while (exit_reason == kExitDoAgain);

  *bytes_consumed = src - initial_src;
  return exit_reason;
}

示例:为 InlinedVector 提供更简单的快速路径。

inlined_vector.h

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
auto Storage<T, N, A>::Resize(ValueAdapter values, size_type new_size) -> void {
  StorageView storage_view = MakeStorageView();

  IteratorValueAdapter<MoveIterator> move_values(
      MoveIterator(storage_view.data));

  AllocationTransaction allocation_tx(GetAllocPtr());
  ConstructionTransaction construction_tx(GetAllocPtr());

  absl::Span<value_type> construct_loop;
  absl::Span<value_type> move_construct_loop;
  absl::Span<value_type> destroy_loop;

  if (new_size > storage_view.capacity) {
  ...
  } else if (new_size > storage_view.size) {
    construct_loop = {storage_view.data + storage_view.size,
                      new_size - storage_view.size};
  } else {
    destroy_loop = {storage_view.data + new_size, storage_view.size - new_size};
  }

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
auto Storage<T, N, A>::Resize(ValueAdapter values, size_type new_size) -> void {
  StorageView storage_view = MakeStorageView();
  auto* const base = storage_view.data;
  const size_type size = storage_view.size;
  auto* alloc = GetAllocPtr();
  if (new_size <= size) {
    // 销毁多余的旧元素。
    inlined_vector_internal::DestroyElements(alloc, base + new_size,
                                             size - new_size);
  } else if (new_size <= storage_view.capacity) {
    // 原地构造新元素。
    inlined_vector_internal::ConstructElements(alloc, base + size, &values,
                                               new_size - size);
  } else {
  ...
  }

示例:为初始化 1-D 到 4-D tensor 的常见情况提供快速路径。

tensor_shape.cc

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
template <class Shape>
TensorShapeBase<Shape>::TensorShapeBase(gtl::ArraySlice<int64> dim_sizes) {
  set_tag(REP16);
  set_data_type(DT_INVALID);
  set_ndims_byte(0);
  set_num_elements(1);
  for (int64 s : dim_sizes) {
    AddDim(internal::SubtleMustCopy(s));
  }
}

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
template <class Shape>
void TensorShapeBase<Shape>::InitDims(gtl::ArraySlice<int64> dim_sizes) {
  DCHECK_EQ(tag(), REP16);

  // 允许小于 kint64max^0.25 的大小,这样下面的四路乘法
  // 不会溢出。
  static const uint64 kMaxSmall = 0xd744;
  static_assert(kMaxSmall * kMaxSmall * kMaxSmall * kMaxSmall <= kint64max,
                "bad overflow check");
  bool large_size = false;
  for (auto s : dim_sizes) {
    if (s > kMaxSmall) {
      large_size = true;
      break;
    }
  }

  if (!large_size) {
    // 每个 size 都适合 16 位;对 dims 属于 {1,2,3,4} 的情况使用快速路径。
    uint16* dst = as16()->dims_;
    switch (dim_sizes.size()) {
      case 1: {
        set_ndims_byte(1);
        const int64 size = dim_sizes[0];
        const bool neg = Set16(kIsPartial, dst, 0, size);
        set_num_elements(neg ? -1 : size);
        return;
      }
      case 2: {
        set_ndims_byte(2);
        const int64 size0 = dim_sizes[0];
        const int64 size1 = dim_sizes[1];
        bool neg = Set16(kIsPartial, dst, 0, size0);
        neg |= Set16(kIsPartial, dst, 1, size1);
        set_num_elements(neg ? -1 : (size0 * size1));
        return;
      }
      case 3: {
      ...
      }
      case 4: {
      ...
      }
    }
  }

  set_ndims_byte(0);
  set_num_elements(1);
  for (int64 s : dim_sizes) {
    AddDim(internal::SubtleMustCopy(s));
  }
}

示例:让 varint 解析器快速路径只覆盖 1 字节情况,而不是同时覆盖 1 字节和 2 字节情况。

减小(内联)快速路径的大小,可以减少代码体积和指令缓存压力,从而提升性能。

parse_context.h

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
template <typename T>
PROTOBUF_NODISCARD const char* VarintParse(const char* p, T* out) {
  auto ptr = reinterpret_cast<const uint8_t*>(p);
  uint32_t res = ptr[0];
  if (!(res & 0x80)) {
    *out = res;
    return p + 1;
  }
  uint32_t byte = ptr[1];
  res += (byte - 1) << 7;
  if (!(byte & 0x80)) {
    *out = res;
    return p + 2;
  }
  return VarintParseSlow(p, res, out);
}

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
template <typename T>
PROTOBUF_NODISCARD const char* VarintParse(const char* p, T* out) {
  auto ptr = reinterpret_cast<const uint8_t*>(p);
  uint32_t res = ptr[0];
  if (!(res & 0x80)) {
    *out = res;
    return p + 1;
  }
  return VarintParseSlow(p, res, out);
}

parse_context.cc

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
std::pair<const char*, uint32_t> VarintParseSlow32(const char* p,
                                                   uint32_t res) {
  for (std::uint32_t i = 2; i < 5; i++) {
  ...
}
...
std::pair<const char*, uint64_t> VarintParseSlow64(const char* p,
                                                   uint32_t res32) {
  uint64_t res = res32;
  for (std::uint32_t i = 2; i < 10; i++) {
  ...
}

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
std::pair<const char*, uint32_t> VarintParseSlow32(const char* p,
                                                   uint32_t res) {
  for (std::uint32_t i = 1; i < 5; i++) {
  ...
}
...
std::pair<const char*, uint64_t> VarintParseSlow64(const char* p,
                                                   uint32_t res32) {
  uint64_t res = res32;
  for (std::uint32_t i = 1; i < 10; i++) {
  ...
}

示例:如果没有发生错误,在 RPC_Stats_Measurement 加法中跳过大量工作。

rpc-stats.h

修改前:

1
2
3
struct RPC_Stats_Measurement {
  ...
  double errors[RPC::NUM_ERRORS];

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
struct RPC_Stats_Measurement {
  ...
  double get_errors(int index) const { return errors[index]; }
  void set_errors(int index, double value) {
    errors[index] = value;
    any_errors_set = true;
  }
 private:
  ...
  // 将其设为 private,以便跟踪这些值中是否有任何一个
  // 被设置为非零值。
  double errors[RPC::NUM_ERRORS];
  bool any_errors_set;  // 当且仅当任意 errors[i] 值非零时为 true。

rpc-stats.cc

修改前:

1
2
3
4
5
6
void RPC_Stats_Measurement::operator+=(const RPC_Stats_Measurement& x) {
  ...
  for (int i = 0; i < RPC::NUM_ERRORS; ++i) {
    errors[i] += x.errors[i];
  }
}

修改后:

1
2
3
4
5
6
7
8
9
void RPC_Stats_Measurement::operator+=(const RPC_Stats_Measurement& x) {
  ...
  if (x.any_errors_set) {
    for (int i = 0; i < RPC::NUM_ERRORS; ++i) {
      errors[i] += x.errors[i];
    }
    any_errors_set = true;
  }
}

示例:根据字符串首字节做数组查找,通常可以避免对整个字符串计算 fingerprint。

soft-tokens-helper.cc

修改前:

1
2
3
4
bool SoftTokensHelper::IsSoftToken(const StringPiece& token) const {
  return soft_tokens_.find(Fingerprint(token.data(), token.size())) !=
      soft_tokens_.end();
}

soft-tokens-helper.h

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class SoftTokensHelper {
 ...
 private:
  ...
  // 由于 soft token 大多与标点相关,出于性能考虑,
  // 我们维护数组 filter_。当且仅当任何 soft token
  // 以字节值 “i” 开头时,filter_[i] 为 true。这样可避免
  // 在常见情况下对 term 计算 fingerprint,因为只需基于首字节做数组
  // 查找;如果 filter_[b] 为 false,
  // 就可以立即返回 false。
  bool          filter_[256];
  ...
};

inline bool SoftTokensHelper::IsSoftToken(const StringPiece& token) const {
  if (token.size() >= 1) {
    char first_char = token.data()[0];
    if (!filter_[first_char]) {
      return false;
    }
  }
  return IsSoftTokenFallback(token);
}

soft-tokens-helper.cc

修改后:

1
2
3
4
bool SoftTokensHelper::IsSoftTokenFallback(const StringPiece& token) const {
  return soft_tokens_.find(Fingerprint(token.data(), token.size())) !=
      soft_tokens_.end();
}

一次性预计算昂贵信息

示例:预计算 TensorFlow 图执行节点属性,以便快速排除某些不常见情况。

executor.cc

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
struct NodeItem {
  ...
  bool kernel_is_expensive = false;  // 当且仅当 kernel->IsExpensive() 时为 true。
  bool kernel_is_async = false;      // 当且仅当 kernel->AsAsync() != nullptr 时为 true。
  bool is_merge = false;             // 当且仅当 IsMerge(node) 时为 true。
  ...
  if (IsEnter(node)) {
  ...
  } else if (IsExit(node)) {
  ...
  } else if (IsNextIteration(node)) {
  ...
  } else {
    // 大多数节点的普通路径。
    ...
  }

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
struct NodeItem {
  ...
  bool kernel_is_expensive : 1;  // 当且仅当 kernel->IsExpensive() 时为 true。
  bool kernel_is_async : 1;      // 当且仅当 kernel->AsAsync() != nullptr 时为 true。
  bool is_merge : 1;             // 当且仅当 IsMerge(node) 时为 true。
  bool is_enter : 1;             // 当且仅当 IsEnter(node) 时为 true。
  bool is_exit : 1;              // 当且仅当 IsExit(node) 时为 true。
  bool is_control_trigger : 1;   // 当且仅当 IsControlTrigger(node) 时为 true。
  bool is_sink : 1;              // 当且仅当 IsSink(node) 时为 true。
  // 当且仅当 IsEnter(node) || IsExit(node) || IsNextIteration(node) 时为 true。
  bool is_enter_exit_or_next_iter : 1;
  ...
  if (!item->is_enter_exit_or_next_iter) {
    // 不需要特殊处理的节点类型的快速路径。
    DCHECK_EQ(input_frame, output_frame);
    ...
  } else if (item->is_enter) {
  ...
  } else if (item->is_exit) {
  ...
  } else {
    DCHECK(IsNextIteration(node));
    ...
  }

示例:预计算 256 元素数组,并在 trigram 初始化时使用。

byte_trigram_classifier.cc

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
void ByteTrigramClassifier::VerifyModel(void) const {
  ProbT class_sums[num_classes_];
  for (int cls = 0; cls < num_classes_; cls++) {
    class_sums[cls] = 0;
  }
  for (ByteNgramId id = 0; id < trigrams_.num_trigrams(); id++) {
    for (int cls = 0; cls < num_classes_; ++cls) {
      class_sums[cls] += Prob(trigram_probs_[id].log_probs[cls]);
    }
  }
  ...
}                         

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
void ByteTrigramClassifier::VerifyModel(void) const {
  CHECK_EQ(sizeof(ByteLogProbT), 1);
  ProbT fast_prob[256];
  for (int b = 0; b < 256; b++) {
    fast_prob[b] = Prob(static_cast<ByteLogProbT>(b));
  }

  ProbT class_sums[num_classes_];
  for (int cls = 0; cls < num_classes_; cls++) {
    class_sums[cls] = 0;
  }
  for (ByteNgramId id = 0; id < trigrams_.num_trigrams(); id++) {
    for (int cls = 0; cls < num_classes_; ++cls) {
      class_sums[cls] += fast_prob[trigram_probs_[id].log_probs[cls]];
    }
  }
  ...
}                         

一般建议:在模块边界检查畸形输入,而不是在内部反复检查。

将昂贵计算移到循环外

示例:将边界计算移到循环外。

literal_linearizer.cc

修改前:

1
2
for (int64 i = 0; i < src_shape.dimensions(dimension_numbers.front());
     ++i) {

修改后:

1
2
3
4
int64 dim_front = src_shape.dimensions(dimension_numbers.front());
const uint8* src_buffer_data = src_buffer.data();
uint8* dst_buffer_data = dst_buffer.data();
for (int64 i = 0; i < dim_front; ++i) {

延迟昂贵计算

示例:将 GetSubSharding 调用延迟到需要时再执行,把 CPU 时间从 43 秒减少到 2 秒。

sharding_propagation.cc

修改前:

1
2
3
4
5
6
7
HloSharding alternative_sub_sharding =
    user.sharding().GetSubSharding(user.shape(), {i});
if (user.operand(i) == &instruction &&
    hlo_sharding_util::IsShardingMoreSpecific(alternative_sub_sharding,
                                              sub_sharding)) {
  sub_sharding = alternative_sub_sharding;
}

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
if (user.operand(i) == &instruction) {
  // 只有当该 operand 值得关注时才计算 GetSubSharding,
  // 因为它相对昂贵。
  HloSharding alternative_sub_sharding =
      user.sharding().GetSubSharding(user.shape(), {i});
  if (hlo_sharding_util::IsShardingMoreSpecific(
          alternative_sub_sharding, sub_sharding)) {
    sub_sharding = alternative_sub_sharding;
  }
}

示例:不要急切更新统计信息;按需计算。

不要在非常频繁的分配/释放调用中更新统计信息。相反,在调用频率低得多的 Stats() 方法被调用时,按需计算统计信息。

示例:在 Google Web 服务器的查询处理中预分配 10 个节点,而不是 200 个。

一个简单改动,让 Web 服务器 CPU 使用量降低了 7.5%。

querytree.h

修改前:

1
static const int kInitParseTreeSize = 200;   // querynode 池的初始大小。

修改后:

1
static const int kInitParseTreeSize = 10;   // querynode 池的初始大小。

示例:改变搜索顺序,使吞吐量提升 19%。

一个旧搜索系统(约 2000 年)有两层索引:一层包含全文索引,另一层只包含标题和锚文本词项的索引。过去我们会先搜索较小的标题/锚文本层。反直觉的是,我们发现先搜索较大的全文索引层成本更低,因为如果我们到达全文层末尾,就可以完全跳过标题/锚文本层(它是全文层的子集)。这种情况相当常见,因此我们降低了处理一次查询所需的平均磁盘寻道次数。

背景信息可参见 The Anatomy of a Large-Scale Hypertextual Web Search Engine 中关于标题和锚文本处理的讨论。

特化代码

某个对性能敏感的调用点,可能并不需要通用库提供的完整通用性。在这种情况下,如果专用代码能够提升性能,可以考虑编写专用代码,而不是调用通用代码。

示例:为 Histogram 类编写定制打印代码,比 sprintf 快 4 倍。

这段代码对性能敏感,因为监控系统从各类服务器收集统计信息时会调用它。

histogram_export.cc

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void Histogram::PopulateBuckets(const string &prefix,
                                expvar::MapProto *const var) const {
                                ...
  for (int i = min_bucket; i <= max_bucket; ++i) {
    const double count = BucketCount(i);
    if (!export_empty_buckets && count == 0.0) continue;
    acc += count;
    // 离散直方图导出 bucket 的标签格式
    // 指定包含式上界,这与原始 Histogram 实现
    // 相同。该格式不适用于非离散直方图,
    // 因此它们使用半开区间,
    // 并用 “_” 而不是 “-” 作为分隔符,
    // 以便区分这些格式。
    string key =
        options_.export_cumulative_counts() ?
            StringPrintf("%.12g", boundaries_->BucketLimit(i)) :
        options_.discrete() ?
            StringPrintf("%.0f-%.0f",
                         ceil(boundaries_->BucketStart(i)),
                         ceil(boundaries_->BucketLimit(i)) - 1.0) :
            StringPrintf("%.12g_%.12g",
                         boundaries_->BucketStart(i),
                         boundaries_->BucketLimit(i));
    EscapeMapKey(&key);
    const double value = options_.export_cumulative_counts() ? acc : count;
    expvar::AddMapFloat(StrCat(prefix,
                               options_.export_bucket_key_prefix(),
                               key),
                        value * count_mult,
                        var);
  }

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// 按 format 格式化 “val”。如果 “need_escape” 为 true,
// 则 format 可能产生包含 “.” 的输出,结果将被转义。
// 如果 “need_escape” 为 false,则调用方保证 format
// 生成的数字不会包含任何 “.” 字符,
// 因此可以避免调用 EscapeKey。
// 如有必要,该函数可以自由使用 “*scratch” 作为临时空间,
// 生成的 StringPiece 也可能指向 “*scratch” 内部。
static StringPiece FormatNumber(const char* format,
                                bool need_escape,
                                double val, string* scratch) {
  // 该例程被特化为只处理有限数量的格式。
  DCHECK(StringPiece(format) == "%.0f" || StringPiece(format) == "%.12g");

  scratch->clear();
  if (val == trunc(val) && val >= kint32min && val <= kint32max) {
    // 可直接使用 StrAppend 的整数。
    StrAppend(scratch, static_cast<int32>(val));
    return StringPiece(*scratch);
  } else if (isinf(val)) {
    // 无穷大,直接表示为 “inf”。
    return StringPiece("inf", 3);
  } else {
    // 根据 “format” 格式化,并在需要时转义。
    StringAppendF(scratch, format, val);
    if (need_escape) {
      EscapeMapKey(scratch);
    } else {
      DCHECK(!StringPiece(*scratch).contains("."));
    }
    return StringPiece(*scratch);
  }
}
...
void Histogram::PopulateBuckets(const string &prefix,
                                expvar::MapProto *const var) const {
                                ...
  const string full_key_prefix = StrCat(prefix,
                                        options_.export_bucket_key_prefix());
  string key = full_key_prefix;  // key 将以 “full_key_prefix” 开头。
  string start_scratch;
  string limit_scratch;
  const bool cumul_counts = options_.export_cumulative_counts();
  const bool discrete = options_.discrete();
  for (int i = min_bucket; i <= max_bucket; ++i) {
    const double count = BucketCount(i);
    if (!export_empty_buckets && count == 0.0) continue;
    acc += count;
    // 离散直方图导出 bucket 的标签格式
    // 指定包含式上界,这与原始 Histogram 实现
    // 相同。该格式不适用于非离散直方图,
    // 因此它们使用半开区间,
    // 并用 “_” 而不是 “-” 作为分隔符,
    // 以便区分这些格式。
    key.resize(full_key_prefix.size());  // 从 full_key_prefix 开始。
    DCHECK_EQ(key, full_key_prefix);

    const double limit = boundaries_->BucketLimit(i);
    if (cumul_counts) {
      StrAppend(&key, FormatNumber("%.12g", true, limit, &limit_scratch));
    } else {
      const double start = boundaries_->BucketStart(i);
      if (discrete) {
        StrAppend(&key,
                  FormatNumber("%.0f", false, ceil(start), &start_scratch),
                  "-",
                  FormatNumber("%.0f", false, ceil(limit) - 1.0,
                               &limit_scratch));
      } else {
        StrAppend(&key,
                  FormatNumber("%.12g", true, start, &start_scratch),
                  "_",
                  FormatNumber("%.12g", true, limit, &limit_scratch));
      }
    }
    const double value = cumul_counts ? acc : count;

    // 添加到 map 变量。
    expvar::AddMapFloat(key, value * count_mult, var);
  }
}

示例:为 VLOG(1)VLOG(2) 等添加特化,以提升速度并减小代码体积。

VLOG 是整个代码库中大量使用的宏。这个改动避免了在几乎每个调用点传递额外的整数常量(如果日志级别在调用点是常量,几乎总是如此,例如 VLOG(1) << ...),从而节省代码空间。

vlog_is_on.h

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
class VLogSite final {
 public:
  ...
  bool IsEnabled(int level) {
    int stale_v = v_.load(std::memory_order_relaxed);
    if (ABSL_PREDICT_TRUE(level > stale_v)) {
      return false;
    }

    // 我们把快速路径之外的所有内容,即 vlogging 已初始化
    // 但未启用的情况,放到非内联函数后面以减少代码大小。
    return SlowIsEnabled(stale_v, level);
  }
  ...
 private:
  ...
  ABSL_ATTRIBUTE_NOINLINE
  bool SlowIsEnabled(int stale_v, int level);
  ...
};

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class VLogSite final {
 public:
  ...
  bool IsEnabled(int level) {
    int stale_v = v_.load(std::memory_order_relaxed);
    if (ABSL_PREDICT_TRUE(level > stale_v)) {
      return false;
    }

    // 我们把快速路径之外的所有内容,即 vlogging 已初始化
    // 但未启用的情况,放到非内联函数后面以减少代码大小。
    // “level” 几乎总是调用点常量,因此通过对 1、2、3 级
    // 做特殊处理,可以节省一些代码空间。
#if defined(__has_builtin) && __has_builtin(__builtin_constant_p)
    if (__builtin_constant_p(level)) {
      if (level == 0) return SlowIsEnabled0(stale_v);
      if (level == 1) return SlowIsEnabled1(stale_v);
      if (level == 2) return SlowIsEnabled2(stale_v);
      if (level == 3) return SlowIsEnabled3(stale_v);
      if (level == 4) return SlowIsEnabled4(stale_v);
      if (level == 5) return SlowIsEnabled5(stale_v);
    }
#endif
    return SlowIsEnabled(stale_v, level);
    ...
 private:
  ...
  ABSL_ATTRIBUTE_NOINLINE
  bool SlowIsEnabled(int stale_v, int level);
  ABSL_ATTRIBUTE_NOINLINE bool SlowIsEnabled0(int stale_v);
  ABSL_ATTRIBUTE_NOINLINE bool SlowIsEnabled1(int stale_v);
  ABSL_ATTRIBUTE_NOINLINE bool SlowIsEnabled2(int stale_v);
  ABSL_ATTRIBUTE_NOINLINE bool SlowIsEnabled3(int stale_v);
  ABSL_ATTRIBUTE_NOINLINE bool SlowIsEnabled4(int stale_v);
  ABSL_ATTRIBUTE_NOINLINE bool SlowIsEnabled5(int stale_v);
  ...
};

vlog_is_on.cc

修改后:

1
2
3
4
5
6
bool VLogSite::SlowIsEnabled0(int stale_v) { return SlowIsEnabled(stale_v, 0); }
bool VLogSite::SlowIsEnabled1(int stale_v) { return SlowIsEnabled(stale_v, 1); }
bool VLogSite::SlowIsEnabled2(int stale_v) { return SlowIsEnabled(stale_v, 2); }
bool VLogSite::SlowIsEnabled3(int stale_v) { return SlowIsEnabled(stale_v, 3); }
bool VLogSite::SlowIsEnabled4(int stale_v) { return SlowIsEnabled(stale_v, 4); }
bool VLogSite::SlowIsEnabled5(int stale_v) { return SlowIsEnabled(stale_v, 5); }

示例:在可能时用简单前缀匹配替换 RE2 调用。

read_matcher.cc

修改前:

1
2
3
4
5
6
enum MatchItemType {
  MATCH_TYPE_INVALID,
  MATCH_TYPE_RANGE,
  MATCH_TYPE_EXACT,
  MATCH_TYPE_REGEXP,
};

修改后:

1
2
3
4
5
6
7
enum MatchItemType {
  MATCH_TYPE_INVALID,
  MATCH_TYPE_RANGE,
  MATCH_TYPE_EXACT,
  MATCH_TYPE_REGEXP,
  MATCH_TYPE_PREFIX,   // regexp “.*” 的特殊类型。
};

read_matcher.cc

修改前:

1
p->type = MATCH_TYPE_REGEXP;

修改后:

1
2
3
4
5
6
7
term.NonMetaPrefix().CopyToString(&p->prefix);
if (term.RegexpSuffix() == ".*") {
  // 对匹配任意内容的 regexp 做特殊处理,
  // 这样可以绕过 RE2::FullMatch。
  p->type = MATCH_TYPE_PREFIX;
} else {
  p->type = MATCH_TYPE_REGEXP;

示例:使用 StrCat 而不是 StringPrintf 格式化 IP 地址。

ipaddress.cc

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
string IPAddress::ToString() const {
  char buf[INET6_ADDRSTRLEN];

  switch (address_family_) {
    case AF_INET:
      CHECK(inet_ntop(AF_INET, &addr_.addr4, buf, INET6_ADDRSTRLEN) != NULL);
      return buf;
    case AF_INET6:
      CHECK(inet_ntop(AF_INET6, &addr_.addr6, buf, INET6_ADDRSTRLEN) != NULL);
      return buf;
    case AF_UNSPEC:
      LOG(DFATAL) << "Calling ToString() on an empty IPAddress";
      return "";
    default:
      LOG(FATAL) << "Unknown address family " << address_family_;
  }
}
...
string IPAddressToURIString(const IPAddress& ip) {
  switch (ip.address_family()) {
    case AF_INET6:
      return StringPrintf("[%s]", ip.ToString().c_str());
    default:
      return ip.ToString();
  }
}
...
string SocketAddress::ToString() const {
  return IPAddressToURIString(host_) + StringPrintf(":%u", port_);
}

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
string IPAddress::ToString() const {
  char buf[INET6_ADDRSTRLEN];

  switch (address_family_) {
    case AF_INET: {
      uint32 addr = gntohl(addr_.addr4.s_addr);
      int a1 = static_cast<int>((addr >> 24) & 0xff);
      int a2 = static_cast<int>((addr >> 16) & 0xff);
      int a3 = static_cast<int>((addr >> 8) & 0xff);
      int a4 = static_cast<int>(addr & 0xff);
      return StrCat(a1, ".", a2, ".", a3, ".", a4);
    }
    case AF_INET6:
      CHECK(inet_ntop(AF_INET6, &addr_.addr6, buf, INET6_ADDRSTRLEN) != NULL);
      return buf;
    case AF_UNSPEC:
      LOG(DFATAL) << "Calling ToString() on an empty IPAddress";
      return "";
    default:
      LOG(FATAL) << "Unknown address family " << address_family_;
  }
}
...
string IPAddressToURIString(const IPAddress& ip) {
  switch (ip.address_family()) {
    case AF_INET6:
      return StrCat("[", ip.ToString(), "]");
    default:
      return ip.ToString();
  }
}
...
string SocketAddress::ToString() const {
  return StrCat(IPAddressToURIString(host_), ":", port_);
}

使用缓存避免重复工作

示例:基于大型序列化 proto 的预计算 fingerprint 进行缓存。

dp_ops.cc

修改前:

1
2
3
4
5
InputOutputMappingProto mapping_proto;
PLAQUE_OP_REQUIRES(
    mapping_proto.ParseFromStringPiece(GetAttrMappingProto(state)),
    absl::InternalError("Failed to parse InputOutputMappingProto"));
ParseMapping(mapping_proto);

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
uint64 mapping_proto_fp = GetAttrMappingProtoFp(state);
{
  absl::MutexLock l(&fp_to_iometa_mu);
  if (fp_to_iometa == nullptr) {
    fp_to_iometa =
        new absl::flat_hash_map<uint64, std::unique_ptr<ProgramIOMetadata>>;
  }
  auto it = fp_to_iometa->find(mapping_proto_fp);
  if (it != fp_to_iometa->end()) {
    io_metadata_ = it->second.get();
  } else {
    auto serial_proto = GetAttrMappingProto(state);
    DCHECK_EQ(mapping_proto_fp, Fingerprint(serial_proto));
    InputOutputMappingProto mapping_proto;
    PLAQUE_OP_REQUIRES(
        mapping_proto.ParseFromStringPiece(GetAttrMappingProto(state)),
        absl::InternalError("Failed to parse InputOutputMappingProto"));
    auto io_meta = ParseMapping(mapping_proto);
    io_metadata_ = io_meta.get();
    (*fp_to_iometa)[mapping_proto_fp] = std::move(io_meta);
  }
}

让编译器更容易优化

编译器可能很难穿透多层抽象进行优化,因为它必须对代码整体行为做保守假设,也可能无法做出正确的速度与体积权衡。应用程序员通常更了解系统行为,可以通过把代码改写到更低层的操作方式来帮助编译器。不过,只有在性能剖析显示确实存在问题时才这样做,因为编译器通常自己就能做对。查看性能关键例程生成的汇编代码,可以帮助你理解编译器是否“做对了”。Pprof 提供了非常有用的[源码与反汇编交错显示][annotated source],并带有性能数据标注。一些可能有用的技巧:

  1. 避免在热点函数中调用其他函数(让编译器可以避免帧设置成本)。

  2. 将慢路径代码移到单独的尾调用函数中。

  3. 在大量使用少量数据前,先把它复制到局部变量中。 这可以让编译器假设它不会与其他数据发生别名,从而可能改善自动向量化和寄存器分配。

  4. 手动展开非常热的循环。

示例:用指向底层数组的裸指针替换 absl::Span,加速 ShapeUtil::ForEachState

shape_util.h

修改前:

1
2
3
4
5
6
7
8
9
struct ForEachState {
  ForEachState(const Shape& s, absl::Span<const int64_t> b,
               absl::Span<const int64_t> c, absl::Span<const int64_t> i);
  ~ForEachState();

  const Shape& shape;
  const absl::Span<const int64_t> base;
  const absl::Span<const int64_t> count;
  const absl::Span<const int64_t> incr;

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
struct ForEachState {
  ForEachState(const Shape& s, absl::Span<const int64_t> b,
               absl::Span<const int64_t> c, absl::Span<const int64_t> i);
  inline ~ForEachState() = default;

  const Shape& shape;
  // 指向传入 span 所表示数组的指针。
  const int64_t* const base;
  const int64_t* const count;
  const int64_t* const incr;

示例:手动展开循环冗余校验(CRC)计算循环。

crc.cc

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
void CRC32::Extend(uint64 *lo, uint64 *hi, const void *bytes, size_t length)
                      const {
                      ...
  // 每次处理 4 字节。
  while ((p + 4) <= e) {
    uint32 c = l ^ WORD(p);
    p += 4;
    l = this->table3_[c & 0xff] ^
        this->table2_[(c >> 8) & 0xff] ^
        this->table1_[(c >> 16) & 0xff] ^
        this->table0_[c >> 24];
  }

  // 处理最后几个字节。
  while (p != e) {
    int c = (l & 0xff) ^ *p++;
    l = this->table0_[c] ^ (l >> 8);
  }
  *lo = l;
}

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void CRC32::Extend(uint64 *lo, uint64 *hi, const void *bytes, size_t length)
                      const {
                      ...
#define STEP {                                  \
    uint32 c = l ^ WORD(p);                     \
    p += 4;                                     \
    l = this->table3_[c & 0xff] ^               \
        this->table2_[(c >> 8) & 0xff] ^        \
        this->table1_[(c >> 16) & 0xff] ^       \
        this->table0_[c >> 24];                 \
}

  // 每次处理 16 字节。
  while ((e-p) >= 16) {
    STEP;
    STEP;
    STEP;
    STEP;
  }

  // 每次处理 4 字节。
  while ((p + 4) <= e) {
    STEP;
  }
#undef STEP

  // 处理最后几个字节。
  while (p != e) {
    int c = (l & 0xff) ^ *p++;
    l = this->table0_[c] ^ (l >> 8);
  }
  *lo = l;
}

示例:解析 Spanner key 时一次处理四个字符。

  • 手动展开循环,一次处理四个字符,而不是使用 memchr

  • 手动展开查找名称分隔段的循环。

  • 由于名称的第一部分可能最长,因此向后查找以 # 分隔的名称片段,而不是向前查找。

key.cc

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
void Key::InitSeps(const char* start) {
  const char* base = &rep_[0];
  const char* limit = base + rep_.size();
  const char* s = start;

  DCHECK_GE(s, base);
  DCHECK_LT(s, limit);

  for (int i = 0; i < 3; i++) {
    s = (const char*)memchr(s, '#', limit - s);
    DCHECK(s != NULL);
    seps_[i] = s - base;
    s++;
  }
}

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
inline const char* ScanBackwardsForSep(const char* base, const char* p) {
  while (p >= base + 4) {
    if (p[0] == '#') return p;
    if (p[-1] == '#') return p-1;
    if (p[-2] == '#') return p-2;
    if (p[-3] == '#') return p-3;
    p -= 4;
  }
  while (p >= base && *p != '#') p--;
  return p;
}

void Key::InitSeps(const char* start) {
  const char* base = &rep_[0];
  const char* limit = base + rep_.size();
  const char* s = start;

  DCHECK_GE(s, base);
  DCHECK_LT(s, limit);

  // 我们从字符串末尾向后走,而不是向前走,
  // 因为目录名可能很长,且肯定不包含
  // 任何 “#” 字符。
  const char* p = ScanBackwardsForSep(s, limit - 1);
  DCHECK(*p == '#');
  seps_[2] = p - base;
  p--;

  p = ScanBackwardsForSep(s, p);
  DCHECK(*p == '#');
  seps_[1] = p - base;
  p--;

  p = ScanBackwardsForSep(s, p);
  DCHECK(*p == '#');
  seps_[0] = p - base;
}

示例:将 ABSL_LOG(FATAL) 改为 ABSL_DCHECK(false),避免帧设置成本。

arena_cleanup.h

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
inline ABSL_ATTRIBUTE_ALWAYS_INLINE size_t Size(Tag tag) {
  if (!EnableSpecializedTags()) return sizeof(DynamicNode);

  switch (tag) {
    case Tag::kDynamic:
      return sizeof(DynamicNode);
    case Tag::kString:
      return sizeof(TaggedNode);
    case Tag::kCord:
      return sizeof(TaggedNode);
    default:
      ABSL_LOG(FATAL) << "Corrupted cleanup tag: " << static_cast<int>(tag);
      return sizeof(DynamicNode);
  }
}

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
inline ABSL_ATTRIBUTE_ALWAYS_INLINE size_t Size(Tag tag) {
  if (!EnableSpecializedTags()) return sizeof(DynamicNode);

  switch (tag) {
    case Tag::kDynamic:
      return sizeof(DynamicNode);
    case Tag::kString:
      return sizeof(TaggedNode);
    case Tag::kCord:
      return sizeof(TaggedNode);
    default:
      ABSL_DCHECK(false) << "Corrupted cleanup tag: " << static_cast<int>(tag);
      return sizeof(DynamicNode);
  }
}

降低统计信息收集成本

需要在系统统计信息及其他行为信息的价值,与维护这些信息的成本之间做权衡。额外信息通常能帮助人们理解和改进高层行为,但维护它们也可能很昂贵。

没有用处的统计信息可以直接删除。

示例:停止维护 SelectServer 中 alarm 和 closure 数量这类昂贵统计。

这是把设置 alarm 的时间从 771 ns 降到 271 ns 的改动之一。

selectserver.h

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
class SelectServer {
 public:
 ...
 protected:
  ...
  scoped_ptr<MinuteTenMinuteHourStat> num_alarms_stat_;
  ...
  scoped_ptr<MinuteTenMinuteHourStat> num_closures_stat_;
  ...
};

修改后:

1
2
3
4
5
6
// Selectserver 类。
class SelectServer {
 ...
 protected:
 ...
};

/selectserver.cc

修改前:

1
2
3
4
5
6
7
8
9
void SelectServer::AddAlarmInternal(Alarmer* alarmer,
                                    int offset_in_ms,
                                    int id,
                                    bool is_periodic) {
                                    ...
  alarms_->insert(alarm);
  num_alarms_stat_->IncBy(1);
  ...
}

修改后:

1
2
3
4
5
6
7
8
void SelectServer::AddAlarmInternal(Alarmer* alarmer,
                                    int offset_in_ms,
                                    int id,
                                    bool is_periodic) {
                                    ...
  alarms_->Add(alarm);
  ...
}

/selectserver.cc

修改前:

1
2
3
4
5
6
void SelectServer::RemoveAlarm(Alarmer* alarmer, int id) {
      ...
      alarms_->erase(alarm);
      num_alarms_stat_->IncBy(-1);
      ...
}

修改后:

1
2
3
4
5
void SelectServer::RemoveAlarm(Alarmer* alarmer, int id) {
      ...
      alarms_->Remove(alarm);
      ...
}

通常可以只为系统处理的元素样本维护统计信息或其他属性(例如 RPC 请求、输入记录、用户)。许多子系统都使用这种方法(如 tcmalloc 分配跟踪、/requestz 状态页、Dapper 样本)。

使用采样时,应在合适的情况下考虑降低采样率。

示例:只为 doc info 请求样本维护统计信息。

采样让我们可以在大多数请求中避免触碰 39 个直方图和 MinuteTenMinuteHour 统计。

generic-leaf-stats.cc

1
... code that touches 39 histograms to update various stats on every request ...
1
2
3
4
5
6
7
// 定期添加到直方图。
if (TryLockToUpdateHistogramsDocInfo(docinfo_stats, bucket)) {
  // 仅当应该采样该请求以维护统计信息时,
  // 返回 true 并获取 bucket->lock。
  ... code that touches 39 histograms to update various stats ...
  bucket->lock.Unlock();
}

示例:降低采样率,并让采样决策更快。

这个改动将采样率从 1/10 降到 1/32。此外,我们现在只为被采样事件保留执行时间统计,并通过使用 2 的幂取模来加速采样决策。这段代码会在 Google Meet 视频会议系统的每个数据包上调用;在 COVID 疫情初期,用户迅速转向更多线上会议,因此需要做性能优化来跟上容量需求。

packet_executor.cc

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class ScopedPerformanceMeasurement {
 public:
  explicit ScopedPerformanceMeasurement(PacketExecutor* packet_executor)
      : packet_executor_(packet_executor),
        tracer_(packet_executor->packet_executor_trace_threshold_,
                kClosureTraceName) {
    // ThreadCPUUsage 是昂贵调用。在撰写时,
    // 它耗时超过 400ns,大约比 absl::Now 慢 30 倍,
    // 因此我们只采样 10% 的 closure 以降低成本。
    if (packet_executor->closures_executed_ % 10 == 0) {
      thread_cpu_usage_start_ = base::ThreadCPUUsage();
    }

    // 在可能执行上述昂贵调用后再采样开始时间,
    // 以免污染墙钟时间测量。
    run_start_time_ = absl::Now();
  }

  ~ScopedPerformanceMeasurement() {

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
ScopedPerformanceMeasurement::ScopedPerformanceMeasurement(
    PacketExecutor* packet_executor)
    : packet_executor_(packet_executor),
      tracer_(packet_executor->packet_executor_trace_threshold_,
              kClosureTraceName) {
  // ThreadCPUUsage 是昂贵调用。在撰写时,
  // 它耗时超过 400ns,大约比 absl::Now 慢 30 倍,
  // 因此我们只采样 1/32 的 closure 以降低成本。
  if (packet_executor->closures_executed_ % 32 == 0) {
    thread_cpu_usage_start_ = base::ThreadCPUUsage();
  }

  // 在可能执行上述昂贵调用后再采样开始时间,
  // 以免污染墙钟时间测量。
  run_start_time_ = absl::Now();
}

packet_executor.cc

修改前:

1
2
3
4
5
6
7
8
9
~ScopedPerformanceMeasurement() {
  auto run_end_time = absl::Now();
  auto run_duration = run_end_time - run_start_time_;

  if (thread_cpu_usage_start_.has_value()) {
  ...
  }

  closure_execution_time->Record(absl::ToInt64Microseconds(run_duration));

修改后:

1
2
3
4
5
6
7
8
ScopedPerformanceMeasurement::~ScopedPerformanceMeasurement() {
  auto run_end_time = absl::Now();
  auto run_duration = run_end_time - run_start_time_;

  if (thread_cpu_usage_start_.has_value()) {
    ...
    closure_execution_time->Record(absl::ToInt64Microseconds(run_duration));
  }

基准测试结果:

1
2
3
4
5
Run on (40 X 2793 MHz CPUs); 2020-03-24T20:08:19.991412535-07:00
CPU: Intel Ivybridge with HyperThreading (20 cores) dL1:32KB dL2:256KB dL3:25MB
Benchmark                                      Base (ns)    New (ns) Improvement
----------------------------------------------------------------------------
BM_PacketOverhead_mean                               224          85    +62.0%

避免在热点路径上记录日志

日志语句可能很昂贵,即使该语句的日志级别实际上不会输出任何内容。例如,ABSL_VLOG 的实现至少需要一次加载和一次比较,这在热点代码路径中可能成为问题。此外,日志代码的存在还可能抑制编译器优化。可以考虑从热点路径中完全移除日志。

示例:从内存分配器核心路径中移除日志。

这是一个更大改动中的一小部分。

gpu_bfc_allocator.cc

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
void GPUBFCAllocator::SplitChunk(...) {
  ...
  VLOG(6) << "Adding to chunk map: " << new_chunk->ptr;
  ...
}
...
void GPUBFCAllocator::DeallocateRawInternal(void* ptr) {
  ...
  VLOG(6) << "Chunk at " << c->ptr << " no longer in use";
  ...
}
1
2
3
4
5
6
7
void GPUBFCAllocator::SplitChunk(...) {
...
}
...
void GPUBFCAllocator::DeallocateRawInternal(void* ptr) {
...
}

示例:在嵌套循环外预先计算日志是否启用。

image_similarity.cc

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
for (int j = 0; j < output_subimage_size_y; j++) {
  int j1 = j - rad + output_to_integral_subimage_y;
  int j2 = j1 + 2 * rad + 1;
  // 为该行输出创建指针,同时考虑到相对
  // 完整图像的 offset。
  double *image_diff_ptr = &(*image_diff)(j + min_j, min_i);

  for (int i = 0; i < output_subimage_size_x; i++) {
    ...
    if (VLOG_IS_ON(3)) {
    ...
    }
    ...
  }
}

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
const bool vlog_3 = DEBUG_MODE ? VLOG_IS_ON(3) : false;

for (int j = 0; j < output_subimage_size_y; j++) {
  int j1 = j - rad + output_to_integral_subimage_y;
  int j2 = j1 + 2 * rad + 1;
  // 为该行输出创建指针,同时考虑到相对
  // 完整图像的 offset。
  double *image_diff_ptr = &(*image_diff)(j + min_j, min_i);

  for (int i = 0; i < output_subimage_size_x; i++) {
    ...
    if (vlog_3) {
    ...
    }
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
Run on (40 X 2801 MHz CPUs); 2016-05-16T15:55:32.250633072-07:00
CPU: Intel Ivybridge with HyperThreading (20 cores) dL1:32KB dL2:256KB dL3:25MB
Benchmark                          Base (ns)  New (ns) Improvement
------------------------------------------------------------------
BM_NCCPerformance/16                   29104     26372     +9.4%
BM_NCCPerformance/64                  473235    425281    +10.1%
BM_NCCPerformance/512               30246238  27622009     +8.7%
BM_NCCPerformance/1k              125651445  113361991     +9.8%
BM_NCCLimitedBoundsPerformance/16       8314      7498     +9.8%
BM_NCCLimitedBoundsPerformance/64     143508    132202     +7.9%
BM_NCCLimitedBoundsPerformance/512   9335684   8477567     +9.2%
BM_NCCLimitedBoundsPerformance/1k   37223897  34201739     +8.1%

示例:预先计算日志是否启用,并在辅助例程中使用该结果。

periodic_call.cc

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
  VLOG(1) << Logid()
          << "MaybeScheduleAlarmAtNextTick. Time until next real time: "
          << time_until_next_real_time;
          ...
  uint64 next_virtual_time_ms =
      next_virtual_time_ms_ - num_ticks * kResolutionMs;
  CHECK_GE(next_virtual_time_ms, 0);
  ScheduleAlarm(now, delay, next_virtual_time_ms);
}

void ScheduleNextAlarm(uint64 current_virtual_time_ms)
    ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
  if (calls_.empty()) {
    VLOG(1) << Logid() << "No calls left, entering idle mode";
    next_real_time_ = absl::InfiniteFuture();
    return;
  }
  uint64 next_virtual_time_ms = FindNextVirtualTime(current_virtual_time_ms);
  auto delay =
      absl::Milliseconds(next_virtual_time_ms - current_virtual_time_ms);
  ScheduleAlarm(GetClock().TimeNow(), delay, next_virtual_time_ms);
}

// 该函数调度的 alarm 会取代之前所有已调度的
// alarm。这通过 `scheduling_sequence_number_` 保证。
void ScheduleAlarm(absl::Time now, absl::Duration delay,
                   uint64 virtual_time_ms)
    ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
  next_real_time_ = now + delay;
  next_virtual_time_ms_ = virtual_time_ms;
  ++ref_count_;  // Alarm 持有一个引用。
  ++scheduling_sequence_number_;
  VLOG(1) << Logid() << "ScheduleAlarm. Time : "
          << absl::FormatTime("%M:%S.%E3f", now, absl::UTCTimeZone())
          << ", delay: " << delay << ", virtual time: " << virtual_time_ms
          << ", refs: " << ref_count_
          << ", seq: " << scheduling_sequence_number_
          << ", executor: " << executor_;

  executor_->AddAfter(
      delay, new Alarm(this, virtual_time_ms, scheduling_sequence_number_));
}

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
  const bool vlog_1 = VLOG_IS_ON(1);

  if (vlog_1) {
    VLOG(1) << Logid()
            << "MaybeScheduleAlarmAtNextTick. Time until next real time: "
            << time_until_next_real_time;
  }
  ...
  uint64 next_virtual_time_ms =
      next_virtual_time_ms_ - num_ticks * kResolutionMs;
  CHECK_GE(next_virtual_time_ms, 0);
  ScheduleAlarm(now, delay, next_virtual_time_ms, vlog_1);
}

void ScheduleNextAlarm(uint64 current_virtual_time_ms, bool vlog_1)
    ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
  if (calls_.empty()) {
    if (vlog_1) {
      VLOG(1) << Logid() << "No calls left, entering idle mode";
    }
    next_real_time_ = absl::InfiniteFuture();
    return;
  }
  uint64 next_virtual_time_ms = FindNextVirtualTime(current_virtual_time_ms);
  auto delay =
      absl::Milliseconds(next_virtual_time_ms - current_virtual_time_ms);
  ScheduleAlarm(GetClock().TimeNow(), delay, next_virtual_time_ms, vlog_1);
}

// 该函数调度的 alarm 会取代之前所有已调度的
// alarm。这通过 `scheduling_sequence_number_` 保证。
void ScheduleAlarm(absl::Time now, absl::Duration delay,
                   uint64 virtual_time_ms,
                   bool vlog_1)
    ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
  next_real_time_ = now + delay;
  next_virtual_time_ms_ = virtual_time_ms;
  ++ref_count_;  // Alarm 持有一个引用。
  ++scheduling_sequence_number_;
  if (vlog_1) {
    VLOG(1) << Logid() << "ScheduleAlarm. Time : "
            << absl::FormatTime("%M:%S.%E3f", now, absl::UTCTimeZone())
            << ", delay: " << delay << ", virtual time: " << virtual_time_ms
            << ", refs: " << ref_count_
            << ", seq: " << scheduling_sequence_number_
            << ", executor: " << executor_;
  }

  executor_->AddAfter(
      delay, new Alarm(this, virtual_time_ms, scheduling_sequence_number_));
}

代码大小方面的考量

性能不只是运行时速度。有时也值得考虑软件选择对生成代码体积的影响。代码体积大意味着更长的编译和链接时间、臃肿的二进制、更多内存使用、更大的指令缓存压力,以及对分支预测器等微架构结构的其他负面影响。在编写会被许多地方使用的底层库代码,或编写预期会为许多不同类型实例化的模板代码时,思考这些问题尤其重要。

减少代码体积的有效技巧会因编程语言而有很大差异。下面是一些我们认为对 C++ 代码有用的技巧(C++ 代码很容易因为过度使用模板和内联而膨胀)。

精简常被内联的代码

被广泛调用的函数一旦与内联结合,可能会对代码体积产生巨大影响。

示例:加速 TF_CHECK_OK

避免创建 Ok 对象,并把致命错误消息的复杂格式化移到非内联路径,而不是在每个调用点执行,从而节省代码空间。

status.h

修改前:

1
2
#define TF_CHECK_OK(val) CHECK_EQ(::tensorflow::Status::OK(), (val))
#define TF_QCHECK_OK(val) QCHECK_EQ(::tensorflow::Status::OK(), (val))

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
extern tensorflow::string* TfCheckOpHelperOutOfLine(
    const ::tensorflow::Status& v, const char* msg);
inline tensorflow::string* TfCheckOpHelper(::tensorflow::Status v,
                                           const char* msg) {
  if (v.ok()) return nullptr;
  return TfCheckOpHelperOutOfLine(v, msg);
}
#define TF_CHECK_OK(val)                                           \
  while (tensorflow::string* _result = TfCheckOpHelper(val, #val)) \
  LOG(FATAL) << *(_result)
#define TF_QCHECK_OK(val)                                          \
  while (tensorflow::string* _result = TfCheckOpHelper(val, #val)) \
  LOG(QFATAL) << *(_result)

status.cc

修改后:

1
2
3
4
5
6
7
8
9
string* TfCheckOpHelperOutOfLine(const ::tensorflow::Status& v,
                                 const char* msg) {
  string r("Non-OK-status: ");
  r += msg;
  r += " status: ";
  r += v.ToString();
  // 会泄漏字符串,但它只用于 fatal 错误消息。
  return new string(r);
}

示例:让每个 RETURN_IF_ERROR 调用点减少 79 字节代码。

  • 增加专门供 RETURN_IF_ERROR 使用的适配器类。

  • 不在 RETURN_IF_ERROR 的快速路径上构造/析构 StatusBuilder

  • 不内联某些 StatusBuilder 方法,因为快速路径上已经不再需要它们。

  • 避免不必要的 ~Status 调用。

示例:让 CHECK_GE 性能提升 4.5 倍,并将代码大小从 125 字节缩小到 77 字节。

logging.h

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
struct CheckOpString {
  CheckOpString(string* str) : str_(str) { }
  ~CheckOpString() { delete str_; }
  operator bool() const { return str_ == NULL; }
  string* str_;
};
...
#define DEFINE_CHECK_OP_IMPL(name, op) \
  template <class t1, class t2> \
  inline string* Check##name##Impl(const t1& v1, const t2& v2, \
                                   const char* names) { \
    if (v1 op v2) return NULL; \
    else return MakeCheckOpString(v1, v2, names); \
  } \
  string* Check##name##Impl(int v1, int v2, const char* names);
DEFINE_CHECK_OP_IMPL(EQ, ==)
DEFINE_CHECK_OP_IMPL(NE, !=)
DEFINE_CHECK_OP_IMPL(LE, <=)
DEFINE_CHECK_OP_IMPL(LT, < )
DEFINE_CHECK_OP_IMPL(GE, >=)
DEFINE_CHECK_OP_IMPL(GT, > )
#undef DEFINE_CHECK_OP_IMPL

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
struct CheckOpString {
  CheckOpString(string* str) : str_(str) { }
  // 没有析构函数:如果 str_ 非 NULL,说明即将 LOG(FATAL),
  // 因此清理 str_ 没有意义。
  operator bool() const { return str_ == NULL; }
  string* str_;
};
...
extern string* MakeCheckOpStringIntInt(int v1, int v2, const char* names);

template<int, int>
string* MakeCheckOpString(const int& v1, const int& v2, const char* names) {
  return MakeCheckOpStringIntInt(v1, v2, names);
}
...
#define DEFINE_CHECK_OP_IMPL(name, op) \
  template <class t1, class t2> \
  inline string* Check##name##Impl(const t1& v1, const t2& v2, \
                                   const char* names) { \
    if (v1 op v2) return NULL; \
    else return MakeCheckOpString(v1, v2, names); \
  } \
  inline string* Check##name##Impl(int v1, int v2, const char* names) { \
    if (v1 op v2) return NULL; \
    else return MakeCheckOpString(v1, v2, names); \
  }
DEFINE_CHECK_OP_IMPL(EQ, ==)
DEFINE_CHECK_OP_IMPL(NE, !=)
DEFINE_CHECK_OP_IMPL(LE, <=)
DEFINE_CHECK_OP_IMPL(LT, < )
DEFINE_CHECK_OP_IMPL(GE, >=)
DEFINE_CHECK_OP_IMPL(GT, > )
#undef DEFINE_CHECK_OP_IMPL

logging.cc

修改后:

1
2
3
4
5
string* MakeCheckOpStringIntInt(int v1, int v2, const char* names) {
  strstream ss;
  ss << names << " (" << v1 << " vs. " << v2 << ")";
  return new string(ss.str(), ss.pcount());
}

谨慎内联

内联通常能提升性能,但有时会增加代码体积,却没有相应的性能收益(某些情况下甚至会因为指令缓存压力增加而造成性能下降)。

示例:减少 TensorFlow 中的内联。

该改动停止内联许多非性能敏感函数(例如错误路径和 op 注册代码)。此外,一些性能敏感函数的慢路径被移到非内联函数中。

这些改动让典型二进制中的 TensorFlow 符号大小减少 12.2%(从 8814545 字节降到 7740233 字节)。

示例:Protocol Buffer 库改动:对于 ≥ 128 字节的消息,避免为编码消息长度生成昂贵的内联代码,改为调用共享的非内联例程。

这不仅让重要的大型二进制更小,也让它们更快。

某个大型二进制中,一个重度内联例程每行源码生成的代码字节数。第一个数字表示某一源码行在所有内联位置生成的总字节数。

修改前:

1
2
3
4
5
6
7
8
9
.           0   1825 template <typename MessageType>
.           0   1826 inline uint8* WireFormatLite::InternalWriteMessage(
.           0   1827     int field_number, const MessageType& value, uint8* target,
.           0   1828     io::EpsCopyOutputStream* stream) {
>>>    389246   1829   target = WriteTagToArray(field_number, WIRETYPE_LENGTH_DELIMITED, target);
>>>   5454640   1830   target = io::CodedOutputStream::WriteVarint32ToArray(
>>>    337837   1831       static_cast<uint32>(value.GetCachedSize()), target);
>>>   1285539   1832   return value._InternalSerialize(target, stream);
.           0   1833 }

此改动后的新代码大小输出如下:

1
2
3
4
5
6
7
8
9
.           0   1825 template <typename MessageType>
.           0   1826 inline uint8* WireFormatLite::InternalWriteMessage(
.           0   1827     int field_number, const MessageType& value, uint8* target,
.           0   1828     io::EpsCopyOutputStream* stream) {
>>>    450612   1829   target = WriteTagToArray(field_number, WIRETYPE_LENGTH_DELIMITED, target);
>>       9609   1830   target = io::CodedOutputStream::WriteVarint32ToArrayOutOfLine(
>>>    434668   1831       static_cast<uint32>(value.GetCachedSize()), target);
>>>   1597394   1832   return value._InternalSerialize(target, stream);
.           0   1833 }

coded_stream.h

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class PROTOBUF_EXPORT CodedOutputStream {
  ...
  // 类似 WriteVarint32(),但直接写入目标数组,
  // 并将较少见路径放到非内联路径,而不是内联。
  static uint8* WriteVarint32ToArrayOutOfLine(uint32 value, uint8* target);
  ...
};
...
inline uint8* CodedOutputStream::WriteVarint32ToArrayOutOfLine(uint32 value,
                                                               uint8* target) {
  target[0] = static_cast<uint8>(value);
  if (value < 0x80) {
    return target + 1;
  } else {
    return WriteVarint32ToArrayOutOfLineHelper(value, target);
  }
}

coded_stream.cc

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
uint8* CodedOutputStream::WriteVarint32ToArrayOutOfLineHelper(uint32 value,
                                                              uint8* target) {
  DCHECK_GE(value, 0x80);
  target[0] |= static_cast<uint8>(0x80);
  value >>= 7;
  target[1] = static_cast<uint8>(value);
  if (value < 0x80) {
    return target + 2;
  }
  target += 2;
  do {
    // 打开刚写入字节中的 continuation bit。
    target[-1] |= static_cast<uint8>(0x80);
    value >>= 7;
    *target = static_cast<uint8>(value);
    ++target;
  } while (value >= 0x80);
  return target;
}

示例:减少 absl::flat_hash_setabsl::flat_hash_map 的代码大小。

  • 将不依赖具体哈希表类型的代码提取到公共的(非内联)函数中。

  • 谨慎放置 ABSL_ATTRIBUTE_NOINLINE 指令。

  • 将一些慢路径移到非内联路径。

让一些大型二进制体积减少约 0.5%。

示例:不使用 protobuf arena 时,不内联字符串分配和释放。

public/arenastring.h

修改前:

1
2
3
4
5
6
7
8
  if (IsDefault(default_value)) {
    std::string* new_string = new std::string();
    tagged_ptr_.Set(new_string);
    return new_string;
  } else {
    return UnsafeMutablePointer();
  }
}

修改后:

1
2
3
4
5
6
  if (IsDefault(default_value)) {
    return SetAndReturnNewString();
  } else {
    return UnsafeMutablePointer();
  }
}

internal/arenastring.cc

修改后:

1
2
3
4
5
std::string* ArenaStringPtr::SetAndReturnNewString() {
  std::string* new_string = new std::string();
  tagged_ptr_.Set(new_string);
  return new_string;
}

示例:避免内联某些例程。创建接受 const char* 而不是 const std::string& 的例程变体,以避免每个调用点都生成 std::string 构造代码。

op.h

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
class OpDefBuilderWrapper {
 public:
  explicit OpDefBuilderWrapper(const char name[]) : builder_(name) {}
  OpDefBuilderWrapper& Attr(std::string spec) {
    builder_.Attr(std::move(spec));
    return *this;
  }
  OpDefBuilderWrapper& Input(std::string spec) {
    builder_.Input(std::move(spec));
    return *this;
  }
  OpDefBuilderWrapper& Output(std::string spec) {
    builder_.Output(std::move(spec));
    return *this;
  }

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class OpDefBuilderWrapper {
 public:
  explicit OpDefBuilderWrapper(const char name[]) : builder_(name) {}
  OpDefBuilderWrapper& Attr(std::string spec) {
    builder_.Attr(std::move(spec));
    return *this;
  }
  OpDefBuilderWrapper& Attr(const char* spec) TF_ATTRIBUTE_NOINLINE {
    return Attr(std::string(spec));
  }
  OpDefBuilderWrapper& Input(std::string spec) {
    builder_.Input(std::move(spec));
    return *this;
  }
  OpDefBuilderWrapper& Input(const char* spec) TF_ATTRIBUTE_NOINLINE {
    return Input(std::string(spec));
  }
  OpDefBuilderWrapper& Output(std::string spec) {
    builder_.Output(std::move(spec));
    return *this;
  }
  OpDefBuilderWrapper& Output(const char* spec) TF_ATTRIBUTE_NOINLINE {
    return Output(std::string(spec));
  }

减少模板实例化

模板代码在实例化时,可能会为模板参数的每种可能组合生成重复代码。

示例:用普通参数替换模板参数。

将一个以 bool 为模板参数的大型例程改为把该 bool 作为额外普通参数传入。(这个 bool 只用于在两个字符串常量中选择一个,因此运行时检查完全可以接受。)这把该大型例程的实例化数量从 287 减少到 143。

sharding_util_ops.cc

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
template <bool Split>
Status GetAndValidateAttributes(OpKernelConstruction* ctx,
                                std::vector<int32>& num_partitions,
                                int& num_slices, std::vector<int32>& paddings,
                                bool& has_paddings) {
  absl::string_view num_partitions_attr_name =
      Split ? kNumSplitsAttrName : kNumConcatsAttrName;
      ...
  return OkStatus();
}

修改后:

1
2
3
4
5
6
7
8
9
Status GetAndValidateAttributes(bool split, OpKernelConstruction* ctx,
                                std::vector<int32>& num_partitions,
                                int& num_slices, std::vector<int32>& paddings,
                                bool& has_paddings) {
  absl::string_view num_partitions_attr_name =
      split ? kNumSplitsAttrName : kNumConcatsAttrName;
      ...
  return OkStatus();
}

示例:将模板构造函数中的大块代码移到非模板的共享基类构造函数中。

同时减少模板实例化数量:从每种组合一次,变为每个相关类型各一次。

sharding_util_ops.cc

修改前:

1
2
3
4
5
6
7
8
template <typename Device, typename T>
class XlaSplitNDBaseOp : public OpKernel {
 public:
  explicit XlaSplitNDBaseOp(OpKernelConstruction* ctx) : OpKernel(ctx) {
    OP_REQUIRES_OK(
        ctx, GetAndValidateAttributes(/*split=*/true, ctx, num_splits_,
                                      num_slices_, paddings_, has_paddings_));
  }

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// 用于节省代码空间的共享基类。
class XlaSplitNDShared : public OpKernel {
 public:
  explicit XlaSplitNDShared(OpKernelConstruction* ctx) TF_ATTRIBUTE_NOINLINE
      : OpKernel(ctx),
        num_slices_(1),
        has_paddings_(false) {
    GetAndValidateAttributes(/*split=*/true, ctx, num_splits_, num_slices_,
                             paddings_, has_paddings_);
  }

示例:减少 absl::flat_hash_setabsl::flat_hash_map 的生成代码大小。

  • 将不依赖具体哈希表类型的代码提取到公共的(非内联)函数中。

  • 谨慎放置 ABSL_ATTRIBUTE_NOINLINE 指令。

  • 将一些慢路径移出内联路径。

减少容器操作

应考虑 map 和其他容器操作的影响,因为每次调用这类操作都可能生成大量代码。

示例:将连续许多次 map 插入(用于初始化 emoji 字符哈希表)改成一次批量插入操作(在链接进许多二进制的库中,文本段从 188KB 降到 360 字节)。😊

textfallback_init.h

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
inline void AddEmojiFallbacks(TextFallbackMap *map) {
  (*map)[0xFE000] = &kFE000;
  (*map)[0xFE001] = &kFE001;
  (*map)[0xFE002] = &kFE002;
  (*map)[0xFE003] = &kFE003;
  (*map)[0xFE004] = &kFE004;
  (*map)[0xFE005] = &kFE005;
  ...
  (*map)[0xFEE7D] = &kFEE7D;
  (*map)[0xFEEA0] = &kFEEA0;
  (*map)[0xFE331] = &kFE331;
};

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
inline void AddEmojiFallbacks(TextFallbackMap *map) {
#define PAIR(x) {0x##x, &k##x}
  // clang-format off
  map->insert({
    PAIR(FE000),
    PAIR(FE001),
    PAIR(FE002),
    PAIR(FE003),
    PAIR(FE004),
    PAIR(FE005),
    ...
    PAIR(FEE7D),
    PAIR(FEEA0),
    PAIR(FE331)});
  // clang-format on
#undef PAIR
};

示例:停止内联一个大量使用 InlinedVector 操作的函数。

将一个原本在 .h 文件中被内联的超长例程移到 .cc 文件中(内联它没有实际性能收益)。

reduction_ops_common.h

1
2
3
4
Status Simplify(const Tensor& data, const Tensor& axis,
                const bool keep_dims) {
  ... Eighty line routine body ...
}
1
Status Simplify(const Tensor& data, const Tensor& axis, const bool keep_dims);

并行化与同步

利用并行性

现代机器有很多核心,而且经常没有被充分利用。因此,昂贵工作可以通过并行化更快完成。最常见的方法是并行处理不同项目,并在完成后合并结果。通常会先把项目划分为批次,以避免为每个项目单独支付并行执行成本。

示例:四路并行化使 token 编码速率提升约 3.6 倍。

blocked-token-coder.cc

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
MutexLock l(&encoder_threads_lock);
if (encoder_threads == NULL) {
  encoder_threads = new ThreadPool(NumCPUs());
  encoder_threads->SetStackSize(262144);
  encoder_threads->StartWorkers();
}
encoder_threads->Add
    (NewCallback(this,
                 &BlockedTokenEncoder::EncodeRegionInThread,
                 region_tokens, N, region,
                 stats,
                 controller_->GetClosureWithCost
                 (NewCallback(&DummyCallback), N)));

示例:并行化使解码性能提升 5 倍。

coding.cc

1
2
3
for (int c = 0; c < clusters->size(); c++) {
  RET_CHECK_OK(DecodeBulkForCluster(...);
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
struct SubTask {
  absl::Status result;
  absl::Notification done;
};

std::vector<SubTask> tasks(clusters->size());
for (int c = 0; c < clusters->size(); c++) {
  options_.executor->Schedule([&, c] {
    tasks[c].result = DecodeBulkForCluster(...);
    tasks[c].done.Notify();
  });
}
for (int c = 0; c < clusters->size(); c++) {
  tasks[c].done.WaitForNotification();
}
for (int c = 0; c < clusters->size(); c++) {
  RETURN_IF_ERROR(tasks[c].result);
}

并行化对系统性能的影响应仔细测量。如果没有空闲 CPU,或者内存带宽已经饱和,并行化可能没有帮助,甚至可能有害。

摊销加锁成本

避免细粒度加锁,以降低热点路径中 Mutex 操作的成本。注意:只有当改动不会增加锁竞争时才应这样做。

示例:只获取一次锁来释放整棵查询节点树,而不是为树中每个节点重复获取锁。

mustang-query.cc

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// 查询节点池。
ThreadSafeFreeList<MustangQuery> pool_(256);
...
void MustangQuery::Release(MustangQuery* node) {
  if (node == NULL)
    return;
  for (int i=0; i < node->children_->size(); ++i)
    Release((*node->children_)[i]);
  node->children_->clear();
  pool_.Delete(node);
}

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 查询节点池。
Mutex pool_lock_;
FreeList<MustangQuery> pool_(256);
...
void MustangQuery::Release(MustangQuery* node) {
  if (node == NULL)
    return;
  MutexLock l(&pool_lock_);
  ReleaseLocked(node);
}

void MustangQuery::ReleaseLocked(MustangQuery* node) {
#ifndef NDEBUG
  pool_lock_.AssertHeld();
#endif
  if (node == NULL)
    return;
  for (int i=0; i < node->children_->size(); ++i)
    ReleaseLocked((*node->children_)[i]);
  node->children_->clear();
  pool_.Delete(node);
}

保持临界区短小

避免在临界区内执行昂贵工作。尤其要警惕那些看似无害、但可能会执行 RPC 或访问文件的代码。

示例:减少临界区中触碰的缓存行数量。

细致调整数据结构,显著减少访问的缓存行数量,并让一次 ML 训练运行性能提升 3.3%。

  • 将某些每节点类型属性预计算为 NodeItem 数据结构中的位, 这意味着在临界区内处理出边时,可以避免触碰 Node* 对象。

  • 修改 ExecutorState::ActivateNodes,让每条出边使用目标节点的 NodeItem, 而不是触碰 *item->node 对象中的字段。 通常这意味着访问所需边数据时,总共只触碰 1 到 2 个缓存行, 而不是 ~2 + O(num_outgoing edges) 个缓存行; 对于许多核心执行的大图,这也会降低 TLB 压力。

示例:持有 Mutex 时避免 RPC。

trainer.cc

修改前:

1
2
3
4
5
6
{
  // 通知参数服务器我们即将开始。
  MutexLock l(&lock_);
  model_ = model;
  MaybeRecordProgress(last_global_step_);
}

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
bool should_start_record_progress = false;
int64 step_for_progress = -1;
{
  // 通知参数服务器我们即将开始。
  MutexLock l(&lock_);
  model_ = model;
  should_start_record_progress = ShouldStartRecordProgress();
  step_for_progress = last_global_step_;
}
if (should_start_record_progress) {
  StartRecordProgress(step_for_progress);
}

此外,要警惕那些会在 Mutex 解锁前运行的昂贵析构函数。 (当 Mutex 解锁由 ~MutexUnlock 触发时,这种情况经常发生。) 如果线程安全性允许,在 MutexLock 之前声明带有昂贵析构函数的对象可能会有帮助。

通过分片减少竞争

有时,一个受 Mutex 保护且竞争很高的数据结构,可以安全拆分成多个分片,每个分片拥有自己的 Mutex。(注意:这要求不同分片之间没有跨分片不变量。)

示例:将缓存分成 16 个分片,使多线程负载下吞吐量提升约 2 倍。

cache.cc

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
class ShardedLRUCache : public Cache {
 private:
  LRUCache shard_[kNumShards];
  port::Mutex id_mutex_;
  uint64_t last_id_;

  static inline uint32_t HashSlice(const Slice& s) {
    return Hash(s.data(), s.size(), 0);
  }

  static uint32_t Shard(uint32_t hash) {
    return hash >> (32 - kNumShardBits);
  }
  ...
  virtual Handle* Lookup(const Slice& key) {
    const uint32_t hash = HashSlice(key);
    return shard_[Shard(hash)].Lookup(key, hash);
  }

示例:对用于跟踪调用的 Spanner 数据结构进行分片。

transaction_manager.cc

修改前:

1
2
3
4
5
absl::MutexLock l(&active_calls_in_mu_);
ActiveCallMap::const_iterator iter = active_calls_in_.find(m->tid());
if (iter != active_calls_in_.end()) {
  iter->second.ExtractElements(&m->tmp_calls_);
}

修改后:

1
2
3
4
5
6
ActiveCalls::LockedShard shard(active_calls_in_, m->tid());
const ActiveCallMap& active_calls_map = shard.active_calls_map();
ActiveCallMap::const_iterator iter = active_calls_map.find(m->tid());
if (iter != active_calls_map.end()) {
  iter->second.ExtractElements(&m->tmp_calls_);
}

如果相关数据结构是 map,可以考虑改用并发哈希 map 实现。

要小心用于分片选择的信息。例如,如果使用哈希值中的某些位进行分片选择,而这些相同位稍后又被再次使用,后续使用可能会因为看到偏斜的哈希值分布而表现很差。

示例:修正用于分片选择的信息,避免哈希表问题。

netmon_map_impl.h

修改前:

1
2
3
4
5
6
7
8
ConnectionBucket* GetBucket(Index index) {
  // 重新哈希,确保我们不是基于原始哈希
  // 对 bucket 进行分区。如果 num_buckets_ 是 2 的幂,
  // 那样会降低 bucket 的熵。
  size_t original_hash = absl::Hash<Index>()(index);
  int hash = absl::Hash<size_t>()(original_hash) % num_buckets_;
  return &buckets_[hash];
}

修改后:

1
2
3
4
5
6
ConnectionBucket* GetBucket(Index index) {
  absl::Hash<std::pair<Index, size_t>> hasher{};
  // 将哈希值与 42 组合,以防分片选择使用
  // 与底层哈希表相同的位。
  return &buckets_[hasher({index, 42}) % num_buckets_];
}

示例:对用于跟踪调用的 Spanner 数据结构进行分片。

这个 CL 将 ActiveCallMap 分成 64 个分片。每个分片由单独的互斥锁保护。给定事务会被映射到且只映射到一个分片。新增 LockedShard(tid) 接口,用于以线程安全方式访问某个事务对应的 ActiveCallMap。示例用法:

transaction_manager.cc

修改前:

1
2
3
4
{
  absl::MutexLock l(&active_calls_in_mu_);
  delayed_locks_timer_ring_.Add(delayed_locks_flush_time_ms, tid);
}

修改后:

1
2
3
4
{
  ActiveCalls::LockedShard shard(active_calls_in_, tid);
  shard.delayed_locks_timer_ring().Add(delayed_locks_flush_time_ms, tid);
}

结果显示,在使用 8192 个 fiber 运行基准测试时,总体墙钟时间减少了 69%。

1
2
3
4
Benchmark                   Time(ns)        CPU(ns)     Iterations
------------------------------------------------------------------
BM_ActiveCalls/8k        11854633492     98766564676            10
BM_ActiveCalls/16k       26356203552    217325836709            10
1
2
3
4
Benchmark                   Time(ns)        CPU(ns)     Iterations
------------------------------------------------------------------
BM_ActiveCalls/8k         3696794642     39670670110            10
BM_ActiveCalls/16k        7366284437     79435705713            10

SIMD 指令

可以探索使用现代 CPU 提供的 SIMD 指令一次处理多个项目是否能带来加速(例如参见下面批量操作一节中对 absl::flat_hash_map 的讨论)。

减少伪共享

如果不同线程访问不同的可变数据,考虑把这些不同数据项放到不同缓存行上,例如在 C++ 中使用 alignas 指令。不过这些指令很容易误用,并可能显著增加对象大小,因此要确保性能测量能证明它们的使用是合理的。

示例:将经常变更的字段与其他字段分隔到不同缓存行。

histogram.h

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
HistogramOptions options_;
...
internal::HistogramBoundaries *boundaries_;
...
std::vector<double> buckets_;

double min_;             // 最小值。
double max_;             // 最大值。
double count_;           // 出现总次数。
double sum_;             // 值之和。
double sum_of_squares_;  // 值平方之和。
...
RegisterVariableExporter *exporter_;

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
  HistogramOptions options_;
  ...
  internal::HistogramBoundaries *boundaries_;
  ...
  RegisterVariableExporter *exporter_;
  ...
  // 将下面字段放入专用缓存行,因为它们经常
  // 被修改,这样可以避免潜在的伪共享。
  ...
#ifndef SWIG
  alignas(ABSL_CACHELINE_SIZE)
#endif
  std::vector<double> buckets_;

  double min_;             // 最小值。
  double max_;             // 最大值。
  double count_;           // 出现总次数。
  double sum_;             // 值之和。
  double sum_of_squares_;  // 值平方之和。

降低上下文切换频率

示例:内联处理小工作项,而不是放到设备线程池上。

cast_op.cc

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
template <typename Device, typename Tout, typename Tin>
void CastMaybeInline(const Device& d, typename TTypes<Tout>::Flat o,
                     typename TTypes<Tin>::ConstFlat i) {
  if (o.size() * (sizeof(Tin) + sizeof(Tout)) < 16384) {
    // CPU 上的小型 cast:内联执行。
    o = i.template cast<Tout>();
  } else {
    o.device(d) = i.template cast<Tout>();
  }
}

使用带缓冲的通道进行流水线处理

Channel 可以是无缓冲的,这意味着写入者会阻塞,直到读取者准备好取走项目。当 channel 用于同步时,无缓冲 channel 可能有用;但当 channel 用于增加并行性时,就不适合了。

考虑无锁方案

有时,无锁数据结构相比传统的 Mutex 保护数据结构能带来明显差异。不过,直接操作原子变量可能很[危险][atomic danger]。应优先使用更高层的抽象。

示例:使用无锁 map 管理 RPC channel 缓存。

RPC stub 缓存中的条目每秒会被读取数千次,但很少修改。切换到合适的无锁 map 可将查找延迟降低 3%-5%。

示例:使用固定词典和无锁哈希 map 加速 IsValidTokenId 判断。

dynamic_token_class_manager.h

修改前:

1
2
3
4
5
6
7
mutable Mutex mutex_;

// 该哈希 map 的密度由以下事实保证:
// 动态词典会先复用之前分配的 TokenId,
// 然后才尝试分配新的 TokenId。
dense_hash_map<TokenId, common::LocalTokenClassId> tid_to_cid_
    GUARDED_BY(mutex_);

修改后:

1
2
3
4
5
6
// 对该哈希 map 的读访问应使用
// `epoch_gc_`::(EnterFast / LeaveFast)。写入者应定期
// 通过调用 LockFreeHashMap::CreateGC 对已删除条目做 GC。
typedef util::gtl::LockFreeHashMap<TokenId, common::LocalTokenClassId>
    TokenIdTokenClassIdMap;
TokenIdTokenClassIdMap tid_to_cid_;

Protocol Buffer 建议

Protobuf 是一种方便的数据表示,尤其适合需要通过网络发送或持久化存储的数据。然而,它们可能带来显著性能成本。例如,一段填充 1000 个点列表并汇总 Y 坐标的代码,从 protobuf 改为 C++ 结构体 std::vector 后,速度提升了 20 倍

示例:两个版本的基准测试代码。

1
2
name                old time/op  new time/op  delta
BenchmarkIteration  17.4µs ± 5%   0.8µs ± 1%  -95.30%  (p=0.000 n=11+12)

Protobuf 版本:

1
2
3
4
5
6
7
message PointProto {
  int32 x = 1;
  int32 y = 2;
}
message PointListProto {
  repeated PointProto points = 1;
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
void SumProto(const PointListProto& vec) {
  int sum = 0;
  for (const PointProto& p : vec.points()) {
    sum += p.y();
  }
  ABSL_VLOG(1) << sum;
}

void BenchmarkIteration() {
  PointListProto points;
  points.mutable_points()->Reserve(1000);
  for (int i = 0; i < 1000; i++) {
    PointProto* p = points.add_points();
    p->set_x(i);
    p->set_y(i * 2);
  }
  SumProto(points);
}

非 protobuf 版本:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
struct PointStruct {
  int x;
  int y;
};

void SumVector(const std::vector<PointStruct>& vec) {
  int sum = 0;
  for (const PointStruct& p : vec) {
    sum += p.y;
  }
  ABSL_VLOG(1) << sum;
}

void BenchmarkIteration() {
  std::vector<PointStruct> points;
  points.reserve(1000);
  for (int i = 0; i < 1000; i++) {
    points.push_back({i, i * 2});
  }
  SumVector(points);
}

此外,protobuf 版本会给二进制增加几 KB 的代码和数据。看起来不多,但在包含许多 protobuf 类型的系统中会快速累积。这种体积增加会造成指令缓存和数据缓存压力,从而带来性能问题。

下面是一些与 protobuf 性能相关的技巧:

示例:不要不必要地使用 protobuf。

鉴于上面提到的 20 倍性能差异,如果某些数据永远不会被序列化或解析,你很可能不应该把它放进 protocol buffer。Protocol buffer 的目的是让数据结构易于序列化和反序列化,但它们可能带来显著的代码体积、内存和 CPU 开销。如果你只是想要 DebugString、可复制性等其他便利,不要使用它们。

示例:避免不必要的消息层级。

消息层级有助于以更易读的方式组织信息。然而,额外的消息层级会带来内存分配、函数调用、缓存未命中、更大的序列化消息等开销。

例如,不要这样:

1
2
3
4
5
6
7
8
9
message Foo {
  optional Bar bar = 1;
}
message Bar {
  optional Baz baz = 1;
}
message Baz {
  optional int32 count = 1;
}

优先这样:

1
2
3
message Foo {
  optional int32 count = 1;
}

一个 protocol buffer 消息会对应 C++ 生成代码中的一个消息类,并在线上传输时发出 tag 和 payload 长度。为了携带一个整数,旧形式需要更多分配(和释放),并生成更多代码。因此,所有 protocol buffer 操作(解析、序列化、计算大小等)都会变得更昂贵,因为它们必须遍历消息树。新形式没有这些开销,因此更高效。

示例:对频繁出现的字段使用较小字段号。

Protobuf 使用变长整数来表示字段号和 wire format 的组合(参见 protobuf 编码文档)。字段号在 1 到 15 之间时,该表示占 1 字节;字段号在 16 到 2047 之间时,占 2 字节。(通常应避免使用 2048 或更大的字段号。)

对于性能敏感的 protobuf,可以考虑预留一些较小字段号以供未来扩展。

示例:谨慎选择 int32sint32fixed32uint32(64 位变体同理)。

通常使用 int32int64,但对于哈希码这样的大值,应使用 fixed32fixed64;对于经常为负的值,应使用 sint32sint64

varint 编码小整数时占用更少字节,可以节省空间,但解码成本更高。不过,对于负值或大值,它可能占用更多空间。在这种情况下,使用 fixed32fixed64(而不是 uint32uint64)可以减小体积,并让编码和解码便宜得多。对于较小的负整数,使用 sint32sint64,而不是 int32int64

示例:在 proto2 中,通过标注 [packed=true] 打包 repeated 数值字段。

在 proto2 中,repeated 值默认序列化为一系列 (tag, value) 对。这效率较低,因为每个元素都必须解码 tag。

packed repeated primitive 会先序列化 payload 长度,然后是不带 tag 的值。使用定宽值时,我们在开始解析时就知道最终大小,因此可以避免重新分配,也就是没有重新分配成本。对于 payload 中有多少 varint,我们仍然不知道,可能仍需支付重新分配成本。

在 proto3 中,repeated 字段默认是 packed。

packed 最适合 fixed32fixed64floatdouble 等定宽值,因为整个编码长度可以通过元素数量乘以固定值大小预先确定,而不必计算每个单独元素的长度。

示例:对二进制数据和大值使用 bytes,而不是 string

string 类型保存 UTF-8 编码文本,有时需要验证。bytes 类型可以保存任意字节序列(非文本数据),通常比 string 更合适也更高效。

示例:考虑使用 string_type = VIEW 来避免复制。

解析期间复制大型 string 或 bytes 字段很昂贵。通常可以通过将字段标记为 string_type = VIEW 来避免这种成本。

1
2
3
4
message Image {
  ...
  bytes jpeg_encoding = 4 [features.(pb.cpp).string_type=VIEW];
}

如果没有 VIEW 注解,解析 protocol buffer 时,潜在很大的字段内容会从序列化 protocol buffer 复制到内存中的 string 对象。根据 string 或 bytes 字段数量以及字段大小,复制开销可能非常显著。

ParseFromStringWithAliasing 之类的例程不会复制大型二进制 blob,而是使用 absl::string_view 引用原始后备字符串。注意,后备字符串(序列化 protocol buffer)必须比包含该别名的 protocol buffer 实例活得更久。

示例:对大字段考虑使用 Cord,以减少复制成本。

[ctype=CORD] 注解大型 bytesstring 字段可能减少复制成本。该注解会把字段表示从 std::string 改为 absl::Cordabsl::Cord 使用引用计数和基于树的存储来降低复制和追加成本。如果 protocol buffer 被序列化到 cord,那么解析带 [ctype=CORD] 的 string 或 bytes 字段可以避免复制字段内容。

1
2
3
4
message Document {
  ...
  bytes html = 4 [ctype = CORD];
}

Cord 字段的性能取决于长度分布和访问模式。应使用基准测试验证此类改动。

示例:在 C++ 代码中使用 protobuf arena。

考虑使用 arena 节省分配和释放成本,尤其是对于包含 repeated、string 或 message 字段的 protobuf。

message 和 string 字段会在堆上分配(即使顶层 protocol buffer 对象是在栈上分配的)。如果一个 protocol buffer 消息有很多子 message 字段和 string 字段,分配和释放成本可能很显著。Arena 可以摊销分配成本,并让释放几乎免费;它还通过从连续内存块分配来改善内存局部性。

示例:保持 .proto 文件短小。

不要在单个 .proto 文件中放太多消息。只要依赖某个 .proto 文件中的任何内容,整个文件都会被链接器拉入,即使其中大部分都未使用。这会增加构建时间和二进制大小。可以使用扩展和 Any 来避免对包含许多消息类型的大型 .proto 文件产生硬依赖。

示例:考虑以序列化形式存储 protocol buffer,即使是在内存中。

内存中的 protobuf 对象内存占用很大(通常是 wire format 大小的 5 倍),并且可能分散在许多缓存行上。因此,如果应用需要长期保留许多 protobuf 对象,考虑以序列化形式存储它们。

示例:避免 protobuf map 字段。

Protobuf map 字段存在性能问题,通常超过它们提供的少量语法便利。优先使用从 protobuf 内容初始化的非 protobuf map:

msg.proto

1
map<string, bytes> env_variables = 5;
1
2
3
4
5
message Var {
  string key = 1;
  bytes value = 2;
}
repeated Var env_variables = 5;

示例:使用只包含部分字段的 protobuf 消息定义。

如果你只想访问大型消息类型中的少数字段,考虑定义自己的 protocol buffer 消息类型来模拟原始类型,但只定义你关心的字段。示例如下:

1
2
3
4
5
6
7
8
message FullMessage {
  optional int32 field1 = 1;
  optional BigMessage field2 = 2;
  optional int32 field3 = 3;
  repeated AnotherBigMessage field4 = 4;
  ...
  optional int32 field100 = 100;
}
1
2
3
4
message SubsetMessage {
  optional int32 field3 = 3;
  optional int32 field88 = 88;
}

通过把序列化的 FullMessage 解析成 SubsetMessage,一百个字段中只有两个会被解析,其他字段会被视为未知字段。在合适时,可以考虑使用丢弃未知字段的 API,以进一步提升性能。

示例:尽可能复用 protobuf 对象。

在循环外声明 protobuf 对象,使其已分配存储可以在多次循环迭代之间复用。

C++ 专项建议

absl::flat_hash_map 和 absl::flat_hash_set

Absl 哈希表 通常比 C++ 标准库容器(如 std::mapstd::unordered_map)性能更好。

示例:加速 LanguageFromCode(使用 absl::flat_hash_map 替代 __gnu_cxx::hash_map)。

languages.cc

修改前:

1
2
3
4
class CodeToLanguage
    ...
    : public __gnu_cxx::hash_map<absl::string_view, i18n::languages::Language,
                                 CodeHash, CodeCompare> {

修改后:

1
2
3
4
class CodeToLanguage
    ...
    : public absl::flat_hash_map<absl::string_view, i18n::languages::Language,
                                 CodeHash, CodeCompare> {

基准测试结果:

1
2
name               old time/op  new time/op  delta
BM_CodeToLanguage  19.4ns ± 1%  10.2ns ± 3%  -47.47%  (p=0.000 n=8+10)

示例:加速统计信息发布/取消发布(这是较早的改动,因此使用 dense_hash_map,当时还没有 absl::flat_hash_map)。

publish.cc

修改前:

1
2
typedef hash_map<uint64, Publication*> PublicationMap;
static PublicationMap* publications = NULL;

修改后:

1
2
typedef dense_hash_map<uint64, Publication*> PublicationMap;;
static PublicationMap* publications GUARDED_BY(mu) = NULL;

示例:使用 dense_hash_map 而不是 hash_map 跟踪 SelectServer alarm(今天会使用 absl::flat_hash_map)。

alarmer.h

修改前:

1
typedef hash_map<int, Alarm*> AlarmList;

修改后:

1
typedef dense_hash_map<int, Alarm*> AlarmList;

absl::btree_map 和 absl::btree_set

absl::btree_mapabsl::btree_set 会在每个树节点中存储多个条目。相比 C++ 标准库有序容器(如 std::map),这有几个优势。首先,指向子树节点的指针开销通常会显著降低。其次,由于给定 btree 节点中的条目或 key/value 在内存中连续存储,缓存效率通常也显著更好。

示例:使用 btree_set 而不是 std::set 表示使用非常频繁的工作队列。

register_allocator.h

1
using container_type = std::set<WorklistItem>;
1
using container_type = absl::btree_set<WorklistItem>;

util::bitmap::InlinedBitVector

util::bitmap::InlinedBitvector 可以内联存储较短的位向量,因此通常比 std::vector<bool> 或其他 bitmap 类型更合适。

示例:使用 InlinedBitVector 而不是 std::vector,再用 FindNextBitSet 查找下一个关注项。

block_encoder.cc

修改前:

1
2
3
4
5
6
vector<bool> live_reads(nreads);
...
for (int offset = 0; offset < b_.block_width(); offset++) {
  ...
  for (int r = 0; r < nreads; r++) {
    if (live_reads[r]) {

修改后:

1
2
3
4
5
6
util::bitmap::InlinedBitVector<4096> live_reads(nreads);
...
for (int offset = 0; offset < b_.block_width(); offset++) {
  ...
  for (size_t r = 0; live_reads.FindNextSetBit(&r); r++) {
    DCHECK(live_reads[r]);

absl::InlinedVector

absl::InlinedVector 会内联存储少量元素(数量通过第二个模板参数配置)。这让不超过该数量的小 vector 通常拥有更好的缓存效率,并在元素数量较少时完全避免分配后备存储数组。

示例:在多处使用 InlinedVector 而不是 std::vector

bundle.h

1
2
3
4
5
6
7
8
class Bundle {
 public:
 ...
 private:
  // (已分槽指令,未分槽立即数操作数)序列。
  std::vector<InstructionRecord> instructions_;
  ...
};
1
2
3
4
5
6
7
8
class Bundle {
 public:
 ...
 private:
  // (已分槽指令,未分槽立即数操作数)序列。
  absl::InlinedVector<InstructionRecord, 2> instructions_;
  ...
};

gtl::vector32

通过使用只支持 32 位大小范围的定制 vector 类型来节省空间。

示例:简单类型变更为 Spanner 节省约 8TiB 内存。

table_ply.h

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
class TablePly {
    ...
    // 返回此文件中为该表存储的数据列集合。
    const std::vector<FamilyId>& modified_data_columns() const {
      return modified_data_columns_;
    }
    ...
   private:
    ...
    std::vector<FamilyId> modified_data_columns_;  // 表中的数据列。

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
#include "util/gtl/vector32.h"
    ...
    // 返回此文件中为该表存储的数据列集合。
    absl::Span<const FamilyId> modified_data_columns() const {
      return modified_data_columns_;
    }
    ...

    ...
    // 表中的数据列。
    gtl::vector32<FamilyId> modified_data_columns_;

gtl::small_map

gtl::small_map 使用内联数组存储最多一定数量的唯一 key-value 元素;当空间不足时,会自动升级为由用户指定的 map 类型作为后备。

示例:在 tflite_model 中使用 gtl::small_map

tflite_model.cc

修改前:

1
using ChoiceIdToContextMap = gtl::flat_hash_map<int, TFLiteContext*>;

修改后:

1
2
using ChoiceIdToContextMap =
    gtl::small_map<gtl::flat_hash_map<int, TFLiteContext*>>;

gtl::small_ordered_set

gtl::small_ordered_set 是针对关联容器(如 std::setabsl::btree_multiset)的优化。它使用固定数组存储一定数量的元素;空间不足时,再退回到 set 或 multiset。对于通常较小的 set,这可能比直接使用 set 快得多,因为 set 是为大型数据集优化的。该改动减少了缓存占用,并缩短了临界区长度。

示例:使用 gtl::small_ordered_set 保存 listener 集合。

broadcast_stream.h

修改前:

1
2
3
4
5
6
class BroadcastStream : public ParsedRtpTransport {
 ...
 private:
  ...
  std::set<ParsedRtpTransport*> listeners_ ABSL_GUARDED_BY(listeners_mutex_);
};

修改后:

1
2
3
4
5
6
7
class BroadcastStream : public ParsedRtpTransport {
 ...
 private:
  ...
  using ListenersSet =
      gtl::small_ordered_set<std::set<ParsedRtpTransport*>, 10>;
  ListenersSet listeners_ ABSL_GUARDED_BY(listeners_mutex_);

gtl::intrusive_list

gtl::intrusive_list<T> 是双向链表,链表指针嵌入在 T 类型元素内部。相比 std::list<T*>,它为每个元素节省一个缓存行和一次间接访问。

示例:使用 intrusive_list 跟踪每个索引行更新的进行中请求。

row-update-sender-inflight-set.h

修改前:

1
std::set<int64> inflight_requests_ GUARDED_BY(mu_);

修改后:

1
2
3
4
5
6
7
class SeqNum : public gtl::intrusive_link<SeqNum> {
  ...
  int64 val_ = -1;
  ...
};
...
gtl::intrusive_list<SeqNum> inflight_requests_ GUARDED_BY(mu_);

限制 absl::Status 和 absl::StatusOr 的使用

虽然 absl::Statusabsl::StatusOr 类型相当高效,但即使在成功路径上也有非零开销。因此,对于不需要返回有意义错误详情(甚至可能永远不会失败)的热点例程,应避免使用它们:

示例:避免 RoundUpToAlignment() 函数返回 StatusOr

best_fit_allocator.cc

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
absl::StatusOr<int64> BestFitAllocator::RoundUpToAlignment(int64 bytes) const {
  TPU_RET_CHECK_GE(bytes, 0);

  const int64 max_aligned = MathUtil::RoundDownTo<int64>(
      std::numeric_limits<int64>::max(), alignment_in_bytes_);
  if (bytes > max_aligned) {
    return util::ResourceExhaustedErrorBuilder(ABSL_LOC)
           << "Attempted to allocate "
           << strings::HumanReadableNumBytes::ToString(bytes)
           << " which after aligning to "
           << strings::HumanReadableNumBytes::ToString(alignment_in_bytes_)
           << " cannot be expressed as an int64.";
  }

  return MathUtil::RoundUpTo<int64>(bytes, alignment_in_bytes_);
}

best_fit_allocator.h

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// 将 bytes 向上取整到 alignment_ 的最近倍数。
// 要求:bytes >= 0。
// 要求:结果不会溢出 int64。
// 要求:alignment_in_bytes_ 是 2 的幂(在构造函数中检查)。
int64 RoundUpToAlignment(int64 bytes) const {
  DCHECK_GE(bytes, 0);
  DCHECK_LE(bytes, max_aligned_bytes_);
  int64 result =
      ((bytes + (alignment_in_bytes_ - 1)) & ~(alignment_in_bytes_ - 1));
  DCHECK_EQ(result, MathUtil::RoundUpTo<int64>(bytes, alignment_in_bytes_));
  return result;
}

示例:添加 ShapeUtil::ForEachIndexNoStatus,避免为 tensor 的每个元素创建 Status 返回对象。

shape_util.h

修改前:

1
2
3
4
5
6
7
using ForEachVisitorFunction =
    absl::FunctionRef<StatusOr<bool>(absl::Span<const int64_t>)>;
    ...
static void ForEachIndex(const Shape& shape, absl::Span<const int64_t> base,
                         absl::Span<const int64_t> count,
                         absl::Span<const int64_t> incr,
                         const ForEachVisitorFunction& visitor_function);

修改后:

1
2
3
4
5
6
7
using ForEachVisitorFunctionNoStatus =
    absl::FunctionRef<bool(absl::Span<const int64_t>)>;
    ...
static void ForEachIndexNoStatus(
    const Shape& shape, absl::Span<const int64_t> base,
    absl::Span<const int64_t> count, absl::Span<const int64_t> incr,
    const ForEachVisitorFunctionNoStatus& visitor_function);

literal.cc

修改前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
ShapeUtil::ForEachIndex(
    result_shape, [&](absl::Span<const int64_t> output_index) {
      for (int64_t i = 0, end = dimensions.size(); i < end; ++i) {
        scratch_source_index[i] = output_index[dimensions[i]];
      }
      int64_t dest_index = IndexUtil::MultidimensionalIndexToLinearIndex(
          result_shape, output_index);
      int64_t source_index = IndexUtil::MultidimensionalIndexToLinearIndex(
          shape(), scratch_source_index);
      memcpy(dest_data + primitive_size * dest_index,
             source_data + primitive_size * source_index, primitive_size);
      return true;
    });

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
ShapeUtil::ForEachIndexNoStatus(
    result_shape, [&](absl::Span<const int64_t> output_index) {
      // 计算 dest_index。
      int64_t dest_index = IndexUtil::MultidimensionalIndexToLinearIndex(
          result_shape, result_minor_to_major, output_index);

      // 计算 source_index。
      int64_t source_index;
      for (int64_t i = 0, end = dimensions.size(); i < end; ++i) {
        scratch_source_array[i] = output_index[dimensions[i]];
      }
      if (src_shape_dims == 1) {
        // 该情况的快速路径。
        source_index = scratch_source_array[0];
        DCHECK_EQ(source_index,
                  IndexUtil::MultidimensionalIndexToLinearIndex(
                      src_shape, src_minor_to_major, scratch_source_span));
      } else {
        source_index = IndexUtil::MultidimensionalIndexToLinearIndex(
            src_shape, src_minor_to_major, scratch_source_span);
      }
      // 将一个元素从 source 中的 source_index 移到 dest 中的 dest_index。
      memcpy(dest_data + PRIMITIVE_SIZE * dest_index,
             source_data + PRIMITIVE_SIZE * source_index, PRIMITIVE_SIZE);
      return true;
    });

示例:在 TF_CHECK_OK 中,避免为了测试 ok() 而创建 Ok 对象。

status.h

修改前:

1
2
#define TF_CHECK_OK(val) CHECK_EQ(::tensorflow::Status::OK(), (val))
#define TF_QCHECK_OK(val) QCHECK_EQ(::tensorflow::Status::OK(), (val))

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
extern tensorflow::string* TfCheckOpHelperOutOfLine(
    const ::tensorflow::Status& v, const char* msg);
inline tensorflow::string* TfCheckOpHelper(::tensorflow::Status v,
                                           const char* msg) {
  if (v.ok()) return nullptr;
  return TfCheckOpHelperOutOfLine(v, msg);
}
#define TF_CHECK_OK(val)                                           \
  while (tensorflow::string* _result = TfCheckOpHelper(val, #val)) \
  LOG(FATAL) << *(_result)
#define TF_QCHECK_OK(val)                                          \
  while (tensorflow::string* _result = TfCheckOpHelper(val, #val)) \
  LOG(QFATAL) << *(_result)

示例:从远程过程调用(RPC)的热点路径中移除 StatusOr

从热点路径移除 StatusOr,消除了早前改动在 RPC 基准测试中造成的 14% CPU 回退。

privacy_context.h

修改前:

1
2
absl::StatusOr<privacy::context::PrivacyContext> GetRawPrivacyContext(
    const CensusHandle& h);

privacy_context_statusfree.h

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
enum class Result {
  kSuccess,
  kNoRootScopedData,
  kNoPrivacyContext,
  kNoDDTContext,
  kDeclassified,
  kNoPrequestContext
};
...
Result GetRawPrivacyContext(const CensusHandle& h,
                            PrivacyContext* privacy_context);

批量操作

如果可能,应一次处理多个项目,而不是一次只处理一个。

示例:absl::flat_hash_map 使用单条 SIMD 指令,对一组 key 中每个 key 比较一个哈希字节。

参见 Swiss Table 设计说明,以及 Matt Kulukundis 相关的 CppCon 2017CppCon 2019 演讲。

raw_hash_set.h

修改后:

1
2
3
4
5
6
// 返回位掩码,表示与 hash 匹配的 slot 位置。
BitMask<uint32_t> Match(h2_t hash) const {
  auto ctrl = _mm_loadu_si128(reinterpret_cast<const __m128i*>(pos));
  auto match = _mm_set1_epi8(hash);
  return BitMask<uint32_t>(_mm_movemask_epi8(_mm_cmpeq_epi8(match, ctrl)));
}

示例:用单次操作处理多个字节并进行修正,而不是逐字节检查该怎么做。

ordered-code.cc

修改前:

1
2
3
4
5
6
7
8
9
int len = 0;
while (val > 0) {
  len++;
  buf[9 - len] = (val & 0xff);
  val >>= 8;
}
buf[9 - len - 1] = (unsigned char)len;
len++;
FastStringAppend(dest, reinterpret_cast<const char*>(buf + 9 - len), len);

修改后:

1
2
3
4
5
BigEndian::Store(val, buf + 1);  // buf[0] 可能需要用于长度。
const unsigned int length = OrderedNumLength(val);
char* start = buf + 9 - length - 1;
*start = length;
AppendUpto9(dest, start, length + 1);

示例:通过更高效地分块处理多个交错输入缓冲区,提升 Reed-Solomon 处理速度。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
Run on (12 X 3501 MHz CPUs); 2016-09-27T16:04:55.065995192-04:00
CPU: Intel Haswell with HyperThreading (6 cores) dL1:32KB dL2:256KB dL3:15MB
Benchmark                          Base (ns)  New (ns) Improvement
------------------------------------------------------------------
BM_OneOutput/3/2                      466867    351818    +24.6%
BM_OneOutput/4/2                      563130    474756    +15.7%
BM_OneOutput/5/3                      815393    688820    +15.5%
BM_OneOutput/6/3                      897246    780539    +13.0%
BM_OneOutput/8/4                     1270489   1137149    +10.5%
BM_AllOutputs/3/2                     848772    642942    +24.3%
BM_AllOutputs/4/2                    1067647    638139    +40.2%
BM_AllOutputs/5/3                    1739135   1151369    +33.8%
BM_AllOutputs/6/3                    2045817   1456744    +28.8%
BM_AllOutputs/8/4                    3012958   2484937    +17.5%
BM_AllOutputsSetUpOnce/3/2            717310    493371    +31.2%
BM_AllOutputsSetUpOnce/4/2            833866    600060    +28.0%
BM_AllOutputsSetUpOnce/5/3           1537870   1137357    +26.0%
BM_AllOutputsSetUpOnce/6/3           1802353   1398600    +22.4%
BM_AllOutputsSetUpOnce/8/4           3166930   2455973    +22.4%

示例:一次解码四个整数(约 2004 年)。

引入 GroupVarInt 格式,它一次用 5-17 字节编码/解码一组 4 个变长整数,而不是一次处理一个整数。用新格式解码一组 4 个整数所需时间约为分别解码 4 个 varint 编码整数的 1/3。

groupvarint.cc

修改后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
const char* DecodeGroupVar(const char* p, int N, uint32* dest) {
  assert(groupvar_initialized);
  assert(N % 4 == 0);
  while (N) {
    uint8 tag = *p;
    p++;

    uint8* lenptr = &groupvar_table[tag].length[0];

#define GET_NEXT                                        \
    do {                                                \
      uint8 len = *lenptr;                              \
      *dest = UNALIGNED_LOAD32(p) & groupvar_mask[len]; \
      dest++;                                           \
      p += len;                                         \
      lenptr++;                                         \
    } while (0)
    GET_NEXT;
    GET_NEXT;
    GET_NEXT;
    GET_NEXT;
#undef GET_NEXT

    N -= 4;
  }
  return p;
}

示例:一次编码一组 4 个 k 位数字。

添加 KBitStreamEncoderKBitStreamDecoder 类,用于一次将 4 个 k 位数字编码/解码到位流中。由于 K 在编译期已知,编码和解码可以非常高效。例如,由于一次编码 4 个数字,代码可以假设流始终是字节对齐(k 为偶数)或半字节对齐(k 为奇数)。

展示多种技巧的 CL

有时单个 CL 会包含多个性能改进,并使用前面提到的许多技巧。观察这些 CL 中的改动类型,有助于培养一种思维方式:当系统某部分被确认是瓶颈后,如何做出通用改动来提升其性能。

示例:将 GPU 内存分配器加速约 40%。

GPUBFCAllocator 的分配/释放速度提升 36-48%:

  • 使用句柄编号而不是 Chunk 指针来标识 chunk。Chunk 数据结构现在分配在 vector 中,句柄是该 vector 中指向特定 chunk 的索引。这样 Chunk 中的 next 和 prev 指针可以是 ChunkHandle(4 字节),而不是 Chunk*(8 字节)。

  • Chunk 对象不再使用时,我们维护一个 Chunk 对象空闲链表,链表头由 ChunkHandle free_chunks_list_ 指定,并通过 Chunk->next 指向下一个空闲条目。结合第 1 点,这使我们在分配器中可以避免对 Chunk 对象进行堆分配/释放,除非 vector 偶尔增长。同时,所有 Chunk 对象的内存也变成连续的。

  • 不再让 bins_ 数据结构使用 std::set,也不再通过 lower_bound 根据 byte_size 查找合适的 bin;我们改用 bin 数组,并用 log₂(byte_size/256) 这样的函数做索引。这样可以用几个位操作定位 bin,而不是做二叉搜索树查找。它还允许把所有 Bin 数据结构的存储分配在连续数组中,而不是分散在许多不同缓存行上。这减少了多线程执行分配时需要在核心之间移动的缓存行数量。

  • GPUBFCAllocator::AllocateRaw 添加快速路径,先尝试在不涉及 retry_helper_ 的情况下分配内存。如果初次尝试失败(返回 nullptr),再进入 retry_helper_;但通常可以避免多层过程调用,以及带多个参数的 std::function 分配/释放。

  • 注释掉大多数 VLOG 调用。需要调试时,可以有选择地取消注释并重新编译。

添加多线程基准测试,用于测试竞争下的分配。

在我配有 Titan X 显卡的台式机上,ptb_word_lm 从每秒 8036 个词提升到每秒 8272 个词(+2.9%)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
Run on (40 X 2801 MHz CPUs); 2016/02/16-15:12:49
CPU: Intel Ivybridge with HyperThreading (20 cores) dL1:32KB dL2:256KB dL3:25MB
Benchmark                          Base (ns)  New (ns) Improvement
------------------------------------------------------------------
BM_Allocation                            347       184    +47.0%
BM_AllocationThreaded/1                  351       181    +48.4%
BM_AllocationThreaded/4                 2470      1975    +20.0%
BM_AllocationThreaded/16               11846      9507    +19.7%
BM_AllocationDelayed/1                   392       199    +49.2%
BM_AllocationDelayed/10                  285       169    +40.7%
BM_AllocationDelayed/100                 245       149    +39.2%
BM_AllocationDelayed/1000                238       151    +36.6%

示例:通过一组杂项改动,将 Pathways 吞吐量提升约 20%。

  • 将一批特殊的快速 descriptor 解析函数统一成单个 ParsedDescriptor 类,并在更多地方使用该类,以避免昂贵的完整解析调用。

  • 将几个 protocol buffer 字段从 string 改为 bytes(避免不必要的 UTF-8 检查及相关错误处理代码)。

  • DescriptorProto.inlined_contents 现在是 string,而不是 Cord(预期只用于较小的 tensor)。这需要在 tensor_util.cc 中增加一批复制辅助函数(现在需要同时支持 string 和 Cord)。

  • 在几个地方使用 flat_hash_map 替代 std::unordered_map

  • 添加 MemoryManager::LookupMany 供 Stack op 使用,而不是为每个批元素调用 Lookup。该改动减少了加锁等设置开销。

  • 移除 TransferDispatchOp 中一些不必要的字符串创建。

  • 在同一进程内,从一个组件向另一个组件传输一批 1000 个 1KB tensor 的性能结果:

1
2
修改前: 227.01 steps/sec
After:  272.52 steps/sec (+20% throughput)

示例:通过一系列改动,使 XLA 编译器性能提升约 15%。

一些用于加速 XLA 编译的改动:

  • SortComputationsByContent 中,如果比较函数里 a == b,则返回 false,以避免序列化和 fingerprint 长 computation 字符串。

  • CHECK 改为 DCHECK,避免在 HloComputation::ComputeInstructionPostOrder 中触碰额外缓存行。

  • 避免在 CoreSequencer::IsVectorSyncHoldSatisfied() 中对 front instruction 做昂贵复制。

  • 重写双参数 HloComputation::ToStringHloComputation::ToCord 例程,让大部分工作基于追加到 std::string,而不是追加到 Cord。

  • 修改 PerformanceCounterSet::Increment,只执行一次哈希表查找,而不是两次。

  • 精简 Scoreboard::Update 代码。

一个重要模型的 XLA 编译时间整体加速 14%。

示例:加速 Google Meet 应用代码中的低层日志。

加速位于每个数据包关键路径上的 ScopedLogId

  • 移除看起来只是用于观察不变量是否被破坏的 LOG_EVERY_N(ERROR, ...) 消息。

  • 内联 PushLogIdPopLogid() 例程(因为去掉 LOG_EVERY_N_SECONDS(ERROR, ...) 语句后,它们已经足够小,可以内联)。

  • 改用大小为 4 的固定数组和一个 int size 变量来维护线程本地状态。 由于它本来就从不会超过大小 4,InlinedVector 的功能比实际需要更通用。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
Base: Baseline plus the code in scoped_logid_test.cc to add the benchmark
New: This changelist

CPU: Intel Ivybridge with HyperThreading (20 cores) dL1:32KB dL2:256KB dL3:25MB
Benchmark                                      Base (ns)    New (ns) Improvement
----------------------------------------------------------------------------
BM_ScopedLogId/threads:1                               8           4    +52.6%
BM_ScopedLogId/threads:2                               8           4    +51.9%
BM_ScopedLogId/threads:4                               8           4    +52.9%
BM_ScopedLogId/threads:8                               8           4    +52.1%
BM_ScopedLogId/threads:16                             11           6    +44.0%

示例:通过改进 Shape 处理,将 XLA 编译时间减少约 31%。

若干用于改善 XLA 编译器性能的改动:

  • 通过几种方式改善 ShapeUtil::ForEachIndex... 迭代性能:

  • ShapeUtil::ForEachState 中,只保存 span 所表示数组的指针,而不是完整 span 对象。

  • 预先形成指向 ShapeUtil::ForEachState::indexes vector 的 ShapeUtil::ForEachState::indexes_span,而不是在每次循环迭代中从 vector 构造 span。

  • 保存指向 ShapeUtil::ForEachState::indexes vector 后备存储的 ShapeUtil::ForEachState::indexes_ptr 指针,让 ShapeUtil::ForEachState::IncrementDim() 可以使用简单数组操作,而不是更昂贵的 vector::operator[]

  • 保存 ShapeUtil::ForEachState::minor_to_major 数组指针,在构造函数中通过 shape.layout().minor_to_major().data() 初始化,而不是在每次迭代中对每个维度调用 LayoutUtil::Minor(...)

  • 内联 ShapeUtil::ForEachState 构造函数和 ShapeUtil::ForEachState::IncrementDim() 例程。

  • 对不需要传入函数返回 Status 的调用点,改善 ShapeUtil::ForEachIndex 迭代性能。做法是引入 ShapeUtil::ForEachIndexNoStatus 变体,它接受 ForEachVisitorFunctionNoStatus(返回普通 bool)。这比接受 ForEachVisitorFunctionShapeUtil::ForEachIndex 更快,因为后者返回 StatusOr,会在迭代的每个元素上产生昂贵的 StatusOr 析构调用。

  • LiteralBase::BroadcastGenerateReduceOutputElement 中使用这个 ShapeUtil::ForEachIndexNoStatus 变体。

  • 通过几种方式改善 LiteralBase::Broadcast 性能:

  • literal.cc 中引入模板化的 BroadcastHelper 例程,并针对不同 primitive 字节大小特化。没有这个改动时,primitive_size 是运行时变量,编译器无法很好优化每个元素上的 memcpy,会调用假设字节数较大的通用 memcpy 路径,尽管这里的大小只是很小的 2 的幂(通常是 1、2、4 或 8)。

  • LiteralBase::Broadcast 例程开始处只调用一次 shape(),从而避免每次 Broadcast 调用中约 5 + num_dimensions + num_result_elements 次虚调用中的绝大部分。散落各处、看起来无害的 shape() 调用最终会变成 root_piece().subshape(),其中 subshape() 是虚函数。

  • BroadcastHelper 例程中,对源维度为 1 的情况做特殊处理,避免在这种情况下调用 IndexUtil::MultiDimensionalIndexToLinearIndex

  • BroadcastHelper 中,使用指向 scratch_source_index vector 后备存储的 scratch_source_array 指针变量,并直接使用它来避免逐元素代码中的 vector::operator[] 操作。还在逐元素循环外预计算指向 scratch_source_index vector 的 scratch_source_span,避免为每个元素从 vector 构造 span。

  • 引入三参数版 IndexUtil::MultiDimensionalIndexToLinearIndex,由调用方传入与 shape 参数相关的 minor_to_major span。在 BroadcastHelper 中使用它,让 src 和 dst shape 每次 Broadcast 只计算一次,而不是每复制一个元素计算一次。

  • ShardingPropagation::GetShardingFromUser 中,对于 HloOpcode::kTuple 情况,只有在找到感兴趣的 operand 后才调用 user.sharding().GetSubSharding(...)。避免急切调用,让某次长编译中该例程的 CPU 时间从 43.7s 降到 2.0s。

  • ShapeUtil::ForEachIndexLiteral::Broadcast 和新的 ShapeUtil::ForEachIndexNoStatus 添加基准测试。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
Base is with the benchmark additions of
BM_ForEachIndex and BM_BroadcastVectorToMatrix (and BUILD file change to add
benchmark dependency), but no other changes.

New is this cl

Run on (72 X 1357.56 MHz CPU s) CPU Caches: L1 Data 32 KiB (x36)
L1 Instruction 32 KiB (x36) L2 Unified 1024 KiB (x36) L3 Unified 25344 KiB (x2)

Benchmark                                      Base (ns)    New (ns) Improvement
----------------------------------------------------------------------------
BM_MakeShape                                       18.40       18.90     -2.7%
BM_MakeValidatedShape                              35.80       35.60     +0.6%
BM_ForEachIndex/0                                  57.80       55.80     +3.5%
BM_ForEachIndex/1                                  90.90       85.50     +5.9%
BM_ForEachIndex/2                               1973606     1642197     +16.8%

新增的 ForEachIndexNoStatus 明显快于 ForEachIndex 变体(它只存在于这个新 CL 中,但 BM_ForEachIndexNoStatus/NUM 完成的基准工作与上面的 BM_ForEachIndex/NUM 结果可比)。

1
2
3
4
5
Benchmark                                      Base (ns)    New (ns) Improvement
----------------------------------------------------------------------------
BM_ForEachIndexNoStatus/0                             0        46.90    ----
BM_ForEachIndexNoStatus/1                             0        65.60    ----
BM_ForEachIndexNoStatus/2                             0     1001277     ----

Broadcast 性能提升约 58%。

1
2
3
4
5
Benchmark                                      Base (ns)    New (ns) Improvement
----------------------------------------------------------------------------
BM_BroadcastVectorToMatrix/16/16                   5556        2374     +57.3%
BM_BroadcastVectorToMatrix/16/1024               319510      131075     +59.0%
BM_BroadcastVectorToMatrix/1024/1024           20216949     8408188     +58.4%

对大型语言模型进行提前编译的宏观结果(程序不只做 XLA 编译,但略少于一半时间花在 XLA 相关代码中):

基线程序总体耗时:573 秒。使用此 CL 后总体耗时:465 秒(+19% 改进)。

运行该程序时,编译两个最大 XLA 程序所花时间:

基线:141s + 143s = 284s。使用此 CL:99s + 95s = 194s(+31% 改进)。

示例:在 Plaque(一个分布式执行框架)中,将大型程序编译时间减少约 22%。

通过小幅调整让编译加速约 22%。

  • 加速检测两个节点是否共享同一个来源。 以前我们会按排序顺序获取每个节点的来源,然后做有序求交。 现在我们把一个节点的来源放入哈希表, 再遍历另一个节点的来源并查询该哈希表。

  • 在步骤 1 中复用同一个临时哈希表。

  • 生成已编译 proto 时,维护一个以 pair<package, opname> 为 key 的单个 btree,而不是 btree 的 btree。

  • 在前述 btree 中存储指向 opdef 的指针,而不是复制 opdef。

大型程序(约 45K op)上的速度测量:

1
2
name             old time/op  new time/op  delta
BM_CompileLarge   28.5s ± 2%   22.4s ± 2%  -21.61%  (p=0.008 n=5+5)

示例:MapReduce 改进(wordcount 基准约 2 倍加速)。

MapReduce 加速点:

  • 修改了 SafeCombinerMapOutput 类的 combiner 数据结构。以前使用 hash_multimap,表中插入的每个唯一 key/value 都有一个哈希表条目;现在改用 hash_map(其中 ValuePtr 是值和重复计数的链表)。这有三方面帮助:

  • 它显著减少内存使用,因为每个值只使用 sizeof(ValuePtr) + value_len 字节,而不是 sizeof(SafeCombinerKey) + sizeof(StringPiece) + value_len + 新哈希表条目开销。这意味着 reducer buffer 刷新的频率更低。

  • 它显著更快,因为当为表中已有 key 插入新值时,我们避免了额外哈希表条目,而是把该值挂到该 key 的值链表中。

  • 由于链表中的每个值都关联一个重复计数,因此可以表示如下序列:

1
2
3
4
5
Output(key, "1");
Output(key, "1");
Output(key, "1");
Output(key, "1");
Output(key, "1");

作为 key 对应链表中的单个条目,重复计数为 5。内部会向用户级 combining 函数产出五次 1。(类似技巧或许也可以应用在 reduce 端。)

  • (小改动)在默认 MapReductionBase::KeyFingerprintSharding 函数中添加 nshards == 1 测试;如果只使用 1 个 reduce shard,就完全避免对 key 计算 fingerprint(因为此时无需检查 key,直接返回 0 即可)。

  • 在每次向 combiner 添加 key/value 都会调用的代码路径中,将一些 VLOG(3) 语句改为 DVLOG(3)

将某个 wordcount 基准的耗时从 12.56s 降到 6.55s。

示例:重写 SelectServer 中的 alarm 处理代码,显著提升性能(添加+删除一个 alarm 从 771 ns 降到 271 ns)。

重写了 SelectServer 中的 alarm 处理代码,以显著提升其性能。

改动:

  • AlarmQueueset 改为 AdjustablePriorityQueue。这显著加快了 alarm 处理,把添加和删除一个 alarm 的时间从 771 纳秒降到 281 纳秒。该改动避免了每次设置 alarm 时对 STL set 红黑树节点的一次分配/释放,并且缓存局部性也好得多(因为 AdjustablePriorityQueue 是基于 vector 实现的堆,而不是红黑树);每次通过 selectserver 循环操作 AlarmQueue 时,触碰的缓存行更少。

  • 将 Alarmer 中的 AlarmListhash_map 转为 dense_hash_map,避免每次添加/删除 alarm 时再发生一次分配/释放(这也改善了添加/删除 alarm 时的缓存局部性)。

  • 移除 num_alarms_stat_num_closures_stat_ 这两个 MinuteTenMinuteHourStat 对象以及对应的导出变量。虽然监控这些指标看起来不错,但实践中它们会给关键网络代码增加显著开销。即使把这些变量保留为 Atomic32 而不是 MinuteTenMinuteHourStat,也会让添加和删除 alarm 的成本从 281 纳秒增加到 340 纳秒。

基准测试结果

1
2
3
Benchmark                      Time(ns)  CPU(ns) Iterations
-----------------------------------------------------------
BM_AddAlarm/1                       902      771     777777

此改动后

1
2
3
Benchmark                      Time(ns)  CPU(ns) Iterations
-----------------------------------------------------------
BM_AddAlarm/1                       324      281    2239999

示例:索引服务速度提升 3.3 倍!

2001 年,在计划从磁盘索引服务切换到内存索引服务时,我们发现了许多性能问题。这个改动修复了其中许多问题,使我们在双处理器 Pentium III 机器上的 2GB 内存索引中,从每秒 150 次内存查询提升到超过 500 次。

  • 对索引块解码速度做了大量性能改进(微基准中从 8.9 MB/s 提升到 13.1 MB/s)。

  • 现在我们在解码期间对块做校验和。这使得所有 getsymbol 操作都可以不做边界检查。

  • 使用一些粗糙但有效的宏,在整个循环期间把 BitDecoder 的各个字段保存在局部变量中,并在循环结束后写回。

  • 使用内联汇编访问 Intel 芯片上的 bsf 指令,用于 getUnary(查找一个 word 中第一个 1 bit 的索引)。

  • 将值解码到 vector 时,在循环外调整 vector 大小,然后沿着 vector 移动指针,而不是每次存储值都做带边界检查的访问。

  • 解码 docid 期间,将 docid 保持在本地 docid 空间中,以避免乘以 num_shards_。只有真正需要实际 docid 值时,才乘以 num_shards_ 并加上 my_shard_

  • IndexBlockDecoder 现在导出 AdvanceToDocid 接口,返回第一个 docid ≥ d 的索引。这允许扫描基于本地 docid 进行,而不必在客户端为块中每个索引调用 GetDocid(index) 时,把每个本地 docid 强制转换为全局 docid。

  • 文档位置数据现在按需解码,而不是在客户端请求块中任一文档的位置数据时,就急切解码整个块。

  • 如果正在解码的索引块结尾距离页边界不到 4 字节,就把它复制到本地缓冲区。这样我们总能通过 4 字节加载读取 bit 解码缓冲区,而不必担心越过 mmap 页尾导致段错误。

  • 各种评分数据结构只初始化前 nterms_ 个元素,而不是初始化所有 MAX_TERMS 元素(某些情况下,我们曾经会为每个被评分文档不必要地 memset 20K 到 100K 数据)。

  • 当正在计算的值为 0 时,避免对中间评分值执行 round_to_int 以及后续计算(后续计算只是把 0 写回到我们已经 memset 为 0 的位置,而这是最常见情况)。

  • 将评分数据结构上的边界检查改成 debug 模式断言。

上一篇:性能指南目录

上一节

下一篇:每周性能技巧 #97:良性生态循环

下一节


本节目录