hash_group_by
我们使用示例 SQL 语句来学习 hash_group_by 算子的原理:
select name, count(freight), sum(freight)
from sale_orders so
left join customers c on c.customer_id = so.customer_id
where gender = 'M' and name like 'abc%' and freight > 10 and freight < 50
group by name
在执行计划中,可以看到如下的 hash_group_by 算子, 该算子是 pipeline 的 Sink 节点,其谦虚处理节点为: join |> projection |> hash_group_by
┌───────────────────────────┐
│ HASH_GROUP_BY │
│ ──────────────────── │
│ Groups: #0 │
│ │
│ Aggregates: │
│ count_star() │
│ sum(#1) │
│ │
└─────────────┬─────────────┘
对应的源代码为: src/execution/operator/aggregate/physical_hash_aggregate.cpp
class PhysicalHashAggregate : public PhysicalOperator {
//! The grouping sets, SQL 中分组+聚合的设置信息
GroupedAggregateData grouped_aggregate_data;
vector<unique_ptr<Expression>> groups; // 分组字段, 在这里例子中是 name
vector<vector<idx_t>> grouping_functions; // 这个例子中没有使用分组函数
vector<LogicalType> group_types; // 分组字段的类型, 在这里例子中是 VARCHAR
vector<unique_ptr<Expression>> aggregates; // 聚合表达式, 这里是 count(freight), sum(freight)
vector<LogicalType> payload_types; // freight 字段的类型
vector<LogicalType> aggregate_return_types; // 聚合函数的返回类型, 这里是 BIGINT, BIGINT
vector<BoundAggregateExpression *> bindings; // 对应聚合函数的 binding 信息
idx_t filter_count;
vector<GroupingSet> grouping_sets;
//! The radix partitioned hash tables (one per grouping set)
vector<HashAggregateGroupingData> groupings;
unique_ptr<DistinctAggregateCollectionInfo> distinct_collection_info;
//! A recreation of the input chunk, with nulls for everything that isnt a group
vector<LogicalType> input_group_types;
// Filters given to Sink and friends
unsafe_vector<idx_t> non_distinct_filter;
unsafe_vector<idx_t> distinct_filter;
unordered_map<Expression *, size_t> filter_indexes;
}
class HashAggregateLocalSinkState: public LocalSinkState {
DataChunk aggregate_input_chunk;
vector<HashAggregateGroupingLocalState> grouping_states; // 保存对每个grouping的状态信息
// operator.local_state.grouping_states[group_idx].table_state.ht 存储一个 hashtable
AggregateFilterDataSet filter_set;
}
class HashAggregateGlobalSinkState: public GlobalSinkState {
vector<HashAggregateGroupingGlobalState> grouping_states;
vector<LogicalType> payload_types;
//! Whether or not the aggregate is finished
bool finished = false;
}
class AggregateFunction: public BaseScalarFunction {
//! The hashed aggregate state sizing function
aggregate_size_t state_size; //
//! The hashed aggregate state initialization function
aggregate_initialize_t initialize;
//! The hashed aggregate update state function
aggregate_update_t update;
//! The hashed aggregate combine states function
aggregate_combine_t combine;
//! The hashed aggregate finalization function
aggregate_finalize_t finalize;
//! The simple aggregate update function (may be null)
aggregate_simple_update_t simple_update;
//! The windowed aggregate custom function (may be null)
aggregate_window_t window;
//! The windowed aggregate custom initialization function (may be null)
aggregate_wininit_t window_init = nullptr;
//! The bind function (may be null)
bind_aggregate_function_t bind;
//! The destructor method (may be null)
aggregate_destructor_t destructor;
//! The statistics propagation function (may be null)
aggregate_statistics_t statistics;
aggregate_serialize_t serialize;
aggregate_deserialize_t deserialize;
//! Whether or not the aggregate is order dependent
AggregateOrderDependent order_dependent;
//! Additional function info, passed to the bind
shared_ptr<AggregateFunctionInfo> function_info;
}
分别在 Sink 方法, Combine 方法, Finalize 方法中添加断点,调试执行该算子的代码,跟踪其执行流程,进一步理解该算子的数据结构、算法。
TODO:
- 理解 duckdb 是如何通过 template 来实现不同类型的聚合函数的处理的。
- sum(x) 是如何映射到特定版本的函数的?