一个简单的关联、分组聚合查询的执行计划分析
sql
select name, count(freight), sum(freight)
from sale_orders so
left join customers c on c.customer_id = so.customer_id
where name = 'IB89Nf23kAom'
group by name;
Physical Plan
flowchart BT TS1[Table Scan: customers projections: customer_id,name filters: name = ? ] TS2[Table Scan: sale_orders projections: customer_id, freight filters: customer_id >= 1 ] F1[Filter customer_id <= 999999 ] HJ[Hash Join] Projection1[Projection name, freight ] HashGroupBy1[Hash Group By Groups: #0 Aggregates: count_start, sum #1 ] TS1 --> F1 TS2 --> HJ F1 --> HJ HJ --> Projection1 Projection1 --> HashGroupBy1
Pipeline
flowchart BT subgraph pipelin1 TableScan1[Table Scan: customers projections: customer_id,name filters: name = ? ] Filter1[Filter customer_id <= 999999 ] HashJoin1[Hash Join] end subgraph pipelin2 TableScan2[Table Scan: sale_orders projections: customer_id, freight filters: customer_id >= 1 ] Projection1[Projection name, freight ] HashGroupBy1[Hash Group By Groups: #0 Aggregates: count_start, sum #1 ] end TableScan1 --> Filter1 TableScan2 --> HashJoin1 Filter1 --> HashJoin1 HashJoin1 --> Projection1 Projection1 --> HashGroupBy1 pipelin1 --> pipelin2
- 执行计划拆分为多个 pipeline, 每个 pipeline 是一个从 Source 到 Sink 的 Push 模型.
- 如何根据物理执行计划生成 pipeline ?
- 多个 pipeline 之间构成了一个依赖图,依赖的 pipeline 执行完毕后,被依赖的 pipeline 才能执行。
- 每个 pipeline 会被拆分为多个 Task(分区并行),每个 Task 相当于一个最小的调度单元。
- 当 Pipeline 的所有 task 执行完毕,这个 pipeline 才执行完毕,可以执行被依赖的的 pipeline。
- 在 Pipeline 中有3种角色:
-
Source: 管道的起点,从外部设备读取数据。(IO 阻塞式:这是否会降低系统的处理能力?)
-
GetData(): 读取数据
Source 需要考虑并发,与单个任务相关的数据存储在 localState 中,与整个管道相关的数据存储在 globalState 中。
- globalState 存储在 pipeline 中,同一个 pipeline 的所有 task 共享,设计到并发安全控制时,需要加锁
- localState 存储在 PipelineExecutor 中。由 PipelineExecutor 负责创建和销毁。 不同的 Source 节点有自己的 localState/globalState 定义。
-[ ] 执行计划是如何处理分区的?
-
-
Operator: pure function
- Execute(): 处理单个批次的输入数据,无需考虑并发安全。
-
Sink: 管道的终点,需要协调多个任务的结果,进行合并
- Sink(): 处理单个批次的输入数据
- Combine(): 单个 Task 的全部 Source 处理完毕,进行单个任务内的合并。(其他任务可能会 Sink/Combine)
- Finalize(): 所有 Task 处理完毕,进行最终的合并。
考虑到存在并发问题,需要区分 localState 和 globalState,不同的 Sink 节点有自己的 localState/globalState 定义。
-
主要算子的执行逻辑
PhysicalTableScan (Source in pipeline1 & pipeline2)
- 分区如何处理? Source 侧如何处理并发?
- 每次 Pipeline Execute最多 50 个 Chunk?
- bind_data 指向 DataTable
- 为 TableScanBindData 增加 ToString 方法,方便调试
- unordered_map 无法显示, 考虑为 ScanFilterInfo/TableScanState 添加 ToString 方法
- 实际入口:TableScanFunc() 负责读取数据 // 边界:TableFunction,可能要对比 from table 与 from csv_read 的执行流程的区别
PhysicalFilter (Operator in pipeline1)
- filter 表达式的计算流程?(解释执行的成本如何?)
- 这一块的代码是否会进行 SIMD 优化?
BinaryExecutor::SelectGenericLoop<…>(const int *, const int *, const SelectionVector *,
const SelectionVector *, const SelectionVector *, unsigned long long, ValidityMask &,
ValidityMask &, SelectionVector *, SelectionVector *) binary_executor.hpp:444
--- 这个方法才是最终的向量执行代码,在前面有5-6层的解释和 dispatch 过程,其向量化是通过 compiler auto-vectorization 实现的。
BinaryExecutor::SelectGenericLoopSelSwitch<…>(const int *, const int *, const SelectionVector *,
const SelectionVector *, const SelectionVector *, unsigned long long, ValidityMask &,
ValidityMask &, SelectionVector *, SelectionVector *) binary_executor.hpp:459
BinaryExecutor::SelectGenericLoopSwitch<…>(const int *, const int *, const SelectionVector *,
const SelectionVector *, const SelectionVector *, unsigned long long, ValidityMask &,
ValidityMask &, SelectionVector *, SelectionVector *) binary_executor.hpp:475
BinaryExecutor::SelectGeneric<…>(Vector &, Vector &, const SelectionVector *,
unsigned long long, SelectionVector *, SelectionVector *) binary_executor.hpp:491
BinaryExecutor::Select<…>(Vector &, Vector &, const SelectionVector *,
unsigned long long, SelectionVector *, SelectionVector *) binary_executor.hpp:515
TemplatedSelectOperation<…>(Vector &, Vector &, optional_ptr<…>, unsigned long long,
optional_ptr<…>, optional_ptr<…>, optional_ptr<…>) execute_comparison.cpp:104
VectorOperations::LessThanEquals(Vector &, Vector &, optional_ptr<…>, unsigned long long,
optional_ptr<…>, optional_ptr<…>, optional_ptr<…>) execute_comparison.cpp:345
ExpressionExecutor::Select(const BoundComparisonExpression &, ExpressionState *,
const SelectionVector *, unsigned long long, SelectionVector *,
SelectionVector *) execute_comparison.cpp:369
ExpressionExecutor::Select(const Expression &, ExpressionState *, const SelectionVector *,
unsigned long long, SelectionVector *, SelectionVector *) expression_executor.cpp:236
ExpressionExecutor::SelectExpression(DataChunk &, SelectionVector &) expression_executor.cpp:90
PhysicalFilter::ExecuteInternal(ExecutionContext &, DataChunk &, DataChunk &, GlobalOperatorState &, OperatorState &) const physical_filter.cpp:45
......
当然,要对比一下在 release 模式下,是否会进行编译优化?从这个执行栈来看,这个解释过程还是很啰嗦的。如果进行更好的特化或者 TypedIR 的方式,相信在这一块的 执行效率会更高。
template <class LEFT_TYPE, class RIGHT_TYPE, class OP, bool NO_NULL, bool HAS_TRUE_SEL, bool HAS_FALSE_SEL>
static inline idx_t
SelectGenericLoop(const LEFT_TYPE *__restrict ldata, const RIGHT_TYPE *__restrict rdata,
const SelectionVector *__restrict lsel, const SelectionVector *__restrict rsel,
const SelectionVector *__restrict result_sel, idx_t count, ValidityMask &lvalidity,
ValidityMask &rvalidity, SelectionVector *true_sel, SelectionVector *false_sel) {
idx_t true_count = 0, false_count = 0;
for (idx_t i = 0; i < count; i++) {
auto result_idx = result_sel->get_index(i);
auto lindex = lsel->get_index(i);
auto rindex = rsel->get_index(i);
if ((NO_NULL || (lvalidity.RowIsValid(lindex) && rvalidity.RowIsValid(rindex))) &&
OP::Operation(ldata[lindex], rdata[rindex])) {
if (HAS_TRUE_SEL) {
true_sel->set_index(true_count++, result_idx);
}
} else {
if (HAS_FALSE_SEL) {
false_sel->set_index(false_count++, result_idx);
}
}
}
if (HAS_TRUE_SEL) {
return true_count;
} else {
return count - false_count;
}
}
TODO: 这个方法是最终的向量化执行代码,依赖于编译器的 auto-vectorization,这段代码是否高效呢? 可以截取一段特化的汇编代码,来看看其是否高效?
- l_sel, r_sel 的操作可否向量化?
- l_validity, r_validity 的操作可否向量化?
从代码执行流程来看,这个解释执行的过程似乎还是有较高的成本:
-
通过模版技术,根据不同的数据类型、操作符等分派编译,避免在循环处理中进行分支判断操作。
case class Node(operator, left, right): val leftVector = left.execute() val rightVector = right.execute() operator(leftVector, rightVector) -- 这里 operator 是一个向量化的特化版本函数 // a > b && a < 10 Node( BooleanAnd, Node( IntegerGT, a, b), Node( IntegerLT, a, 10) )
-
TypedIR 直接编译成为 SIMD 代码,在一个函数内,完成多个子表达式的计算,共享寄存器,减少 load 操作,执行速度会更快。
这个方式感觉是最理想的,不过开发的成本可能会比较高,而且这个JIT自身的开销如何也需要考虑。
-
此外,在这个SQL中,Optimizer 似乎引入了一个不必要的 filter: customer_id <= 999999 当行数很大时,这个耗时是没有必要的。
PhysicalHashJoin (Sink in pipeline1)
PhysicalHashJoin (operator in pipeline2)
PhysicalProjection (Operator in pipeline2)
PhysicalHashAggregate (Sink in pipeline2)
Pipeline 调度
-
Pipeline Task 的创建
-
主要数据结构
- 画一个 class diagram, 理清楚这几个类的 CRC。
- Event 这个类是干什么的?
-
Executor: (共享粒度)
- physical_plan
- owned_plan
- root_pipeline
- pipelines
-
Pipeline: 共享粒度,存储 global state,多线程访问需要考虑加锁
- source operator
- operators
- sink operator
- source_state: global state for Source
- sink.sink_state: global state for Sink
-
PipelineExecutor (线程粒度)
- pipeline
- thread_context
- ExecutionContext
- local_source_state
- local_sink_state
-
PipelineTask is a ExecutorTask(线程粒度) 在执行过程中有哪些是变化的?
- ExecutorTask:
- task ?
- executor: Executor
- event
- thread_context
- op ?
- pipeline: Pipeline
- pipeline_executor: PipelineExecutor (每个线程一个 PipelineExecutor 示例,存储 localState )
- ExecutorTask:
-
ExecutionContext(client: ClientContext&, thread, pipeline)