From ec0207c13426abe6dbf4498c9baed6e2f225aa7b Mon Sep 17 00:00:00 2001 From: Carlo Piovesan Date: Mon, 13 May 2024 21:40:33 +0200 Subject: [PATCH] Apply duckdb's shared_ptr.patch --- CMakeLists.txt | 87 ++++----- src/arrow_extension.cpp | 17 +- src/arrow_scan_ipc.cpp | 208 +++++++++++---------- src/arrow_stream_buffer.cpp | 125 +++++++------ src/arrow_to_ipc.cpp | 271 +++++++++++++++------------- src/include/arrow_extension.hpp | 4 +- src/include/arrow_scan_ipc.hpp | 19 +- src/include/arrow_stream_buffer.hpp | 101 +++++------ src/include/arrow_to_ipc.hpp | 44 +++-- 9 files changed, 465 insertions(+), 411 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 72b1370..c95486c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -7,56 +7,55 @@ set(EXTENSION_NAME ${TARGET_NAME}_extension) project(${TARGET_NAME}) include_directories(src/include) -set(EXTENSION_SOURCES - src/arrow_extension.cpp - src/arrow_stream_buffer.cpp - src/arrow_scan_ipc.cpp - src/arrow_to_ipc.cpp) +set(EXTENSION_SOURCES src/arrow_extension.cpp src/arrow_stream_buffer.cpp + src/arrow_scan_ipc.cpp src/arrow_to_ipc.cpp) if(NOT "${OSX_BUILD_ARCH}" STREQUAL "") - set(OSX_ARCH_FLAG -DCMAKE_OSX_ARCHITECTURES=${OSX_BUILD_ARCH}) + set(OSX_ARCH_FLAG -DCMAKE_OSX_ARCHITECTURES=${OSX_BUILD_ARCH}) else() - set(OSX_ARCH_FLAG "") + set(OSX_ARCH_FLAG "") endif() # Building Arrow include(ExternalProject) ExternalProject_Add( - ARROW_EP - GIT_REPOSITORY "https://github.com/apache/arrow" - GIT_TAG ea6875fd2a3ac66547a9a33c5506da94f3ff07f2 - PREFIX "${CMAKE_BINARY_DIR}/third_party/arrow" - INSTALL_DIR "${CMAKE_BINARY_DIR}/third_party/arrow/install" - BUILD_BYPRODUCTS /lib/libarrow.a - CONFIGURE_COMMAND - ${CMAKE_COMMAND} -G${CMAKE_GENERATOR} ${OSX_ARCH_FLAG} - -DCMAKE_BUILD_TYPE=Release - -DCMAKE_INSTALL_PREFIX=${CMAKE_BINARY_DIR}/third_party/arrow/install - -DCMAKE_INSTALL_LIBDIR=lib -DARROW_BUILD_STATIC=ON -DARROW_BUILD_SHARED=OFF - -DARROW_NO_DEPRECATED_API=ON -DARROW_POSITION_INDEPENDENT_CODE=ON - -DARROW_SIMD_LEVEL=NONE -DARROW_ENABLE_TIMING_TESTS=OFF -DARROW_IPC=ON - -DARROW_JEMALLOC=OFF -DARROW_DEPENDENCY_SOURCE=BUNDLED - -DARROW_VERBOSE_THIRDPARTY_BUILD=OFF -DARROW_DEPENDENCY_USE_SHARED=OFF - -DARROW_BOOST_USE_SHARED=OFF -DARROW_BROTLI_USE_SHARED=OFF - -DARROW_BZ2_USE_SHARED=OFF -DARROW_GFLAGS_USE_SHARED=OFF - -DARROW_GRPC_USE_SHARED=OFF -DARROW_JEMALLOC_USE_SHARED=OFF - -DARROW_LZ4_USE_SHARED=OFF -DARROW_OPENSSL_USE_SHARED=OFF - -DARROW_PROTOBUF_USE_SHARED=OFF -DARROW_SNAPPY_USE_SHARED=OFF - -DARROW_THRIFT_USE_SHARED=OFF -DARROW_UTF8PROC_USE_SHARED=OFF - -DARROW_ZSTD_USE_SHARED=OFF -DARROW_USE_GLOG=OFF -DARROW_WITH_BACKTRACE=OFF - -DARROW_WITH_OPENTELEMETRY=OFF -DARROW_WITH_BROTLI=OFF -DARROW_WITH_BZ2=OFF - -DARROW_WITH_LZ4=OFF -DARROW_WITH_SNAPPY=OFF -DARROW_WITH_ZLIB=OFF - -DARROW_WITH_ZSTD=OFF -DARROW_WITH_UCX=OFF -DARROW_WITH_UTF8PROC=OFF - -DARROW_WITH_RE2=OFF /cpp - CMAKE_ARGS -Wno-dev - UPDATE_COMMAND "") + ARROW_EP + GIT_REPOSITORY "https://github.com/apache/arrow" + GIT_TAG ea6875fd2a3ac66547a9a33c5506da94f3ff07f2 + PREFIX "${CMAKE_BINARY_DIR}/third_party/arrow" + INSTALL_DIR "${CMAKE_BINARY_DIR}/third_party/arrow/install" + BUILD_BYPRODUCTS /lib/libarrow.a + CONFIGURE_COMMAND + ${CMAKE_COMMAND} -G${CMAKE_GENERATOR} ${OSX_ARCH_FLAG} + -DCMAKE_BUILD_TYPE=Release + -DCMAKE_INSTALL_PREFIX=${CMAKE_BINARY_DIR}/third_party/arrow/install + -DCMAKE_INSTALL_LIBDIR=lib -DARROW_BUILD_STATIC=ON -DARROW_BUILD_SHARED=OFF + -DARROW_NO_DEPRECATED_API=ON -DARROW_POSITION_INDEPENDENT_CODE=ON + -DARROW_SIMD_LEVEL=NONE -DARROW_ENABLE_TIMING_TESTS=OFF -DARROW_IPC=ON + -DARROW_JEMALLOC=OFF -DARROW_DEPENDENCY_SOURCE=BUNDLED + -DARROW_VERBOSE_THIRDPARTY_BUILD=OFF -DARROW_DEPENDENCY_USE_SHARED=OFF + -DARROW_BOOST_USE_SHARED=OFF -DARROW_BROTLI_USE_SHARED=OFF + -DARROW_BZ2_USE_SHARED=OFF -DARROW_GFLAGS_USE_SHARED=OFF + -DARROW_GRPC_USE_SHARED=OFF -DARROW_JEMALLOC_USE_SHARED=OFF + -DARROW_LZ4_USE_SHARED=OFF -DARROW_OPENSSL_USE_SHARED=OFF + -DARROW_PROTOBUF_USE_SHARED=OFF -DARROW_SNAPPY_USE_SHARED=OFF + -DARROW_THRIFT_USE_SHARED=OFF -DARROW_UTF8PROC_USE_SHARED=OFF + -DARROW_ZSTD_USE_SHARED=OFF -DARROW_USE_GLOG=OFF -DARROW_WITH_BACKTRACE=OFF + -DARROW_WITH_OPENTELEMETRY=OFF -DARROW_WITH_BROTLI=OFF -DARROW_WITH_BZ2=OFF + -DARROW_WITH_LZ4=OFF -DARROW_WITH_SNAPPY=OFF -DARROW_WITH_ZLIB=OFF + -DARROW_WITH_ZSTD=OFF -DARROW_WITH_UCX=OFF -DARROW_WITH_UTF8PROC=OFF + -DARROW_WITH_RE2=OFF /cpp + CMAKE_ARGS -Wno-dev + UPDATE_COMMAND "") ExternalProject_Get_Property(ARROW_EP install_dir) add_library(arrow STATIC IMPORTED GLOBAL) if(WIN32) - set_target_properties(arrow PROPERTIES IMPORTED_LOCATION ${install_dir}/lib/arrow_static.lib) + set_target_properties(arrow PROPERTIES IMPORTED_LOCATION + ${install_dir}/lib/arrow_static.lib) else() - set_target_properties(arrow PROPERTIES IMPORTED_LOCATION ${install_dir}/lib/libarrow.a) + set_target_properties(arrow PROPERTIES IMPORTED_LOCATION + ${install_dir}/lib/libarrow.a) endif() # create static library @@ -71,12 +70,14 @@ build_loadable_extension(${TARGET_NAME} ${PARAMETERS} ${EXTENSION_SOURCES}) add_dependencies(${TARGET_NAME}_loadable_extension ARROW_EP) target_link_libraries(${TARGET_NAME}_loadable_extension arrow) if(WIN32) - target_compile_definitions(${TARGET_NAME}_loadable_extension PUBLIC ARROW_STATIC) + target_compile_definitions(${TARGET_NAME}_loadable_extension + PUBLIC ARROW_STATIC) endif() -target_include_directories(${TARGET_NAME}_loadable_extension PRIVATE ${install_dir}/include) +target_include_directories(${TARGET_NAME}_loadable_extension + PRIVATE ${install_dir}/include) install( - TARGETS ${EXTENSION_NAME} - EXPORT "${DUCKDB_EXPORT_SET}" - LIBRARY DESTINATION "${INSTALL_LIB_DIR}" - ARCHIVE DESTINATION "${INSTALL_LIB_DIR}") \ No newline at end of file + TARGETS ${EXTENSION_NAME} + EXPORT "${DUCKDB_EXPORT_SET}" + LIBRARY DESTINATION "${INSTALL_LIB_DIR}" + ARCHIVE DESTINATION "${INSTALL_LIB_DIR}") diff --git a/src/arrow_extension.cpp b/src/arrow_extension.cpp index e4daf26..6fadec0 100644 --- a/src/arrow_extension.cpp +++ b/src/arrow_extension.cpp @@ -18,27 +18,24 @@ namespace duckdb { static void LoadInternal(DatabaseInstance &instance) { - ExtensionUtil::RegisterFunction(instance, ToArrowIPCFunction::GetFunction()); - ExtensionUtil::RegisterFunction(instance, ArrowIPCTableFunction::GetFunction()); + ExtensionUtil::RegisterFunction(instance, ToArrowIPCFunction::GetFunction()); + ExtensionUtil::RegisterFunction(instance, + ArrowIPCTableFunction::GetFunction()); } -void ArrowExtension::Load(DuckDB &db) { - LoadInternal(*db.instance); -} -std::string ArrowExtension::Name() { - return "arrow"; -} +void ArrowExtension::Load(DuckDB &db) { LoadInternal(*db.instance); } +std::string ArrowExtension::Name() { return "arrow"; } } // namespace duckdb extern "C" { DUCKDB_EXTENSION_API void arrow_init(duckdb::DatabaseInstance &db) { - LoadInternal(db); + LoadInternal(db); } DUCKDB_EXTENSION_API const char *arrow_version() { - return duckdb::DuckDB::LibraryVersion(); + return duckdb::DuckDB::LibraryVersion(); } } diff --git a/src/arrow_scan_ipc.cpp b/src/arrow_scan_ipc.cpp index 7d5b2ff..a60d255 100644 --- a/src/arrow_scan_ipc.cpp +++ b/src/arrow_scan_ipc.cpp @@ -3,111 +3,131 @@ namespace duckdb { TableFunction ArrowIPCTableFunction::GetFunction() { - child_list_t make_buffer_struct_children{{"ptr", LogicalType::UBIGINT}, - {"size", LogicalType::UBIGINT}}; - - TableFunction scan_arrow_ipc_func( - "scan_arrow_ipc", {LogicalType::LIST(LogicalType::STRUCT(make_buffer_struct_children))}, - ArrowIPCTableFunction::ArrowScanFunction, ArrowIPCTableFunction::ArrowScanBind, - ArrowTableFunction::ArrowScanInitGlobal, ArrowTableFunction::ArrowScanInitLocal); - - scan_arrow_ipc_func.cardinality = ArrowTableFunction::ArrowScanCardinality; - scan_arrow_ipc_func.get_batch_index = nullptr; // TODO implement - scan_arrow_ipc_func.projection_pushdown = true; - scan_arrow_ipc_func.filter_pushdown = false; - - return scan_arrow_ipc_func; + child_list_t make_buffer_struct_children{ + {"ptr", LogicalType::UBIGINT}, {"size", LogicalType::UBIGINT}}; + + TableFunction scan_arrow_ipc_func( + "scan_arrow_ipc", + {LogicalType::LIST(LogicalType::STRUCT(make_buffer_struct_children))}, + ArrowIPCTableFunction::ArrowScanFunction, + ArrowIPCTableFunction::ArrowScanBind, + ArrowTableFunction::ArrowScanInitGlobal, + ArrowTableFunction::ArrowScanInitLocal); + + scan_arrow_ipc_func.cardinality = ArrowTableFunction::ArrowScanCardinality; + scan_arrow_ipc_func.get_batch_index = nullptr; // TODO implement + scan_arrow_ipc_func.projection_pushdown = true; + scan_arrow_ipc_func.filter_pushdown = false; + + return scan_arrow_ipc_func; } -unique_ptr ArrowIPCTableFunction::ArrowScanBind(ClientContext &context, TableFunctionBindInput &input, - vector &return_types, vector &names) { - auto stream_decoder = make_uniq(); +unique_ptr ArrowIPCTableFunction::ArrowScanBind( + ClientContext &context, TableFunctionBindInput &input, + vector &return_types, vector &names) { + auto stream_decoder = make_uniq(); - // Decode buffer ptr list - auto buffer_ptr_list = ListValue::GetChildren(input.inputs[0]); - for (auto &buffer_ptr_struct: buffer_ptr_list) { - auto unpacked = StructValue::GetChildren(buffer_ptr_struct); - uint64_t ptr = unpacked[0].GetValue(); - uint64_t size = unpacked[1].GetValue(); + // Decode buffer ptr list + auto buffer_ptr_list = ListValue::GetChildren(input.inputs[0]); + for (auto &buffer_ptr_struct : buffer_ptr_list) { + auto unpacked = StructValue::GetChildren(buffer_ptr_struct); + uint64_t ptr = unpacked[0].GetValue(); + uint64_t size = unpacked[1].GetValue(); - // Feed stream into decoder - auto res = stream_decoder->Consume((const uint8_t *) ptr, size); + // Feed stream into decoder + auto res = stream_decoder->Consume((const uint8_t *)ptr, size); - if (!res.ok()) { - throw IOException("Invalid IPC stream"); - } + if (!res.ok()) { + throw IOException("Invalid IPC stream"); } - - if (!stream_decoder->buffer()->is_eos()) { - throw IOException("IPC buffers passed to arrow scan should contain entire stream"); + } + + if (!stream_decoder->buffer()->is_eos()) { + throw IOException( + "IPC buffers passed to arrow scan should contain entire stream"); + } + + // These are the params I need to produce from the ipc buffers using the + // WebDB.cc code + auto stream_factory_ptr = (uintptr_t)&stream_decoder->buffer(); + auto stream_factory_produce = + (stream_factory_produce_t)&ArrowIPCStreamBufferReader::CreateStream; + auto stream_factory_get_schema = + (stream_factory_get_schema_t)&ArrowIPCStreamBufferReader::GetSchema; + auto res = make_uniq(stream_factory_produce, + stream_factory_ptr); + + // Store decoder + res->stream_decoder = std::move(stream_decoder); + + // TODO Everything below this is identical to the bind in + // duckdb/src/function/table/arrow.cpp + auto &data = *res; + stream_factory_get_schema((ArrowArrayStream *)stream_factory_ptr, + data.schema_root.arrow_schema); + for (idx_t col_idx = 0; + col_idx < (idx_t)data.schema_root.arrow_schema.n_children; col_idx++) { + auto &schema = *data.schema_root.arrow_schema.children[col_idx]; + if (!schema.release) { + throw InvalidInputException("arrow_scan: released schema passed"); } - - // These are the params I need to produce from the ipc buffers using the WebDB.cc code - auto stream_factory_ptr = (uintptr_t) & stream_decoder->buffer(); - auto stream_factory_produce = (stream_factory_produce_t) & ArrowIPCStreamBufferReader::CreateStream; - auto stream_factory_get_schema = (stream_factory_get_schema_t) & ArrowIPCStreamBufferReader::GetSchema; - auto res = make_uniq(stream_factory_produce, stream_factory_ptr); - - // Store decoder - res->stream_decoder = std::move(stream_decoder); - - // TODO Everything below this is identical to the bind in duckdb/src/function/table/arrow.cpp - auto &data = *res; - stream_factory_get_schema((ArrowArrayStream *) stream_factory_ptr, data.schema_root.arrow_schema); - for (idx_t col_idx = 0; col_idx < (idx_t) data.schema_root.arrow_schema.n_children; col_idx++) { - auto &schema = *data.schema_root.arrow_schema.children[col_idx]; - if (!schema.release) { - throw InvalidInputException("arrow_scan: released schema passed"); - } - auto arrow_type = GetArrowLogicalType(schema); - if (schema.dictionary) { - auto dictionary_type = GetArrowLogicalType(*schema.dictionary); - return_types.emplace_back(dictionary_type->GetDuckType()); - arrow_type->SetDictionary(std::move(dictionary_type)); - } else { - return_types.emplace_back(arrow_type->GetDuckType()); - } - res->arrow_table.AddColumn(col_idx, std::move(arrow_type)); - auto format = string(schema.format); - auto name = string(schema.name); - if (name.empty()) { - name = string("v") + to_string(col_idx); - } - names.push_back(name); + auto arrow_type = GetArrowLogicalType(schema); + if (schema.dictionary) { + auto dictionary_type = GetArrowLogicalType(*schema.dictionary); + return_types.emplace_back(dictionary_type->GetDuckType()); + arrow_type->SetDictionary(std::move(dictionary_type)); + } else { + return_types.emplace_back(arrow_type->GetDuckType()); } - QueryResult::DeduplicateColumns(names); - return std::move(res); + res->arrow_table.AddColumn(col_idx, std::move(arrow_type)); + auto format = string(schema.format); + auto name = string(schema.name); + if (name.empty()) { + name = string("v") + to_string(col_idx); + } + names.push_back(name); + } + QueryResult::DeduplicateColumns(names); + return std::move(res); } -// Same as regular arrow scan, except ArrowToDuckDB call TODO: refactor to allow nicely overriding this -void ArrowIPCTableFunction::ArrowScanFunction(ClientContext &context, TableFunctionInput &data_p, DataChunk &output) { - if (!data_p.local_state) { - return; - } - auto &data = data_p.bind_data->CastNoConst(); - auto &state = data_p.local_state->Cast(); - auto &global_state = data_p.global_state->Cast(); - - //! Out of tuples in this chunk - if (state.chunk_offset >= (idx_t)state.chunk->arrow_array.length) { - if (!ArrowScanParallelStateNext(context, data_p.bind_data.get(), state, global_state)) { - return; - } +// Same as regular arrow scan, except ArrowToDuckDB call TODO: refactor to allow +// nicely overriding this +void ArrowIPCTableFunction::ArrowScanFunction(ClientContext &context, + TableFunctionInput &data_p, + DataChunk &output) { + if (!data_p.local_state) { + return; + } + auto &data = data_p.bind_data->CastNoConst(); + auto &state = data_p.local_state->Cast(); + auto &global_state = data_p.global_state->Cast(); + + //! Out of tuples in this chunk + if (state.chunk_offset >= (idx_t)state.chunk->arrow_array.length) { + if (!ArrowScanParallelStateNext(context, data_p.bind_data.get(), state, + global_state)) { + return; } - int64_t output_size = MinValue(STANDARD_VECTOR_SIZE, state.chunk->arrow_array.length - state.chunk_offset); - data.lines_read += output_size; - if (global_state.CanRemoveFilterColumns()) { - state.all_columns.Reset(); - state.all_columns.SetCardinality(output_size); - ArrowToDuckDB(state, data.arrow_table.GetColumns(), state.all_columns, data.lines_read - output_size, false); - output.ReferenceColumns(state.all_columns, global_state.projection_ids); - } else { - output.SetCardinality(output_size); - ArrowToDuckDB(state, data.arrow_table.GetColumns(), output, data.lines_read - output_size, false); - } - - output.Verify(); - state.chunk_offset += output.size(); + } + int64_t output_size = + MinValue(STANDARD_VECTOR_SIZE, + state.chunk->arrow_array.length - state.chunk_offset); + data.lines_read += output_size; + if (global_state.CanRemoveFilterColumns()) { + state.all_columns.Reset(); + state.all_columns.SetCardinality(output_size); + ArrowToDuckDB(state, data.arrow_table.GetColumns(), state.all_columns, + data.lines_read - output_size, false); + output.ReferenceColumns(state.all_columns, global_state.projection_ids); + } else { + output.SetCardinality(output_size); + ArrowToDuckDB(state, data.arrow_table.GetColumns(), output, + data.lines_read - output_size, false); + } + + output.Verify(); + state.chunk_offset += output.size(); } } // namespace duckdb \ No newline at end of file diff --git a/src/arrow_stream_buffer.cpp b/src/arrow_stream_buffer.cpp index f097ca1..c9791e4 100644 --- a/src/arrow_stream_buffer.cpp +++ b/src/arrow_stream_buffer.cpp @@ -1,95 +1,108 @@ #include "arrow_stream_buffer.hpp" #include +#include /// File copied from /// https://github.com/duckdb/duckdb-wasm/blob/0ad10e7db4ef4025f5f4120be37addc4ebe29618/lib/src/arrow_stream_buffer.cc namespace duckdb { /// Constructor -ArrowIPCStreamBuffer::ArrowIPCStreamBuffer() : schema_(nullptr), batches_(), is_eos_(false) { -} +ArrowIPCStreamBuffer::ArrowIPCStreamBuffer() + : schema_(nullptr), batches_(), is_eos_(false) {} /// Decoded a schema -arrow::Status ArrowIPCStreamBuffer::OnSchemaDecoded(std::shared_ptr s) { - schema_ = s; - return arrow::Status::OK(); +arrow::Status +ArrowIPCStreamBuffer::OnSchemaDecoded(std::shared_ptr s) { + schema_ = s; + return arrow::Status::OK(); } /// Decoded a record batch -arrow::Status ArrowIPCStreamBuffer::OnRecordBatchDecoded(std::shared_ptr batch) { - batches_.push_back(batch); - return arrow::Status::OK(); +arrow::Status ArrowIPCStreamBuffer::OnRecordBatchDecoded( + std::shared_ptr batch) { + batches_.push_back(batch); + return arrow::Status::OK(); } /// Reached end of stream arrow::Status ArrowIPCStreamBuffer::OnEOS() { - is_eos_ = true; - return arrow::Status::OK(); + is_eos_ = true; + return arrow::Status::OK(); } /// Constructor -ArrowIPCStreamBufferReader::ArrowIPCStreamBufferReader(std::shared_ptr buffer) - : buffer_(buffer), next_batch_id_(0) { -} +ArrowIPCStreamBufferReader::ArrowIPCStreamBufferReader( + std::shared_ptr buffer) + : buffer_(buffer), next_batch_id_(0) {} /// Get the schema std::shared_ptr ArrowIPCStreamBufferReader::schema() const { - return buffer_->schema(); + return buffer_->schema(); } /// Read the next record batch in the stream. Return null for batch when /// reaching end of stream -arrow::Status ArrowIPCStreamBufferReader::ReadNext(std::shared_ptr *batch) { - if (next_batch_id_ >= buffer_->batches().size()) { - *batch = nullptr; - return arrow::Status::OK(); - } - *batch = buffer_->batches()[next_batch_id_++]; - return arrow::Status::OK(); +arrow::Status ArrowIPCStreamBufferReader::ReadNext( + std::shared_ptr *batch) { + if (next_batch_id_ >= buffer_->batches().size()) { + *batch = nullptr; + return arrow::Status::OK(); + } + *batch = buffer_->batches()[next_batch_id_++]; + return arrow::Status::OK(); } /// Arrow array stream factory function duckdb::unique_ptr -ArrowIPCStreamBufferReader::CreateStream(uintptr_t buffer_ptr, ArrowStreamParameters ¶meters) { - assert(buffer_ptr != 0); - auto buffer = reinterpret_cast *>(buffer_ptr); - auto reader = std::make_shared(*buffer); +ArrowIPCStreamBufferReader::CreateStream(uintptr_t buffer_ptr, + ArrowStreamParameters ¶meters) { + assert(buffer_ptr != 0); + auto buffer = + reinterpret_cast *>(buffer_ptr); + auto reader = std::make_shared(*buffer); - // Create arrow stream - auto stream_wrapper = duckdb::make_uniq(); - stream_wrapper->arrow_array_stream.release = nullptr; - auto maybe_ok = arrow::ExportRecordBatchReader(reader, &stream_wrapper->arrow_array_stream); - if (!maybe_ok.ok()) { - if (stream_wrapper->arrow_array_stream.release) { - stream_wrapper->arrow_array_stream.release(&stream_wrapper->arrow_array_stream); - } - return nullptr; - } + // Create arrow stream + auto stream_wrapper = duckdb::make_uniq(); + stream_wrapper->arrow_array_stream.release = nullptr; + auto maybe_ok = arrow::ExportRecordBatchReader( + reader, &stream_wrapper->arrow_array_stream); + if (!maybe_ok.ok()) { + if (stream_wrapper->arrow_array_stream.release) { + stream_wrapper->arrow_array_stream.release( + &stream_wrapper->arrow_array_stream); + } + return nullptr; + } - // Release the stream - return stream_wrapper; + // Release the stream + return stream_wrapper; } -void ArrowIPCStreamBufferReader::GetSchema(uintptr_t buffer_ptr, duckdb::ArrowSchemaWrapper &schema) { - assert(buffer_ptr != 0); - auto buffer = reinterpret_cast *>(buffer_ptr); - auto reader = std::make_shared(*buffer); +void ArrowIPCStreamBufferReader::GetSchema(uintptr_t buffer_ptr, + duckdb::ArrowSchemaWrapper &schema) { + assert(buffer_ptr != 0); + auto buffer = + reinterpret_cast *>(buffer_ptr); + auto reader = std::make_shared(*buffer); - // Create arrow stream - auto stream_wrapper = duckdb::make_uniq(); - stream_wrapper->arrow_array_stream.release = nullptr; - auto maybe_ok = arrow::ExportRecordBatchReader(reader, &stream_wrapper->arrow_array_stream); - if (!maybe_ok.ok()) { - if (stream_wrapper->arrow_array_stream.release) { - stream_wrapper->arrow_array_stream.release(&stream_wrapper->arrow_array_stream); - } - return; - } + // Create arrow stream + auto stream_wrapper = duckdb::make_uniq(); + stream_wrapper->arrow_array_stream.release = nullptr; + auto maybe_ok = arrow::ExportRecordBatchReader( + reader, &stream_wrapper->arrow_array_stream); + if (!maybe_ok.ok()) { + if (stream_wrapper->arrow_array_stream.release) { + stream_wrapper->arrow_array_stream.release( + &stream_wrapper->arrow_array_stream); + } + return; + } - // Pass ownership to caller - stream_wrapper->arrow_array_stream.get_schema(&stream_wrapper->arrow_array_stream, &schema.arrow_schema); + // Pass ownership to caller + stream_wrapper->arrow_array_stream.get_schema( + &stream_wrapper->arrow_array_stream, &schema.arrow_schema); } /// Constructor -BufferingArrowIPCStreamDecoder::BufferingArrowIPCStreamDecoder(std::shared_ptr buffer) - : arrow::ipc::StreamDecoder(buffer), buffer_(buffer) { -} +BufferingArrowIPCStreamDecoder::BufferingArrowIPCStreamDecoder( + std::shared_ptr buffer) + : arrow::ipc::StreamDecoder(buffer), buffer_(buffer) {} } // namespace duckdb diff --git a/src/arrow_to_ipc.cpp b/src/arrow_to_ipc.cpp index e282612..c316d85 100644 --- a/src/arrow_to_ipc.cpp +++ b/src/arrow_to_ipc.cpp @@ -15,6 +15,8 @@ #include "arrow/type_fwd.h" #include "arrow/c/bridge.h" +#include + #include "duckdb.hpp" #ifndef DUCKDB_AMALGAMATION #include "duckdb/common/arrow/result_arrow_wrapper.hpp" @@ -28,165 +30,180 @@ namespace duckdb { struct ToArrowIpcFunctionData : public TableFunctionData { - ToArrowIpcFunctionData() { - } - shared_ptr schema; - idx_t chunk_size; + ToArrowIpcFunctionData() {} + std::shared_ptr schema; + idx_t chunk_size; }; struct ToArrowIpcGlobalState : public GlobalTableFunctionState { - ToArrowIpcGlobalState() : sent_schema(false) { - } - atomic sent_schema; - mutex lock; + ToArrowIpcGlobalState() : sent_schema(false) {} + atomic sent_schema; + mutex lock; }; struct ToArrowIpcLocalState : public LocalTableFunctionState { - unique_ptr appender; - idx_t current_count = 0; - bool checked_schema = false; + unique_ptr appender; + idx_t current_count = 0; + bool checked_schema = false; }; - -unique_ptr ToArrowIPCFunction::InitLocal(ExecutionContext &context, TableFunctionInitInput &input, - GlobalTableFunctionState *global_state) { - return make_uniq(); +unique_ptr +ToArrowIPCFunction::InitLocal(ExecutionContext &context, + TableFunctionInitInput &input, + GlobalTableFunctionState *global_state) { + return make_uniq(); } -unique_ptr ToArrowIPCFunction::InitGlobal(ClientContext &context, - TableFunctionInitInput &input) { - return make_uniq(); +unique_ptr +ToArrowIPCFunction::InitGlobal(ClientContext &context, + TableFunctionInitInput &input) { + return make_uniq(); } -unique_ptr ToArrowIPCFunction::Bind(ClientContext &context, TableFunctionBindInput &input, - vector &return_types, vector &names) { - auto result = make_uniq(); +unique_ptr +ToArrowIPCFunction::Bind(ClientContext &context, TableFunctionBindInput &input, + vector &return_types, + vector &names) { + auto result = make_uniq(); - result->chunk_size = DEFAULT_CHUNK_SIZE * STANDARD_VECTOR_SIZE; + result->chunk_size = DEFAULT_CHUNK_SIZE * STANDARD_VECTOR_SIZE; - // Set return schema - return_types.emplace_back(LogicalType::BLOB); - names.emplace_back("ipc"); - return_types.emplace_back(LogicalType::BOOLEAN); - names.emplace_back("header"); + // Set return schema + return_types.emplace_back(LogicalType::BLOB); + names.emplace_back("ipc"); + return_types.emplace_back(LogicalType::BOOLEAN); + names.emplace_back("header"); - // Create the Arrow schema - ArrowSchema schema; - ArrowConverter::ToArrowSchema(&schema, input.input_table_types, input.input_table_names, context.GetClientProperties()); - result->schema = arrow::ImportSchema(&schema).ValueOrDie(); + // Create the Arrow schema + ArrowSchema schema; + ArrowConverter::ToArrowSchema(&schema, input.input_table_types, + input.input_table_names, + context.GetClientProperties()); + result->schema = arrow::ImportSchema(&schema).ValueOrDie(); - return std::move(result); + return std::move(result); } -OperatorResultType ToArrowIPCFunction::Function(ExecutionContext &context, TableFunctionInput &data_p, DataChunk &input, - DataChunk &output) { - std::shared_ptr arrow_serialized_ipc_buffer; - auto &data = (ToArrowIpcFunctionData &)*data_p.bind_data; - auto &local_state = (ToArrowIpcLocalState &)*data_p.local_state; - auto &global_state = (ToArrowIpcGlobalState &)*data_p.global_state; - - bool sending_schema = false; - - bool caching_disabled = !PhysicalOperator::OperatorCachingAllowed(context); - - if (!local_state.checked_schema) { - if (!global_state.sent_schema) { - lock_guard init_lock(global_state.lock); - if (!global_state.sent_schema) { - // This run will send the schema, other threads can just send the buffers - global_state.sent_schema = true; - sending_schema = true; - } - } - local_state.checked_schema = true; +OperatorResultType ToArrowIPCFunction::Function(ExecutionContext &context, + TableFunctionInput &data_p, + DataChunk &input, + DataChunk &output) { + std::shared_ptr arrow_serialized_ipc_buffer; + auto &data = (ToArrowIpcFunctionData &)*data_p.bind_data; + auto &local_state = (ToArrowIpcLocalState &)*data_p.local_state; + auto &global_state = (ToArrowIpcGlobalState &)*data_p.global_state; + + bool sending_schema = false; + + bool caching_disabled = !PhysicalOperator::OperatorCachingAllowed(context); + + if (!local_state.checked_schema) { + if (!global_state.sent_schema) { + lock_guard init_lock(global_state.lock); + if (!global_state.sent_schema) { + // This run will send the schema, other threads can just send the + // buffers + global_state.sent_schema = true; + sending_schema = true; + } + } + local_state.checked_schema = true; + } + + if (sending_schema) { + auto result = arrow::ipc::SerializeSchema(*data.schema); + arrow_serialized_ipc_buffer = result.ValueOrDie(); + output.data[1].SetValue(0, Value::BOOLEAN(1)); + } else { + if (!local_state.appender) { + local_state.appender = + make_uniq(input.GetTypes(), data.chunk_size, + context.client.GetClientProperties()); } - if (sending_schema) { - auto result = arrow::ipc::SerializeSchema(*data.schema); - arrow_serialized_ipc_buffer = result.ValueOrDie(); - output.data[1].SetValue(0, Value::BOOLEAN(1)); + // Append input chunk + local_state.appender->Append(input, 0, input.size(), input.size()); + local_state.current_count += input.size(); + + // If chunk size is reached, we can flush to IPC blob + if (caching_disabled || local_state.current_count >= data.chunk_size) { + // Construct record batch from DataChunk + ArrowArray arr = local_state.appender->Finalize(); + auto record_batch = + arrow::ImportRecordBatch(&arr, data.schema).ValueOrDie(); + + // Serialize recordbatch + auto options = arrow::ipc::IpcWriteOptions::Defaults(); + auto result = arrow::ipc::SerializeRecordBatch(*record_batch, options); + arrow_serialized_ipc_buffer = result.ValueOrDie(); + + // Reset appender + local_state.appender.reset(); + local_state.current_count = 0; + + output.data[1].SetValue(0, Value::BOOLEAN(0)); } else { - if (!local_state.appender) { - local_state.appender = make_uniq(input.GetTypes(), data.chunk_size, context.client.GetClientProperties()); - } - - // Append input chunk - local_state.appender->Append(input, 0, input.size(), input.size()); - local_state.current_count += input.size(); - - // If chunk size is reached, we can flush to IPC blob - if (caching_disabled || local_state.current_count >= data.chunk_size) { - // Construct record batch from DataChunk - ArrowArray arr = local_state.appender->Finalize(); - auto record_batch = arrow::ImportRecordBatch(&arr, data.schema).ValueOrDie(); - - // Serialize recordbatch - auto options = arrow::ipc::IpcWriteOptions::Defaults(); - auto result = arrow::ipc::SerializeRecordBatch(*record_batch, options); - arrow_serialized_ipc_buffer = result.ValueOrDie(); - - // Reset appender - local_state.appender.reset(); - local_state.current_count = 0; - - output.data[1].SetValue(0, Value::BOOLEAN(0)); - } else { - return OperatorResultType::NEED_MORE_INPUT; - } + return OperatorResultType::NEED_MORE_INPUT; } + } + + // TODO clean up + auto wrapped_buffer = + make_buffer(arrow_serialized_ipc_buffer); + auto &vector = output.data[0]; + StringVector::AddBuffer(vector, wrapped_buffer); + auto data_ptr = (string_t *)vector.GetData(); + *data_ptr = string_t((const char *)arrow_serialized_ipc_buffer->data(), + arrow_serialized_ipc_buffer->size()); + output.SetCardinality(1); + + if (sending_schema) { + return OperatorResultType::HAVE_MORE_OUTPUT; + } else { + return OperatorResultType::NEED_MORE_INPUT; + } +} - // TODO clean up - auto wrapped_buffer = make_buffer(arrow_serialized_ipc_buffer); +OperatorFinalizeResultType ToArrowIPCFunction::FunctionFinal( + ExecutionContext &context, TableFunctionInput &data_p, DataChunk &output) { + auto &data = (ToArrowIpcFunctionData &)*data_p.bind_data; + auto &local_state = (ToArrowIpcLocalState &)*data_p.local_state; + std::shared_ptr arrow_serialized_ipc_buffer; + + // TODO clean up + if (local_state.appender) { + ArrowArray arr = local_state.appender->Finalize(); + auto record_batch = + arrow::ImportRecordBatch(&arr, data.schema).ValueOrDie(); + + // Serialize recordbatch + auto options = arrow::ipc::IpcWriteOptions::Defaults(); + auto result = arrow::ipc::SerializeRecordBatch(*record_batch, options); + arrow_serialized_ipc_buffer = result.ValueOrDie(); + + auto wrapped_buffer = + make_buffer(arrow_serialized_ipc_buffer); auto &vector = output.data[0]; StringVector::AddBuffer(vector, wrapped_buffer); auto data_ptr = (string_t *)vector.GetData(); - *data_ptr = string_t((const char *)arrow_serialized_ipc_buffer->data(), arrow_serialized_ipc_buffer->size()); + *data_ptr = string_t((const char *)arrow_serialized_ipc_buffer->data(), + arrow_serialized_ipc_buffer->size()); output.SetCardinality(1); + local_state.appender.reset(); + output.data[1].SetValue(0, Value::BOOLEAN(0)); + } - if (sending_schema) { - return OperatorResultType::HAVE_MORE_OUTPUT; - } else { - return OperatorResultType::NEED_MORE_INPUT; - } -} - -OperatorFinalizeResultType ToArrowIPCFunction::FunctionFinal(ExecutionContext &context, TableFunctionInput &data_p, - DataChunk &output) { - auto &data = (ToArrowIpcFunctionData &)*data_p.bind_data; - auto &local_state = (ToArrowIpcLocalState &)*data_p.local_state; - std::shared_ptr arrow_serialized_ipc_buffer; - - // TODO clean up - if (local_state.appender) { - ArrowArray arr = local_state.appender->Finalize(); - auto record_batch = arrow::ImportRecordBatch(&arr, data.schema).ValueOrDie(); - - // Serialize recordbatch - auto options = arrow::ipc::IpcWriteOptions::Defaults(); - auto result = arrow::ipc::SerializeRecordBatch(*record_batch, options); - arrow_serialized_ipc_buffer = result.ValueOrDie(); - - auto wrapped_buffer = make_buffer(arrow_serialized_ipc_buffer); - auto &vector = output.data[0]; - StringVector::AddBuffer(vector, wrapped_buffer); - auto data_ptr = (string_t *)vector.GetData(); - *data_ptr = string_t((const char *)arrow_serialized_ipc_buffer->data(), arrow_serialized_ipc_buffer->size()); - output.SetCardinality(1); - local_state.appender.reset(); - output.data[1].SetValue(0, Value::BOOLEAN(0)); - } - - return OperatorFinalizeResultType::FINISHED; + return OperatorFinalizeResultType::FINISHED; } - TableFunction ToArrowIPCFunction::GetFunction() { - TableFunction fun("to_arrow_ipc", {LogicalType::TABLE}, nullptr, ToArrowIPCFunction::Bind, - ToArrowIPCFunction::InitGlobal,ToArrowIPCFunction::InitLocal); - fun.in_out_function = ToArrowIPCFunction::Function; - fun.in_out_function_final = ToArrowIPCFunction::FunctionFinal; + TableFunction fun("to_arrow_ipc", {LogicalType::TABLE}, nullptr, + ToArrowIPCFunction::Bind, ToArrowIPCFunction::InitGlobal, + ToArrowIPCFunction::InitLocal); + fun.in_out_function = ToArrowIPCFunction::Function; + fun.in_out_function_final = ToArrowIPCFunction::FunctionFinal; - return fun; + return fun; } } // namespace duckdb \ No newline at end of file diff --git a/src/include/arrow_extension.hpp b/src/include/arrow_extension.hpp index 8ad174e..7d600d1 100644 --- a/src/include/arrow_extension.hpp +++ b/src/include/arrow_extension.hpp @@ -6,8 +6,8 @@ namespace duckdb { class ArrowExtension : public Extension { public: - void Load(DuckDB &db) override; - std::string Name() override; + void Load(DuckDB &db) override; + std::string Name() override; }; } // namespace duckdb diff --git a/src/include/arrow_scan_ipc.hpp b/src/include/arrow_scan_ipc.hpp index 4ec1b9f..66a7827 100644 --- a/src/include/arrow_scan_ipc.hpp +++ b/src/include/arrow_scan_ipc.hpp @@ -9,20 +9,23 @@ namespace duckdb { struct ArrowIPCScanFunctionData : public ArrowScanFunctionData { public: - using ArrowScanFunctionData::ArrowScanFunctionData; - unique_ptr stream_decoder = nullptr; + using ArrowScanFunctionData::ArrowScanFunctionData; + unique_ptr stream_decoder = nullptr; }; -// IPC Table scan is identical to ArrowTableFunction arrow scan except instead of CDataInterface header pointers, it -// takes a bunch of pointers pointing to buffers containing data in Arrow IPC format +// IPC Table scan is identical to ArrowTableFunction arrow scan except instead +// of CDataInterface header pointers, it takes a bunch of pointers pointing to +// buffers containing data in Arrow IPC format struct ArrowIPCTableFunction : public ArrowTableFunction { public: - static TableFunction GetFunction(); + static TableFunction GetFunction(); private: - static unique_ptr ArrowScanBind(ClientContext &context, TableFunctionBindInput &input, - vector &return_types, vector &names); - static void ArrowScanFunction(ClientContext &context, TableFunctionInput &data_p, DataChunk &output); + static unique_ptr + ArrowScanBind(ClientContext &context, TableFunctionBindInput &input, + vector &return_types, vector &names); + static void ArrowScanFunction(ClientContext &context, + TableFunctionInput &data_p, DataChunk &output); }; } // namespace duckdb diff --git a/src/include/arrow_stream_buffer.hpp b/src/include/arrow_stream_buffer.hpp index a4cbe97..e486c72 100644 --- a/src/include/arrow_stream_buffer.hpp +++ b/src/include/arrow_stream_buffer.hpp @@ -14,6 +14,7 @@ #include #include #include +#include /// File copied from /// https://github.com/duckdb/duckdb-wasm/blob/0ad10e7db4ef4025f5f4120be37addc4ebe29618/lib/include/duckdb/web/arrow_stream_buffer.h @@ -21,76 +22,72 @@ namespace duckdb { struct ArrowIPCStreamBuffer : public arrow::ipc::Listener { protected: - /// The schema - std::shared_ptr schema_; - /// The batches - std::vector> batches_; - /// Is eos? - bool is_eos_; + /// The schema + std::shared_ptr schema_; + /// The batches + std::vector> batches_; + /// Is eos? + bool is_eos_; - /// Decoded a record batch - arrow::Status OnSchemaDecoded(std::shared_ptr schema); - /// Decoded a record batch - arrow::Status OnRecordBatchDecoded(std::shared_ptr record_batch); - /// Reached end of stream - arrow::Status OnEOS(); + /// Decoded a record batch + arrow::Status OnSchemaDecoded(std::shared_ptr schema); + /// Decoded a record batch + arrow::Status + OnRecordBatchDecoded(std::shared_ptr record_batch); + /// Reached end of stream + arrow::Status OnEOS(); public: - /// Constructor - ArrowIPCStreamBuffer(); + /// Constructor + ArrowIPCStreamBuffer(); - /// Is end of stream? - bool is_eos() const { - return is_eos_; - } - /// Return the schema - std::shared_ptr &schema() { - return schema_; - } - /// Return the batches - std::vector> &batches() { - return batches_; - } + /// Is end of stream? + bool is_eos() const { return is_eos_; } + /// Return the schema + std::shared_ptr &schema() { return schema_; } + /// Return the batches + std::vector> &batches() { + return batches_; + } }; struct ArrowIPCStreamBufferReader : public arrow::RecordBatchReader { protected: - /// The buffer - std::shared_ptr buffer_; - /// The batch index - size_t next_batch_id_; + /// The buffer + std::shared_ptr buffer_; + /// The batch index + size_t next_batch_id_; public: - /// Constructor - ArrowIPCStreamBufferReader(std::shared_ptr buffer); - /// Destructor - ~ArrowIPCStreamBufferReader() = default; + /// Constructor + ArrowIPCStreamBufferReader(std::shared_ptr buffer); + /// Destructor + ~ArrowIPCStreamBufferReader() = default; - /// Get the schema - std::shared_ptr schema() const override; - /// Read the next record batch in the stream. Return null for batch when reaching end of stream - arrow::Status ReadNext(std::shared_ptr *batch) override; + /// Get the schema + std::shared_ptr schema() const override; + /// Read the next record batch in the stream. Return null for batch when + /// reaching end of stream + arrow::Status ReadNext(std::shared_ptr *batch) override; - /// Create arrow array stream wrapper - static duckdb::unique_ptr CreateStream(uintptr_t buffer_ptr, - ArrowStreamParameters ¶meters); - /// Create arrow array stream wrapper - static void GetSchema(uintptr_t buffer_ptr, ArrowSchemaWrapper &schema); + /// Create arrow array stream wrapper + static duckdb::unique_ptr + CreateStream(uintptr_t buffer_ptr, ArrowStreamParameters ¶meters); + /// Create arrow array stream wrapper + static void GetSchema(uintptr_t buffer_ptr, ArrowSchemaWrapper &schema); }; struct BufferingArrowIPCStreamDecoder : public arrow::ipc::StreamDecoder { protected: - /// The buffer - std::shared_ptr buffer_; + /// The buffer + std::shared_ptr buffer_; public: - /// Constructor - BufferingArrowIPCStreamDecoder( - std::shared_ptr buffer = std::make_shared()); - /// Get the buffer - std::shared_ptr &buffer() { - return buffer_; - } + /// Constructor + BufferingArrowIPCStreamDecoder(std::shared_ptr buffer = + std::make_shared()); + /// Get the buffer + std::shared_ptr &buffer() { return buffer_; } }; } // namespace duckdb diff --git a/src/include/arrow_to_ipc.hpp b/src/include/arrow_to_ipc.hpp index b4eb9d4..6c8995a 100644 --- a/src/include/arrow_to_ipc.hpp +++ b/src/include/arrow_to_ipc.hpp @@ -3,36 +3,42 @@ #include "arrow/buffer.h" #include "duckdb.hpp" +#include + namespace duckdb { class ArrowStringVectorBuffer : public VectorBuffer { public: - explicit ArrowStringVectorBuffer(std::shared_ptr buffer_p) - : VectorBuffer(VectorBufferType::OPAQUE_BUFFER), buffer(std::move(buffer_p)) { - } + explicit ArrowStringVectorBuffer(std::shared_ptr buffer_p) + : VectorBuffer(VectorBufferType::OPAQUE_BUFFER), + buffer(std::move(buffer_p)) {} private: - std::shared_ptr buffer; + std::shared_ptr buffer; }; - class ToArrowIPCFunction { public: - //! note: this is the number of vectors per chunk - static constexpr idx_t DEFAULT_CHUNK_SIZE = 120; + //! note: this is the number of vectors per chunk + static constexpr idx_t DEFAULT_CHUNK_SIZE = 120; - static TableFunction GetFunction(); + static TableFunction GetFunction(); private: - static unique_ptr InitLocal(ExecutionContext &context, TableFunctionInitInput &input, - GlobalTableFunctionState *global_state); - static unique_ptr InitGlobal(ClientContext &context, - TableFunctionInitInput &input); - static unique_ptr Bind(ClientContext &context, TableFunctionBindInput &input, - vector &return_types, vector &names); - static OperatorResultType Function(ExecutionContext &context, TableFunctionInput &data_p, DataChunk &input, - DataChunk &output); - static OperatorFinalizeResultType FunctionFinal(ExecutionContext &context, TableFunctionInput &data_p, - DataChunk &output); + static unique_ptr + InitLocal(ExecutionContext &context, TableFunctionInitInput &input, + GlobalTableFunctionState *global_state); + static unique_ptr + InitGlobal(ClientContext &context, TableFunctionInitInput &input); + static unique_ptr Bind(ClientContext &context, + TableFunctionBindInput &input, + vector &return_types, + vector &names); + static OperatorResultType Function(ExecutionContext &context, + TableFunctionInput &data_p, + DataChunk &input, DataChunk &output); + static OperatorFinalizeResultType FunctionFinal(ExecutionContext &context, + TableFunctionInput &data_p, + DataChunk &output); }; // namespace duckdb -} +} // namespace duckdb