From 7fb17d36315d75fe2f41593e4bcd061a4051285a Mon Sep 17 00:00:00 2001 From: Sam Ansmink Date: Mon, 23 Sep 2024 17:32:08 +0200 Subject: [PATCH] PoC delta attach --- CMakeLists.txt | 8 +- scripts/plot.py | 3 +- src/delta_extension.cpp | 27 +++ src/functions/delta_scan.cpp | 37 +++- src/include/delta_utils.hpp | 55 ++++++ src/include/functions/delta_scan.hpp | 3 +- src/include/storage/delta_catalog.hpp | 79 ++++++++ src/include/storage/delta_schema_entry.hpp | 52 ++++++ src/include/storage/delta_table_entry.hpp | 35 ++++ src/include/storage/delta_transaction.hpp | 45 +++++ .../storage/delta_transaction_manager.hpp | 33 ++++ src/storage/delta_catalog.cpp | 83 +++++++++ src/storage/delta_schema_entry.cpp | 170 ++++++++++++++++++ src/storage/delta_table_entry.cpp | 58 ++++++ src/storage/delta_transaction.cpp | 42 +++++ src/storage/delta_transaction_manager.cpp | 40 +++++ 16 files changed, 762 insertions(+), 8 deletions(-) create mode 100644 src/include/storage/delta_catalog.hpp create mode 100644 src/include/storage/delta_schema_entry.hpp create mode 100644 src/include/storage/delta_table_entry.hpp create mode 100644 src/include/storage/delta_transaction.hpp create mode 100644 src/include/storage/delta_transaction_manager.hpp create mode 100644 src/storage/delta_catalog.cpp create mode 100644 src/storage/delta_schema_entry.cpp create mode 100644 src/storage/delta_table_entry.cpp create mode 100644 src/storage/delta_transaction.cpp create mode 100644 src/storage/delta_transaction_manager.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index f4f9267..fd9255d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -15,7 +15,13 @@ set(EXTENSION_SOURCES src/delta_extension.cpp src/delta_functions.cpp src/delta_utils.cpp - src/functions/delta_scan.cpp) + src/functions/delta_scan.cpp + src/storage/delta_catalog.cpp + src/storage/delta_schema_entry.cpp + src/storage/delta_table_entry.cpp + src/storage/delta_transaction.cpp + src/storage/delta_transaction_manager.cpp +) ### Custom config # TODO: figure out if we really need this? diff --git a/scripts/plot.py b/scripts/plot.py index 9090f3f..2ca01bc 100644 --- a/scripts/plot.py +++ b/scripts/plot.py @@ -5,6 +5,7 @@ parser = argparse.ArgumentParser(description='Plot the results in ./benchmark_results') parser.add_argument('-p','--pattern', help='Pattern to match result csv files to', required=False, default='*.csv') parser.add_argument('-w','--width', help='Width of graph, adjust to fit data', required=False, default=20) +parser.add_argument('-n','--name', help='name of the graph ', required=False, default='') args = vars(parser.parse_args()) ### Parse Query Results @@ -34,5 +35,5 @@ import numpy as np plt.rcParams["figure.figsize"] = [int(args['width']), 5] -fig = benchmark_results.pivot(index='benchmark', columns='config', values='timing').plot(kind='bar', title='', ylabel='runtime [s]').get_figure() +fig = benchmark_results.pivot(index='benchmark', columns='config', values='timing').plot(kind='bar', title=args['name'], ylabel='runtime [s]').get_figure() fig.savefig('benchmark_results/result.png') \ No newline at end of file diff --git a/src/delta_extension.cpp b/src/delta_extension.cpp index 1a316d9..b463a3f 100644 --- a/src/delta_extension.cpp +++ b/src/delta_extension.cpp @@ -6,14 +6,41 @@ #include "duckdb.hpp" #include "duckdb/common/exception.hpp" #include "duckdb/main/extension_util.hpp" +#include "duckdb/storage/storage_extension.hpp" +#include "storage/delta_catalog.hpp" +#include "storage/delta_transaction_manager.hpp" namespace duckdb { +static unique_ptr DeltaCatalogAttach(StorageExtensionInfo *storage_info, ClientContext &context, + AttachedDatabase &db, const string &name, AttachInfo &info, + AccessMode access_mode) { + return make_uniq(db, info.path, access_mode); +} + +static unique_ptr CreateTransactionManager(StorageExtensionInfo *storage_info, AttachedDatabase &db, + Catalog &catalog) { + auto &uc_catalog = catalog.Cast(); + return make_uniq(db, uc_catalog); +} + +class DeltaStorageExtension : public StorageExtension { +public: + DeltaStorageExtension() { + attach = DeltaCatalogAttach; + create_transaction_manager = CreateTransactionManager; + } +}; + static void LoadInternal(DatabaseInstance &instance) { // Load functions for (const auto &function : DeltaFunctions::GetTableFunctions(instance)) { ExtensionUtil::RegisterFunction(instance, function); } + + // Register the "single table" delta catalog (to ATTACH a single delta table) + auto &config = DBConfig::GetConfig(instance); + config.storage_extensions["delta"] = make_uniq(); } void DeltaExtension::Load(DuckDB &db) { diff --git a/src/functions/delta_scan.cpp b/src/functions/delta_scan.cpp index 65eb34f..afadaca 100644 --- a/src/functions/delta_scan.cpp +++ b/src/functions/delta_scan.cpp @@ -19,7 +19,9 @@ #include #include #include +#include #include +#include namespace duckdb { @@ -390,7 +392,14 @@ void DeltaSnapshot::Bind(vector &return_types, vector &name if (!initialized) { InitializeFiles(); } - auto schema = SchemaVisitor::VisitSnapshotSchema(snapshot.get()); + + unique_ptr schema; + + { + auto snapshot_ref = snapshot->GetLockingRef(); + schema = SchemaVisitor::VisitSnapshotSchema(snapshot_ref.GetPtr()); + } + for (const auto &field: *schema) { names.push_back(field.first); return_types.push_back(field.second); @@ -439,18 +448,21 @@ void DeltaSnapshot::InitializeFiles() { auto interface_builder = CreateBuilder(context, paths[0]); extern_engine = TryUnpackKernelResult( ffi::builder_build(interface_builder)); - // Initialize Snapshot - snapshot = TryUnpackKernelResult(ffi::snapshot(path_slice, extern_engine.get())); + if (!snapshot) { + snapshot = make_shared_ptr(TryUnpackKernelResult(ffi::snapshot(path_slice, extern_engine.get()))); + } + + auto snapshot_ref = snapshot->GetLockingRef(); // Create Scan PredicateVisitor visitor(names, &table_filters); - scan = TryUnpackKernelResult(ffi::scan(snapshot.get(), extern_engine.get(), &visitor)); + scan = TryUnpackKernelResult(ffi::scan(snapshot_ref.GetPtr(), extern_engine.get(), &visitor)); // Create GlobalState global_state = ffi::get_global_scan_state(scan.get()); // Set version - this->version = ffi::version(snapshot.get()); + this->version = ffi::version(snapshot_ref.GetPtr()); // Create scan data iterator scan_data_iterator = TryUnpackKernelResult(ffi::kernel_scan_data_init(extern_engine.get(), scan.get())); @@ -471,6 +483,9 @@ unique_ptr DeltaSnapshot::ComplexFilterPushdown(ClientContext &co filtered_list->table_filters = std::move(filterstmp); filtered_list->names = names; + // Copy over the snapshot, this avoids reparsing metadata + filtered_list->snapshot = snapshot; + return std::move(filtered_list); } @@ -623,6 +638,18 @@ unique_ptr DeltaMultiFileReader::CreateFileList(ClientContext &co throw BinderException("'delta_scan' only supports single path as input"); } + // TODO: this is techinically incorrect for `select * from attach_delta union all from delta_scan('../some/path') + // since the first one should scan the snapshot from the transaction whereas the second one should scan the current state + for (auto& transaction : context.ActiveTransaction().OpenedTransactions()) { + auto & catalog = transaction.get().GetCatalog(); + if (catalog.GetCatalogType() == "delta" && catalog.GetDBPath() == paths[0]) { + auto snapshot = make_uniq(context, paths[0]); + snapshot->snapshot = + + return std::move(snapshot); + } + } + return make_uniq(context, paths[0]); } diff --git a/src/include/delta_utils.hpp b/src/include/delta_utils.hpp index 9b33c5c..7b14386 100644 --- a/src/include/delta_utils.hpp +++ b/src/include/delta_utils.hpp @@ -108,6 +108,61 @@ typedef TemplatedUniqueKernelPointer KernelScan typedef TemplatedUniqueKernelPointer KernelGlobalScanState; typedef TemplatedUniqueKernelPointer KernelScanDataIterator; +template +struct SharedKernelPointer; + +// A reference to a SharedKernelPointer, only 1 can be handed out at the same time +template +struct SharedKernelRef { + friend struct SharedKernelPointer; +public: + KernelType* GetPtr() { + return owning_pointer.kernel_ptr.get(); + } + ~SharedKernelRef() { + owning_pointer.lock.unlock(); + } + +protected: + SharedKernelRef(SharedKernelPointer& owning_pointer_p) : owning_pointer(owning_pointer_p) { + owning_pointer.lock.lock(); + } + +protected: + // The pointer that owns this ref + SharedKernelPointer& owning_pointer; +}; + +// Wrapper around ffi objects to share between threads +template +struct SharedKernelPointer { + friend struct SharedKernelRef; +public: + SharedKernelPointer(TemplatedUniqueKernelPointer unique_kernel_ptr) : kernel_ptr(unique_kernel_ptr) {} + SharedKernelPointer(KernelType* ptr) : kernel_ptr(ptr){} + SharedKernelPointer(){} + + SharedKernelPointer(SharedKernelPointer&& other) : SharedKernelPointer() { + other.lock.lock(); + lock.lock(); + kernel_ptr = std::move(other.kernel_ptr); + lock.lock(); + other.lock.lock(); + } + + // Returns a reference to the underlying kernel object. The SharedKernelPointer to this object will be locked for the + // lifetime of this reference + SharedKernelRef GetLockingRef() { + return SharedKernelRef(*this); + } + +protected: + TemplatedUniqueKernelPointer kernel_ptr; + mutex lock; +}; + +typedef SharedKernelPointer SharedKernelSnapshot; + struct KernelUtils { static ffi::KernelStringSlice ToDeltaString(const string &str); static string FromDeltaString(const struct ffi::KernelStringSlice slice); diff --git a/src/include/functions/delta_scan.hpp b/src/include/functions/delta_scan.hpp index aac35cc..f491076 100644 --- a/src/include/functions/delta_scan.hpp +++ b/src/include/functions/delta_scan.hpp @@ -70,7 +70,8 @@ struct DeltaSnapshot : public MultiFileList { idx_t version; //! Delta Kernel Structures - KernelSnapshot snapshot; + shared_ptr snapshot; + KernelExternEngine extern_engine; KernelScan scan; KernelGlobalScanState global_state; diff --git a/src/include/storage/delta_catalog.hpp b/src/include/storage/delta_catalog.hpp new file mode 100644 index 0000000..40a4835 --- /dev/null +++ b/src/include/storage/delta_catalog.hpp @@ -0,0 +1,79 @@ +//===----------------------------------------------------------------------===// +// DuckDB +// +// storage/delta_catalog.hpp +// +// +//===----------------------------------------------------------------------===// + +#pragma once + +#include "duckdb/catalog/catalog.hpp" +#include "duckdb/function/table_function.hpp" +#include "duckdb/common/enums/access_mode.hpp" + +namespace duckdb { +class DeltaSchemaEntry; + +struct DeltaCredentials { + string endpoint; + string token; + + // Not really part of the credentials, but required to query s3 tables + string aws_region; +}; + +class DeltaClearCacheFunction : public TableFunction { +public: + DeltaClearCacheFunction(); + + static void ClearCacheOnSetting(ClientContext &context, SetScope scope, Value ¶meter); +}; + +class DeltaCatalog : public Catalog { +public: + explicit DeltaCatalog(AttachedDatabase &db_p, const string &internal_name, AccessMode access_mode); + ~DeltaCatalog(); + + string path; + AccessMode access_mode; + +public: + void Initialize(bool load_builtin) override; + string GetCatalogType() override { + return "delta"; + } + + optional_ptr CreateSchema(CatalogTransaction transaction, CreateSchemaInfo &info) override; + + void ScanSchemas(ClientContext &context, std::function callback) override; + + optional_ptr GetSchema(CatalogTransaction transaction, const string &schema_name, + OnEntryNotFound if_not_found, + QueryErrorContext error_context = QueryErrorContext()) override; + + unique_ptr PlanInsert(ClientContext &context, LogicalInsert &op, + unique_ptr plan) override; + unique_ptr PlanCreateTableAs(ClientContext &context, LogicalCreateTable &op, + unique_ptr plan) override; + unique_ptr PlanDelete(ClientContext &context, LogicalDelete &op, + unique_ptr plan) override; + unique_ptr PlanUpdate(ClientContext &context, LogicalUpdate &op, + unique_ptr plan) override; + unique_ptr BindCreateIndex(Binder &binder, CreateStatement &stmt, TableCatalogEntry &table, + unique_ptr plan) override; + + DatabaseSize GetDatabaseSize(ClientContext &context) override; + + bool InMemory() override; + string GetDBPath() override; + +private: + void DropSchema(ClientContext &context, DropInfo &info) override; + +private: + unique_ptr main_schema; + string default_schema; +}; + +} // namespace duckdb diff --git a/src/include/storage/delta_schema_entry.hpp b/src/include/storage/delta_schema_entry.hpp new file mode 100644 index 0000000..ab4eb33 --- /dev/null +++ b/src/include/storage/delta_schema_entry.hpp @@ -0,0 +1,52 @@ +//===----------------------------------------------------------------------===// +// DuckDB +// +// storage/delta_schema_entry.hpp +// +// +//===----------------------------------------------------------------------===// + +#pragma once + +#include "duckdb/catalog/catalog_entry/schema_catalog_entry.hpp" +#include "storage/delta_table_entry.hpp" + +#define DEFAULT_DELTA_TABLE "delta_table" + +namespace duckdb { +class DeltaTransaction; + +class DeltaSchemaEntry : public SchemaCatalogEntry { +public: + DeltaSchemaEntry(Catalog &catalog, CreateSchemaInfo &info); + ~DeltaSchemaEntry() override; + + void LoadTable(CatalogTransaction& transaction); + +public: + optional_ptr CreateTable(CatalogTransaction transaction, BoundCreateTableInfo &info) override; + optional_ptr CreateFunction(CatalogTransaction transaction, CreateFunctionInfo &info) override; + optional_ptr CreateIndex(CatalogTransaction transaction, CreateIndexInfo &info, + TableCatalogEntry &table) override; + optional_ptr CreateView(CatalogTransaction transaction, CreateViewInfo &info) override; + optional_ptr CreateSequence(CatalogTransaction transaction, CreateSequenceInfo &info) override; + optional_ptr CreateTableFunction(CatalogTransaction transaction, + CreateTableFunctionInfo &info) override; + optional_ptr CreateCopyFunction(CatalogTransaction transaction, + CreateCopyFunctionInfo &info) override; + optional_ptr CreatePragmaFunction(CatalogTransaction transaction, + CreatePragmaFunctionInfo &info) override; + optional_ptr CreateCollation(CatalogTransaction transaction, CreateCollationInfo &info) override; + optional_ptr CreateType(CatalogTransaction transaction, CreateTypeInfo &info) override; + void Alter(CatalogTransaction transaction, AlterInfo &info) override; + void Scan(ClientContext &context, CatalogType type, const std::function &callback) override; + void Scan(CatalogType type, const std::function &callback) override; + void DropEntry(ClientContext &context, DropInfo &info) override; + optional_ptr GetEntry(CatalogTransaction transaction, CatalogType type, const string &name) override; + +private: + // There is only 1 table in a delta catalog. + unique_ptr table; +}; + +} // namespace duckdb diff --git a/src/include/storage/delta_table_entry.hpp b/src/include/storage/delta_table_entry.hpp new file mode 100644 index 0000000..5d36421 --- /dev/null +++ b/src/include/storage/delta_table_entry.hpp @@ -0,0 +1,35 @@ +//===----------------------------------------------------------------------===// +// DuckDB +// +// storage/delta_table_entry.hpp +// +// +//===----------------------------------------------------------------------===// + +#pragma once + +#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp" +#include "duckdb/parser/parsed_data/create_table_info.hpp" + +namespace duckdb { +struct DeltaSnapshot; + +class DeltaTableEntry : public TableCatalogEntry { +public: + DeltaTableEntry(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info); + +public: + unique_ptr GetStatistics(ClientContext &context, column_t column_id) override; + + TableFunction GetScanFunction(ClientContext &context, unique_ptr &bind_data) override; + + TableStorageInfo GetStorageInfo(ClientContext &context) override; + + void BindUpdateConstraints(Binder &binder, LogicalGet &get, LogicalProjection &proj, LogicalUpdate &update, + ClientContext &context) override; + +public: + shared_ptr snapshot; +}; + +} // namespace duckdb diff --git a/src/include/storage/delta_transaction.hpp b/src/include/storage/delta_transaction.hpp new file mode 100644 index 0000000..c0bd82b --- /dev/null +++ b/src/include/storage/delta_transaction.hpp @@ -0,0 +1,45 @@ +//===----------------------------------------------------------------------===// +// DuckDB +// +// storage/delta_transaction.hpp +// +// +//===----------------------------------------------------------------------===// + +#pragma once + +#include "duckdb/transaction/transaction.hpp" + +namespace duckdb { +class DeltaCatalog; +class DeltaSchemaEntry; +class DeltaTableEntry; +struct DeltaSnapshot; + +enum class DeltaTransactionState { TRANSACTION_NOT_YET_STARTED, TRANSACTION_STARTED, TRANSACTION_FINISHED }; + +class DeltaTransaction : public Transaction { +public: + DeltaTransaction(DeltaCatalog &delta_catalog, TransactionManager &manager, ClientContext &context); + ~DeltaTransaction() override; + + void Start(); + void Commit(); + void Rollback(); + + static DeltaTransaction &Get(ClientContext &context, Catalog &catalog); + AccessMode GetAccessMode() const; + + void SetReadWrite() override { + throw NotImplementedException("Can not start read-write transaction"); + }; +public: + shared_ptr snapshot; + +private: + // DeltaConnection connection; + DeltaTransactionState transaction_state; + AccessMode access_mode; +}; + +} // namespace duckdb diff --git a/src/include/storage/delta_transaction_manager.hpp b/src/include/storage/delta_transaction_manager.hpp new file mode 100644 index 0000000..3957982 --- /dev/null +++ b/src/include/storage/delta_transaction_manager.hpp @@ -0,0 +1,33 @@ +//===----------------------------------------------------------------------===// +// DuckDB +// +// storage/delta_transaction_manager.hpp +// +// +//===----------------------------------------------------------------------===// + +#pragma once + +#include "duckdb/transaction/transaction_manager.hpp" +#include "storage/delta_catalog.hpp" +#include "storage/delta_transaction.hpp" + +namespace duckdb { + +class DeltaTransactionManager : public TransactionManager { +public: + DeltaTransactionManager(AttachedDatabase &db_p, DeltaCatalog &delta_catalog); + + Transaction &StartTransaction(ClientContext &context) override; + ErrorData CommitTransaction(ClientContext &context, Transaction &transaction) override; + void RollbackTransaction(Transaction &transaction) override; + + void Checkpoint(ClientContext &context, bool force = false) override; + +private: + DeltaCatalog &delta_catalog; + mutex transaction_lock; + reference_map_t> transactions; +}; + +} // namespace duckdb diff --git a/src/storage/delta_catalog.cpp b/src/storage/delta_catalog.cpp new file mode 100644 index 0000000..ccb2c78 --- /dev/null +++ b/src/storage/delta_catalog.cpp @@ -0,0 +1,83 @@ +#include "storage/delta_catalog.hpp" +#include "storage/delta_schema_entry.hpp" +#include "storage/delta_transaction.hpp" +#include "duckdb/storage/database_size.hpp" +#include "duckdb/parser/parsed_data/drop_info.hpp" +#include "duckdb/parser/parsed_data/create_schema_info.hpp" +#include "duckdb/main/attached_database.hpp" + +namespace duckdb { + +DeltaCatalog::DeltaCatalog(AttachedDatabase &db_p, const string &path, AccessMode access_mode) + : Catalog(db_p), path(path), access_mode(access_mode) { +} + +DeltaCatalog::~DeltaCatalog() = default; + +void DeltaCatalog::Initialize(bool load_builtin) { + CreateSchemaInfo info; + main_schema = make_uniq(*this, info); +} + +optional_ptr DeltaCatalog::CreateSchema(CatalogTransaction transaction, CreateSchemaInfo &info) { + throw BinderException("Delta tables do not support creating new schemas"); +} + +void DeltaCatalog::DropSchema(ClientContext &context, DropInfo &info) { + throw BinderException("Delta tables do not support dropping schemas"); +} + +void DeltaCatalog::ScanSchemas(ClientContext &context, std::function callback) { + callback(*main_schema); +} + +optional_ptr DeltaCatalog::GetSchema(CatalogTransaction transaction, const string &schema_name, + OnEntryNotFound if_not_found, QueryErrorContext error_context) { + if (schema_name == DEFAULT_SCHEMA || schema_name == INVALID_SCHEMA) { + return main_schema.get(); + } + if (if_not_found == OnEntryNotFound::RETURN_NULL) { + return nullptr; + } + return nullptr; +} + +bool DeltaCatalog::InMemory() { + return false; +} + +string DeltaCatalog::GetDBPath() { + return path; +} + +DatabaseSize DeltaCatalog::GetDatabaseSize(ClientContext &context) { + if (default_schema.empty()) { + throw InvalidInputException("Attempting to fetch the database size - but no database was provided " + "in the connection string"); + } + DatabaseSize size; + return size; +} + +unique_ptr DeltaCatalog::PlanInsert(ClientContext &context, LogicalInsert &op, + unique_ptr plan) { + throw NotImplementedException("DeltaCatalog PlanInsert"); +} +unique_ptr DeltaCatalog::PlanCreateTableAs(ClientContext &context, LogicalCreateTable &op, + unique_ptr plan) { + throw NotImplementedException("DeltaCatalog PlanCreateTableAs"); +} +unique_ptr DeltaCatalog::PlanDelete(ClientContext &context, LogicalDelete &op, + unique_ptr plan) { + throw NotImplementedException("DeltaCatalog PlanDelete"); +} +unique_ptr DeltaCatalog::PlanUpdate(ClientContext &context, LogicalUpdate &op, + unique_ptr plan) { + throw NotImplementedException("DeltaCatalog PlanUpdate"); +} +unique_ptr DeltaCatalog::BindCreateIndex(Binder &binder, CreateStatement &stmt, TableCatalogEntry &table, + unique_ptr plan) { + throw NotImplementedException("DeltaCatalog BindCreateIndex"); +} + +} // namespace duckdb diff --git a/src/storage/delta_schema_entry.cpp b/src/storage/delta_schema_entry.cpp new file mode 100644 index 0000000..3612bfa --- /dev/null +++ b/src/storage/delta_schema_entry.cpp @@ -0,0 +1,170 @@ +#include "storage/delta_schema_entry.hpp" + +#include +#include + +#include "storage/delta_table_entry.hpp" +#include "storage/delta_transaction.hpp" +#include "duckdb/parser/parsed_data/create_view_info.hpp" +#include "duckdb/parser/parsed_data/create_index_info.hpp" +#include "duckdb/planner/parsed_data/bound_create_table_info.hpp" +#include "duckdb/parser/parsed_data/drop_info.hpp" +#include "duckdb/parser/constraints/list.hpp" +#include "duckdb/common/unordered_set.hpp" +#include "duckdb/parser/parsed_data/alter_info.hpp" +#include "duckdb/parser/parsed_data/alter_table_info.hpp" +#include "duckdb/parser/parsed_expression_iterator.hpp" + + +namespace duckdb { + +DeltaSchemaEntry::DeltaSchemaEntry(Catalog &catalog, CreateSchemaInfo &info) + : SchemaCatalogEntry(catalog, info) { +} + +DeltaSchemaEntry::~DeltaSchemaEntry() { +} + +DeltaTransaction &GetDeltaTransaction(CatalogTransaction transaction) { + if (!transaction.transaction) { + throw InternalException("No transaction!?"); + } + return transaction.transaction->Cast(); +} + +optional_ptr DeltaSchemaEntry::CreateTable(CatalogTransaction transaction, BoundCreateTableInfo &info) { + throw BinderException("Delta tables do not support creating tables"); +} + +optional_ptr DeltaSchemaEntry::CreateFunction(CatalogTransaction transaction, CreateFunctionInfo &info) { + throw BinderException("Delta tables do not support creating functions"); +} + +void DeltaUnqualifyColumnRef(ParsedExpression &expr) { + if (expr.type == ExpressionType::COLUMN_REF) { + auto &colref = expr.Cast(); + auto name = std::move(colref.column_names.back()); + colref.column_names = {std::move(name)}; + return; + } + ParsedExpressionIterator::EnumerateChildren(expr, DeltaUnqualifyColumnRef); +} + +optional_ptr DeltaSchemaEntry::CreateIndex(CatalogTransaction transaction, CreateIndexInfo &info, + TableCatalogEntry &table) { + throw NotImplementedException("CreateIndex"); +} + +string GetDeltaCreateView(CreateViewInfo &info) { + throw NotImplementedException("GetCreateView"); +} + +optional_ptr DeltaSchemaEntry::CreateView(CatalogTransaction transaction, CreateViewInfo &info) { + throw BinderException("Delta tables do not support creating views"); +} + +optional_ptr DeltaSchemaEntry::CreateType(CatalogTransaction transaction, CreateTypeInfo &info) { + throw BinderException("Delta databases do not support creating types"); +} + +optional_ptr DeltaSchemaEntry::CreateSequence(CatalogTransaction transaction, CreateSequenceInfo &info) { + throw BinderException("Delta databases do not support creating sequences"); +} + +optional_ptr DeltaSchemaEntry::CreateTableFunction(CatalogTransaction transaction, + CreateTableFunctionInfo &info) { + throw BinderException("Delta databases do not support creating table functions"); +} + +optional_ptr DeltaSchemaEntry::CreateCopyFunction(CatalogTransaction transaction, + CreateCopyFunctionInfo &info) { + throw BinderException("Delta databases do not support creating copy functions"); +} + +optional_ptr DeltaSchemaEntry::CreatePragmaFunction(CatalogTransaction transaction, + CreatePragmaFunctionInfo &info) { + throw BinderException("Delta databases do not support creating pragma functions"); +} + +optional_ptr DeltaSchemaEntry::CreateCollation(CatalogTransaction transaction, CreateCollationInfo &info) { + throw BinderException("Delta databases do not support creating collations"); +} + +void DeltaSchemaEntry::Alter(CatalogTransaction transaction, AlterInfo &info) { + throw NotImplementedException("Delta tables do not support altering"); +} + +bool CatalogTypeIsSupported(CatalogType type) { + switch (type) { + case CatalogType::TABLE_ENTRY: + return true; + default: + return false; + } +} + +void DeltaSchemaEntry::Scan(ClientContext &context, CatalogType type, + const std::function &callback) { + if (!CatalogTypeIsSupported(type)) { + return; + } + + // LoadTable(context); + + callback(*table); +} +void DeltaSchemaEntry::Scan(CatalogType type, const std::function &callback) { + throw NotImplementedException("Scan without context not supported"); +} + +void DeltaSchemaEntry::DropEntry(ClientContext &context, DropInfo &info) { + throw NotImplementedException("Delta tables do not support dropping"); +} + +optional_ptr DeltaSchemaEntry::GetEntry(CatalogTransaction transaction, CatalogType type, + const string &name) { + D_ASSERT(type == CatalogType::TABLE_ENTRY); + if (type == CatalogType::TABLE_ENTRY && name == DEFAULT_DELTA_TABLE) { + + LoadTable(transaction); + + return *table; + } + return nullptr; +} + +void DeltaSchemaEntry::LoadTable(CatalogTransaction &transaction) { + if (table) { +#ifdef DEBUG + // Confirm that the transaction is looking at the same version of the table + auto snapshot = GetDeltaTransaction(transaction).snapshot; + if (snapshot && table->snapshot) { + D_ASSERT(snapshot.get() == table->snapshot.get()); + } +#endif + return; + } + + auto& delta_transaction = GetDeltaTransaction(transaction); + + // This is the first time we fetch the snapshot during this transaction: we will store this in the transaction + // making sure that any subsequent reads of the delta table will see the same snapshot + if (!delta_transaction.snapshot) { + auto &delta_catalog = catalog.Cast(); + delta_transaction.snapshot = make_shared_ptr(transaction.GetContext(), delta_catalog.GetDBPath()); + } + + // Get the names and types from the delta snapshot + vector return_types; + vector names; + delta_transaction.snapshot->Bind(return_types, names); + + CreateTableInfo table_info; + for (idx_t i = 0; i < return_types.size(); i++) { + table_info.columns.AddColumn(ColumnDefinition(names[i], return_types[i])); + } + table = make_uniq(catalog, *this, table_info); + table->snapshot = delta_transaction.snapshot; +} + +} // namespace duckdb diff --git a/src/storage/delta_table_entry.cpp b/src/storage/delta_table_entry.cpp new file mode 100644 index 0000000..1389344 --- /dev/null +++ b/src/storage/delta_table_entry.cpp @@ -0,0 +1,58 @@ +#include "storage/delta_catalog.hpp" +#include "storage/delta_schema_entry.hpp" +#include "storage/delta_table_entry.hpp" +#include "storage/delta_transaction.hpp" +#include "duckdb/storage/statistics/base_statistics.hpp" +#include "duckdb/storage/table_storage_info.hpp" +#include "duckdb/main/extension_util.hpp" +#include "duckdb/main/database.hpp" +#include "duckdb/main/secret/secret_manager.hpp" +#include "duckdb/catalog/catalog_entry/table_function_catalog_entry.hpp" +#include "duckdb/parser/tableref/table_function_ref.hpp" +#include "../../duckdb/third_party/catch/catch.hpp" + +namespace duckdb { + +DeltaTableEntry::DeltaTableEntry(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info) + : TableCatalogEntry(catalog, schema, info) { + this->internal = false; +} + +unique_ptr DeltaTableEntry::GetStatistics(ClientContext &context, column_t column_id) { + return nullptr; +} + +void DeltaTableEntry::BindUpdateConstraints(Binder &binder, LogicalGet &, LogicalProjection &, LogicalUpdate &, + ClientContext &) { + throw NotImplementedException("BindUpdateConstraints for delta table"); +} + +TableFunction DeltaTableEntry::GetScanFunction(ClientContext &context, unique_ptr &bind_data) { + auto &db = DatabaseInstance::GetDatabase(context); + auto &delta_function_set = ExtensionUtil::GetTableFunction(db, "delta_scan"); + + auto delta_scan_function = delta_function_set.functions.GetFunctionByArguments(context, {LogicalType::VARCHAR}); + auto &delta_catalog = catalog.Cast(); + + vector inputs = {delta_catalog.GetDBPath()}; + named_parameter_map_t param_map; + vector return_types; + vector names; + TableFunctionRef empty_ref; + + TableFunctionBindInput bind_input(inputs, param_map, return_types, names, nullptr, nullptr, delta_scan_function, + empty_ref); + + auto result = delta_scan_function.bind(context, bind_input, return_types, names); + bind_data = std::move(result); + + return delta_scan_function; +} + +TableStorageInfo DeltaTableEntry::GetStorageInfo(ClientContext &context) { + TableStorageInfo result; + // TODO fill info + return result; +} + +} // namespace duckdb diff --git a/src/storage/delta_transaction.cpp b/src/storage/delta_transaction.cpp new file mode 100644 index 0000000..1286f63 --- /dev/null +++ b/src/storage/delta_transaction.cpp @@ -0,0 +1,42 @@ +#include "storage/delta_transaction.hpp" +#include "storage/delta_catalog.hpp" +#include "duckdb/parser/parsed_data/create_view_info.hpp" +#include "duckdb/catalog/catalog_entry/index_catalog_entry.hpp" +#include "duckdb/catalog/catalog_entry/view_catalog_entry.hpp" +#include "functions/delta_scan.hpp" + +namespace duckdb { + +DeltaTransaction::DeltaTransaction(DeltaCatalog &delta_catalog, TransactionManager &manager, ClientContext &context) + : Transaction(manager, context), access_mode(delta_catalog.access_mode) { + // connection = DeltaConnection::Open(delta_catalog.path); +} + +DeltaTransaction::~DeltaTransaction() { +} + +void DeltaTransaction::Start() { + transaction_state = DeltaTransactionState::TRANSACTION_NOT_YET_STARTED; +} +void DeltaTransaction::Commit() { + if (transaction_state == DeltaTransactionState::TRANSACTION_STARTED) { + transaction_state = DeltaTransactionState::TRANSACTION_FINISHED; + // NOP: we only support read-only transactions currently + } +} +void DeltaTransaction::Rollback() { + if (transaction_state == DeltaTransactionState::TRANSACTION_STARTED) { + transaction_state = DeltaTransactionState::TRANSACTION_FINISHED; + // NOP: we only support read-only transactions currently + } +} + +DeltaTransaction &DeltaTransaction::Get(ClientContext &context, Catalog &catalog) { + return Transaction::Get(context, catalog).Cast(); +} + +AccessMode DeltaTransaction::GetAccessMode() const { + return access_mode; +} + +} // namespace duckdb diff --git a/src/storage/delta_transaction_manager.cpp b/src/storage/delta_transaction_manager.cpp new file mode 100644 index 0000000..f06e104 --- /dev/null +++ b/src/storage/delta_transaction_manager.cpp @@ -0,0 +1,40 @@ +#include "storage/delta_transaction_manager.hpp" +#include "duckdb/main/attached_database.hpp" + +namespace duckdb { + +DeltaTransactionManager::DeltaTransactionManager(AttachedDatabase &db_p, DeltaCatalog &delta_catalog) + : TransactionManager(db_p), delta_catalog(delta_catalog) { +} + +Transaction &DeltaTransactionManager::StartTransaction(ClientContext &context) { + auto transaction = make_uniq(delta_catalog, *this, context); + transaction->Start(); + auto &result = *transaction; + lock_guard l(transaction_lock); + transactions[result] = std::move(transaction); + return result; +} + +ErrorData DeltaTransactionManager::CommitTransaction(ClientContext &context, Transaction &transaction) { + auto &delta_transaction = transaction.Cast(); + delta_transaction.Commit(); + lock_guard l(transaction_lock); + transactions.erase(transaction); + return ErrorData(); +} + +void DeltaTransactionManager::RollbackTransaction(Transaction &transaction) { + auto &delta_transaction = transaction.Cast(); + delta_transaction.Rollback(); + lock_guard l(transaction_lock); + transactions.erase(transaction); +} + +void DeltaTransactionManager::Checkpoint(ClientContext &context, bool force) { + // auto &transaction = DeltaTransaction::Get(context, db.GetCatalog()); + // auto &db = transaction.GetConnection(); + // db.Execute("CHECKPOINT"); +} + +} // namespace duckdb