Skip to content

Commit

Permalink
Merge pull request #128 from samansmink/thread-safety
Browse files Browse the repository at this point in the history
Thread safety
  • Loading branch information
samansmink authored Dec 19, 2024
2 parents 8a7721e + f945b66 commit 7140762
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 27 deletions.
11 changes: 6 additions & 5 deletions .github/workflows/LocalTesting.yml
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,12 @@ jobs:
run: |
python ./duckdb/scripts/regression/test_runner.py --old=duckdb_delta/build/release/benchmark/benchmark_runner --new=build/release/benchmark/benchmark_runner --benchmarks=.github/regression/tpcds_sf1_local.csv --verbose --threads=2 --root-dir=.
- name: Regression Test Micro
if: always()
shell: bash
run: |
python ./duckdb/scripts/regression/test_runner.py --old=duckdb_delta/build/release/benchmark/benchmark_runner --new=build/release/benchmark/benchmark_runner --benchmarks=.github/regression/micro.csv --verbose --threads=2 --root-dir=.
# FIXME: re-enable
# - name: Regression Test Micro
# if: always()
# shell: bash
# run: |
# python ./duckdb/scripts/regression/test_runner.py --old=duckdb_delta/build/release/benchmark/benchmark_runner --new=build/release/benchmark/benchmark_runner --benchmarks=.github/regression/micro.csv --verbose --threads=2 --root-dir=.

- name: Test benchmark makefile
shell: bash
Expand Down
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ PROJ_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))
EXT_NAME=deltatable
EXT_CONFIG=${PROJ_DIR}extension_config.cmake

ifeq ($(SANITIZER_MODE), thread)
EXT_DEBUG_FLAGS:=-DENABLE_THREAD_SANITIZER=1
endif

ifneq ("${CUSTOM_LINKER}", "")
EXT_DEBUG_FLAGS:=${EXT_DEBUG_FLAGS} -DCUSTOM_LINKER=${CUSTOM_LINKER}
endif

# Set test paths
test_release: export DELTA_KERNEL_TESTS_PATH=./build/release/rust/src/delta_kernel/kernel/tests/data
test_release: export DAT_PATH=./build/release/rust/src/delta_kernel/acceptance/tests/dat
Expand Down
61 changes: 44 additions & 17 deletions src/functions/delta_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ string url_decode(string input) {
return result;
}

static void visit_callback(ffi::NullableCvoid engine_context, struct ffi::KernelStringSlice path, int64_t size,
void DeltaSnapshot::VisitCallback(ffi::NullableCvoid engine_context, struct ffi::KernelStringSlice path, int64_t size,
const ffi::Stats *stats, const ffi::DvInfo *dv_info,
const struct ffi::CStringMap *partition_values) {
auto context = (DeltaSnapshot *)engine_context;
Expand Down Expand Up @@ -94,9 +94,9 @@ static void visit_callback(ffi::NullableCvoid engine_context, struct ffi::Kernel
context->metadata.back()->partition_map = std::move(constant_map);
}

static void visit_data(void *engine_context, ffi::ExclusiveEngineData *engine_data,
void DeltaSnapshot::VisitData(void *engine_context, ffi::ExclusiveEngineData *engine_data,
const struct ffi::KernelBoolSlice selection_vec) {
ffi::visit_scan_data(engine_data, selection_vec, engine_context, visit_callback);
ffi::visit_scan_data(engine_data, selection_vec, engine_context, VisitCallback);
}

string ParseAccountNameFromEndpoint(const string &endpoint) {
Expand Down Expand Up @@ -386,7 +386,7 @@ DeltaSnapshot::DeltaSnapshot(ClientContext &context_p, const string &path)
: MultiFileList({ToDeltaPath(path)}, FileGlobOptions::ALLOW_EMPTY), context(context_p) {
}

string DeltaSnapshot::GetPath() {
string DeltaSnapshot::GetPath() const {
return GetPaths()[0];
}

Expand Down Expand Up @@ -416,6 +416,8 @@ string DeltaSnapshot::ToDeltaPath(const string &raw_path) {
}

void DeltaSnapshot::Bind(vector<LogicalType> &return_types, vector<string> &names) {
unique_lock<mutex> lck(lock);

if (have_bound) {
names = this->names;
return_types = this->types;
Expand Down Expand Up @@ -443,7 +445,7 @@ void DeltaSnapshot::Bind(vector<LogicalType> &return_types, vector<string> &name
this->types = return_types;
}

string DeltaSnapshot::GetFile(idx_t i) {
string DeltaSnapshot::GetFileInternal(idx_t i) {
if (!initialized_snapshot) {
InitializeSnapshot();
}
Expand All @@ -462,7 +464,7 @@ string DeltaSnapshot::GetFile(idx_t i) {
}

while (i >= resolved_files.size()) {
auto have_scan_data_res = ffi::kernel_scan_data_next(scan_data_iterator.get(), this, visit_data);
auto have_scan_data_res = ffi::kernel_scan_data_next(scan_data_iterator.get(), this, VisitData);

auto have_scan_data = TryUnpackKernelResult(have_scan_data_res);

Expand All @@ -481,6 +483,12 @@ string DeltaSnapshot::GetFile(idx_t i) {
return resolved_files[i];
}

string DeltaSnapshot::GetFile(idx_t i) {
// TODO: profile this: we should be able to use atomics here to optimize
unique_lock<mutex> lck(lock);
return GetFileInternal(i);
}

void DeltaSnapshot::InitializeSnapshot() {
auto path_slice = KernelUtils::ToDeltaString(paths[0]);

Expand Down Expand Up @@ -535,13 +543,17 @@ unique_ptr<MultiFileList> DeltaSnapshot::ComplexFilterPushdown(ClientContext &co
filtered_list->names = names;

// Copy over the snapshot, this avoids reparsing metadata
filtered_list->snapshot = snapshot;
{
unique_lock<mutex> lck(lock);
filtered_list->snapshot = snapshot;
}

auto &profiler = QueryProfiler::Get(context);

// Note: this is potentially quite expensive: we are creating 2 scans of the snapshot and fully materializing both
// file lists Therefore this is only done when profile is enabled. This is enable by default in debug mode or for
// EXPLAIN ANALYZE queries
// TODO: check locking behaviour below
if (profiler.IsEnabled()) {
Value result;
if (!context.TryGetCurrentSetting("delta_scan_explain_files_filtered", result)) {
Expand Down Expand Up @@ -589,9 +601,10 @@ unique_ptr<MultiFileList> DeltaSnapshot::ComplexFilterPushdown(ClientContext &co
}

vector<string> DeltaSnapshot::GetAllFiles() {
unique_lock<mutex> lck(lock);
idx_t i = resolved_files.size();
// TODO: this can probably be improved
while (!GetFile(i).empty()) {
while (!GetFileInternal(i).empty()) {
i++;
}
return resolved_files;
Expand All @@ -606,9 +619,9 @@ FileExpandResult DeltaSnapshot::GetExpandResult() {
}

idx_t DeltaSnapshot::GetTotalFileCount() {
// TODO: this can probably be improved
unique_lock<mutex> lck(lock);
idx_t i = resolved_files.size();
while (!GetFile(i).empty()) {
while (!GetFileInternal(i).empty()) {
i++;
}
return resolved_files.size();
Expand All @@ -618,6 +631,9 @@ unique_ptr<NodeStatistics> DeltaSnapshot::GetCardinality(ClientContext &context)
// This also ensures all files are expanded
auto total_file_count = DeltaSnapshot::GetTotalFileCount();

// TODO: internalize above
unique_lock<mutex> lck(lock);

if (total_file_count == 0) {
return make_uniq<NodeStatistics>(0, 0);
}
Expand All @@ -638,6 +654,17 @@ unique_ptr<NodeStatistics> DeltaSnapshot::GetCardinality(ClientContext &context)
return nullptr;
}


idx_t DeltaSnapshot::GetVersion() {
unique_lock<mutex> lck(lock);
return version;
}

DeltaFileMetaData &DeltaSnapshot::GetMetaData(idx_t index) const {
unique_lock<mutex> lck(lock);
return *metadata[index];
}

unique_ptr<MultiFileReader> DeltaMultiFileReader::CreateInstance(const TableFunction &table_function) {
auto result = make_uniq<DeltaMultiFileReader>();

Expand Down Expand Up @@ -716,16 +743,16 @@ void DeltaMultiFileReader::FinalizeBind(const MultiFileReaderOptions &file_optio
// Get the metadata for this file
D_ASSERT(global_state->file_list);
const auto &snapshot = dynamic_cast<const DeltaSnapshot &>(*global_state->file_list);
auto &file_metadata = snapshot.metadata[reader_data.file_list_idx.GetIndex()];
auto &file_metadata = snapshot.GetMetaData(reader_data.file_list_idx.GetIndex());

if (!file_metadata->partition_map.empty()) {
if (!file_metadata.partition_map.empty()) {
for (idx_t i = 0; i < global_column_ids.size(); i++) {
column_t col_id = global_column_ids[i].GetPrimaryIndex();
if (IsRowIdColumnId(col_id)) {
continue;
}
auto col_partition_entry = file_metadata->partition_map.find(global_names[col_id]);
if (col_partition_entry != file_metadata->partition_map.end()) {
auto col_partition_entry = file_metadata.partition_map.find(global_names[col_id]);
if (col_partition_entry != file_metadata.partition_map.end()) {
auto &current_type = global_types[col_id];
if (current_type == LogicalType::BLOB) {
reader_data.constant_map.emplace_back(i, Value::BLOB_RAW(col_partition_entry->second));
Expand Down Expand Up @@ -977,15 +1004,15 @@ void DeltaMultiFileReader::FinalizeChunk(ClientContext &context, const MultiFile

// Get the metadata for this file
const auto &snapshot = dynamic_cast<const DeltaSnapshot &>(*global_state->file_list);
auto &metadata = snapshot.metadata[reader_data.file_list_idx.GetIndex()];
auto &metadata = snapshot.GetMetaData(reader_data.file_list_idx.GetIndex());

if (metadata->selection_vector.ptr && chunk.size() != 0) {
if (metadata.selection_vector.ptr && chunk.size() != 0) {
D_ASSERT(delta_global_state.file_row_number_idx != DConstants::INVALID_INDEX);
auto &file_row_number_column = chunk.data[delta_global_state.file_row_number_idx];

// Construct the selection vector using the file_row_number column and the raw selection vector from delta
idx_t select_count;
auto sv = DuckSVFromDeltaSV(metadata->selection_vector, file_row_number_column, chunk.size(), select_count);
auto sv = DuckSVFromDeltaSV(metadata.selection_vector, file_row_number_column, chunk.size(), select_count);
chunk.Slice(sv, select_count);
}

Expand Down
16 changes: 13 additions & 3 deletions src/include/functions/delta_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct DeltaFileMetaData {
//! The DeltaSnapshot implements the MultiFileList API to allow injecting it into the regular DuckDB parquet scan
struct DeltaSnapshot : public MultiFileList {
DeltaSnapshot(ClientContext &context, const string &path);
string GetPath();
string GetPath() const;
static string ToDuckDBPath(const string &raw_path);
static string ToDeltaPath(const string &raw_path);

Expand All @@ -58,12 +58,15 @@ struct DeltaSnapshot : public MultiFileList {
idx_t GetTotalFileCount() override;

unique_ptr<NodeStatistics> GetCardinality(ClientContext &context) override;
idx_t GetVersion();
DeltaFileMetaData &GetMetaData(idx_t index) const;

protected:
//! Get the i-th expanded file
string GetFile(idx_t i) override;

protected:
string GetFileInternal(idx_t i);
void InitializeSnapshot();
void InitializeScan();

Expand All @@ -73,8 +76,15 @@ struct DeltaSnapshot : public MultiFileList {
result, StringUtil::Format("While trying to read from delta table: '%s'", paths[0]));
}

// TODO: change back to protected
public:
static void VisitData(void *engine_context, ffi::ExclusiveEngineData *engine_data,
const struct ffi::KernelBoolSlice selection_vec);
static void VisitCallback(ffi::NullableCvoid engine_context, struct ffi::KernelStringSlice path, int64_t size,
const ffi::Stats *stats, const ffi::DvInfo *dv_info,
const struct ffi::CStringMap *partition_values);

protected:
mutable mutex lock;

idx_t version;

//! Delta Kernel Structures
Expand Down
4 changes: 2 additions & 2 deletions src/storage/delta_catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ optional_idx DeltaCatalog::GetCatalogVersion(ClientContext &context) {
// Option 1: snapshot is cached table-wide
auto cached_snapshot = main_schema->GetCachedTable();
if (cached_snapshot) {
return cached_snapshot->snapshot->version;
return cached_snapshot->snapshot->GetVersion();
}

// Option 2: snapshot is cached in transaction
if (delta_transaction.table_entry) {
return delta_transaction.table_entry->snapshot->version;
return delta_transaction.table_entry->snapshot->GetVersion();
}

return {};
Expand Down
100 changes: 100 additions & 0 deletions test/sql/generated/attach_parallel.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# name: test/sql/generated/attach_parallel.test
# description: Test attaching a delta table and reading from it in parallel
# group: [dat]

require parquet

require delta

require-env GENERATED_DATA_AVAILABLE

statement ok
pragma threads=10;

statement ok
ATTACH 'data/generated/simple_partitioned/delta_lake/' as dt (TYPE delta)

statement ok
ATTACH 'data/generated/simple_partitioned/delta_lake/' as dt_pinned (TYPE delta, PIN_SNAPSHOT)

concurrentloop threadid 0 20

query I
WITH RECURSIVE ctename AS (
SELECT *, 1 as recursiondepth
FROM dt
UNION ALL
SELECT * EXCLUDE (c2.recursiondepth), c2.recursiondepth + 1 as recursiondepth
FROM ctename as c2
WHERE c2.recursiondepth < 8
)
SELECT count(i) FROM ctename;
----
80

query I
WITH RECURSIVE ctename AS (
SELECT *, 1 as recursiondepth
FROM dt_pinned
UNION ALL
SELECT * EXCLUDE (c2.recursiondepth), c2.recursiondepth + 1 as recursiondepth
FROM ctename as c2
WHERE c2.recursiondepth < 8
)
SELECT count(i) FROM ctename;
----
80

endloop

concurrentloop threadid 0 20

query I
SELECT count(i) FROM dt UNION ALL
SELECT count(i) FROM dt UNION ALL
SELECT count(i) FROM dt UNION ALL
SELECT count(i) FROM dt UNION ALL
SELECT count(i) FROM dt UNION ALL
SELECT count(i) FROM dt UNION ALL
SELECT count(i) FROM dt UNION ALL
SELECT count(i) FROM dt UNION ALL
SELECT count(i) FROM dt UNION ALL
SELECT count(i) FROM dt
----
10
10
10
10
10
10
10
10
10
10

query I
SELECT count(i) FROM dt_pinned UNION ALL
SELECT count(i) FROM dt_pinned UNION ALL
SELECT count(i) FROM dt_pinned UNION ALL
SELECT count(i) FROM dt_pinned UNION ALL
SELECT count(i) FROM dt_pinned UNION ALL
SELECT count(i) FROM dt_pinned UNION ALL
SELECT count(i) FROM dt_pinned UNION ALL
SELECT count(i) FROM dt_pinned UNION ALL
SELECT count(i) FROM dt_pinned UNION ALL
SELECT count(i) FROM dt_pinned
----
10
10
10
10
10
10
10
10
10
10



endloop

0 comments on commit 7140762

Please sign in to comment.