datafusion 窗口函数执行分析
API: 使用窗口函数
窗口函数是针对数据分析的一个 SQL 查询扩展,其执行顺序如下图中,
一般的,窗口函数的语法如下:
来源:https://duckdb.org/docs/stable/sql/functions/window_functions
datafusion 提供了对窗口函数的支持,不过,目前的版本支持程度仍然不如 duckdb,我目前发现的问题是: frame 中目前仅支持 literal expr, 这限制了基于当前行的 dynamic range 的支持能力,例如,典型的 上年同期年累 这样的计算。
在 duckdb 中,可以表示为
SUM( SUM(amount) ) over (order by order_date
range between to_days( order_date - (date_trunc('year', order_date'year') - interval 1 year)) preceding
and interval 1 year preceding )
本文分析的目的之一就是对 datafusion 的窗口函数执行机制进行研究,并评估为其添加上这类能力的可行性。
SPI:创建自定义的窗口函数
datafusion 中支持4种 自定义函数:
-
udf: scalar 函数
-
udtf: 表函数,如 csv_read 之类的函数
-
udwf: User Define Window Function:以 partition 为单位的窗口函数
主要针对非聚合类的窗口函数,参见:
see: datafusion/physical-expr/src/window/standard.rs StandardWindowExpr::evaluate(按照这个源代码整理,与代码注释对应不上)
[ uses_window_frame
][ supports_bounded_execution
][ include_rank
]function_to_implement functions true * * [ evaluate
]nth_value false (default) * true [ evaluate_all_with_rank
]rank, dense_rank, cume_dist false * false (default) [ evaluate_all
]
窗口函数:
category functions supported desc uses_window_frame supports_bounded_execution include_rank ranking cume_dist df, duckdb 累积百分比 X X true ranking dense_rank df, duckdb 1,2,2,3 风格的排名 X true true ranking rank df, duckdb 1,2,2,4 风格的排名 X true true ranking ntile(n) df, duckdb 按 n 等分 X X X ranking percent_rank df, duckdb 排名百分比 X X true ranking row_number df, duckdb 行号 X true X analytical first_value(expr) df, duckdb X X X analytical lag(expr, offset, default) df, duckdb X true X analytical last_value(expr) df, duckdb X X X analytical lead(expr, offset, default) df, duckdb X true X analytical nth_value(expr, n) df, duckdb true true X -
-
udaf: User Define Aggregate Function 针对形如 SUM, COUNT 之类的函数,自定义函数参考:advanced udaf 核心接口是 Accumulator, GroupsAccumulator
classDiagram class WindowUDFImpl { <<trait>> +partition_evaluator(PartitionEvaluatorArgs args) PartitionEvaluator } class PartitionEvaluator { <<trait>> +is_causal(): bool +uses_window_frame(): bool +supports_bounded_execution(): bool +include_rank(): bool +evaluate(&~ArrayRef~ values, &Range~usize~ range): Result<ScalarValue> +evaluate_all(values:&~ArrayRef~, num_rows:uszie): Result<ArrayRef> +evaluate_all_with_rank(usize num_rows, &[Range~usize~] ranks_in_partition) -> Result<ArrayRef> } WindowUDFImpl .. PartitionEvaluator class AggregateUDFImpl { <<trait>> +accumulator(AccumulatorArgs args): Result~Box~dyn Accumulator~~ +create_groups_accumulator(AccumulatorArgs args): Result<Box<dyn GroupsAccumulator>> } class Accumulator { <<trait>> +update_batch(&[ArrayRef]): Result<()> +retract_batch(&[ArrayRef]): Result<()> +state(): Result<Vec<ScalaValue>> +merge_batch(&[ArrayRef]): Result<()> +evaluate(): Result<ScalaValue> } class GroupsAccumulator { <<trait>> +update_batch(&[ArrayRef], Option<&BooleanArray>, usize): Result<()> +state(EmitTo): Result<ArrayRef> +merge_batch(&[ArrayRef], &[usize], Option<&BooleanArray>, usize): Result<()> +evaluate(EmitTo): Result<ArrayRef> } AggregateUDFImpl .. Accumulator AggregateUDFImpl .. GroupsAccumulator
classDiagram class WindowExpr { <<trait>> +evaluate_args(batch: &RecordBatch) Result~Vec<ArrayRef>~ +evaluate(batch: &RecordBatch) Result~ArrayRef~ +evaluate_stateful(partition_batches: &PartitionBatches, window_agg_state: &mut PartitionWindowAggStates) Result~()~ } note for AggregateWindowExpr "for user-defined-aggregate-function" class AggregateWindowExpr { <<trait>> } note for StandardWindowExpr "for user-defined-window-function" class StandardWindowExpr { <<struct>> } class SlidingAggregateWindowExpr { <<struct>> } class PlainAggregateWindowExpr { <<struct>> } WindowExpr <|-- AggregateWindowExpr AggregateWindowExpr <|-- SlidingAggregateWindowExpr AggregateWindowExpr <|-- PlainAggregateWindowExpr WindowExpr <|-- StandardWindowExpr
物理计划生成:选择算子、WindowExpr 决策树
flowchart TD A[ 1: function type ?] == window function ==> StandardWindowExpr A == aggregate function ==> C[frame start unbounded ?] A2[2: frame end bounded ?] A2 == unbounded following ==> WA[WindowAggExec] A2 == bounded ==> BWA[BoundedWindowAggExec] C == unbounded preceding ==> plain[PlainAggregateWindowExpr] C == bounded ==> sliding[SlidingAggregateWindowExpr]
具体,可以查看如下的代码实例,通过代码的调试等方式,可以帮助我们理解不同的算子下的执行流程:
function | operator | window expr | demo |
---|---|---|---|
aggregate | window agg exec | plain aggregate | test_sum_1 |
aggregate | window agg exec | sliding aggregate | test_sum2 |
aggregate | bounded window agg exec | plain aggregate | test_sum4 |
aggregate | bounded window agg exec | sliding aggregate | test_sum3 |
算子: WindowAggExec 分析
根据上述分析,WindowAggExec 的 frame 形如:between bounded and unbounded following
- 从上游获取 RecordBatch,追加到 self.batches 中,直至全部读取完成,进入到第2步。
- 将全部的 RecordBatch 合并为一个 RecordBatch
- 在 batch 上求值 sort columns (partition key, maybe + order key)
- 按照 partition key 对 batch 进行 partition,由于 batch 已经排序,因此,在batch 中每个分区的数据已经是连续存放的,一个分区的数据接着
上一个分区的数据。每个分区可以表示为 Range
- foreach partition,调用函数 WindowAggExec::compute_window_aggregates 进行窗口函数求值
- foreach window_expr 调用 window_expr.evaluate(batch) 求值(多个窗口函数可以共享同一个窗口)
- window_expr.evaluate(batch) : single partition, single window_exp
- foreach row in batch
AggregateWindowExpr::get_result_column
- 计算 当前行的 window range
- window_expr.
get_aggregate_result_inside_range
: evaluate for single row with a range(window)-
PlainAggregateWindowExpr::get_aggregate_result_inside_range: window 0..end end 是递增的
frame: 0 .. LAST
- 对比 last_range,将 shift in rows 调用 accumulator.update_batch
- 获取 accumulator.evaluate
-
SlidingAggregateWindowExpr::get_aggregate_result_inside_range:
frame: bounded .. LAST
- 对比 last_range,将 shift out rows 调用 accumulator.retract_batch
- 将 shift in rows 调用 accumulator.update_batch
- 获取 accumulator.evaluate
-
总之: SlidingAggregateWindowExpr::get_aggregate_result_inside_range 可以覆盖 PlainAggregateWindowExpr::get_aggregate_result_inside_range 的能力。
-
- foreach row in batch
- window_expr.evaluate(batch) : single partition, single window_exp
- foreach window_expr 调用 window_expr.evaluate(batch) 求值(多个窗口函数可以共享同一个窗口)
算子: BoundedWindowAggExec
对 frame.end 不是 unbounded following 的窗口类型,datafusion 视为 bounded window,使用 BoundedWindowAggExec 算子来处理该窗口函数的计算。
根据 frame.start 的不同,会选择不同类型的 WindowExpr:
- 对标准的窗口函数(UDWF),使用 StandardWindowExpr + PartitionEvaluator:
select *, rank() over (partition by product_id order by order_date) as rank1 from t1
- 对聚合类窗口函数(UDAF), frame.start 是 unbounded preceding 的,使用 PlainAggregateWindowExpr + Accumulator
select *, sum(amount) over (partition by product_id order by order_date rows between unbounded preceding and 1 following) as amounts1 from t1
- 对聚合类窗口函数(UDAF), frame.start 不是 unbounded preceding 的,使用 SlidingAggregateWindowExpr + Accumulator
select *, sum(amount) over (order by order_date range between interval '1 days' preceding and interval '1 days' following) as slide_amounts from t1
执行流程:
- BoundedWindowAggStream 从上游读取 RecordBatch (已按 window order by 进行排序)
- 根据 RecordBatch 更新 partition_key -> PartitionBatchState( record_batch )
- 调用 BoundedWindowAggStream::compute_aggregates 计算窗口函数
- foreach window_expr, call window_expr.evaluate_stateful (1 window_expr, multi partition)
- foreach partition, call window_expr.get_result_column (1 window_expr, 1 partition)
- foreach row which is not calculated previous
- 计算 row 对应的 frame range
- 如果 RecordBatch 满足 frame range,则调用 window_expr.get_aggregate_result_inside_range 进行聚合求值
- 相比上一行的frame,对移出的行调用 accumulator::retract_batch
- 对移入的行调用 accumulator::update_batch
- foreach row which is not calculated previous
- foreach partition, call window_expr.get_result_column (1 window_expr, 1 partition)
- foreach window_expr, call window_expr.evaluate_stateful (1 window_expr, multi partition)
#![allow(unused)] fn main() { struct BoundedWindowAggStream { window_expr: Vec<WindowExprRef>, input_buffer: RecordBatch, partition_buffers: PartitionBatches, // IndexMap<PartitionKey, PartitionBatchState> window_agg_states: Vec<PartitionWindowAggStates>, // indexed by window_expr, IndexMap<PartitionKey, WindowState> } struct PartitionBatchState { record_batch: RecordBatch, most_recent_row: Option<RecordBatch> is_end: bool, n_out_row: usize } struct WindowState { state: WindowAggState, window_fn: WindowFn, // 累加器会持有状态,通过 update_batch, retract_batch, merge_batch 更新状态 } struct WindowAggState { window_frame_range: Range<usize>, window_frame_ctx: Option<WindowFrameContext>, last_calculated_index: usize, offset_pruned_rows: usize, out_col: ArrayRef, n_row_result_missing: usize, is_end: bool } }
classDiagram class BoundedWindowAggStream { <<object>> window_exprs: Vec~WindowExprRef~ input_buffer: RecordBatch finished: bool } class PartitionBatchState { <<object>> record_batch: RecordBatch most_recent_row: Option<RecordBatch> is_end: bool n_out_row: usize } class WindowState { <<object>> state: WindowAggState window_fn: WindowFn } class WindowAggState { <<object>> window_frmae_range: Range<usize> window_frame_ctx: Option<WindowFrameContext> last_cacluated_index: usize offset_pruned_rows: usize out_col: ArrayRef n_row_result_missing: usize is_end: bool } class WindowFn { <<object>> partition_evaluator: PartitionEvaulator accumulator: Accumulator } BoundedWindowAggStream o-- PartitionBatchState : partition_buffers[PartitionKey] BoundedWindowAggStream o-- WindowState : window_agg_state[window_expr_idx][PartitionKey] WindowState o-- WindowAggState WindowState o-- WindowFn
BoundedWindowAggStream::poll_next_inner
- poll RecordBatch from input
BoundedWindowAggStream::compute_aggregates
- foreach window_expr,
window_expr::evaluate_stateful(partion_batches, partition_window_agg_states)
AggregateWindowExpr::evalute_stateful
foreach partition 1.
- foreach window_expr,
处理过程:
- 无需在读取了全部分区数据后,再进行窗口函数计算,可以在读入 batch 的过程中增量的处理。
- 思考:WindowAggExec 是否可以转换为 逆序后使用 BoundedWindowAggExec?