read_csv 表函数分析
-
ReadCSVBind -- 返回表的 schema 信息,主要是列名 和 类型信息。
typedef unique_ptr<FunctionData> (*table_function_bind_t)( ClientContext &context, TableFunctionBindInput &input, vector<LogicalType> &return_types, vector<string> &names );
在 Plan 阶段,会调用, 返回值 FunctionData 会传递到后续调用中,例如 TableFunctionInput.bind_data;
-
ReadCSVFunction -- main function
typedef void (*table_function_t)( ClientContext &context, TableFunctionInput &data, DataChunk &output );
在 Pipeline 执行阶段,有 Source Operator 调用。
- 为什么是3个线程在执行?
- 这三个线程是如何协作的?例如并行处理文件的行数?
-
ReadCSVInitGlobal -- 可选
-
ReadCSVInitLocal -- 可选
CVS Scanner: 按照每 8M 一个 scanner , 每个 scanner 分配一个扫描范围:
- 如果一个 scanner 扫描完了当前block,需要继续扫描下一个 scanner 的数据,直到遇到一个换行符。
- 对非第一个 scanner,会先忽略直到第一个换行符的数据
优化: CSV Scanner 是否可以考虑使用 SIMD 加速?
StringValueScanner: BaseScanner:
- BaseScanner
- iterator: CSVIterator
- pos: CSVPosition( buffer_idx, buffer_pos) -- 当前位置
- boundary: CSVBoundary( buffer_idx, buffer_pos, boundary_idx, end_pos ) -- 边界
- CSVFileHandle
- CSVOptions
- iterator: CSVIterator
duckdb::CSVBuffer::CSVBuffer(duckdb::CSVFileHandle &, duckdb::ClientContext &, unsigned long long, unsigned long long, unsigned long long, unsigned long long) csv_buffer.cpp:28 duckdb::CSVBuffer::CSVBuffer(duckdb::CSVFileHandle &, duckdb::ClientContext &, unsigned long long, unsigned long long, unsigned long long, unsigned long long) csv_buffer.cpp:25 duckdb::make_shared_ptr<…>(duckdb::CSVFileHandle &, duckdb::ClientContext &, unsigned long long &, unsigned long long &&, unsigned long long &, unsigned long long &&) helper.hpp:73 duckdb::CSVBuffer::Next(duckdb::CSVFileHandle &, unsigned long long, unsigned long long, bool &) csv_buffer.cpp:44 duckdb::CSVBufferManager::ReadNextAndCacheIt() csv_buffer_manager.cpp:42 duckdb::CSVBufferManager::GetBuffer(unsigned long long) csv_buffer_manager.cpp:71 duckdb::CSVIterator::Next(duckdb::CSVBufferManager &) scanner_boundary.cpp:54 duckdb::CSVGlobalState::Next(duckdb::optional_ptr<…>) global_csv_state.cpp:143 duckdb::ReadCSVInitLocal(duckdb::ExecutionContext &, duckdb::TableFunctionInitInput &, duckdb::GlobalTableFunctionState *) read_csv.cpp:212 -- 会调用 TableFunction.init_local 进行初始化,调用 global_state.Next() 获取 当前任务的一个 CSV-Scanner duckdb::TableScanLocalSourceState::TableScanLocalSourceState(duckdb::ExecutionContext &, duckdb::TableScanGlobalSourceState &, const duckdb::PhysicalTableScan &) physical_table_scan.cpp:75 duckdb::TableScanLocalSourceState::TableScanLocalSourceState(duckdb::ExecutionContext &, duckdb::TableScanGlobalSourceState &, const duckdb::PhysicalTableScan &) physical_table_scan.cpp:71 duckdb::make_uniq<…>(duckdb::ExecutionContext &, duckdb::TableScanGlobalSourceState &, const duckdb::PhysicalTableScan &) helper.hpp:65 duckdb::PhysicalTableScan::GetLocalSourceState(duckdb::ExecutionContext &, duckdb::GlobalSourceState &) const physical_table_scan.cpp:84 -- 每个 PipelineExecutor 中的 Source/Sink 都会有 local state duckdb::PipelineExecutor::PipelineExecutor(duckdb::ClientContext &, duckdb::Pipeline &) pipeline_executor.cpp:27 duckdb::PipelineExecutor::PipelineExecutor(duckdb::ClientContext &, duckdb::Pipeline &) pipeline_executor.cpp:14 duckdb::make_uniq<…>(duckdb::ClientContext &, duckdb::Pipeline &) helper.hpp:65 duckdb::PipelineTask::ExecuteTask(duckdb::TaskExecutionMode) pipeline.cpp:34 duckdb::ExecutorTask::Execute(duckdb::TaskExecutionMode) executor_task.cpp:44 duckdb::TaskScheduler::ExecuteForever(std::__1::atomic<…> *) task_scheduler.cpp:189 duckdb::ThreadExecuteTasks(duckdb::TaskScheduler *, std::__1::atomic<…> *) task_scheduler.cpp:279
CSVGlobalState:
- fileScans: vector<shared_ptr
> ; 每一个 csv 文件一个 CSVFileScan - file_path
- file_size: 文件大小
- buffer_manager: 管理文件的多个 buffer
- cacheed_buffers: vector
-- 每个 CSVBuffer 对应一个 PipelineExecutor - buffer_idx
- actual_buffer_size
- handle -- 指向 buffer 数据 每次 CSVGlobalState.Next() 获取下一个 StringValueScanner 时,会分配一个 Buffer(8M),将文件数据读取到 buffer 中。在后面 scan 处理 时,从内存中进行读取。
- cacheed_buffers: vector
- 这一块的代码写得很复杂。
-
table_function.pushdown_complex_filter 可选
在 Optimizer 阶段,会调用。
- 在前面应该有一个处理 bind_data 的阶段,获取表的 types, names 等信息。
[ ] 需要整理出一个 table_function 的调用时序图,帮助理解每一个可选函数存在的价值。
阅读了上述信息后,可以开始评估编写我们自己的 table function 了,接下来,我们需要的一个场景是:
- 在 Java 中使用 JDBC 读取业务数据,将 ResultSet 写入到一个 DataChunk 中。(提供API 写 data chunk)
- 创建一个 table function,将 DataChunk 中的数据返回给 DuckDB。
DataChunk chunk = new DataChunk();
// prvoide
connection.registerDataChunk("asdf", chunk);
connection.executeQuery("select * from read_asdf");
或者: jdbc ResultSet -> arrow.vector.VectorSchemaRoot -> ArrowStreamReader -> arrow.c.ArrowArrayStream -> duckdb
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.c.ArrowArrayStream;
import org.apache.arrow.c.ArrowArrayStreamListener;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
public class ArrowConversionExample {
public static ArrowArrayStream convertToArrowArrayStream(VectorSchemaRoot root) throws IOException {
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
ByteArrayOutputStream out = new ByteArrayOutputStream();
ArrowStreamWriter writer = new ArrowStreamWriter(root, null, out);
// Write the VectorSchemaRoot to the output stream
writer.start();
writer.writeBatch();
writer.end();
// Create an ArrowArrayStream from the output stream
ArrowArrayStream arrayStream = new ArrowArrayStream();
arrayStream.setListener(new ArrowArrayStreamListener() {
@Override
public void onNext(ArrowRecordBatch batch) {
// Handle the ArrowRecordBatch
}
@Override
public void onError(Throwable t) {
// Handle the error
}
@Override
public void onCompleted() {
// Handle completion
}
});
// Initialize the ArrowArrayStream with the written data
arrayStream.init(out.toByteArray(), allocator);
return arrayStream;
}
public static void main(String[] args) throws IOException {
// Example usage
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
Schema schema = new Schema(/* define your schema here */);
VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
// Populate the VectorSchemaRoot with data
// ...
ArrowArrayStream arrayStream = convertToArrowArrayStream(root);
// Use the ArrowArrayStream
// ...
}
}
这个方式是有复制的,需要考虑如何减少复制。
- https://arrow.apache.org/docs/java/jdbc.html
- https://duckdb.org/docs/api/java#arrow-import
- ArrowReader
- Field(name, nullable, type, ...)
- Schema(fields, metadata)
- batches: List[ArrowRecordBatch]
- ArrowRecordBatch( length, nodes, buffers) // each field encode in a buffer
这一块需要研究一下,整体成本相比自己实现可能会简单一些。
- Pipeline::Schedule 负责创建 PipelineTask 并提交调度。 对 TableScan, 会调用 source_state->MaxThreads() 获取最大线程数,然后创建对应的 PipelineTask,一般会调用 对应 operator.globalState.MaxThreads() 对 CSV 文件,取文件大小 / 8M
- TODO 理解 PipelineEvent