From 4a32608dbf0f6dd956c61bfe767278268801f636 Mon Sep 17 00:00:00 2001 From: Sam Ansmink Date: Thu, 19 Dec 2024 11:25:38 +0100 Subject: [PATCH 1/2] add locking and parallel test --- Makefile | 8 ++ src/functions/delta_scan.cpp | 61 +++++++++++---- src/include/functions/delta_scan.hpp | 16 +++- src/storage/delta_catalog.cpp | 4 +- test/sql/generated/attach_parallel.test | 100 ++++++++++++++++++++++++ 5 files changed, 167 insertions(+), 22 deletions(-) create mode 100644 test/sql/generated/attach_parallel.test diff --git a/Makefile b/Makefile index 8cc8bc9..7eb8376 100644 --- a/Makefile +++ b/Makefile @@ -4,6 +4,14 @@ PROJ_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST)))) EXT_NAME=deltatable EXT_CONFIG=${PROJ_DIR}extension_config.cmake +ifeq ($(SANITIZER_MODE), thread) + EXT_DEBUG_FLAGS:=-DENABLE_THREAD_SANITIZER=1 +endif + +ifneq ("${CUSTOM_LINKER}", "") + EXT_DEBUG_FLAGS:=${EXT_DEBUG_FLAGS} -DCUSTOM_LINKER=${CUSTOM_LINKER} +endif + # Set test paths test_release: export DELTA_KERNEL_TESTS_PATH=./build/release/rust/src/delta_kernel/kernel/tests/data test_release: export DAT_PATH=./build/release/rust/src/delta_kernel/acceptance/tests/dat diff --git a/src/functions/delta_scan.cpp b/src/functions/delta_scan.cpp index fb4bbe4..9e1c36e 100644 --- a/src/functions/delta_scan.cpp +++ b/src/functions/delta_scan.cpp @@ -49,7 +49,7 @@ string url_decode(string input) { return result; } -static void visit_callback(ffi::NullableCvoid engine_context, struct ffi::KernelStringSlice path, int64_t size, +void DeltaSnapshot::VisitCallback(ffi::NullableCvoid engine_context, struct ffi::KernelStringSlice path, int64_t size, const ffi::Stats *stats, const ffi::DvInfo *dv_info, const struct ffi::CStringMap *partition_values) { auto context = (DeltaSnapshot *)engine_context; @@ -94,9 +94,9 @@ static void visit_callback(ffi::NullableCvoid engine_context, struct ffi::Kernel context->metadata.back()->partition_map = std::move(constant_map); } -static void visit_data(void *engine_context, ffi::ExclusiveEngineData *engine_data, +void DeltaSnapshot::VisitData(void *engine_context, ffi::ExclusiveEngineData *engine_data, const struct ffi::KernelBoolSlice selection_vec) { - ffi::visit_scan_data(engine_data, selection_vec, engine_context, visit_callback); + ffi::visit_scan_data(engine_data, selection_vec, engine_context, VisitCallback); } string ParseAccountNameFromEndpoint(const string &endpoint) { @@ -386,7 +386,7 @@ DeltaSnapshot::DeltaSnapshot(ClientContext &context_p, const string &path) : MultiFileList({ToDeltaPath(path)}, FileGlobOptions::ALLOW_EMPTY), context(context_p) { } -string DeltaSnapshot::GetPath() { +string DeltaSnapshot::GetPath() const { return GetPaths()[0]; } @@ -416,6 +416,8 @@ string DeltaSnapshot::ToDeltaPath(const string &raw_path) { } void DeltaSnapshot::Bind(vector &return_types, vector &names) { + unique_lock lck(lock); + if (have_bound) { names = this->names; return_types = this->types; @@ -443,7 +445,7 @@ void DeltaSnapshot::Bind(vector &return_types, vector &name this->types = return_types; } -string DeltaSnapshot::GetFile(idx_t i) { +string DeltaSnapshot::GetFileInternal(idx_t i) { if (!initialized_snapshot) { InitializeSnapshot(); } @@ -462,7 +464,7 @@ string DeltaSnapshot::GetFile(idx_t i) { } while (i >= resolved_files.size()) { - auto have_scan_data_res = ffi::kernel_scan_data_next(scan_data_iterator.get(), this, visit_data); + auto have_scan_data_res = ffi::kernel_scan_data_next(scan_data_iterator.get(), this, VisitData); auto have_scan_data = TryUnpackKernelResult(have_scan_data_res); @@ -481,6 +483,12 @@ string DeltaSnapshot::GetFile(idx_t i) { return resolved_files[i]; } +string DeltaSnapshot::GetFile(idx_t i) { + // TODO: profile this: we should be able to use atomics here to optimize + unique_lock lck(lock); + return GetFileInternal(i); +} + void DeltaSnapshot::InitializeSnapshot() { auto path_slice = KernelUtils::ToDeltaString(paths[0]); @@ -535,13 +543,17 @@ unique_ptr DeltaSnapshot::ComplexFilterPushdown(ClientContext &co filtered_list->names = names; // Copy over the snapshot, this avoids reparsing metadata - filtered_list->snapshot = snapshot; + { + unique_lock lck(lock); + filtered_list->snapshot = snapshot; + } auto &profiler = QueryProfiler::Get(context); // Note: this is potentially quite expensive: we are creating 2 scans of the snapshot and fully materializing both // file lists Therefore this is only done when profile is enabled. This is enable by default in debug mode or for // EXPLAIN ANALYZE queries + // TODO: check locking behaviour below if (profiler.IsEnabled()) { Value result; if (!context.TryGetCurrentSetting("delta_scan_explain_files_filtered", result)) { @@ -589,9 +601,10 @@ unique_ptr DeltaSnapshot::ComplexFilterPushdown(ClientContext &co } vector DeltaSnapshot::GetAllFiles() { + unique_lock lck(lock); idx_t i = resolved_files.size(); // TODO: this can probably be improved - while (!GetFile(i).empty()) { + while (!GetFileInternal(i).empty()) { i++; } return resolved_files; @@ -606,9 +619,9 @@ FileExpandResult DeltaSnapshot::GetExpandResult() { } idx_t DeltaSnapshot::GetTotalFileCount() { - // TODO: this can probably be improved + unique_lock lck(lock); idx_t i = resolved_files.size(); - while (!GetFile(i).empty()) { + while (!GetFileInternal(i).empty()) { i++; } return resolved_files.size(); @@ -618,6 +631,9 @@ unique_ptr DeltaSnapshot::GetCardinality(ClientContext &context) // This also ensures all files are expanded auto total_file_count = DeltaSnapshot::GetTotalFileCount(); + // TODO: internalize above + unique_lock lck(lock); + if (total_file_count == 0) { return make_uniq(0, 0); } @@ -638,6 +654,17 @@ unique_ptr DeltaSnapshot::GetCardinality(ClientContext &context) return nullptr; } + +idx_t DeltaSnapshot::GetVersion() { + unique_lock lck(lock); + return version; +} + +DeltaFileMetaData &DeltaSnapshot::GetMetaData(idx_t index) const { + unique_lock lck(lock); + return *metadata[index]; +} + unique_ptr DeltaMultiFileReader::CreateInstance(const TableFunction &table_function) { auto result = make_uniq(); @@ -716,16 +743,16 @@ void DeltaMultiFileReader::FinalizeBind(const MultiFileReaderOptions &file_optio // Get the metadata for this file D_ASSERT(global_state->file_list); const auto &snapshot = dynamic_cast(*global_state->file_list); - auto &file_metadata = snapshot.metadata[reader_data.file_list_idx.GetIndex()]; + auto &file_metadata = snapshot.GetMetaData(reader_data.file_list_idx.GetIndex()); - if (!file_metadata->partition_map.empty()) { + if (!file_metadata.partition_map.empty()) { for (idx_t i = 0; i < global_column_ids.size(); i++) { column_t col_id = global_column_ids[i].GetPrimaryIndex(); if (IsRowIdColumnId(col_id)) { continue; } - auto col_partition_entry = file_metadata->partition_map.find(global_names[col_id]); - if (col_partition_entry != file_metadata->partition_map.end()) { + auto col_partition_entry = file_metadata.partition_map.find(global_names[col_id]); + if (col_partition_entry != file_metadata.partition_map.end()) { auto ¤t_type = global_types[col_id]; if (current_type == LogicalType::BLOB) { reader_data.constant_map.emplace_back(i, Value::BLOB_RAW(col_partition_entry->second)); @@ -977,15 +1004,15 @@ void DeltaMultiFileReader::FinalizeChunk(ClientContext &context, const MultiFile // Get the metadata for this file const auto &snapshot = dynamic_cast(*global_state->file_list); - auto &metadata = snapshot.metadata[reader_data.file_list_idx.GetIndex()]; + auto &metadata = snapshot.GetMetaData(reader_data.file_list_idx.GetIndex()); - if (metadata->selection_vector.ptr && chunk.size() != 0) { + if (metadata.selection_vector.ptr && chunk.size() != 0) { D_ASSERT(delta_global_state.file_row_number_idx != DConstants::INVALID_INDEX); auto &file_row_number_column = chunk.data[delta_global_state.file_row_number_idx]; // Construct the selection vector using the file_row_number column and the raw selection vector from delta idx_t select_count; - auto sv = DuckSVFromDeltaSV(metadata->selection_vector, file_row_number_column, chunk.size(), select_count); + auto sv = DuckSVFromDeltaSV(metadata.selection_vector, file_row_number_column, chunk.size(), select_count); chunk.Slice(sv, select_count); } diff --git a/src/include/functions/delta_scan.hpp b/src/include/functions/delta_scan.hpp index e9e89da..fe842d3 100644 --- a/src/include/functions/delta_scan.hpp +++ b/src/include/functions/delta_scan.hpp @@ -43,7 +43,7 @@ struct DeltaFileMetaData { //! The DeltaSnapshot implements the MultiFileList API to allow injecting it into the regular DuckDB parquet scan struct DeltaSnapshot : public MultiFileList { DeltaSnapshot(ClientContext &context, const string &path); - string GetPath(); + string GetPath() const; static string ToDuckDBPath(const string &raw_path); static string ToDeltaPath(const string &raw_path); @@ -58,12 +58,15 @@ struct DeltaSnapshot : public MultiFileList { idx_t GetTotalFileCount() override; unique_ptr GetCardinality(ClientContext &context) override; + idx_t GetVersion(); + DeltaFileMetaData &GetMetaData(idx_t index) const; protected: //! Get the i-th expanded file string GetFile(idx_t i) override; protected: + string GetFileInternal(idx_t i); void InitializeSnapshot(); void InitializeScan(); @@ -73,8 +76,15 @@ struct DeltaSnapshot : public MultiFileList { result, StringUtil::Format("While trying to read from delta table: '%s'", paths[0])); } - // TODO: change back to protected -public: + static void VisitData(void *engine_context, ffi::ExclusiveEngineData *engine_data, + const struct ffi::KernelBoolSlice selection_vec); + static void VisitCallback(ffi::NullableCvoid engine_context, struct ffi::KernelStringSlice path, int64_t size, + const ffi::Stats *stats, const ffi::DvInfo *dv_info, + const struct ffi::CStringMap *partition_values); + +protected: + mutable mutex lock; + idx_t version; //! Delta Kernel Structures diff --git a/src/storage/delta_catalog.cpp b/src/storage/delta_catalog.cpp index 53b1195..44e03e7 100644 --- a/src/storage/delta_catalog.cpp +++ b/src/storage/delta_catalog.cpp @@ -64,12 +64,12 @@ optional_idx DeltaCatalog::GetCatalogVersion(ClientContext &context) { // Option 1: snapshot is cached table-wide auto cached_snapshot = main_schema->GetCachedTable(); if (cached_snapshot) { - return cached_snapshot->snapshot->version; + return cached_snapshot->snapshot->GetVersion(); } // Option 2: snapshot is cached in transaction if (delta_transaction.table_entry) { - return delta_transaction.table_entry->snapshot->version; + return delta_transaction.table_entry->snapshot->GetVersion(); } return {}; diff --git a/test/sql/generated/attach_parallel.test b/test/sql/generated/attach_parallel.test new file mode 100644 index 0000000..37a5fbb --- /dev/null +++ b/test/sql/generated/attach_parallel.test @@ -0,0 +1,100 @@ +# name: test/sql/generated/attach_parallel.test +# description: Test attaching a delta table and reading from it in parallel +# group: [dat] + +require parquet + +require delta + +require-env GENERATED_DATA_AVAILABLE + +statement ok +pragma threads=10; + +statement ok +ATTACH 'data/generated/simple_partitioned/delta_lake/' as dt (TYPE delta) + +statement ok +ATTACH 'data/generated/simple_partitioned/delta_lake/' as dt_pinned (TYPE delta, PIN_SNAPSHOT) + +concurrentloop threadid 0 20 + +query I +WITH RECURSIVE ctename AS ( + SELECT *, 1 as recursiondepth + FROM dt + UNION ALL + SELECT * EXCLUDE (c2.recursiondepth), c2.recursiondepth + 1 as recursiondepth + FROM ctename as c2 + WHERE c2.recursiondepth < 8 +) +SELECT count(i) FROM ctename; +---- +80 + +query I +WITH RECURSIVE ctename AS ( + SELECT *, 1 as recursiondepth + FROM dt_pinned + UNION ALL + SELECT * EXCLUDE (c2.recursiondepth), c2.recursiondepth + 1 as recursiondepth + FROM ctename as c2 + WHERE c2.recursiondepth < 8 +) +SELECT count(i) FROM ctename; +---- +80 + +endloop + +concurrentloop threadid 0 20 + +query I +SELECT count(i) FROM dt UNION ALL +SELECT count(i) FROM dt UNION ALL +SELECT count(i) FROM dt UNION ALL +SELECT count(i) FROM dt UNION ALL +SELECT count(i) FROM dt UNION ALL +SELECT count(i) FROM dt UNION ALL +SELECT count(i) FROM dt UNION ALL +SELECT count(i) FROM dt UNION ALL +SELECT count(i) FROM dt UNION ALL +SELECT count(i) FROM dt +---- +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 + +query I +SELECT count(i) FROM dt_pinned UNION ALL +SELECT count(i) FROM dt_pinned UNION ALL +SELECT count(i) FROM dt_pinned UNION ALL +SELECT count(i) FROM dt_pinned UNION ALL +SELECT count(i) FROM dt_pinned UNION ALL +SELECT count(i) FROM dt_pinned UNION ALL +SELECT count(i) FROM dt_pinned UNION ALL +SELECT count(i) FROM dt_pinned UNION ALL +SELECT count(i) FROM dt_pinned UNION ALL +SELECT count(i) FROM dt_pinned +---- +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 + + + +endloop \ No newline at end of file From f945b663ca850e267a91912a152d2d74c42c15bc Mon Sep 17 00:00:00 2001 From: Sam Ansmink Date: Thu, 19 Dec 2024 13:34:17 +0100 Subject: [PATCH 2/2] disable micro benchmarks for now --- .github/workflows/LocalTesting.yml | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/.github/workflows/LocalTesting.yml b/.github/workflows/LocalTesting.yml index 34457eb..5c425b8 100644 --- a/.github/workflows/LocalTesting.yml +++ b/.github/workflows/LocalTesting.yml @@ -294,11 +294,12 @@ jobs: run: | python ./duckdb/scripts/regression/test_runner.py --old=duckdb_delta/build/release/benchmark/benchmark_runner --new=build/release/benchmark/benchmark_runner --benchmarks=.github/regression/tpcds_sf1_local.csv --verbose --threads=2 --root-dir=. - - name: Regression Test Micro - if: always() - shell: bash - run: | - python ./duckdb/scripts/regression/test_runner.py --old=duckdb_delta/build/release/benchmark/benchmark_runner --new=build/release/benchmark/benchmark_runner --benchmarks=.github/regression/micro.csv --verbose --threads=2 --root-dir=. + # FIXME: re-enable +# - name: Regression Test Micro +# if: always() +# shell: bash +# run: | +# python ./duckdb/scripts/regression/test_runner.py --old=duckdb_delta/build/release/benchmark/benchmark_runner --new=build/release/benchmark/benchmark_runner --benchmarks=.github/regression/micro.csv --verbose --threads=2 --root-dir=. - name: Test benchmark makefile shell: bash