Skip to content

Commit

Permalink
Apply duckdb's shared_ptr.patch
Browse files Browse the repository at this point in the history
  • Loading branch information
carlopi committed May 13, 2024
1 parent 5d69e32 commit ec0207c
Show file tree
Hide file tree
Showing 9 changed files with 465 additions and 411 deletions.
87 changes: 44 additions & 43 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,56 +7,55 @@ set(EXTENSION_NAME ${TARGET_NAME}_extension)
project(${TARGET_NAME})
include_directories(src/include)

set(EXTENSION_SOURCES
src/arrow_extension.cpp
src/arrow_stream_buffer.cpp
src/arrow_scan_ipc.cpp
src/arrow_to_ipc.cpp)
set(EXTENSION_SOURCES src/arrow_extension.cpp src/arrow_stream_buffer.cpp
src/arrow_scan_ipc.cpp src/arrow_to_ipc.cpp)

if(NOT "${OSX_BUILD_ARCH}" STREQUAL "")
set(OSX_ARCH_FLAG -DCMAKE_OSX_ARCHITECTURES=${OSX_BUILD_ARCH})
set(OSX_ARCH_FLAG -DCMAKE_OSX_ARCHITECTURES=${OSX_BUILD_ARCH})
else()
set(OSX_ARCH_FLAG "")
set(OSX_ARCH_FLAG "")
endif()

# Building Arrow
include(ExternalProject)
ExternalProject_Add(
ARROW_EP
GIT_REPOSITORY "https://github.com/apache/arrow"
GIT_TAG ea6875fd2a3ac66547a9a33c5506da94f3ff07f2
PREFIX "${CMAKE_BINARY_DIR}/third_party/arrow"
INSTALL_DIR "${CMAKE_BINARY_DIR}/third_party/arrow/install"
BUILD_BYPRODUCTS <INSTALL_DIR>/lib/libarrow.a
CONFIGURE_COMMAND
${CMAKE_COMMAND} -G${CMAKE_GENERATOR} ${OSX_ARCH_FLAG}
-DCMAKE_BUILD_TYPE=Release
-DCMAKE_INSTALL_PREFIX=${CMAKE_BINARY_DIR}/third_party/arrow/install
-DCMAKE_INSTALL_LIBDIR=lib -DARROW_BUILD_STATIC=ON -DARROW_BUILD_SHARED=OFF
-DARROW_NO_DEPRECATED_API=ON -DARROW_POSITION_INDEPENDENT_CODE=ON
-DARROW_SIMD_LEVEL=NONE -DARROW_ENABLE_TIMING_TESTS=OFF -DARROW_IPC=ON
-DARROW_JEMALLOC=OFF -DARROW_DEPENDENCY_SOURCE=BUNDLED
-DARROW_VERBOSE_THIRDPARTY_BUILD=OFF -DARROW_DEPENDENCY_USE_SHARED=OFF
-DARROW_BOOST_USE_SHARED=OFF -DARROW_BROTLI_USE_SHARED=OFF
-DARROW_BZ2_USE_SHARED=OFF -DARROW_GFLAGS_USE_SHARED=OFF
-DARROW_GRPC_USE_SHARED=OFF -DARROW_JEMALLOC_USE_SHARED=OFF
-DARROW_LZ4_USE_SHARED=OFF -DARROW_OPENSSL_USE_SHARED=OFF
-DARROW_PROTOBUF_USE_SHARED=OFF -DARROW_SNAPPY_USE_SHARED=OFF
-DARROW_THRIFT_USE_SHARED=OFF -DARROW_UTF8PROC_USE_SHARED=OFF
-DARROW_ZSTD_USE_SHARED=OFF -DARROW_USE_GLOG=OFF -DARROW_WITH_BACKTRACE=OFF
-DARROW_WITH_OPENTELEMETRY=OFF -DARROW_WITH_BROTLI=OFF -DARROW_WITH_BZ2=OFF
-DARROW_WITH_LZ4=OFF -DARROW_WITH_SNAPPY=OFF -DARROW_WITH_ZLIB=OFF
-DARROW_WITH_ZSTD=OFF -DARROW_WITH_UCX=OFF -DARROW_WITH_UTF8PROC=OFF
-DARROW_WITH_RE2=OFF <SOURCE_DIR>/cpp
CMAKE_ARGS -Wno-dev
UPDATE_COMMAND "")
ARROW_EP
GIT_REPOSITORY "https://github.com/apache/arrow"
GIT_TAG ea6875fd2a3ac66547a9a33c5506da94f3ff07f2
PREFIX "${CMAKE_BINARY_DIR}/third_party/arrow"
INSTALL_DIR "${CMAKE_BINARY_DIR}/third_party/arrow/install"
BUILD_BYPRODUCTS <INSTALL_DIR>/lib/libarrow.a
CONFIGURE_COMMAND
${CMAKE_COMMAND} -G${CMAKE_GENERATOR} ${OSX_ARCH_FLAG}
-DCMAKE_BUILD_TYPE=Release
-DCMAKE_INSTALL_PREFIX=${CMAKE_BINARY_DIR}/third_party/arrow/install
-DCMAKE_INSTALL_LIBDIR=lib -DARROW_BUILD_STATIC=ON -DARROW_BUILD_SHARED=OFF
-DARROW_NO_DEPRECATED_API=ON -DARROW_POSITION_INDEPENDENT_CODE=ON
-DARROW_SIMD_LEVEL=NONE -DARROW_ENABLE_TIMING_TESTS=OFF -DARROW_IPC=ON
-DARROW_JEMALLOC=OFF -DARROW_DEPENDENCY_SOURCE=BUNDLED
-DARROW_VERBOSE_THIRDPARTY_BUILD=OFF -DARROW_DEPENDENCY_USE_SHARED=OFF
-DARROW_BOOST_USE_SHARED=OFF -DARROW_BROTLI_USE_SHARED=OFF
-DARROW_BZ2_USE_SHARED=OFF -DARROW_GFLAGS_USE_SHARED=OFF
-DARROW_GRPC_USE_SHARED=OFF -DARROW_JEMALLOC_USE_SHARED=OFF
-DARROW_LZ4_USE_SHARED=OFF -DARROW_OPENSSL_USE_SHARED=OFF
-DARROW_PROTOBUF_USE_SHARED=OFF -DARROW_SNAPPY_USE_SHARED=OFF
-DARROW_THRIFT_USE_SHARED=OFF -DARROW_UTF8PROC_USE_SHARED=OFF
-DARROW_ZSTD_USE_SHARED=OFF -DARROW_USE_GLOG=OFF -DARROW_WITH_BACKTRACE=OFF
-DARROW_WITH_OPENTELEMETRY=OFF -DARROW_WITH_BROTLI=OFF -DARROW_WITH_BZ2=OFF
-DARROW_WITH_LZ4=OFF -DARROW_WITH_SNAPPY=OFF -DARROW_WITH_ZLIB=OFF
-DARROW_WITH_ZSTD=OFF -DARROW_WITH_UCX=OFF -DARROW_WITH_UTF8PROC=OFF
-DARROW_WITH_RE2=OFF <SOURCE_DIR>/cpp
CMAKE_ARGS -Wno-dev
UPDATE_COMMAND "")

ExternalProject_Get_Property(ARROW_EP install_dir)
add_library(arrow STATIC IMPORTED GLOBAL)
if(WIN32)
set_target_properties(arrow PROPERTIES IMPORTED_LOCATION ${install_dir}/lib/arrow_static.lib)
set_target_properties(arrow PROPERTIES IMPORTED_LOCATION
${install_dir}/lib/arrow_static.lib)
else()
set_target_properties(arrow PROPERTIES IMPORTED_LOCATION ${install_dir}/lib/libarrow.a)
set_target_properties(arrow PROPERTIES IMPORTED_LOCATION
${install_dir}/lib/libarrow.a)
endif()

# create static library
Expand All @@ -71,12 +70,14 @@ build_loadable_extension(${TARGET_NAME} ${PARAMETERS} ${EXTENSION_SOURCES})
add_dependencies(${TARGET_NAME}_loadable_extension ARROW_EP)
target_link_libraries(${TARGET_NAME}_loadable_extension arrow)
if(WIN32)
target_compile_definitions(${TARGET_NAME}_loadable_extension PUBLIC ARROW_STATIC)
target_compile_definitions(${TARGET_NAME}_loadable_extension
PUBLIC ARROW_STATIC)
endif()
target_include_directories(${TARGET_NAME}_loadable_extension PRIVATE ${install_dir}/include)
target_include_directories(${TARGET_NAME}_loadable_extension
PRIVATE ${install_dir}/include)

install(
TARGETS ${EXTENSION_NAME}
EXPORT "${DUCKDB_EXPORT_SET}"
LIBRARY DESTINATION "${INSTALL_LIB_DIR}"
ARCHIVE DESTINATION "${INSTALL_LIB_DIR}")
TARGETS ${EXTENSION_NAME}
EXPORT "${DUCKDB_EXPORT_SET}"
LIBRARY DESTINATION "${INSTALL_LIB_DIR}"
ARCHIVE DESTINATION "${INSTALL_LIB_DIR}")
17 changes: 7 additions & 10 deletions src/arrow_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,24 @@
namespace duckdb {

static void LoadInternal(DatabaseInstance &instance) {
ExtensionUtil::RegisterFunction(instance, ToArrowIPCFunction::GetFunction());
ExtensionUtil::RegisterFunction(instance, ArrowIPCTableFunction::GetFunction());
ExtensionUtil::RegisterFunction(instance, ToArrowIPCFunction::GetFunction());
ExtensionUtil::RegisterFunction(instance,
ArrowIPCTableFunction::GetFunction());
}

void ArrowExtension::Load(DuckDB &db) {
LoadInternal(*db.instance);
}
std::string ArrowExtension::Name() {
return "arrow";
}
void ArrowExtension::Load(DuckDB &db) { LoadInternal(*db.instance); }
std::string ArrowExtension::Name() { return "arrow"; }

} // namespace duckdb

extern "C" {

DUCKDB_EXTENSION_API void arrow_init(duckdb::DatabaseInstance &db) {
LoadInternal(db);
LoadInternal(db);
}

DUCKDB_EXTENSION_API const char *arrow_version() {
return duckdb::DuckDB::LibraryVersion();
return duckdb::DuckDB::LibraryVersion();
}
}

Expand Down
208 changes: 114 additions & 94 deletions src/arrow_scan_ipc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,111 +3,131 @@
namespace duckdb {

TableFunction ArrowIPCTableFunction::GetFunction() {
child_list_t <LogicalType> make_buffer_struct_children{{"ptr", LogicalType::UBIGINT},
{"size", LogicalType::UBIGINT}};

TableFunction scan_arrow_ipc_func(
"scan_arrow_ipc", {LogicalType::LIST(LogicalType::STRUCT(make_buffer_struct_children))},
ArrowIPCTableFunction::ArrowScanFunction, ArrowIPCTableFunction::ArrowScanBind,
ArrowTableFunction::ArrowScanInitGlobal, ArrowTableFunction::ArrowScanInitLocal);

scan_arrow_ipc_func.cardinality = ArrowTableFunction::ArrowScanCardinality;
scan_arrow_ipc_func.get_batch_index = nullptr; // TODO implement
scan_arrow_ipc_func.projection_pushdown = true;
scan_arrow_ipc_func.filter_pushdown = false;

return scan_arrow_ipc_func;
child_list_t<LogicalType> make_buffer_struct_children{
{"ptr", LogicalType::UBIGINT}, {"size", LogicalType::UBIGINT}};

TableFunction scan_arrow_ipc_func(
"scan_arrow_ipc",
{LogicalType::LIST(LogicalType::STRUCT(make_buffer_struct_children))},
ArrowIPCTableFunction::ArrowScanFunction,
ArrowIPCTableFunction::ArrowScanBind,
ArrowTableFunction::ArrowScanInitGlobal,
ArrowTableFunction::ArrowScanInitLocal);

scan_arrow_ipc_func.cardinality = ArrowTableFunction::ArrowScanCardinality;
scan_arrow_ipc_func.get_batch_index = nullptr; // TODO implement
scan_arrow_ipc_func.projection_pushdown = true;
scan_arrow_ipc_func.filter_pushdown = false;

return scan_arrow_ipc_func;
}

unique_ptr <FunctionData> ArrowIPCTableFunction::ArrowScanBind(ClientContext &context, TableFunctionBindInput &input,
vector <LogicalType> &return_types, vector <string> &names) {
auto stream_decoder = make_uniq<BufferingArrowIPCStreamDecoder>();
unique_ptr<FunctionData> ArrowIPCTableFunction::ArrowScanBind(
ClientContext &context, TableFunctionBindInput &input,
vector<LogicalType> &return_types, vector<string> &names) {
auto stream_decoder = make_uniq<BufferingArrowIPCStreamDecoder>();

// Decode buffer ptr list
auto buffer_ptr_list = ListValue::GetChildren(input.inputs[0]);
for (auto &buffer_ptr_struct: buffer_ptr_list) {
auto unpacked = StructValue::GetChildren(buffer_ptr_struct);
uint64_t ptr = unpacked[0].GetValue<uint64_t>();
uint64_t size = unpacked[1].GetValue<uint64_t>();
// Decode buffer ptr list
auto buffer_ptr_list = ListValue::GetChildren(input.inputs[0]);
for (auto &buffer_ptr_struct : buffer_ptr_list) {
auto unpacked = StructValue::GetChildren(buffer_ptr_struct);
uint64_t ptr = unpacked[0].GetValue<uint64_t>();
uint64_t size = unpacked[1].GetValue<uint64_t>();

// Feed stream into decoder
auto res = stream_decoder->Consume((const uint8_t *) ptr, size);
// Feed stream into decoder
auto res = stream_decoder->Consume((const uint8_t *)ptr, size);

if (!res.ok()) {
throw IOException("Invalid IPC stream");
}
if (!res.ok()) {
throw IOException("Invalid IPC stream");
}

if (!stream_decoder->buffer()->is_eos()) {
throw IOException("IPC buffers passed to arrow scan should contain entire stream");
}

if (!stream_decoder->buffer()->is_eos()) {
throw IOException(
"IPC buffers passed to arrow scan should contain entire stream");
}

// These are the params I need to produce from the ipc buffers using the
// WebDB.cc code
auto stream_factory_ptr = (uintptr_t)&stream_decoder->buffer();
auto stream_factory_produce =
(stream_factory_produce_t)&ArrowIPCStreamBufferReader::CreateStream;
auto stream_factory_get_schema =
(stream_factory_get_schema_t)&ArrowIPCStreamBufferReader::GetSchema;
auto res = make_uniq<ArrowIPCScanFunctionData>(stream_factory_produce,
stream_factory_ptr);

// Store decoder
res->stream_decoder = std::move(stream_decoder);

// TODO Everything below this is identical to the bind in
// duckdb/src/function/table/arrow.cpp
auto &data = *res;
stream_factory_get_schema((ArrowArrayStream *)stream_factory_ptr,
data.schema_root.arrow_schema);
for (idx_t col_idx = 0;
col_idx < (idx_t)data.schema_root.arrow_schema.n_children; col_idx++) {
auto &schema = *data.schema_root.arrow_schema.children[col_idx];
if (!schema.release) {
throw InvalidInputException("arrow_scan: released schema passed");
}

// These are the params I need to produce from the ipc buffers using the WebDB.cc code
auto stream_factory_ptr = (uintptr_t) & stream_decoder->buffer();
auto stream_factory_produce = (stream_factory_produce_t) & ArrowIPCStreamBufferReader::CreateStream;
auto stream_factory_get_schema = (stream_factory_get_schema_t) & ArrowIPCStreamBufferReader::GetSchema;
auto res = make_uniq<ArrowIPCScanFunctionData>(stream_factory_produce, stream_factory_ptr);

// Store decoder
res->stream_decoder = std::move(stream_decoder);

// TODO Everything below this is identical to the bind in duckdb/src/function/table/arrow.cpp
auto &data = *res;
stream_factory_get_schema((ArrowArrayStream *) stream_factory_ptr, data.schema_root.arrow_schema);
for (idx_t col_idx = 0; col_idx < (idx_t) data.schema_root.arrow_schema.n_children; col_idx++) {
auto &schema = *data.schema_root.arrow_schema.children[col_idx];
if (!schema.release) {
throw InvalidInputException("arrow_scan: released schema passed");
}
auto arrow_type = GetArrowLogicalType(schema);
if (schema.dictionary) {
auto dictionary_type = GetArrowLogicalType(*schema.dictionary);
return_types.emplace_back(dictionary_type->GetDuckType());
arrow_type->SetDictionary(std::move(dictionary_type));
} else {
return_types.emplace_back(arrow_type->GetDuckType());
}
res->arrow_table.AddColumn(col_idx, std::move(arrow_type));
auto format = string(schema.format);
auto name = string(schema.name);
if (name.empty()) {
name = string("v") + to_string(col_idx);
}
names.push_back(name);
auto arrow_type = GetArrowLogicalType(schema);
if (schema.dictionary) {
auto dictionary_type = GetArrowLogicalType(*schema.dictionary);
return_types.emplace_back(dictionary_type->GetDuckType());
arrow_type->SetDictionary(std::move(dictionary_type));
} else {
return_types.emplace_back(arrow_type->GetDuckType());
}
QueryResult::DeduplicateColumns(names);
return std::move(res);
res->arrow_table.AddColumn(col_idx, std::move(arrow_type));
auto format = string(schema.format);
auto name = string(schema.name);
if (name.empty()) {
name = string("v") + to_string(col_idx);
}
names.push_back(name);
}
QueryResult::DeduplicateColumns(names);
return std::move(res);
}

// Same as regular arrow scan, except ArrowToDuckDB call TODO: refactor to allow nicely overriding this
void ArrowIPCTableFunction::ArrowScanFunction(ClientContext &context, TableFunctionInput &data_p, DataChunk &output) {
if (!data_p.local_state) {
return;
}
auto &data = data_p.bind_data->CastNoConst<ArrowScanFunctionData>();
auto &state = data_p.local_state->Cast<ArrowScanLocalState>();
auto &global_state = data_p.global_state->Cast<ArrowScanGlobalState>();

//! Out of tuples in this chunk
if (state.chunk_offset >= (idx_t)state.chunk->arrow_array.length) {
if (!ArrowScanParallelStateNext(context, data_p.bind_data.get(), state, global_state)) {
return;
}
// Same as regular arrow scan, except ArrowToDuckDB call TODO: refactor to allow
// nicely overriding this
void ArrowIPCTableFunction::ArrowScanFunction(ClientContext &context,
TableFunctionInput &data_p,
DataChunk &output) {
if (!data_p.local_state) {
return;
}
auto &data = data_p.bind_data->CastNoConst<ArrowScanFunctionData>();
auto &state = data_p.local_state->Cast<ArrowScanLocalState>();
auto &global_state = data_p.global_state->Cast<ArrowScanGlobalState>();

//! Out of tuples in this chunk
if (state.chunk_offset >= (idx_t)state.chunk->arrow_array.length) {
if (!ArrowScanParallelStateNext(context, data_p.bind_data.get(), state,
global_state)) {
return;
}
int64_t output_size = MinValue<int64_t>(STANDARD_VECTOR_SIZE, state.chunk->arrow_array.length - state.chunk_offset);
data.lines_read += output_size;
if (global_state.CanRemoveFilterColumns()) {
state.all_columns.Reset();
state.all_columns.SetCardinality(output_size);
ArrowToDuckDB(state, data.arrow_table.GetColumns(), state.all_columns, data.lines_read - output_size, false);
output.ReferenceColumns(state.all_columns, global_state.projection_ids);
} else {
output.SetCardinality(output_size);
ArrowToDuckDB(state, data.arrow_table.GetColumns(), output, data.lines_read - output_size, false);
}

output.Verify();
state.chunk_offset += output.size();
}
int64_t output_size =
MinValue<int64_t>(STANDARD_VECTOR_SIZE,
state.chunk->arrow_array.length - state.chunk_offset);
data.lines_read += output_size;
if (global_state.CanRemoveFilterColumns()) {
state.all_columns.Reset();
state.all_columns.SetCardinality(output_size);
ArrowToDuckDB(state, data.arrow_table.GetColumns(), state.all_columns,
data.lines_read - output_size, false);
output.ReferenceColumns(state.all_columns, global_state.projection_ids);
} else {
output.SetCardinality(output_size);
ArrowToDuckDB(state, data.arrow_table.GetColumns(), output,
data.lines_read - output_size, false);
}

output.Verify();
state.chunk_offset += output.size();
}

} // namespace duckdb
Loading

0 comments on commit ec0207c

Please sign in to comment.