Skip to content

Commit

Permalink
PoC delta attach
Browse files Browse the repository at this point in the history
  • Loading branch information
samansmink committed Sep 23, 2024
1 parent 6feb423 commit 7fb17d3
Show file tree
Hide file tree
Showing 16 changed files with 762 additions and 8 deletions.
8 changes: 7 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@ set(EXTENSION_SOURCES
src/delta_extension.cpp
src/delta_functions.cpp
src/delta_utils.cpp
src/functions/delta_scan.cpp)
src/functions/delta_scan.cpp
src/storage/delta_catalog.cpp
src/storage/delta_schema_entry.cpp
src/storage/delta_table_entry.cpp
src/storage/delta_transaction.cpp
src/storage/delta_transaction_manager.cpp
)

### Custom config
# TODO: figure out if we really need this?
Expand Down
3 changes: 2 additions & 1 deletion scripts/plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
parser = argparse.ArgumentParser(description='Plot the results in ./benchmark_results')
parser.add_argument('-p','--pattern', help='Pattern to match result csv files to', required=False, default='*.csv')
parser.add_argument('-w','--width', help='Width of graph, adjust to fit data', required=False, default=20)
parser.add_argument('-n','--name', help='name of the graph ', required=False, default='')
args = vars(parser.parse_args())

### Parse Query Results
Expand Down Expand Up @@ -34,5 +35,5 @@
import numpy as np

plt.rcParams["figure.figsize"] = [int(args['width']), 5]
fig = benchmark_results.pivot(index='benchmark', columns='config', values='timing').plot(kind='bar', title='', ylabel='runtime [s]').get_figure()
fig = benchmark_results.pivot(index='benchmark', columns='config', values='timing').plot(kind='bar', title=args['name'], ylabel='runtime [s]').get_figure()
fig.savefig('benchmark_results/result.png')
27 changes: 27 additions & 0 deletions src/delta_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,41 @@
#include "duckdb.hpp"
#include "duckdb/common/exception.hpp"
#include "duckdb/main/extension_util.hpp"
#include "duckdb/storage/storage_extension.hpp"
#include "storage/delta_catalog.hpp"
#include "storage/delta_transaction_manager.hpp"

namespace duckdb {

static unique_ptr<Catalog> DeltaCatalogAttach(StorageExtensionInfo *storage_info, ClientContext &context,
AttachedDatabase &db, const string &name, AttachInfo &info,
AccessMode access_mode) {
return make_uniq<DeltaCatalog>(db, info.path, access_mode);
}

static unique_ptr<TransactionManager> CreateTransactionManager(StorageExtensionInfo *storage_info, AttachedDatabase &db,
Catalog &catalog) {
auto &uc_catalog = catalog.Cast<DeltaCatalog>();
return make_uniq<DeltaTransactionManager>(db, uc_catalog);
}

class DeltaStorageExtension : public StorageExtension {
public:
DeltaStorageExtension() {
attach = DeltaCatalogAttach;
create_transaction_manager = CreateTransactionManager;
}
};

static void LoadInternal(DatabaseInstance &instance) {
// Load functions
for (const auto &function : DeltaFunctions::GetTableFunctions(instance)) {
ExtensionUtil::RegisterFunction(instance, function);
}

// Register the "single table" delta catalog (to ATTACH a single delta table)
auto &config = DBConfig::GetConfig(instance);
config.storage_extensions["delta"] = make_uniq<DeltaStorageExtension>();
}

void DeltaExtension::Load(DuckDB &db) {
Expand Down
37 changes: 32 additions & 5 deletions src/functions/delta_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
#include <string>
#include <numeric>
#include <regex>
#include <duckdb/main/attached_database.hpp>
#include <duckdb/main/client_data.hpp>
#include <storage/delta_catalog.hpp>

namespace duckdb {

Expand Down Expand Up @@ -390,7 +392,14 @@ void DeltaSnapshot::Bind(vector<LogicalType> &return_types, vector<string> &name
if (!initialized) {
InitializeFiles();
}
auto schema = SchemaVisitor::VisitSnapshotSchema(snapshot.get());

unique_ptr<SchemaVisitor::FieldList> schema;

{
auto snapshot_ref = snapshot->GetLockingRef();
schema = SchemaVisitor::VisitSnapshotSchema(snapshot_ref.GetPtr());
}

for (const auto &field: *schema) {
names.push_back(field.first);
return_types.push_back(field.second);
Expand Down Expand Up @@ -439,18 +448,21 @@ void DeltaSnapshot::InitializeFiles() {
auto interface_builder = CreateBuilder(context, paths[0]);
extern_engine = TryUnpackKernelResult( ffi::builder_build(interface_builder));

// Initialize Snapshot
snapshot = TryUnpackKernelResult(ffi::snapshot(path_slice, extern_engine.get()));
if (!snapshot) {
snapshot = make_shared_ptr<SharedKernelSnapshot>(TryUnpackKernelResult(ffi::snapshot(path_slice, extern_engine.get())));
}

auto snapshot_ref = snapshot->GetLockingRef();

// Create Scan
PredicateVisitor visitor(names, &table_filters);
scan = TryUnpackKernelResult(ffi::scan(snapshot.get(), extern_engine.get(), &visitor));
scan = TryUnpackKernelResult(ffi::scan(snapshot_ref.GetPtr(), extern_engine.get(), &visitor));

// Create GlobalState
global_state = ffi::get_global_scan_state(scan.get());

// Set version
this->version = ffi::version(snapshot.get());
this->version = ffi::version(snapshot_ref.GetPtr());

// Create scan data iterator
scan_data_iterator = TryUnpackKernelResult(ffi::kernel_scan_data_init(extern_engine.get(), scan.get()));
Expand All @@ -471,6 +483,9 @@ unique_ptr<MultiFileList> DeltaSnapshot::ComplexFilterPushdown(ClientContext &co
filtered_list->table_filters = std::move(filterstmp);
filtered_list->names = names;

// Copy over the snapshot, this avoids reparsing metadata
filtered_list->snapshot = snapshot;

return std::move(filtered_list);
}

Expand Down Expand Up @@ -623,6 +638,18 @@ unique_ptr<MultiFileList> DeltaMultiFileReader::CreateFileList(ClientContext &co
throw BinderException("'delta_scan' only supports single path as input");
}

// TODO: this is techinically incorrect for `select * from attach_delta union all from delta_scan('../some/path')
// since the first one should scan the snapshot from the transaction whereas the second one should scan the current state
for (auto& transaction : context.ActiveTransaction().OpenedTransactions()) {
auto & catalog = transaction.get().GetCatalog();
if (catalog.GetCatalogType() == "delta" && catalog.GetDBPath() == paths[0]) {
auto snapshot = make_uniq<DeltaSnapshot>(context, paths[0]);
snapshot->snapshot =

return std::move(snapshot);

Check failure on line 649 in src/functions/delta_scan.cpp

View workflow job for this annotation

GitHub Actions / Build extension binaries / MacOS (osx_arm64, arm64, arm64-osx)

expected expression

Check failure on line 649 in src/functions/delta_scan.cpp

View workflow job for this annotation

GitHub Actions / Performance Regression Tests

expected primary-expression before ‘return’

Check failure on line 649 in src/functions/delta_scan.cpp

View workflow job for this annotation

GitHub Actions / Minio (local S3 test server) tests (Linux)

expected primary-expression before ‘return’

Check failure on line 649 in src/functions/delta_scan.cpp

View workflow job for this annotation

GitHub Actions / Minio (local S3 test server) tests (Linux)

expected primary-expression before ‘return’

Check failure on line 649 in src/functions/delta_scan.cpp

View workflow job for this annotation

GitHub Actions / Generated Tests (Linux)

expected primary-expression before ‘return’

Check failure on line 649 in src/functions/delta_scan.cpp

View workflow job for this annotation

GitHub Actions / Generated Tests (Linux)

expected primary-expression before ‘return’

Check failure on line 649 in src/functions/delta_scan.cpp

View workflow job for this annotation

GitHub Actions / Build extension binaries / Linux (linux_amd64, ubuntu:18.04, x64-linux)

expected primary-expression before 'return'

Check failure on line 649 in src/functions/delta_scan.cpp

View workflow job for this annotation

GitHub Actions / Azure tests (Linux)

expected primary-expression before ‘return’

Check failure on line 649 in src/functions/delta_scan.cpp

View workflow job for this annotation

GitHub Actions / Azurite (local azure test server) tests (Linux)

expected primary-expression before ‘return’
}
}

return make_uniq<DeltaSnapshot>(context, paths[0]);
}

Expand Down
55 changes: 55 additions & 0 deletions src/include/delta_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,61 @@ typedef TemplatedUniqueKernelPointer<ffi::SharedScan, ffi::free_scan> KernelScan
typedef TemplatedUniqueKernelPointer<ffi::SharedGlobalScanState, ffi::free_global_scan_state> KernelGlobalScanState;
typedef TemplatedUniqueKernelPointer<ffi::SharedScanDataIterator, ffi::free_kernel_scan_data> KernelScanDataIterator;

template <typename KernelType, void (*DeleteFunction)(KernelType*)>
struct SharedKernelPointer;

// A reference to a SharedKernelPointer, only 1 can be handed out at the same time
template <typename KernelType, void (*DeleteFunction)(KernelType*)>
struct SharedKernelRef {
friend struct SharedKernelPointer<KernelType, DeleteFunction>;
public:
KernelType* GetPtr() {
return owning_pointer.kernel_ptr.get();
}
~SharedKernelRef() {
owning_pointer.lock.unlock();
}

protected:
SharedKernelRef(SharedKernelPointer<KernelType, DeleteFunction>& owning_pointer_p) : owning_pointer(owning_pointer_p) {
owning_pointer.lock.lock();
}

protected:
// The pointer that owns this ref
SharedKernelPointer<KernelType, DeleteFunction>& owning_pointer;
};

// Wrapper around ffi objects to share between threads
template <typename KernelType, void (*DeleteFunction)(KernelType*)>
struct SharedKernelPointer {
friend struct SharedKernelRef<KernelType, DeleteFunction>;
public:
SharedKernelPointer(TemplatedUniqueKernelPointer<KernelType, DeleteFunction> unique_kernel_ptr) : kernel_ptr(unique_kernel_ptr) {}
SharedKernelPointer(KernelType* ptr) : kernel_ptr(ptr){}
SharedKernelPointer(){}

SharedKernelPointer(SharedKernelPointer&& other) : SharedKernelPointer() {
other.lock.lock();
lock.lock();
kernel_ptr = std::move(other.kernel_ptr);
lock.lock();
other.lock.lock();
}

// Returns a reference to the underlying kernel object. The SharedKernelPointer to this object will be locked for the
// lifetime of this reference
SharedKernelRef<KernelType, DeleteFunction> GetLockingRef() {
return SharedKernelRef<KernelType, DeleteFunction>(*this);
}

protected:
TemplatedUniqueKernelPointer<KernelType, DeleteFunction> kernel_ptr;
mutex lock;
};

typedef SharedKernelPointer<ffi::SharedSnapshot, ffi::free_snapshot> SharedKernelSnapshot;

struct KernelUtils {
static ffi::KernelStringSlice ToDeltaString(const string &str);
static string FromDeltaString(const struct ffi::KernelStringSlice slice);
Expand Down
3 changes: 2 additions & 1 deletion src/include/functions/delta_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ struct DeltaSnapshot : public MultiFileList {
idx_t version;

//! Delta Kernel Structures
KernelSnapshot snapshot;
shared_ptr<SharedKernelSnapshot> snapshot;

KernelExternEngine extern_engine;
KernelScan scan;
KernelGlobalScanState global_state;
Expand Down
79 changes: 79 additions & 0 deletions src/include/storage/delta_catalog.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
//===----------------------------------------------------------------------===//
// DuckDB
//
// storage/delta_catalog.hpp
//
//
//===----------------------------------------------------------------------===//

#pragma once

#include "duckdb/catalog/catalog.hpp"
#include "duckdb/function/table_function.hpp"
#include "duckdb/common/enums/access_mode.hpp"

namespace duckdb {
class DeltaSchemaEntry;

struct DeltaCredentials {
string endpoint;
string token;

// Not really part of the credentials, but required to query s3 tables
string aws_region;
};

class DeltaClearCacheFunction : public TableFunction {
public:
DeltaClearCacheFunction();

static void ClearCacheOnSetting(ClientContext &context, SetScope scope, Value &parameter);
};

class DeltaCatalog : public Catalog {
public:
explicit DeltaCatalog(AttachedDatabase &db_p, const string &internal_name, AccessMode access_mode);
~DeltaCatalog();

string path;
AccessMode access_mode;

public:
void Initialize(bool load_builtin) override;
string GetCatalogType() override {
return "delta";
}

optional_ptr<CatalogEntry> CreateSchema(CatalogTransaction transaction, CreateSchemaInfo &info) override;

void ScanSchemas(ClientContext &context, std::function<void(SchemaCatalogEntry &)> callback) override;

optional_ptr<SchemaCatalogEntry> GetSchema(CatalogTransaction transaction, const string &schema_name,
OnEntryNotFound if_not_found,
QueryErrorContext error_context = QueryErrorContext()) override;

unique_ptr<PhysicalOperator> PlanInsert(ClientContext &context, LogicalInsert &op,
unique_ptr<PhysicalOperator> plan) override;
unique_ptr<PhysicalOperator> PlanCreateTableAs(ClientContext &context, LogicalCreateTable &op,
unique_ptr<PhysicalOperator> plan) override;
unique_ptr<PhysicalOperator> PlanDelete(ClientContext &context, LogicalDelete &op,
unique_ptr<PhysicalOperator> plan) override;
unique_ptr<PhysicalOperator> PlanUpdate(ClientContext &context, LogicalUpdate &op,
unique_ptr<PhysicalOperator> plan) override;
unique_ptr<LogicalOperator> BindCreateIndex(Binder &binder, CreateStatement &stmt, TableCatalogEntry &table,
unique_ptr<LogicalOperator> plan) override;

DatabaseSize GetDatabaseSize(ClientContext &context) override;

bool InMemory() override;
string GetDBPath() override;

private:
void DropSchema(ClientContext &context, DropInfo &info) override;

private:
unique_ptr<DeltaSchemaEntry> main_schema;
string default_schema;
};

} // namespace duckdb
52 changes: 52 additions & 0 deletions src/include/storage/delta_schema_entry.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//===----------------------------------------------------------------------===//
// DuckDB
//
// storage/delta_schema_entry.hpp
//
//
//===----------------------------------------------------------------------===//

#pragma once

#include "duckdb/catalog/catalog_entry/schema_catalog_entry.hpp"
#include "storage/delta_table_entry.hpp"

#define DEFAULT_DELTA_TABLE "delta_table"

namespace duckdb {
class DeltaTransaction;

class DeltaSchemaEntry : public SchemaCatalogEntry {
public:
DeltaSchemaEntry(Catalog &catalog, CreateSchemaInfo &info);
~DeltaSchemaEntry() override;

void LoadTable(CatalogTransaction& transaction);

public:
optional_ptr<CatalogEntry> CreateTable(CatalogTransaction transaction, BoundCreateTableInfo &info) override;
optional_ptr<CatalogEntry> CreateFunction(CatalogTransaction transaction, CreateFunctionInfo &info) override;
optional_ptr<CatalogEntry> CreateIndex(CatalogTransaction transaction, CreateIndexInfo &info,
TableCatalogEntry &table) override;
optional_ptr<CatalogEntry> CreateView(CatalogTransaction transaction, CreateViewInfo &info) override;
optional_ptr<CatalogEntry> CreateSequence(CatalogTransaction transaction, CreateSequenceInfo &info) override;
optional_ptr<CatalogEntry> CreateTableFunction(CatalogTransaction transaction,
CreateTableFunctionInfo &info) override;
optional_ptr<CatalogEntry> CreateCopyFunction(CatalogTransaction transaction,
CreateCopyFunctionInfo &info) override;
optional_ptr<CatalogEntry> CreatePragmaFunction(CatalogTransaction transaction,
CreatePragmaFunctionInfo &info) override;
optional_ptr<CatalogEntry> CreateCollation(CatalogTransaction transaction, CreateCollationInfo &info) override;
optional_ptr<CatalogEntry> CreateType(CatalogTransaction transaction, CreateTypeInfo &info) override;
void Alter(CatalogTransaction transaction, AlterInfo &info) override;
void Scan(ClientContext &context, CatalogType type, const std::function<void(CatalogEntry &)> &callback) override;
void Scan(CatalogType type, const std::function<void(CatalogEntry &)> &callback) override;
void DropEntry(ClientContext &context, DropInfo &info) override;
optional_ptr<CatalogEntry> GetEntry(CatalogTransaction transaction, CatalogType type, const string &name) override;

private:
// There is only 1 table in a delta catalog.
unique_ptr<DeltaTableEntry> table;
};

} // namespace duckdb
35 changes: 35 additions & 0 deletions src/include/storage/delta_table_entry.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//===----------------------------------------------------------------------===//
// DuckDB
//
// storage/delta_table_entry.hpp
//
//
//===----------------------------------------------------------------------===//

#pragma once

#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
#include "duckdb/parser/parsed_data/create_table_info.hpp"

namespace duckdb {
struct DeltaSnapshot;

class DeltaTableEntry : public TableCatalogEntry {
public:
DeltaTableEntry(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info);

public:
unique_ptr<BaseStatistics> GetStatistics(ClientContext &context, column_t column_id) override;

TableFunction GetScanFunction(ClientContext &context, unique_ptr<FunctionData> &bind_data) override;

TableStorageInfo GetStorageInfo(ClientContext &context) override;

void BindUpdateConstraints(Binder &binder, LogicalGet &get, LogicalProjection &proj, LogicalUpdate &update,
ClientContext &context) override;

public:
shared_ptr<DeltaSnapshot> snapshot;
};

} // namespace duckdb
Loading

0 comments on commit 7fb17d3

Please sign in to comment.