Skip to content

Commit

Permalink
optimized attach single table catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
samansmink committed Oct 16, 2024
1 parent 91d15e8 commit b24cc31
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 21 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/LocalTesting.yml
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,12 @@ jobs:
run: |
python ./duckdb/scripts/regression_test_runner.py --old=duckdb_delta/build/release/benchmark/benchmark_runner --new=build/release/benchmark/benchmark_runner --benchmarks=.github/regression/tpcds_sf1_local.csv --verbose --threads=2 --root-dir=.
- name: Regression Test Micro
if: always()
shell: bash
run: |
python ./duckdb/scripts/regression_test_runner.py --old=duckdb_delta/build/release/benchmark/benchmark_runner --new=build/release/benchmark/benchmark_runner --benchmarks=.github/regression/micro.csv --verbose --threads=2 --root-dir=.
- name: Test benchmark makefile
shell: bash
run: |
Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ test/python/__pycache__/
data/generated
__azurite*__.json
__blobstorage__
.venv
venv
.vscode
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ test_debug: export DELTA_KERNEL_TESTS_PATH=./build/debug/rust/src/delta_kernel/k
test_debug: export DAT_PATH=./build/debug/rust/src/delta_kernel/acceptance/tests/dat

# Core extensions that we need for testing
CORE_EXTENSIONS='tpcds;tpch;aws;azure;httpfs;sqlite_scanner'
CORE_EXTENSIONS='tpcds;tpch;aws;azure;httpfs'

# Set this flag during building to enable the benchmark runner
ifeq (${BUILD_BENCHMARK}, 1)
Expand Down
7 changes: 7 additions & 0 deletions benchmark/benchmark.Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,10 @@ bench-run-tpcds-sf1-duckdb: bench-output-dir

# COMPARES TPCDS SF1 on parquet file vs on delta files
bench-run-tpcds-sf1: bench-run-tpcds-sf1-delta bench-run-tpcds-sf1-parquet bench-run-tpcds-sf1-duckdb

###
# MICRO
###

bench-run-snapshot-performance: bench-output-dir
./build/release/benchmark/benchmark_runner --root-dir './' 'benchmark/micro/snapshot_performance/.*' 2>&1 | tee benchmark_results/snapshot-performance.csv
2 changes: 1 addition & 1 deletion duckdb
Submodule duckdb updated 240 files
34 changes: 23 additions & 11 deletions src/functions/delta_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ static ffi::EngineBuilder* CreateBuilder(ClientContext &context, const string &p
}

DeltaSnapshot::DeltaSnapshot(ClientContext &context_p, const string &path) : MultiFileList({ToDeltaPath(path)}, FileGlobOptions::ALLOW_EMPTY), context(context_p) {
// printf("created snapshot\n");
}

string DeltaSnapshot::GetPath() {
Expand Down Expand Up @@ -390,15 +391,15 @@ string DeltaSnapshot::ToDeltaPath(const string &raw_path) {

void DeltaSnapshot::Bind(vector<LogicalType> &return_types, vector<string> &names) {
if (have_bound) {
printf("Bind DeltaSnapshot Cached\n");
// printf("Bind DeltaSnapshot Cached\n");
names = this->names;
return_types = this->types;
return;
}
printf("Bind DeltaSnapshot Uncached\n");
// printf("Bind DeltaSnapshot Uncached\n");

if (!initialized) {
InitializeFiles();
if (!initialized_snapshot) {
InitializeSnapshot();
}

unique_ptr<SchemaVisitor::FieldList> schema;
Expand All @@ -419,9 +420,14 @@ void DeltaSnapshot::Bind(vector<LogicalType> &return_types, vector<string> &name
}

string DeltaSnapshot::GetFile(idx_t i) {
if (!initialized) {
InitializeFiles();
if (!initialized_snapshot) {
InitializeSnapshot();
}

if(!initialized_scan) {
InitializeScan();
}

// We already have this file
if (i < resolved_files.size()) {
return resolved_files[i];
Expand Down Expand Up @@ -451,18 +457,23 @@ string DeltaSnapshot::GetFile(idx_t i) {
return resolved_files[i];
}

void DeltaSnapshot::InitializeFiles() {
printf("InitializeFiles\n");
void DeltaSnapshot::InitializeSnapshot() {
auto path_slice = KernelUtils::ToDeltaString(paths[0]);

// Register engine
auto interface_builder = CreateBuilder(context, paths[0]);
extern_engine = TryUnpackKernelResult( ffi::builder_build(interface_builder));

if (!snapshot) {
// printf("InitializeFiles uncached\n");
snapshot = make_shared_ptr<SharedKernelSnapshot>(TryUnpackKernelResult(ffi::snapshot(path_slice, extern_engine.get())));
}

initialized_snapshot = true;
}

void DeltaSnapshot::InitializeScan() {
// printf("InitializeScan\n");

auto snapshot_ref = snapshot->GetLockingRef();

// Create Scan
Expand All @@ -478,7 +489,7 @@ void DeltaSnapshot::InitializeFiles() {
// Create scan data iterator
scan_data_iterator = TryUnpackKernelResult(ffi::kernel_scan_data_init(extern_engine.get(), scan.get()));

initialized = true;
initialized_scan = true;
}

unique_ptr<MultiFileList> DeltaSnapshot::ComplexFilterPushdown(ClientContext &context, const MultiFileReaderOptions &options, MultiFilePushdownInfo &info,
Expand All @@ -496,7 +507,6 @@ unique_ptr<MultiFileList> DeltaSnapshot::ComplexFilterPushdown(ClientContext &co
auto filterstmp = combiner.GenerateTableScanFilters(info.column_ids);

// TODO: can/should we figure out if this filtered anything?

auto filtered_list = make_uniq<DeltaSnapshot>(context, paths[0]);
filtered_list->table_filters = std::move(filterstmp);
filtered_list->names = names;
Expand All @@ -517,6 +527,8 @@ vector<string> DeltaSnapshot::GetAllFiles() {
}

FileExpandResult DeltaSnapshot::GetExpandResult() {
return FileExpandResult::MULTIPLE_FILES;

// GetFile(1) will ensure at least the first 2 files are expanded if they are available
GetFile(1);

Expand Down
8 changes: 5 additions & 3 deletions src/include/functions/delta_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ struct DeltaSnapshot : public MultiFileList {
string GetFile(idx_t i) override;

protected:
// TODO: How to guarantee we only call this after the filter pushdown?
void InitializeFiles();
void InitializeSnapshot();
void InitializeScan();

template <class T>
T TryUnpackKernelResult(ffi::ExternResult<T> result) {
Expand Down Expand Up @@ -92,7 +92,9 @@ struct DeltaSnapshot : public MultiFileList {
vector<unique_ptr<DeltaFileMetaData>> metadata;

//! Current file list resolution state
bool initialized = false;
bool initialized_snapshot = false;
bool initialized_scan = false;

bool files_exhausted = false;
vector<string> resolved_files;
TableFilterSet table_filters;
Expand Down
6 changes: 3 additions & 3 deletions src/storage/delta_catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,17 @@ optional_idx DeltaCatalog::GetCatalogVersion(ClientContext &context) {
// Option 1: snapshot is cached table-wide
auto cached_snapshot = main_schema->GetCachedTable();
if (cached_snapshot) {
printf("Returned Cached Table version: %lld\n", cached_snapshot->snapshot->version);
// printf("Returned Cached Table version: %lld\n", cached_snapshot->snapshot->version);
return cached_snapshot->snapshot->version;
}

// Option 2: snapshot is cached in transaction
if (delta_transaction.table_entry) {
printf("Returned transaction Table version: %lld\n", delta_transaction.table_entry->snapshot->version);
// printf("Returned transaction Table version: %lld\n", delta_transaction.table_entry->snapshot->version);
return delta_transaction.table_entry->snapshot->version;
}

printf("No catalog version!\n");
// printf("No catalog version!\n");
return optional_idx::Invalid();
}

Expand Down
2 changes: 1 addition & 1 deletion src/storage/delta_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace duckdb {
DeltaTransaction::DeltaTransaction(DeltaCatalog &delta_catalog, TransactionManager &manager, ClientContext &context)
: Transaction(manager, context), access_mode(delta_catalog.access_mode) {
// connection = DeltaConnection::Open(delta_catalog.path);
printf("Started Delta Transaction\n");
// printf("Started Delta Transaction\n");
}

DeltaTransaction::~DeltaTransaction() {
Expand Down

0 comments on commit b24cc31

Please sign in to comment.