前言

duckdb 是一款优秀的 OLAP 数据库,有如下的特点:

  • 快速。

    我之前进行了一次 OLAP 数据库的性能测试,对比了包括 ClickHouse, DuckDB, StarRocks, DorisDB, DataFusion, Pola.rs 在内的 几款数据库, DuckDB 在好几项测试中,都处在领先位置。

    img.png

  • 轻量

    DuckDB 的二进制文件大小只有 40MB 左右,没有任何依赖。相比 clickhouse(~440M) 来说,小了一个数量级。

    而且,DuckDB 无需任何配置,即可使用,可处理 csv、parquet、json 等格式的文件数据,也可以使用 duckdb 自身的数据库格式,其使用 风格非常类似于 sqlite, 是一个 serverless 风格的数据库,也非常适合于嵌入式应用。

    除了发布版本的的轻量之外,duckdb 的源代码量也很轻量,并且可以在各种平台中编译,源代码的结构较为清晰,适合于学习。

  • SQL 支持能力强。

    作为一款轻量的数据库,我很惊讶于 duckdb 对 SQL 的支持程度:

    • 比如 JOIN 支持,很多的数据库都不支持 Full Join,Semi Join, Anti Join 等操作(虽然可以使用其他方式进行改写,但既不自然,也不便于性能优化)
    • 集合操作:支持 Union, Union All, Intersect, Except 等操作。后两者很多数据库都不支持。
    • 窗口函数的支持力度。mysql 算是支持比较好的,但无法表示出“同期年累积值”(去年1月1日至1年前的今天)这样的窗口范围。ClickHouse 则对基于 日期的窗口计算乏善可陈。在这方面, DuckDB 的支持力度是最好的。
    • CTE 支持。 没有 CTE 的 SQL 语句,很多时候会变得难以阅读,难以维护。我们在处理类似于 PowerBI 的双向筛选时,生成了非常复杂的 SQL 语句, 要理解这条 SQL 语句,有时1-2小时都讲不出清楚,但采用 CTE 后就变得简单、清晰很多。duckdb 良好的支持 CTE,并且有针对 CTE 的优化, 这使得不仅 SQL 简单、易读,而且查询性能也更优秀。
    • list 数据类型 的支持,以及丰富的 list 处理函数。 在我碰到的一个对 count(distinct X) 类型的度量进行小计、合计时,如果是传统的方式,势必针对每个聚合层次做一次重新计算,而采用 list 类型后,我们完全可以仅使用 一次 SCAN 结合 窗口函数,完成这个需求,这对于大数据量下的查询来说,可以显著提升性能。

    上面仅列举了这段时间来我发现的一些能力,估计,随着对 duckdb 的深入研究,以及定制能力的挖掘,藏在 duckdb 内部还会有更多的宝藏会冒出来。

  • 扩展能力。

    目前还只是简单的浏览了一下 Extensions 的文档,实验了一下 mysql 扩展提供的外部数据源 连接能力,体会到 duckdb 强大的扩展能力。

    • 数据类型扩展,参考 Inet Extension
    • 外部数据源扩展。参考 MySQL Extension,包括了函数、表函数、Pushdown 的扩展。
    • 自定义函数扩展。参考
      • scalar function
      • aggregate function
      • table function
      • window function

    当然,强大的扩展能力也是建立在良好的 模块化设计 的基础之上的。这也使得更有必要学习一下这款优秀的数据库的源代码了。

  • 活跃的社区

    DuckDB 目前来看,在社区是棘手可热的:

    • 官网版本更新频繁。转正为 1.0.0 版本后,目前,每个月都有一个小版本的更新。
    • 文档较为完善。
    • Blog 更新较为频繁,介绍了很多新的优化、特性。
    • Youtube 上有很多的视频教程。
    • 国内也有很多的关注者,在知乎上有很多的文章介绍 duckdb 的使用和源代码分析的文章。

作为一名数据分析产品的开发者,我在了解 DuckDB 后,越发感觉到需要深入的学习 duckdb, 主要有以下的几个原因:

  1. 引入 duckdb 作为公司核心产品的数据分析的引擎,简化产品产品架构、提升产品的能力,尤其是在性能方面的提升。

    目前,我们的产品有两个分析引擎:

    • SQL 分析引擎:将前端(仪表盘等可视化组件、AI)分析查询转换为 SQL 执行。目前这种方式对度量的复杂度有较大的局限,诸如时间的各种快速计算、 表间计算 等难以有效支持。 (对源数据库的 SQL 方言适配工作量过于复杂。当产品要支持 40+ 各异的数据源时,SQL 方言几乎是一件不可能的任务。)
    • MDX 分析引擎。 MDX 分析引擎对复杂度量的表达能力支持,理论上是无上限的,但这种方式的计算模型偏重于 Top-Down 的递归函数计算,在数据的访问 主要是 Pull 式的,在复杂的计算场景下,会面临性能问题。如果将 Top-Down + Pull 模型转换为 Bottom-Up + Push 模型,避免对数据源的重复 Scan 和 重复计算,是提升这类计算引擎的关键措施。

    对于 SQL 分析引擎,如何将一个复杂的 前端查询, 表示为 SQL 查询计划,在引入 duckdb 之前与 引入 duckdb 之后,在架构上会有很大的不同,甚至于 有进一步的空间(即我们直接生成 物理执行计划,并在物理执行计划中添加与前端分析匹配的自定义算子的方式),这一块,目前还处在一个相对朦胧的阶段, 有待内部进一步实验后,再进行分享。

    当然,无论是浅度的依赖,还是深度的依赖,都需要对 duckdb 的源代码有一个深入的了解,才能做到更多的驾驭能力,否则,很多东西是不敢想,不敢做,或者 是遇到内部问题,难以跨越的。

  2. 学习一款完整的 OLAP 分析引擎,了解更多的技术细节。

    SQL 执行自身是一个复杂的过程,从基础的火山模型,到向量版的火山模型、到 Pipeline 执行模型,从 Tree 解释执行到 JIT 编译执行,从传统的计算算子 到 向量化的 算子,每一个领域都有很多的算法、优化技术。学习这些知识,是一个很有乐趣的过程。 或许,duckdb 并不是终点,而可能是一个新的起点。

后续,我会在这个系列中,记录我对 duckdb 的学习过程,包括:

  • 数据结构
  • 整体处理流程
  • 特定的优化技术
  • 对 OLAP.NEXT 技术的一些思考记录

Parse

Resolve

Logical Plan

Optimizer

Physical Plan

Vector

向量化(Vectorization) 是 OLAP 的核心技术。在《MonetDB/X100: Hyper-Pipelining Query Execution》论文中,介绍了使用向量化技术带来的 性能提升,其主要原因是:

  1. 传统的以 tuple-at-a-time 为单位的 Pull 模型,CPU 大部份消耗在 tuple 的运输(pull)上,实际的计算占比 < 10%(图中为8.8%)。参见下图: img.png
  2. 即使在这 8.8% 的计算中,其 IPC(Instructions Per Cycle) 也不高,只有 0.8。即平均每个时钟周期只执行了 0.8 条指令。而现在的服务器 CPU 是多发射架构,intel skylake 每个时钟周期理论上可以执行4条指令。 这与 tuple-at-a-time 的计算模型有关。 img.png 来源:https://zhuanlan.zhihu.com/p/645343994 (具体的发射大小文档待查)
  3. 现代的 CPU 支持 SIMD 指令集,可以在一个时钟周期内执行多个相同操作的指令,这样可以提高计算效率。128/256/512 位的 SIMD 寄存器,可以在单条 指令中操作 4/8/16 个 32-bit 的计算 或者 2/4/8 个 64-bit 的计算。
  4. tuple-at-a-time 的计算模型,对 CPU 的缓存友好性不好。无论是 I-cache 还是 D-cache,这些都会影响 IPC 的提升。

使用向量化优化后,

  1. 从 tuple-at-a-time 的 Pull 模型,改变为 morsel-at-a-time 的 Push 模型,每个 morsel 的大小可以很好的匹配 CPU 的缓存行大小。从而 提升缓存的命中率。
  2. morsel-at-a-time 的模式,大大的减少了处理循环,提高了有效运算的占比。
  3. 结合 SIMD 指令,从原来的 0.8 的 IPC 显著提升到等效的 16-64, 计算处理也得以大幅度提升。

本文以 duckdb 的示例为例,介绍 Vector 的数据结构设计,常用的向量化操作,以及对这一块性能优化的一些思考。

DuckDB Vector 数据结构

参考文档:Vector官网介绍资料

class Vector {
  VectorType vector_type;  // FLAT_VECTOR, FSST_VECTOR, CONSTANT_VECTOR, DICTIONARY_VECTOR, SEQUENCE_VECTOR
  LogicalType type;   // boolean, integer, date, varchar, etc.
  data_ptr_t data;    // uint8_t*, 指向数据的指钨
  ValidityMask validity;   // 标识某个元素是否是 null
  shared_ptr<VectorBuffer> buffer;   // data 的容器
  shared_ptr<VectorBuffer> auxliary; // 向量的动态数据部份的数据容器
}

以基础的 FlatVector(integer) 为例,其内存布局如下:

   vector_type: i8  -- FLAT_VECTOR
   type: [i8, 24]
       - id: i8  -- INTEGER
       - physical_type: i8
       _ type_info : nullptr  [i8,16]
   data: i32*  -- 指向 buffer.data 中区域
   validity: [i8, 32] 
       - validity_mask: u64*  -- bitmap,其 owner 由 validity_data 持有
       - validity_data: shared_ptr<ValidityBuffer>
       - target_count: u64 = 2048
   buffer: shared_ptr<VectorBuffer> -- owner data
           - buffer_type
           - aux_data: nullptr
           - data: nullptr -- 管理 vector.data 的所有权
   auxliary: nullptr       -- FlatVector(integer) 不需要额外的存储空间
  • data: T_ITEM* 这里的 T_ITEM 是向量中元素的值类型,对 INTEGER 等定长类型,其值类型是 i32。

    • 对 VARCHAR,使用的是 string_t 结构,定义如下:

      union {     // 16 bytes
          struct {  // length > 12 时存储前4个字符,后面的字符存储在 auxliary 中
              uint32_t length;
              char prefix[4];
              char *ptr;
          } pointer;
          struct { // length <= 12 时,直接存储在 inlined 中
              uint32_t length;
              char inlined[12];
          } inlined;
      } value;
      

      思考: 如果 string_t 调整为 24 或者 32 字节大小,则 INLINE 可以存储 12/20 字节,在进行字符串的比较时,是否会有更好的性能?

      对 VARCHAR 类型来说,string_t 是定长的,可以作为数组形式存储在 buffer 中(vector.data 是对 buffer 中的引用),而变长部份 则存储在 auxliary 中。 这个存储设计非常类似于编程语言中的 stack 和 heap: stack 中存储定长部份,heap 中存储变长部份。

    • List 类型。DuckDB 支持 List ,不仅在 SQL 级别支持 list 数据类型及一系列的 list 操作函数,在引擎内部,也会依赖 List 类型来处理计算。

      struct list_entry_t {  // list_entrt_t 是定长的,存储在 buffer 中
        idx_t offset;  // 指向在 auxliary 中的变长部份开始位置
        idx_t length;  // list 的长度
      };
      

      思考:这种设计就无法支持 list 的动态添加了。在将 list 作为聚合函数时,随着新的数据加入,list会不断的增长,这个就需要使用其他的数据结构来支持了。

      • 目前还没有看懂这一块的向量的构建过程。
  • buffer: 数据的生命周期。

    (我之前的C++经验还停留在 C++ 11之前的手动new/delete是经验,现在通过智能指针后,delete操作就基本上不需要了)这种模式是引用智能指针后 C++ 的常见的内存管理模式,rust 的内存管理思想与这个也是高度一致的。虽然 Rust 的设计是从 C++ 的最佳实践中衍生出来的,不过,不妨碍我用 Rust 来 反向理解 C++。

    buffer 与 data 之间的关系满足:(在 Vector.Resize 方法中有这个假设:)

    // data: unique_ptr<[T_ITEM]>; // T_ITEM 数组
    // data = buffer->data.get()   
    auto new_data = make_unsafe_uniq_array_uninitialized<data_t>(target_size); // new [T_ITEM; n]
    memcpy(new_data.get(), resize_info_entry.data, old_size); // 
    resize_info_entry.buffer->SetData(std::move(new_data));   // buffer->data = new_data
    resize_info_entry.vec.data = resize_info_entry.buffer->GetData();  // data = buffer->data.get()
    
    • TODO 似乎有一些子类型的 VectorBuffer 不满足这个契约,例如 DictionaryBuffer 通过 sel_vector 来访问 data, VectorChildBuffer 通过重载的 data: Vecotr 来管理数据。这些子类型似乎并不满足 Vector::Resize 中的这个契约。
  • auxliary:

Execution

学习 duckdb 的源代码,我是首先从 Execution 这一部份开始的:

  • 虽然从结构上来看,Execution 是整个系统的最后一步,但秉着“以终为始”的学习原则, 从 Execution 开始,可以更好的了解整个系统的核心。从 Parser 到 Logical Plan 到 Optimizer 到 Physical Plan,都是在为 Execution 这一步做准备。
  • DuckDB 的性能优化的关键,也是在 Execution 这一步,其核心是 Pipeline 的机制和 Vector 的数据结构。

理解了 Pipeline 和 Vector,再结合几个主要的 Operator 原理,就可以对 DuckDB 的执行机制有一个初步的认识了。

参考文档

  1. DuckDB internals
  2. Push-Based Execution In DuckDB
  3. 知乎链接 DuckDB Push-Based Execution Model

代码调试技巧

  1. 在 duckdb shell 中 使用 explain statement 或者explain analyze statement 查看执行计划。
  2. PipelineExecutor::Execute(idx_t max_chunks) 是 Pipeline 的执行入口,可以从这里添加断点,开启一个 Pipeline 的调试。 -. watches: this->pipeline.ToString() 查看当前 Pipeline 的信息。 或者 this->pipeline.Print() 在 console 中 查看当前 Pipeline 的 信息。 一次SQL 执行会产生多个 Pipeline,仅在满足条件的 Pipeline 上设置断点。 -. watches: this->pipeline.source->ToString(ExplainFormat::TEXT) 查看当前 Pipeline 的 Source 信息。 -. 在调试的 Variables 面板中,可以查看 pipeline 的 source, operators, sink 的信息,在对应的 operator 上设置断点。
  3. source 节点的入口是: PhysicalXxxOperator::GetData(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input), 可以在这里设置断点,查看 source 的执行流程。
  4. operator 节点的执行入口是:PhysicalXxxOperator::Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk, GlobalOperatorState &gstate, OperatorState &state
  5. sink 节点的执行有3个入口:
    • PhysicalXxxOperator::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input)
    • PhysicalXxxOperator::Combine(ExecutionContext &context, OperatorSinkCombineInput &input)
    • PhysicalXxxOperator::Finalize((Pipeline &pipeline, Event &event, ClientContext &context, OperatorSinkFinalizeInput &input)

在阅读代码过程中,可以通过上述的调试技巧,找到需要学习的代码的入口点,设置断点后,跟着调试器一步一步的阅读代码,并在 Variables 中查看各个变量的 值,理解数据流和代码执行流程。

接下来,我们将从一些简单的SQL 执行示例出发,来阅读 Execution 模块的源代码。

  1. example query: 两个表join后分组聚合

一个简单的关联、分组聚合查询的执行计划分析

sql

select name, count(freight), sum(freight) 
from sale_orders so 
left join customers c on c.customer_id = so.customer_id 
where name = 'IB89Nf23kAom' 
group by name;

Physical Plan

flowchart BT
    
    TS1[Table Scan: customers
    projections: customer_id,name
    filters: name = ? ]

    TS2[Table Scan: sale_orders
    projections: customer_id, freight
    filters: customer_id >= 1
    ]
    
    F1[Filter
    customer_id <= 999999
    ]

    HJ[Hash Join]
    
    Projection1[Projection
    name, freight
    ]

    HashGroupBy1[Hash Group By
    Groups: #0
    Aggregates: count_start, sum #1
    ]

    TS1 --> F1
    TS2 --> HJ
    F1 --> HJ
    HJ --> Projection1
    Projection1 --> HashGroupBy1

Pipeline

flowchart BT
    
    subgraph pipelin1
          TableScan1[Table Scan: customers
            projections: customer_id,name
            filters: name = ? 
        ]

           
        Filter1[Filter
            customer_id <= 999999
        ]

        HashJoin1[Hash Join]

    end 
    subgraph pipelin2
        TableScan2[Table Scan: sale_orders
            projections: customer_id, freight
            filters: customer_id >= 1
        ]

        Projection1[Projection
            name, freight
        ]

        HashGroupBy1[Hash Group By
            Groups: #0
            Aggregates: count_start, sum #1
        ]

    end
    
    TableScan1 --> Filter1
    TableScan2 --> HashJoin1
    Filter1 --> HashJoin1
    HashJoin1 --> Projection1
    Projection1 --> HashGroupBy1

    pipelin1 --> pipelin2
  1. 执行计划拆分为多个 pipeline, 每个 pipeline 是一个从 Source 到 Sink 的 Push 模型.
    • 如何根据物理执行计划生成 pipeline ?
  2. 多个 pipeline 之间构成了一个依赖图,依赖的 pipeline 执行完毕后,被依赖的 pipeline 才能执行。
  3. 每个 pipeline 会被拆分为多个 Task(分区并行),每个 Task 相当于一个最小的调度单元。
    • 当 Pipeline 的所有 task 执行完毕,这个 pipeline 才执行完毕,可以执行被依赖的的 pipeline。
  4. 在 Pipeline 中有3种角色:
    • Source: 管道的起点,从外部设备读取数据。(IO 阻塞式:这是否会降低系统的处理能力?)

      • GetData(): 读取数据

        Source 需要考虑并发,与单个任务相关的数据存储在 localState 中,与整个管道相关的数据存储在 globalState 中。

        • globalState 存储在 pipeline 中,同一个 pipeline 的所有 task 共享,设计到并发安全控制时,需要加锁
        • localState 存储在 PipelineExecutor 中。由 PipelineExecutor 负责创建和销毁。 不同的 Source 节点有自己的 localState/globalState 定义。

      -[ ] 执行计划是如何处理分区的?

    • Operator: pure function

      • Execute(): 处理单个批次的输入数据,无需考虑并发安全。
    • Sink: 管道的终点,需要协调多个任务的结果,进行合并

      • Sink(): 处理单个批次的输入数据
      • Combine(): 单个 Task 的全部 Source 处理完毕,进行单个任务内的合并。(其他任务可能会 Sink/Combine)
      • Finalize(): 所有 Task 处理完毕,进行最终的合并。

      考虑到存在并发问题,需要区分 localState 和 globalState,不同的 Sink 节点有自己的 localState/globalState 定义。

主要算子的执行逻辑

PhysicalTableScan (Source in pipeline1 & pipeline2)

  1. 分区如何处理? Source 侧如何处理并发?
  2. 每次 Pipeline Execute最多 50 个 Chunk?
  3. bind_data 指向 DataTable
    • 为 TableScanBindData 增加 ToString 方法,方便调试
    • unordered_map 无法显示, 考虑为 ScanFilterInfo/TableScanState 添加 ToString 方法
  4. 实际入口:TableScanFunc() 负责读取数据 // 边界:TableFunction,可能要对比 from table 与 from csv_read 的执行流程的区别

PhysicalFilter (Operator in pipeline1)

  1. filter 表达式的计算流程?(解释执行的成本如何?)
  2. 这一块的代码是否会进行 SIMD 优化?

BinaryExecutor::SelectGenericLoop<…>(const int *, const int *, const SelectionVector *, 
    const SelectionVector *, const SelectionVector *, unsigned long long, ValidityMask &, 
    ValidityMask &, SelectionVector *, SelectionVector *) binary_executor.hpp:444
    --- 这个方法才是最终的向量执行代码,在前面有5-6层的解释和 dispatch 过程,其向量化是通过 compiler auto-vectorization 实现的。
BinaryExecutor::SelectGenericLoopSelSwitch<…>(const int *, const int *, const SelectionVector *, 
    const SelectionVector *, const SelectionVector *, unsigned long long, ValidityMask &, 
    ValidityMask &, SelectionVector *, SelectionVector *) binary_executor.hpp:459
BinaryExecutor::SelectGenericLoopSwitch<…>(const int *, const int *, const SelectionVector *, 
    const SelectionVector *, const SelectionVector *, unsigned long long, ValidityMask &, 
    ValidityMask &, SelectionVector *, SelectionVector *) binary_executor.hpp:475
BinaryExecutor::SelectGeneric<…>(Vector &, Vector &, const SelectionVector *, 
    unsigned long long, SelectionVector *, SelectionVector *) binary_executor.hpp:491
BinaryExecutor::Select<…>(Vector &, Vector &, const SelectionVector *, 
    unsigned long long, SelectionVector *, SelectionVector *) binary_executor.hpp:515
TemplatedSelectOperation<…>(Vector &, Vector &, optional_ptr<…>, unsigned long long, 
    optional_ptr<…>, optional_ptr<…>, optional_ptr<…>) execute_comparison.cpp:104
VectorOperations::LessThanEquals(Vector &, Vector &, optional_ptr<…>, unsigned long long, 
    optional_ptr<…>, optional_ptr<…>, optional_ptr<…>) execute_comparison.cpp:345
    
ExpressionExecutor::Select(const BoundComparisonExpression &, ExpressionState *, 
    const SelectionVector *, unsigned long long, SelectionVector *, 
    SelectionVector *) execute_comparison.cpp:369
ExpressionExecutor::Select(const Expression &, ExpressionState *, const SelectionVector *, 
    unsigned long long, SelectionVector *, SelectionVector *) expression_executor.cpp:236
ExpressionExecutor::SelectExpression(DataChunk &, SelectionVector &) expression_executor.cpp:90
PhysicalFilter::ExecuteInternal(ExecutionContext &, DataChunk &, DataChunk &, GlobalOperatorState &, OperatorState &) const physical_filter.cpp:45
......

当然,要对比一下在 release 模式下,是否会进行编译优化?从这个执行栈来看,这个解释过程还是很啰嗦的。如果进行更好的特化或者 TypedIR 的方式,相信在这一块的 执行效率会更高。

template <class LEFT_TYPE, class RIGHT_TYPE, class OP, bool NO_NULL, bool HAS_TRUE_SEL, bool HAS_FALSE_SEL>
	static inline idx_t
	SelectGenericLoop(const LEFT_TYPE *__restrict ldata, const RIGHT_TYPE *__restrict rdata,
	                  const SelectionVector *__restrict lsel, const SelectionVector *__restrict rsel,
	                  const SelectionVector *__restrict result_sel, idx_t count, ValidityMask &lvalidity,
	                  ValidityMask &rvalidity, SelectionVector *true_sel, SelectionVector *false_sel) {
		idx_t true_count = 0, false_count = 0;
		for (idx_t i = 0; i < count; i++) {
			auto result_idx = result_sel->get_index(i);
			auto lindex = lsel->get_index(i);
			auto rindex = rsel->get_index(i);
			if ((NO_NULL || (lvalidity.RowIsValid(lindex) && rvalidity.RowIsValid(rindex))) &&
			    OP::Operation(ldata[lindex], rdata[rindex])) {
				if (HAS_TRUE_SEL) {
					true_sel->set_index(true_count++, result_idx);
				}
			} else {
				if (HAS_FALSE_SEL) {
					false_sel->set_index(false_count++, result_idx);
				}
			}
		}
		if (HAS_TRUE_SEL) {
			return true_count;
		} else {
			return count - false_count;
		}
	}

TODO: 这个方法是最终的向量化执行代码,依赖于编译器的 auto-vectorization,这段代码是否高效呢? 可以截取一段特化的汇编代码,来看看其是否高效?

  1. l_sel, r_sel 的操作可否向量化?
  2. l_validity, r_validity 的操作可否向量化?

从代码执行流程来看,这个解释执行的过程似乎还是有较高的成本:

  1. 通过模版技术,根据不同的数据类型、操作符等分派编译,避免在循环处理中进行分支判断操作。

     case class Node(operator, left, right):
         val leftVector = left.execute()
         val rightVector = right.execute()
         operator(leftVector, rightVector) -- 这里 operator 是一个向量化的特化版本函数
    
    // a > b && a < 10 
    Node( BooleanAnd, 
         Node( IntegerGT, a, b), 
         Node( IntegerLT, a, 10)
    )
    
  2. TypedIR 直接编译成为 SIMD 代码,在一个函数内,完成多个子表达式的计算,共享寄存器,减少 load 操作,执行速度会更快。

    这个方式感觉是最理想的,不过开发的成本可能会比较高,而且这个JIT自身的开销如何也需要考虑。

  3. 此外,在这个SQL中,Optimizer 似乎引入了一个不必要的 filter: customer_id <= 999999 当行数很大时,这个耗时是没有必要的。

PhysicalHashJoin (Sink in pipeline1)

PhysicalHashJoin (operator in pipeline2)

PhysicalProjection (Operator in pipeline2)

PhysicalHashAggregate (Sink in pipeline2)

Pipeline 调度

  1. Pipeline Task 的创建

  2. 主要数据结构

    • 画一个 class diagram, 理清楚这几个类的 CRC。
    • Event 这个类是干什么的?
    1. Executor: (共享粒度)

      • physical_plan
      • owned_plan
      • root_pipeline
      • pipelines
    2. Pipeline: 共享粒度,存储 global state,多线程访问需要考虑加锁

      • source operator
      • operators
      • sink operator
      • source_state: global state for Source
      • sink.sink_state: global state for Sink
    3. PipelineExecutor (线程粒度)

      • pipeline
      • thread_context
      • ExecutionContext
      • local_source_state
      • local_sink_state
    4. PipelineTask is a ExecutorTask(线程粒度) 在执行过程中有哪些是变化的?

      • ExecutorTask:
        • task ?
        • executor: Executor
        • event
        • thread_context
        • op ?
      • pipeline: Pipeline
      • pipeline_executor: PipelineExecutor (每个线程一个 PipelineExecutor 示例,存储 localState )
    5. ExecutionContext(client: ClientContext&, thread, pipeline)

Pipeline

Operators

seq_scan

filter

projection

projection

hash_join

hash_group_by

我们使用示例 SQL 语句来学习 hash_group_by 算子的原理:

select name, count(freight), sum(freight) 
from sale_orders so 
left join customers c on c.customer_id = so.customer_id 
where gender = 'M' and name like 'abc%'  and freight > 10 and freight < 50 
group by name

在执行计划中,可以看到如下的 hash_group_by 算子, 该算子是 pipeline 的 Sink 节点,其谦虚处理节点为: join |> projection |> hash_group_by

┌───────────────────────────┐
│       HASH_GROUP_BY       │
│    ────────────────────   │
│         Groups: #0        │
│                           │
│        Aggregates:        │
│        count_star()       │
│          sum(#1)          │
│                           │
└─────────────┬─────────────┘ 

对应的源代码为: src/execution/operator/aggregate/physical_hash_aggregate.cpp

class PhysicalHashAggregate : public PhysicalOperator {

	//! The grouping sets, SQL 中分组+聚合的设置信息
	GroupedAggregateData grouped_aggregate_data;
	
            vector<unique_ptr<Expression>> groups;  // 分组字段, 在这里例子中是 name
            vector<vector<idx_t>> grouping_functions;  // 这个例子中没有使用分组函数
            vector<LogicalType> group_types;        // 分组字段的类型, 在这里例子中是 VARCHAR


            vector<unique_ptr<Expression>> aggregates;  // 聚合表达式, 这里是 count(freight), sum(freight)
            vector<LogicalType> payload_types;         // freight 字段的类型
            vector<LogicalType> aggregate_return_types; // 聚合函数的返回类型, 这里是 BIGINT, BIGINT
            vector<BoundAggregateExpression *> bindings; // 对应聚合函数的 binding 信息
            idx_t filter_count;
        
	vector<GroupingSet> grouping_sets;
	//! The radix partitioned hash tables (one per grouping set)
	vector<HashAggregateGroupingData> groupings;
	unique_ptr<DistinctAggregateCollectionInfo> distinct_collection_info;
	//! A recreation of the input chunk, with nulls for everything that isnt a group
	vector<LogicalType> input_group_types;

	// Filters given to Sink and friends
	unsafe_vector<idx_t> non_distinct_filter;
	unsafe_vector<idx_t> distinct_filter;

	unordered_map<Expression *, size_t> filter_indexes;
}

class HashAggregateLocalSinkState: public LocalSinkState {
	DataChunk aggregate_input_chunk;
	
	vector<HashAggregateGroupingLocalState> grouping_states;  // 保存对每个grouping的状态信息
	// operator.local_state.grouping_states[group_idx].table_state.ht 存储一个 hashtable
	
	AggregateFilterDataSet filter_set;
}

class HashAggregateGlobalSinkState: public GlobalSinkState {
    vector<HashAggregateGroupingGlobalState> grouping_states;
	vector<LogicalType> payload_types;
	//! Whether or not the aggregate is finished
	bool finished = false;
}

class AggregateFunction: public BaseScalarFunction {
    //! The hashed aggregate state sizing function
	aggregate_size_t state_size;  // 
	//! The hashed aggregate state initialization function
	aggregate_initialize_t initialize;
	//! The hashed aggregate update state function
	aggregate_update_t update;
	//! The hashed aggregate combine states function
	aggregate_combine_t combine;
	//! The hashed aggregate finalization function
	aggregate_finalize_t finalize;
	//! The simple aggregate update function (may be null)
	aggregate_simple_update_t simple_update;
	//! The windowed aggregate custom function (may be null)
	aggregate_window_t window;
	//! The windowed aggregate custom initialization function (may be null)
	aggregate_wininit_t window_init = nullptr;

	//! The bind function (may be null)
	bind_aggregate_function_t bind;
	//! The destructor method (may be null)
	aggregate_destructor_t destructor;

	//! The statistics propagation function (may be null)
	aggregate_statistics_t statistics;

	aggregate_serialize_t serialize;
	aggregate_deserialize_t deserialize;
	//! Whether or not the aggregate is order dependent
	AggregateOrderDependent order_dependent;
	//! Additional function info, passed to the bind
	shared_ptr<AggregateFunctionInfo> function_info;
}

分别在 Sink 方法, Combine 方法, Finalize 方法中添加断点,调试执行该算子的代码,跟踪其执行流程,进一步理解该算子的数据结构、算法。

TODO:

  • 理解 duckdb 是如何通过 template 来实现不同类型的聚合函数的处理的。
  • sum(x) 是如何映射到特定版本的函数的?

Storage

CLI

read_csv 表函数分析

  • ReadCSVBind -- 返回表的 schema 信息,主要是列名 和 类型信息。

      typedef unique_ptr<FunctionData> (*table_function_bind_t)(
          ClientContext &context, 
          TableFunctionBindInput &input,
          vector<LogicalType> &return_types, 
          vector<string> &names
      );
    

    在 Plan 阶段,会调用, 返回值 FunctionData 会传递到后续调用中,例如 TableFunctionInput.bind_data;

  • ReadCSVFunction -- main function

     typedef void (*table_function_t)(
          ClientContext &context, 
          TableFunctionInput &data, 
          DataChunk &output
     );
    

    在 Pipeline 执行阶段,有 Source Operator 调用。

    1. 为什么是3个线程在执行?
    2. 这三个线程是如何协作的?例如并行处理文件的行数?
  • ReadCSVInitGlobal -- 可选

  • ReadCSVInitLocal -- 可选

    CVS Scanner: 按照每 8M 一个 scanner , 每个 scanner 分配一个扫描范围:

    • 如果一个 scanner 扫描完了当前block,需要继续扫描下一个 scanner 的数据,直到遇到一个换行符。
    • 对非第一个 scanner,会先忽略直到第一个换行符的数据

    优化: CSV Scanner 是否可以考虑使用 SIMD 加速?

    StringValueScanner: BaseScanner:

    • BaseScanner
      • iterator: CSVIterator
        • pos: CSVPosition( buffer_idx, buffer_pos) -- 当前位置
        • boundary: CSVBoundary( buffer_idx, buffer_pos, boundary_idx, end_pos ) -- 边界
      • CSVFileHandle
        • CSVOptions
    duckdb::CSVBuffer::CSVBuffer(duckdb::CSVFileHandle &, duckdb::ClientContext &, unsigned long long, unsigned long long, unsigned long long, unsigned long long) csv_buffer.cpp:28
    duckdb::CSVBuffer::CSVBuffer(duckdb::CSVFileHandle &, duckdb::ClientContext &, unsigned long long, unsigned long long, unsigned long long, unsigned long long) csv_buffer.cpp:25
    duckdb::make_shared_ptr<…>(duckdb::CSVFileHandle &, duckdb::ClientContext &, unsigned long long &, unsigned long long &&, unsigned long long &, unsigned long long &&) helper.hpp:73
    duckdb::CSVBuffer::Next(duckdb::CSVFileHandle &, unsigned long long, unsigned long long, bool &) csv_buffer.cpp:44
    duckdb::CSVBufferManager::ReadNextAndCacheIt() csv_buffer_manager.cpp:42
    duckdb::CSVBufferManager::GetBuffer(unsigned long long) csv_buffer_manager.cpp:71
    duckdb::CSVIterator::Next(duckdb::CSVBufferManager &) scanner_boundary.cpp:54
    duckdb::CSVGlobalState::Next(duckdb::optional_ptr<…>) global_csv_state.cpp:143
    duckdb::ReadCSVInitLocal(duckdb::ExecutionContext &, duckdb::TableFunctionInitInput &, duckdb::GlobalTableFunctionState *) read_csv.cpp:212
       -- 会调用 TableFunction.init_local 进行初始化,调用 global_state.Next() 获取 当前任务的一个 CSV-Scanner
    duckdb::TableScanLocalSourceState::TableScanLocalSourceState(duckdb::ExecutionContext &, duckdb::TableScanGlobalSourceState &, const duckdb::PhysicalTableScan &) physical_table_scan.cpp:75
    duckdb::TableScanLocalSourceState::TableScanLocalSourceState(duckdb::ExecutionContext &, duckdb::TableScanGlobalSourceState &, const duckdb::PhysicalTableScan &) physical_table_scan.cpp:71
    duckdb::make_uniq<…>(duckdb::ExecutionContext &, duckdb::TableScanGlobalSourceState &, const duckdb::PhysicalTableScan &) helper.hpp:65
    
    duckdb::PhysicalTableScan::GetLocalSourceState(duckdb::ExecutionContext &, duckdb::GlobalSourceState &) const physical_table_scan.cpp:84
       -- 每个 PipelineExecutor 中的 Source/Sink 都会有 local state
    duckdb::PipelineExecutor::PipelineExecutor(duckdb::ClientContext &, duckdb::Pipeline &) pipeline_executor.cpp:27
    
    duckdb::PipelineExecutor::PipelineExecutor(duckdb::ClientContext &, duckdb::Pipeline &) pipeline_executor.cpp:14
    duckdb::make_uniq<…>(duckdb::ClientContext &, duckdb::Pipeline &) helper.hpp:65
    duckdb::PipelineTask::ExecuteTask(duckdb::TaskExecutionMode) pipeline.cpp:34
    duckdb::ExecutorTask::Execute(duckdb::TaskExecutionMode) executor_task.cpp:44
    duckdb::TaskScheduler::ExecuteForever(std::__1::atomic<…> *) task_scheduler.cpp:189
    duckdb::ThreadExecuteTasks(duckdb::TaskScheduler *, std::__1::atomic<…> *) task_scheduler.cpp:279
    

    CSVGlobalState:

    • fileScans: vector<shared_ptr> ; 每一个 csv 文件一个 CSVFileScan
      • file_path
      • file_size: 文件大小
      • buffer_manager: 管理文件的多个 buffer
        • cacheed_buffers: vector -- 每个 CSVBuffer 对应一个 PipelineExecutor
          • buffer_idx
          • actual_buffer_size
          • handle -- 指向 buffer 数据 每次 CSVGlobalState.Next() 获取下一个 StringValueScanner 时,会分配一个 Buffer(8M),将文件数据读取到 buffer 中。在后面 scan 处理 时,从内存中进行读取。
    • 这一块的代码写得很复杂。
  • table_function.pushdown_complex_filter 可选

    在 Optimizer 阶段,会调用。

    • 在前面应该有一个处理 bind_data 的阶段,获取表的 types, names 等信息。

[ ] 需要整理出一个 table_function 的调用时序图,帮助理解每一个可选函数存在的价值。

阅读了上述信息后,可以开始评估编写我们自己的 table function 了,接下来,我们需要的一个场景是:

  1. 在 Java 中使用 JDBC 读取业务数据,将 ResultSet 写入到一个 DataChunk 中。(提供API 写 data chunk)
  2. 创建一个 table function,将 DataChunk 中的数据返回给 DuckDB。
    
    DataChunk chunk = new DataChunk();  
    // prvoide 
    connection.registerDataChunk("asdf", chunk);
    
    connection.executeQuery("select * from read_asdf");

或者: jdbc ResultSet -> arrow.vector.VectorSchemaRoot -> ArrowStreamReader -> arrow.c.ArrowArrayStream -> duckdb

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.c.ArrowArrayStream;
import org.apache.arrow.c.ArrowArrayStreamListener;

import java.io.ByteArrayOutputStream;
import java.io.IOException;

public class ArrowConversionExample {

    public static ArrowArrayStream convertToArrowArrayStream(VectorSchemaRoot root) throws IOException {
        BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        ArrowStreamWriter writer = new ArrowStreamWriter(root, null, out);

        // Write the VectorSchemaRoot to the output stream
        writer.start();
        writer.writeBatch();
        writer.end();

        // Create an ArrowArrayStream from the output stream
        ArrowArrayStream arrayStream = new ArrowArrayStream();
        arrayStream.setListener(new ArrowArrayStreamListener() {
            @Override
            public void onNext(ArrowRecordBatch batch) {
                // Handle the ArrowRecordBatch
            }

            @Override
            public void onError(Throwable t) {
                // Handle the error
            }

            @Override
            public void onCompleted() {
                // Handle completion
            }
        });

        // Initialize the ArrowArrayStream with the written data
        arrayStream.init(out.toByteArray(), allocator);

        return arrayStream;
    }

    public static void main(String[] args) throws IOException {
        // Example usage
        BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
        Schema schema = new Schema(/* define your schema here */);
        VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);

        // Populate the VectorSchemaRoot with data
        // ...

        ArrowArrayStream arrayStream = convertToArrowArrayStream(root);

        // Use the ArrowArrayStream
        // ...
    }
}

这个方式是有复制的,需要考虑如何减少复制。

  1. https://arrow.apache.org/docs/java/jdbc.html
  2. https://duckdb.org/docs/api/java#arrow-import
  • ArrowReader
    • Field(name, nullable, type, ...)
    • Schema(fields, metadata)
    • batches: List[ArrowRecordBatch]
      • ArrowRecordBatch( length, nodes, buffers) // each field encode in a buffer

这一块需要研究一下,整体成本相比自己实现可能会简单一些。

  • Pipeline::Schedule 负责创建 PipelineTask 并提交调度。 对 TableScan, 会调用 source_state->MaxThreads() 获取最大线程数,然后创建对应的 PipelineTask,一般会调用 对应 operator.globalState.MaxThreads() 对 CSV 文件,取文件大小 / 8M
  • TODO 理解 PipelineEvent

Extensions

Typed IR

使用 Typed IR 来作为 Physical Plan 的输入:

  1. 可以将 Executor 作为一个独立的模块。
  2. Executor 可以独立嵌入到的应用中,由其他的应用生成 TypedIR,然后交给 Executor 执行。
  3. TypedIR 可以手工编写,独立优化、调试,更好的进行新 operator 的开发、测试、优化工作。

subquery optimization

subquery 的优化包括:

  1. 将 行级的 subquery 转换为 batch的 JOIN 操作

表达能力:

  • 与 MDX 的 Top-Down + Pull 模型匹配,可以支持复杂的度量计算。
    • 类似于 DAX 的 Calculate 函数,修改查询上下文。

Misc

  1. DataType
  2. Value
  3. Vector[DataType]
    1. FlatVector
    2. DictionaryVector
    3. ConstantVector
    4. SequenceVector
    5. SelectorVector ......
  4. DataChunk
  5. Operator
    1. Filter
    2. Projection
    3. Join
    4. Hash Group - Aggregate
    5. Window
  6. Pipeline