From ec3a8b13d852ef5915d1ac4ce4b1485f5615f194 Mon Sep 17 00:00:00 2001 From: Sam Ansmink Date: Thu, 3 Oct 2024 17:12:11 +0200 Subject: [PATCH] improve delta snapshot sharing --- duckdb | 2 +- scripts/generate_test_data.py | 13 ++++++--- src/functions/delta_scan.cpp | 33 +++++++++++++++-------- src/include/functions/delta_scan.hpp | 15 ++++++----- src/include/storage/delta_table_entry.hpp | 2 +- src/storage/delta_schema_entry.cpp | 13 +++++---- src/storage/delta_table_entry.cpp | 4 +-- 7 files changed, 50 insertions(+), 32 deletions(-) diff --git a/duckdb b/duckdb index 6d777d8..c9d5b4f 160000 --- a/duckdb +++ b/duckdb @@ -1 +1 @@ -Subproject commit 6d777d83776d814283d125987320366056d6e549 +Subproject commit c9d5b4f24f82aef27d29445604f455f506c41a52 diff --git a/scripts/generate_test_data.py b/scripts/generate_test_data.py index b817faa..e3ab444 100644 --- a/scripts/generate_test_data.py +++ b/scripts/generate_test_data.py @@ -32,7 +32,7 @@ def generate_test_data_delta_rs_multi(path, init, tables, splits = 1): os.makedirs(f"{generated_path}") - # First we write a DuckDB file TODO: this should go in 10 appends as well? + # First we write a DuckDB file TODO: this should go in N appends as well? con = duckdb.connect(f"{generated_path}/duckdb.db") con.sql(init) @@ -40,14 +40,21 @@ def generate_test_data_delta_rs_multi(path, init, tables, splits = 1): # Then we write the parquet files for table in tables: total_count = con.sql(f"select count(*) from ({table['query']})").fetchall()[0][0] - tuples_per_file = math.ceil(total_count / splits) + # At least 1 tuple per file + if total_count < splits: + splits = total_count + tuples_per_file = total_count // splits + remainder = total_count % splits file_no = 0 + write_from = 0 while file_no < splits: os.makedirs(f"{generated_path}/{table['name']}/parquet", exist_ok=True) # Write DuckDB's reference data - con.sql(f"COPY ({table['query']} where rowid >= {(file_no) * tuples_per_file} and rowid < {(file_no+1) * tuples_per_file}) to '{generated_path}/{table['name']}/parquet/data_{file_no}.parquet' (FORMAT parquet)") + write_to = write_from + tuples_per_file + (1 if file_no < remainder else 0) + con.sql(f"COPY ({table['query']} where rowid >= {write_from} and rowid < {write_to}) to '{generated_path}/{table['name']}/parquet/data_{file_no}.parquet' (FORMAT parquet)") file_no += 1 + write_from = write_to for table in tables: con = duckdb.connect(f"{generated_path}/duckdb.db") diff --git a/src/functions/delta_scan.cpp b/src/functions/delta_scan.cpp index 0c109dc..20378e7 100644 --- a/src/functions/delta_scan.cpp +++ b/src/functions/delta_scan.cpp @@ -389,6 +389,12 @@ string DeltaSnapshot::ToDeltaPath(const string &raw_path) { } void DeltaSnapshot::Bind(vector &return_types, vector &names) { + if (have_bound) { + names = this->names; + return_types = this->types; + return; + } + if (!initialized) { InitializeFiles(); } @@ -405,7 +411,9 @@ void DeltaSnapshot::Bind(vector &return_types, vector &name return_types.push_back(field.second); } // Store the bound names for resolving the complex filter pushdown later + have_bound = true; this->names = names; + this->types = return_types; } string DeltaSnapshot::GetFile(idx_t i) { @@ -473,12 +481,19 @@ void DeltaSnapshot::InitializeFiles() { unique_ptr DeltaSnapshot::ComplexFilterPushdown(ClientContext &context, const MultiFileReaderOptions &options, MultiFilePushdownInfo &info, vector> &filters) { FilterCombiner combiner(context); + + // TODO: can we avoid constructing 2 scans for scans with filter pushdown? + if (filters.empty()) { + return nullptr; + } + for (const auto &filter : filters) { combiner.AddFilter(filter->Copy()); } auto filterstmp = combiner.GenerateTableScanFilters(info.column_ids); // TODO: can/should we figure out if this filtered anything? + auto filtered_list = make_uniq(context, paths[0]); filtered_list->table_filters = std::move(filterstmp); filtered_list->names = names; @@ -548,8 +563,7 @@ unique_ptr DeltaMultiFileReader::CreateInstance(const TableFunc auto result = make_uniq(); if (table_function.function_info) { - result->kernel_snapshot = table_function.function_info->Cast().snapshot; - result->kernel_snapshot_path = table_function.function_info->Cast().expected_path; + result->snapshot = table_function.function_info->Cast().snapshot; } return std::move(result); @@ -640,21 +654,18 @@ void DeltaMultiFileReader::FinalizeBind(const MultiFileReaderOptions &file_optio } } -unique_ptr DeltaMultiFileReader::CreateFileList(ClientContext &context, const vector& paths, FileGlobOptions options) { +shared_ptr DeltaMultiFileReader::CreateFileList(ClientContext &context, const vector& paths, FileGlobOptions options) { if (paths.size() != 1) { throw BinderException("'delta_scan' only supports single path as input"); } - if (kernel_snapshot) { - if (kernel_snapshot_path != paths[0]) { - throw InternalException("Expected path from injected function info did not match! '%s' expected to match '%s'", kernel_snapshot_path, paths[0]); - } + + if (snapshot) { + // TODO: somehow assert that we are querying the same path as this injected snapshot // This takes the kernel snapshot from the delta snapshot and ensures we use that snapshot for reading - if (kernel_snapshot) { - auto snapshot = make_uniq(context, paths[0]); - snapshot->snapshot = kernel_snapshot; - return std::move(snapshot); + if (snapshot) { + return snapshot; } } diff --git a/src/include/functions/delta_scan.hpp b/src/include/functions/delta_scan.hpp index 8dfe77b..a732c3a 100644 --- a/src/include/functions/delta_scan.hpp +++ b/src/include/functions/delta_scan.hpp @@ -12,9 +12,10 @@ #include "duckdb/common/multi_file_reader.hpp" namespace duckdb { +struct DeltaSnapshot; struct DeltaFunctionInfo : public TableFunctionInfo { - shared_ptr snapshot; + shared_ptr snapshot; string expected_path; }; @@ -84,6 +85,8 @@ struct DeltaSnapshot : public MultiFileList { //! Names vector names; + vector types; + bool have_bound = false; //! Metadata map for files vector> metadata; @@ -111,7 +114,7 @@ struct DeltaMultiFileReaderGlobalState : public MultiFileReaderGlobalState { struct DeltaMultiFileReader : public MultiFileReader { static unique_ptr CreateInstance(const TableFunction &table_function); //! Return a DeltaSnapshot - unique_ptr CreateFileList(ClientContext &context, const vector &paths, + shared_ptr CreateFileList(ClientContext &context, const vector &paths, FileGlobOptions options) override; //! Override the regular parquet bind using the MultiFileReader Bind. The bind from these are what DuckDB's file @@ -148,11 +151,9 @@ struct DeltaMultiFileReader : public MultiFileReader { bool ParseOption(const string &key, const Value &val, MultiFileReaderOptions &options, ClientContext &context) override; - // For the TableFunction used on attached delta tables, the kernel snapshot is injected into the MultiFileReader to - // ensure the table is read at the correct timestamp - shared_ptr kernel_snapshot; - // Path for when kernel_snapshot is set: this multifilereader can then only be used to scan this path - string kernel_snapshot_path; + // A snapshot can be injected into the multifilereader, this ensures the GetMultiFileList can return this snapshot + // (note that the path should match the one passed to CreateFileList) + shared_ptr snapshot; }; } // namespace duckdb diff --git a/src/include/storage/delta_table_entry.hpp b/src/include/storage/delta_table_entry.hpp index 8ec10db..c131694 100644 --- a/src/include/storage/delta_table_entry.hpp +++ b/src/include/storage/delta_table_entry.hpp @@ -30,7 +30,7 @@ class DeltaTableEntry : public TableCatalogEntry { ClientContext &context) override; public: - shared_ptr snapshot; + shared_ptr snapshot; }; } // namespace duckdb diff --git a/src/storage/delta_schema_entry.cpp b/src/storage/delta_schema_entry.cpp index 2b4bc4b..2c0a906 100644 --- a/src/storage/delta_schema_entry.cpp +++ b/src/storage/delta_schema_entry.cpp @@ -127,7 +127,7 @@ static unique_ptr CreateTableEntry(ClientContext &context, Delt void DeltaSchemaEntry::Scan(ClientContext &context, CatalogType type, const std::function &callback) { if (!CatalogTypeIsSupported(type)) { - CatalogTransaction transaction(this->catalog, context); + auto transaction = catalog.GetCatalogTransaction(context); auto default_table = GetEntry(transaction, type, DEFAULT_DELTA_TABLE); if (default_table) { callback(*default_table); @@ -151,11 +151,11 @@ optional_ptr DeltaSchemaEntry::GetEntry(CatalogTransaction transac auto &context = transaction.GetContext(); if (type == CatalogType::TABLE_ENTRY && name == DEFAULT_DELTA_TABLE) { - auto &transaction = context.ActiveTransaction().GetTransaction(this->catalog.GetAttached()).Cast(); + auto &delta_transaction = GetDeltaTransaction(transaction); auto &delta_catalog = catalog.Cast(); - if (transaction.table_entry) { - return *transaction.table_entry; + if (delta_transaction.table_entry) { + return *delta_transaction.table_entry; } if (delta_catalog.UseCachedSnapshot()) { @@ -166,9 +166,8 @@ optional_ptr DeltaSchemaEntry::GetEntry(CatalogTransaction transac return *cached_table; } - unique_lock l(lock); - transaction.table_entry = CreateTableEntry(context, delta_catalog, *this); - return *transaction.table_entry; + delta_transaction.table_entry = CreateTableEntry(context, delta_catalog, *this); + return *delta_transaction.table_entry; } return nullptr; diff --git a/src/storage/delta_table_entry.cpp b/src/storage/delta_table_entry.cpp index 88ca780..f82caa4 100644 --- a/src/storage/delta_table_entry.cpp +++ b/src/storage/delta_table_entry.cpp @@ -45,8 +45,8 @@ TableFunction DeltaTableEntry::GetScanFunction(ClientContext &context, unique_pt // Copy over the internal kernel snapshot auto function_info = make_shared_ptr(); - function_info->snapshot = this->snapshot->snapshot; - function_info->expected_path = delta_catalog.GetDBPath(); + + function_info->snapshot = this->snapshot; delta_scan_function.function_info = std::move(function_info); vector inputs = {delta_catalog.GetDBPath()};