From 751d1a47fcc1a4ece529d511dd8b09d169c7775a Mon Sep 17 00:00:00 2001 From: akvlad <2798590+akvlad@users.noreply.github.com> Date: Fri, 11 Oct 2024 19:34:47 +0300 Subject: [PATCH] Feature: read_parquet_mergetree (#13) * parquet ordered scan / mergetree like feature Co-authored-by: Lorenzo Mangani --- CMakeLists.txt | 23 +- Makefile | 2 +- chsql/CMakeLists.txt | 34 +++ chsql/extension_config.cmake | 22 ++ {src => chsql/src}/chsql_extension.cpp | 3 +- .../src}/include/chsql_extension.hpp | 3 +- chsql/src/parquet_ordered_scan.cpp | 286 ++++++++++++++++++ chsql/src/silly_btree_store.cpp | 85 ++++++ {test => chsql/test}/README.md | 0 {test => chsql/test}/sql/chsql.test | 19 ++ vcpkg.json => chsql/vcpkg.json | 0 extension_config.cmake | 10 - 12 files changed, 454 insertions(+), 33 deletions(-) create mode 100644 chsql/CMakeLists.txt create mode 100644 chsql/extension_config.cmake rename {src => chsql/src}/chsql_extension.cpp (99%) rename {src => chsql/src}/include/chsql_extension.hpp (67%) create mode 100644 chsql/src/parquet_ordered_scan.cpp create mode 100644 chsql/src/silly_btree_store.cpp rename {test => chsql/test}/README.md (100%) rename {test => chsql/test}/sql/chsql.test (88%) rename vcpkg.json => chsql/vcpkg.json (100%) delete mode 100644 extension_config.cmake diff --git a/CMakeLists.txt b/CMakeLists.txt index fc06a4c..f750372 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,30 +1,13 @@ cmake_minimum_required(VERSION 3.5) - # Set extension name here set(TARGET_NAME chsql) - # DuckDB's extension distribution supports vcpkg. As such, dependencies can be added in ./vcpkg.json and then # used in cmake with find_package. Feel free to remove or replace with other dependencies. # Note that it should also be removed from vcpkg.json to prevent needlessly installing it.. find_package(OpenSSL REQUIRED) - set(EXTENSION_NAME ${TARGET_NAME}_extension) set(LOADABLE_EXTENSION_NAME ${TARGET_NAME}_loadable_extension) - project(${TARGET_NAME}) -include_directories(src/include) - -set(EXTENSION_SOURCES src/chsql_extension.cpp) - -build_static_extension(${TARGET_NAME} ${EXTENSION_SOURCES}) -build_loadable_extension(${TARGET_NAME} " " ${EXTENSION_SOURCES}) - -# Link OpenSSL in both the static library as the loadable extension -target_link_libraries(${EXTENSION_NAME} OpenSSL::SSL OpenSSL::Crypto) -target_link_libraries(${LOADABLE_EXTENSION_NAME} OpenSSL::SSL OpenSSL::Crypto) - -install( - TARGETS ${EXTENSION_NAME} - EXPORT "${DUCKDB_EXPORT_SET}" - LIBRARY DESTINATION "${INSTALL_LIB_DIR}" - ARCHIVE DESTINATION "${INSTALL_LIB_DIR}") +set(EXT_NAME chsql) +set(DUCKDB_EXTENSION_CONFIGS ../chsql/extension_config.cmake) +add_subdirectory(./duckdb) diff --git a/Makefile b/Makefile index d1b6c95..acd730b 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -PROJ_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST)))) +PROJ_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))chsql/ # Configuration of extension EXT_NAME=chsql diff --git a/chsql/CMakeLists.txt b/chsql/CMakeLists.txt new file mode 100644 index 0000000..06de4a9 --- /dev/null +++ b/chsql/CMakeLists.txt @@ -0,0 +1,34 @@ +cmake_minimum_required(VERSION 3.5) +# Set extension name here +set(TARGET_NAME chsql) +# DuckDB's extension distribution supports vcpkg. As such, dependencies can be added in ./vcpkg.json and then +# used in cmake with find_package. Feel free to remove or replace with other dependencies. +# Note that it should also be removed from vcpkg.json to prevent needlessly installing it.. +find_package(OpenSSL REQUIRED) +set(EXTENSION_NAME ${TARGET_NAME}_extension) +set(LOADABLE_EXTENSION_NAME ${TARGET_NAME}_loadable_extension) +project(${TARGET_NAME}) + +include_directories( + ./src/include + ./src + ${CMAKE_CURRENT_SOURCE_DIR}/../duckdb/extension/parquet/include + ../duckdb/third_party/lz4 + ../duckdb/third_party/parquet + ../duckdb/third_party/thrift + ../duckdb/third_party/snappy + ../duckdb/third_party/zstd/include + ../duckdb/third_party/mbedtls + ../duckdb/third_party/mbedtls/include + ../duckdb/third_party/brotli/include) +set(EXTENSION_SOURCES src/chsql_extension.cpp) +build_static_extension(${TARGET_NAME} ${EXTENSION_SOURCES}) +build_loadable_extension(${TARGET_NAME} " " ${EXTENSION_SOURCES}) +# Link OpenSSL in both the static library as the loadable extension +target_link_libraries(${EXTENSION_NAME} OpenSSL::SSL OpenSSL::Crypto) +target_link_libraries(${LOADABLE_EXTENSION_NAME} OpenSSL::SSL OpenSSL::Crypto) +install( + TARGETS ${EXTENSION_NAME} + EXPORT "${DUCKDB_EXPORT_SET}" + LIBRARY DESTINATION "${INSTALL_LIB_DIR}" + ARCHIVE DESTINATION "${INSTALL_LIB_DIR}") diff --git a/chsql/extension_config.cmake b/chsql/extension_config.cmake new file mode 100644 index 0000000..776f13b --- /dev/null +++ b/chsql/extension_config.cmake @@ -0,0 +1,22 @@ +# This file is included by DuckDB's build system. It specifies which extension to load + +include_directories( + ./src/include + ${CMAKE_CURRENT_SOURCE_DIR}/../duckdb/extension/parquet/include + ../duckdb/third_party/lz4 + ../duckdb/third_party/parquet + ../duckdb/third_party/thrift + ../duckdb/third_party/snappy + ../duckdb/third_party/zstd/include + ../duckdb/third_party/mbedtls + ../duckdb/third_party/mbedtls/include + ../duckdb/third_party/brotli/include) + +# Extension from this repo +duckdb_extension_load(chsql + SOURCE_DIR ${CMAKE_CURRENT_LIST_DIR} + LOAD_TESTS +) + +# Any extra extensions that should be built +# e.g.: duckdb_extension_load(json) diff --git a/src/chsql_extension.cpp b/chsql/src/chsql_extension.cpp similarity index 99% rename from src/chsql_extension.cpp rename to chsql/src/chsql_extension.cpp index 6756dea..5d55613 100644 --- a/src/chsql_extension.cpp +++ b/chsql/src/chsql_extension.cpp @@ -12,7 +12,7 @@ // OpenSSL linked through vcpkg #include - +#include "parquet_ordered_scan.cpp" namespace duckdb { // To add a new scalar SQL macro, add a new macro to this array! @@ -188,6 +188,7 @@ static void LoadInternal(DatabaseInstance &instance) { auto table_info = DefaultTableFunctionGenerator::CreateTableMacroInfo(chsql_table_macros[index]); ExtensionUtil::RegisterFunction(instance, *table_info); } + ExtensionUtil::RegisterFunction(instance, ReadParquetOrderedFunction()); } void ChsqlExtension::Load(DuckDB &db) { diff --git a/src/include/chsql_extension.hpp b/chsql/src/include/chsql_extension.hpp similarity index 67% rename from src/include/chsql_extension.hpp rename to chsql/src/include/chsql_extension.hpp index 57abf17..33e2c6f 100644 --- a/src/include/chsql_extension.hpp +++ b/chsql/src/include/chsql_extension.hpp @@ -10,5 +10,6 @@ class ChsqlExtension : public Extension { std::string Name() override; std::string Version() const override; }; - +duckdb::TableFunction ReadParquetOrderedFunction(); +static void RegisterSillyBTreeStore(DatabaseInstance &instance); } // namespace duckdb diff --git a/chsql/src/parquet_ordered_scan.cpp b/chsql/src/parquet_ordered_scan.cpp new file mode 100644 index 0000000..be3c617 --- /dev/null +++ b/chsql/src/parquet_ordered_scan.cpp @@ -0,0 +1,286 @@ +#include +#include "duckdb/common/exception.hpp" +#include +#include "chsql_extension.hpp" +#include + +namespace duckdb { + + struct ReaderSet { + unique_ptr reader; + idx_t orderByIdx; + unique_ptr chunk; + unique_ptr scanState; + vector columnMap; + idx_t result_idx; + }; + + struct OrderedReadFunctionData : FunctionData { + string orderBy; + vector> sets; + vector files; + vector returnTypes; + vector names; + unique_ptr Copy() const override { + throw std::runtime_error("not implemented"); + } + static bool EqualStrArrays(const vector &a, const vector &b) { + if (a.size() != b.size()) { + return false; + } + for (int i = 0; i < a.size(); i++) { + if (a[i] != b[i]) { + return false; + } + } + return true; + } + bool Equals(const FunctionData &other) const override { + const auto &o = other.Cast(); + if (!EqualStrArrays(o.files, files)) { + return false; + } + return this->orderBy == o.orderBy; + }; + }; + + + + struct OrderedReadLocalState: LocalTableFunctionState { + vector> sets; + vector winner_group; + void RecalculateWinnerGroup() { + winner_group.clear(); + if (sets.empty()) { + return; + } + idx_t winner_idx = 0; + for (idx_t i = 1; i < sets.size(); i++) { + const auto &s = sets[i]; + const auto &w = sets[winner_idx]; + if (s->chunk->GetValue(s->orderByIdx, s->result_idx) < + w->chunk->GetValue(w->orderByIdx, w->result_idx)) { + winner_idx = i; + } + } + winner_group.push_back(winner_idx); + auto &w = sets[winner_idx]; + const auto &wLast = w->chunk->GetValue(w->orderByIdx, w->chunk->size()-1); + for (idx_t i = 0; i < sets.size(); i++) { + if (i == winner_idx) continue; + auto &s = sets[i]; + const auto &sFirst = s->chunk->GetValue(s->orderByIdx, s->result_idx); + if (sFirst <= wLast) { + winner_group.push_back(i); + } + } + } + void RemoveSetGracefully(const idx_t idx) { + if (idx != sets.size() - 1) { + sets[idx].reset(sets[sets.size() - 1].release()); + } + sets.pop_back(); + } + }; + + + static unique_ptr OrderedParquetScanBind(ClientContext &context, TableFunctionBindInput &input, + vector &return_types, vector &names) { + Connection conn(*context.db); + auto res = make_uniq(); + auto files = ListValue::GetChildren(input.inputs[0]); + vector fileNames; + for (auto & file : files) { + fileNames.push_back(file.ToString()); + } + GlobMultiFileList fileList(context, fileNames, FileGlobOptions::ALLOW_EMPTY); + string filename; + MultiFileListScanData it; + fileList.InitializeScan(it); + vector unglobbedFileList; + while (fileList.Scan(it, filename)) { + unglobbedFileList.push_back(filename); + } + + res->orderBy = input.inputs[1].GetValue(); + for (auto & file : unglobbedFileList) { + auto set = make_uniq(); + res->files.push_back(file); + ParquetOptions po; + po.binary_as_string = true; + ParquetReader reader(context, file, po, nullptr); + set->columnMap = vector(); + for (auto &el : reader.metadata->metadata->schema) { + if (el.num_children != 0) { + continue; + } + auto name_it = std::find(names.begin(), names.end(), el.name); + auto return_type = LogicalType::ANY; + switch (el.type) { + case Type::INT32: + return_type = LogicalType::INTEGER; + break; + case Type::INT64: + return_type = LogicalType::BIGINT; + break; + case Type::DOUBLE: + return_type = LogicalType::DOUBLE; + break; + case Type::FLOAT: + return_type = LogicalType::FLOAT; + break; + case Type::BYTE_ARRAY: + return_type = LogicalType::VARCHAR; + case Type::FIXED_LEN_BYTE_ARRAY: + return_type = LogicalType::VARCHAR; + break; + case Type::BOOLEAN: + return_type = LogicalType::TINYINT; + break; + default: + break;; + } + set->columnMap.push_back(name_it - names.begin()); + if (el.name == res->orderBy) { + set->orderByIdx = name_it - names.begin(); + } + if (name_it != names.end()) { + if (return_types[name_it - names.begin()] != return_type) { + throw std::runtime_error("incompatible schema"); + } + continue; + } + return_types.push_back(return_type); + names.push_back(el.name); + } + res->sets.push_back(std::move(set)); + } + res->returnTypes = return_types; + res->names = names; + return std::move(res); + } + + static unique_ptr + ParquetScanInitLocal(ExecutionContext &context, TableFunctionInitInput &input, GlobalTableFunctionState *gstate_p) { + auto res = make_uniq(); + const auto &bindData = input.bind_data->Cast(); + ParquetOptions po; + po.binary_as_string = true; + for (int i = 0; i < bindData.files.size(); i++) { + auto set = make_uniq(); + set->reader = make_uniq(context.client, bindData.files[i], po, nullptr); + set->scanState = make_uniq(); + int j = 0; + for (auto &el : set->reader->metadata->metadata->schema) { + if (el.num_children != 0) { + continue; + } + set->reader->reader_data.column_ids.push_back(j); + j++; + } + set->columnMap = bindData.sets[i]->columnMap; + set->reader->reader_data.column_mapping = set->columnMap; + vector rgs(set->reader->metadata->metadata->row_groups.size(), 0); + for (idx_t i = 0; i < rgs.size(); i++) { + rgs[i] = i; + } + set->reader->InitializeScan(context.client, *set->scanState, rgs); + set->chunk = make_uniq(); + + set->orderByIdx = bindData.sets[i]->orderByIdx; + set->result_idx = 0; + auto ltypes = vector(); + for (const auto idx : set->columnMap) { + ltypes.push_back(bindData.returnTypes[idx]); + } + set->chunk->Initialize(context.client, ltypes); + set->reader->Scan(*set->scanState, *set->chunk); + res->sets.push_back(std::move(set)); + } + res->RecalculateWinnerGroup(); + return std::move(res); + } + + static void ParquetOrderedScanImplementation( + ClientContext &context, duckdb::TableFunctionInput &data_p,DataChunk &output) { + auto &loc_state = data_p.local_state->Cast(); + const auto &fieldNames = data_p.bind_data->Cast().names; + const auto &returnTypes = data_p.bind_data->Cast().returnTypes; + bool toRecalc = false; + for (int i = loc_state.sets.size() - 1; i >= 0 ; i--) { + if (loc_state.sets[i]->result_idx >= loc_state.sets[i]->chunk->size()) { + auto &set = loc_state.sets[i]; + set->chunk->Reset(); + loc_state.sets[i]->reader->Scan( + *loc_state.sets[i]->scanState, + *loc_state.sets[i]->chunk); + loc_state.sets[i]->result_idx = 0; + + if (loc_state.sets[i]->chunk->size() == 0) { + loc_state.RemoveSetGracefully(i); + } + toRecalc = true; + } + } + if (loc_state.sets.empty()) { + return; + } + if (toRecalc) { + loc_state.RecalculateWinnerGroup(); + } + int cap = 1024; + output.Reset(); + output.SetCapacity(cap); + idx_t j = 0; + if (loc_state.winner_group.size() == 1) { + auto &set = loc_state.sets[loc_state.winner_group[0]]; + set->chunk->Slice(set->result_idx, set->chunk->size() - set->result_idx); + output.Append(*set->chunk, true); + output.SetCardinality(set->chunk->size()); + set->result_idx = set->chunk->size(); + return; + } + while(true) { + auto winnerSet = &loc_state.sets[loc_state.winner_group[0]]; + Value winner_val = (*winnerSet)->chunk->GetValue( + (*winnerSet)->orderByIdx, + (*winnerSet)->result_idx + ); + for (int k = 1; k < loc_state.winner_group.size(); k++) { + const auto i = loc_state.winner_group[k]; + const auto &set = loc_state.sets[i]; + const Value &val = set->chunk->GetValue(set->orderByIdx, set->result_idx); + if (val < winner_val) { + winnerSet = &loc_state.sets[i]; + winner_val = (*winnerSet)->chunk->GetValue(set->orderByIdx, set->result_idx); + } + } + for (int i = 0; i < fieldNames.size(); i++) { + const auto &val = (*winnerSet)->chunk->GetValue(i,(*winnerSet)->result_idx); + output.SetValue(i, j, val); + } + j++; + (*winnerSet)->result_idx++; + if ((*winnerSet)->result_idx >= (*winnerSet)->chunk->size() || j >= 2048) { + output.SetCardinality(j); + return; + } + if (j >= cap) { + cap *= 2; + output.SetCapacity(cap); + } + } + } + + TableFunction ReadParquetOrderedFunction() { + TableFunction tf = duckdb::TableFunction( + "read_parquet_mergetree", + {LogicalType::LIST(LogicalType::VARCHAR), LogicalType::VARCHAR}, + ParquetOrderedScanImplementation, + OrderedParquetScanBind, + nullptr, + ParquetScanInitLocal + ); + return tf; + } +} diff --git a/chsql/src/silly_btree_store.cpp b/chsql/src/silly_btree_store.cpp new file mode 100644 index 0000000..3def1f3 --- /dev/null +++ b/chsql/src/silly_btree_store.cpp @@ -0,0 +1,85 @@ +#include +#include +#include +#include + +namespace duckdb { + struct SillyTreeKey { + //TODO: key-value pair using duckdb::Value + }; + struct SillyTreeValue { + //TODO: key-value pair using duckdb::Value + }; + static std::unordered_map> silly_btree_store; + static std::unordered_map> silly_btree_store_mutex; + static std::mutex silly_btree_store_mutex_lock; + + + + struct _lock { + std::mutex &_m; + _lock(std::mutex &m) : _m(m) { + _m.lock(); + } + ~_lock() { + _m.unlock(); + } + }; + + struct AppendTreeData:FunctionData { + unique_ptr Copy() const override { + throw std::runtime_error("not implemented"); + } + + bool Equals(const FunctionData &other) const override { + return false; + } + + idx_t tree_id; + Value key; + Value value; + }; + + static unique_ptr AppendTreeBind(ClientContext &context, TableFunctionBindInput &input, + vector &return_types, vector &names) { + auto res = make_uniq(); + res->tree_id = input.inputs[0].GetValue(); + res->key = input.inputs[1].GetValue(); + res->value = input.inputs[2].GetValue(); + return_types.clear(); + return_types.push_back(LogicalType::TINYINT); + names.clear(); + names.push_back("ok"); + return res; + } + + static unique_ptr AppendTreeLocalState( + ClientContext &context, TableFunctionBindInput &input, + vector &return_types, vector &names) { + return nullptr; + } + + struct SillyTree { + std::map &values; + std::unique_ptr<_lock> lock; + SillyTree(std::map &values, std::unique_ptr<_lock> lock): values(values), lock(std::move(lock)) {} + }; + + static unique_ptr GetTree(idx_t tree_id) { + auto l = _lock(silly_btree_store_mutex_lock); + auto it = silly_btree_store.find(tree_id); + if (it == silly_btree_store.end()) { + silly_btree_store[tree_id] = std::map(); + silly_btree_store_mutex[tree_id] = make_uniq(); + } + + } + + static void AppendTreeImplementation(ClientContext &context, duckdb::TableFunctionInput &data_p,DataChunk &output) { + + } + + static void RegisterSillyBtreeStore(DatabaseHeader &instance) { + //TODO: register store functions + } +} \ No newline at end of file diff --git a/test/README.md b/chsql/test/README.md similarity index 100% rename from test/README.md rename to chsql/test/README.md diff --git a/test/sql/chsql.test b/chsql/test/sql/chsql.test similarity index 88% rename from test/sql/chsql.test rename to chsql/test/sql/chsql.test index d753ece..84c2527 100644 --- a/test/sql/chsql.test +++ b/chsql/test/sql/chsql.test @@ -11,6 +11,8 @@ Catalog Error: Scalar Function with name chsql does not exist! # Require statement will ensure this test is run with this extension loaded require chsql +require parquet + # Confirm the extension works query I SELECT chsql('Works'); @@ -386,3 +388,20 @@ query I SELECT bitCount(7) -- Binary: 111 ---- 3 + +# read_mergetree +statement ok +copy (select number * 2 as n from numbers(100000)) TO '__TEST_DIR__/1.parquet'; + +statement ok +copy (select number * 2 + 1 as n from numbers(100000)) TO '__TEST_DIR__/2.parquet'; + +query I +select count() as c from read_parquet_mergetree(ARRAY['__TEST_DIR__/1.parquet', '__TEST_DIR__/2.parquet'], 'n'); +---- +200000 + +query I +select count() as c from (select n - lag(n) over () as diff from read_parquet_mergetree(ARRAY['__TEST_DIR__/1.parquet', '__TEST_DIR__/2.parquet'], 'n')) where diff <0; +---- +0 diff --git a/vcpkg.json b/chsql/vcpkg.json similarity index 100% rename from vcpkg.json rename to chsql/vcpkg.json diff --git a/extension_config.cmake b/extension_config.cmake deleted file mode 100644 index 85d494c..0000000 --- a/extension_config.cmake +++ /dev/null @@ -1,10 +0,0 @@ -# This file is included by DuckDB's build system. It specifies which extension to load - -# Extension from this repo -duckdb_extension_load(chsql - SOURCE_DIR ${CMAKE_CURRENT_LIST_DIR} - LOAD_TESTS -) - -# Any extra extensions that should be built -# e.g.: duckdb_extension_load(json)