Performance Compare: DataFusion vs DuckDB, Case 1
Background
CREATE TABLE customer_tags(customer_id INTEGER, tag_id INTEGER); -- 8M, average 1 customer 8 tags
CREATE TABLE customers(customer_id INTEGER, "name" VARCHAR, gender VARCHAR); -- 1M
CREATE TABLE purchase_items(purchase_item_id BIGINT,
purchase_order_id INTEGER,
product_id INTEGER,
quantity INTEGER,
price DOUBLE,
amount DOUBLE); -- 40M
CREATE TABLE purchase_orders(purchase_order_id INTEGER,
order_date DATE,
supplier_id INTEGER,
shop_id INTEGER,
freight DOUBLE); -- 1M
CREATE TABLE sale_items(sale_item_id BIGINT,
sale_order_id INTEGER,
product_id INTEGER,
quantity INTEGER,
price DOUBLE,
amount DOUBLE); -- 80M
CREATE TABLE sale_orders(sale_order_id INTEGER,
order_date DATE,
customer_id INTEGER,
shop_id INTEGER,
freight DOUBLE); -- 20M
CREATE TABLE suppliers(supplier_id INTEGER, "name" VARCHAR); -- 1000
CREATE TABLE tags(tag_id INTEGER, tag_name VARCHAR); -- 100
Query Case 1
select s.order_date, sum(si.amount)
from sale_items si left join sale_orders s on si.sale_order_id = s.sale_order_id -- 80M * 20M
group by s.order_date;
Result
- DuckDB v1.2.2: 336ms(user: 2.667s, sys: 54ms) -- duckdb timer on
samply profile 看sleep 1秒后后面的部分就是执行 SQL 的 profile.
DuckDB 的火焰图构成相对较为简单(下述数据来源于 explain analyze,与flame graph 有点对不上):
- build pipeline: 10ms
- proble pipeline
- scan: 40ms
- probe: 2.26s
- projection: 10ms
- hash group by: 470ms
- DataFusion v46.0.1 810ms(user: 5.47s, sys: 0.99s) -- timer command
samply profile
- HashJoin 算子的效率不够高。 其中 lookup 的开销占比 67%
接近1倍的性能差距。
- 从火焰图结构来看,duckdb 的方式更为简洁(栈调用深度很浅),datafusion 由于采用了大量的异步,导致火焰图也变得复杂。
- dataframe 的执行计划更为复杂,引入了 Repartition, CoalesceBatches 等算子,有较多的跨线程操作,这些理论上可能导致 性能下降。
理解 datafusion 中的算子
graph BT subgraph a[pipeline 1] ds1[datasource: sale_items * 10] --> rp1[repartition sale_order_id, 10] end subgraph b[pipeline 2] ds2[datasource: sale_orders * 10] --> rp2[repartition sale_order_id, 10] end subgraph c[pipeline 3] rp1 -.-> coal1[coalesce batches] coal1 -->|probe| hj1[hashjoin] rp2 -.-> coal2[coalesce batches] coal2 -->|build| hj1[hashjoin partitioned by sale_order_id] hj1 --> coal3[coalesce batches] coal3 --> prj[projection amount,order_date] prj --> aggr1["aggregate partial, order_date->sum(amount)"] aggr1 --> rp3[repartition order_date] end subgraph d[pipeline 4] rp3 -.-> coal4[coalesce batches] coal4 --> aggr2["aggregate final"] end
- pipeline 1,2: 调用栈
<datafusion_datasource::file_stream::FileStream as futures_core::stream::Stream>::poll_next -- DataSourceExec ... datafusion_physical_plan::repartition::RepartitionExec::pull_from_input::{{closure}} -- RepartitionExec ... tokio::runtime::task::harness::poll_future [tokio-1.44.2/src/runtime/task/harness.rs] ... _pthread_start [libsystem_pthread.dylib]
- pipeline 3 - collect build side
<datafusion_physical_plan::coalesce_batches::CoalesceBatchesStream as futures_core::stream::Stream>::poll_next ... datafusion_physical_plan::joins::hash_join::HashJoinStream::collect_build_side datafusion_physical_plan::joins::hash_join::HashJoinStream::poll_next_impl <datafusion_physical_plan::joins::hash_join::HashJoinStream as futures_core::stream::Stream>::poll_next ... <datafusion_physical_plan::coalesce_batches::CoalesceBatchesStream as futures_core::stream::Stream>::poll_next ... <datafusion_physical_plan::projection::ProjectionStream as futures_core::stream::Stream>::poll_next ... <datafusion_physical_plan::aggregates::row_hash::GroupedHashAggregateStream as futures_core::stream::Stream>::poll_next ... datafusion_physical_plan::repartition::RepartitionExec::pull_from_input::{{closure}} tokio::runtime::task::core::Core<T,S>::poll::{{closure}} ... _pthread_start [libsystem_pthread.dylib]
- pipeline 3: fetch probe batch
<datafusion_physical_plan::coalesce_batches::CoalesceBatchesStream as futures_core::stream::Stream>::poll_next ... datafusion_physical_plan::joins::hash_join::HashJoinStream::fetch_probe_batch datafusion_physical_plan::joins::hash_join::HashJoinStream::poll_next_impl <datafusion_physical_plan::joins::hash_join::HashJoinStream as futures_core::stream::Stream>::poll_next ... <datafusion_physical_plan::coalesce_batches::CoalesceBatchesStream as futures_core::stream::Stream>::poll_next ... <datafusion_physical_plan::projection::ProjectionStream as futures_core::stream::Stream>::poll_next ... <datafusion_physical_plan::aggregates::row_hash::GroupedHashAggregateStream as futures_core::stream::Stream>::poll_next ... datafusion_physical_plan::repartition::RepartitionExec::pull_from_input::{{closure}} tokio::runtime::task::core::Core<T,S>::poll::{{closure}} [tokio-1.44.2/src/runtime/task/core.rs] ... _pthread_start [libsystem_pthread.dylib]
- pipeline 4: 由于 pipeline 4 的耗时很少,在火焰图上抓不到,理论上可以通过调试的方式获得 stack trace.
<datafusion_physical_plan::coalesce_batches::CoalesceBatchesStream as futures_core::stream::Stream>::poll_next ... <datafusion_physical_plan::aggregates::row_hash::GroupedHashAggregateStream as futures_core::stream::Stream>::poll_next ... datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder::run_input::{{closure}} ... tokio::runtime::task::harness::Harness<T,S>::poll _pthread_start 0x0000000188c902e4
这 4 个 stacktrace 与途中的查询计划可以很好的匹配上:
-
datafusion 通过 async call chain 的方式实现了自上而下的 pull 式的执行计划。但仍然有隐含的 pipeline 概念。
以图中的 pipeline 1 为例:这里有2个算子:repartition <- datasource:
- 上层算子(这里是 hashjoin pipeline 的 probe 侧 CoalesceBatches 算子)调用 RepartitionExec 的 poll_next 方法
- RepartitionExec 的 poll_next 方法会调用 datasource 的 poll_next 方法,这里总是以同步的方式调用。
- datasource 的 poll_next 会异步读取数据,这里是一个异步源点,如果数据未就绪,则返回 Poll::Pending,并向上返回,直到把当前线程归还给线程池。
- 当操作系统异步读取数据就绪时,会唤醒之前的 Future 链,重新恢复执行。
以图中的 pipeline 3 为例,这个future chain 就更深了,从最底层的异步点 CoalesceBatch 到 顶层的 Repartition 共有 6 层,这会产生一定的开销, 当然,由于 datafusion 采用了 batch 的方式来处理数据,减少了调度的次数,从而减低了future 链式调用的开销。
采用 pull 模式时,当一个算子有多个输入时,存在轮询的需求,这一块的处理也需要特殊优化,否则可能会消耗不必要的CPU。
-
duckdb 则通过 push 模式实现算子间的数据流动,从火焰图的角度来看,push 模式的调用栈更浅,更清晰。 duckdb 目前似乎没有采用 coalesce batches 的方式,而是 morsel 的自然流动,这样做可能会导致下游算子的数据批量减少,从而降低 SIMD 的效率, 优点是避免不必要的临时数据存储
需要理解 datafusion 中的 repartition 算子 和 coalesce batches 算子
- Repartition 算子在多线程间的数据交换
- CoalesceBatches 算子对数据的合并
- 这两个算子的 data copy 开销以及缓存效率。
- 执行栈的分析
- hashjoin 算子的性能评估。
- 是否可以手动编写一个 physical plan 来替代 datafusion 的执行计划?
评估
- push vs pull? 两者逻辑上是等效的,之前有看到某些文章说:batch 处理不适合与 pull,这种说法是不准确的。datafusion 就采用了 pull 的方式 来处理处理,更贴近火山模型。
- 虽然逻辑上是等效的,但我个人更倾向于 push 模型,主要是 async 的调用链看起来没有那么爽?这个是不是我的错觉?
- datafusion 在这个 case 上的性能要比 duckdb 慢上不少,目前来看,主要的原因是:
- hash join 算子的实现效率
- duckdb 对 hash join 有更好的查询计划优化,尤其是 Dynamic Filter Pushdown from Joins
- 需要进一步评估 datafusion 算子的 copy 开销?这一块,在火焰图上还是比较明显的,duckdb 就少了很多。
- datafusion 在命令行的友好程度上相比 duckdb 要差很多,体现在:
- duckdb 支持更多的选项,例如 .timer 可以查看执行耗时,.rows/.columns 格式,.mode 设置多种 output 格式
- duckdb 的 explain analyze 结果更为简洁,可读性强
- duckdb 的 cli 对 SQL 的格式化、多行 SQL 输入、以及输入时的 tab complete 支持很好,datafusion 几乎没有支持。
- datafusion 对窗口函数的支持不完善,range spec 不支持 expr.
- 虽然 datafusion 的性能相比 duckdb 要差,但代码的结构要简单很多。当然,也可能是 rust 代码更易于阅读一些的缘故?所以,如果是选择 data frame 进行优化,我更倾向于 base on datafusion。
总体来说,作为一个SQL计算的基础库而言,datafusion 目前的成熟度还较低,使用起来估计会有更多的坑。