datafusion 窗口函数执行分析
API: 使用窗口函数
窗口函数是针对数据分析的一个 SQL 查询扩展,其执行顺序如下图中,
一般的,窗口函数的语法如下:
来源:https://duckdb.org/docs/stable/sql/functions/window_functions
datafusion 提供了对窗口函数的支持,不过,目前的版本支持程度仍然不如 duckdb,我目前发现的问题是: frame 中目前仅支持 literal expr, 这限制了基于当前行的 dynamic range 的支持能力,例如,典型的 上年同期年累 这样的计算。
在 duckdb 中,可以表示为
SUM( SUM(amount) ) over (order by order_date
range between to_days( order_date - (date_trunc('year', order_date'year') - interval 1 year)) preceding
and interval 1 year preceding )
本文分析的目的之一就是对 datafusion 的窗口函数执行机制进行研究,并评估为其添加上这类能力的可行性。
SPI:创建自定义的窗口函数
datafusion 中支持4种 自定义函数:
-
udf: scalar 函数
-
udtf: 表函数,如 csv_read 之类的函数
-
udwf: User Define Window Function:以 partition 为单位的窗口函数
主要针对非聚合类的窗口函数,参见:
see: datafusion/physical-expr/src/window/standard.rs StandardWindowExpr::evaluate(按照这个源代码整理,与代码注释对应不上)
[ uses_window_frame
][ supports_bounded_execution
][ include_rank
]function_to_implement functions true * * [ evaluate
]nth false (default) * true [ evaluate_all_with_rank
]rank, dense_rank, cume_dist false * false (default) [ evaluate_all
]- use_window_frame = false: 表示该函数使用 frame 部分,在单个 partition 粒度上执行
- nth use_window_frame = true
- others: false
- supports_bounded_execution:
- lead
- lag
- row_number
- nth,
- rank
- dense_rank
- include_rank
- cume_dist
- rank
- use_window_frame = false: 表示该函数使用 frame 部分,在单个 partition 粒度上执行
-
udaf: User Define Aggregate Function 针对形如 SUM, COUNT 之类的函数,自定义函数参考:advanced udaf 核心接口是 Accumulator, GroupsAccumulator
classDiagram class WindowUDFImpl { <<trait>> +partition_evaluator(PartitionEvaluatorArgs args) PartitionEvaluator } class PartitionEvaluator { <<trait>> +is_causal(): bool +uses_window_frame(): bool +supports_bounded_execution(): bool +include_rank(): bool +evaluate(&~ArrayRef~ values, &Range~usize~ range): Result<ScalarValue> +evaluate_all(values:&~ArrayRef~, num_rows:uszie): Result<ArrayRef> +evaluate_all_with_rank(usize num_rows, &[Range~usize~] ranks_in_partition) -> Result<ArrayRef> } WindowUDFImpl .. PartitionEvaluator class AggregateUDFImpl { <<trait>> +accumulator(AccumulatorArgs args): Result~Box~dyn Accumulator~~ +create_groups_accumulator(AccumulatorArgs args): Result<Box<dyn GroupsAccumulator>> } class Accumulator { <<trait>> +update_batch(&[ArrayRef]): Result<()> +retract_batch(&[ArrayRef]): Result<()> +state(): Result<Vec<ScalaValue>> +merge_batch(&[ArrayRef]): Result<()> +evaluate(): Result<ScalaValue> } class GroupsAccumulator { <<trait>> +update_batch(&[ArrayRef], Option<&BooleanArray>, usize): Result<()> +state(EmitTo): Result<ArrayRef> +merge_batch(&[ArrayRef], &[usize], Option<&BooleanArray>, usize): Result<()> +evaluate(EmitTo): Result<ArrayRef> } AggregateUDFImpl .. Accumulator AggregateUDFImpl .. GroupsAccumulator
classDiagram class WindowExpr { <<trait>> +evaluate_args(batch: &RecordBatch) Result~Vec<ArrayRef>~ +evaluate(batch: &RecordBatch) Result~ArrayRef~ +evaluate_stateful(partition_batches: &PartitionBatches, window_agg_state: &mut PartitionWindowAggStates) Result~()~ } note for AggregateWindowExpr "for user-defined-aggregate-function" class AggregateWindowExpr { <<trait>> } note for StandardWindowExpr "for user-defined-window-function" class StandardWindowExpr { <<struct>> } class SlidingAggregateWindowExpr { <<struct>> } class PlainAggregateWindowExpr { <<struct>> } WindowExpr <|-- AggregateWindowExpr AggregateWindowExpr <|-- SlidingAggregateWindowExpr AggregateWindowExpr <|-- PlainAggregateWindowExpr WindowExpr <|-- StandardWindowExpr
组合:
-
operator & window_expr for aggregate functions
frame start frame end window_expr operator unbounded preceding unbounded following PlainAggregateWindowExpr WindowAggExec unbounded preceding expr following or current row PlainAggregateWindowExpr BoundedWindowAggExec expr preceding or current row unbounded following SlidingAggregateWindowExpr WindowAggExec expr preceding or current row expr following or current row SlidingAggregateWindowExpr BoundedWindowAggExec -
operator for user-defined-window-functions
frame start frame end window_expr operator unbounded preceding unbounded following StandardWindowExpr WindowAggExec unbounded preceding expr following or current row StandardWindowExpr BoundedWindowAggExec expr preceding or current row unbounded following StandardWindowExpr WindowAggExec expr preceding or current row expr following or current row StandardWindowExpr BoundedWindowAggExec
算子: WindowAggExec 分析
算子: BoundedWindowAggExec
窗口函数执行堆栈
BoundedWindowAggStream as futures_core::stream::Stream::poll_next
BoundedWindowAggStream::poll_next_inner
BoundedWindowAggStream::compute_aggregates
WindowExpr AggregateWindowExpr StandardWindowExpr SlidingAggregateWindowExpr PlainAggregateWindowExpr
#![allow(unused)] fn main() { struct WindowState { -- 每个分区维护一个 WindowState state: WindowAggState, window_fn: WindowFn } struct WindowAggState { window_frame_range: Range<usize>, window_frame_ctx: Option<WindowFrameContext> last_calculated_index: usize, offset_pruned_rows: usize, out_col: ArrayRef, n_row_result_missing: usize, is_end: bool, } partitionBatcheS: IndexMap<PartitionKey, PartitionBatchState> partitionWindowAggStates: IndexMap<PartitionKey, WindowState> }