diff --git a/CMakeLists.txt b/CMakeLists.txt index 75dbe1f..a04b147 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,6 +17,7 @@ set(EXTENSION_SOURCES src/delta_utils.cpp src/functions/delta_scan.cpp src/functions/expression_functions.cpp + src/storage/delta_insert.cpp src/storage/delta_catalog.cpp src/storage/delta_schema_entry.cpp src/storage/delta_table_entry.cpp diff --git a/duckdb b/duckdb index 89bcc3e..05870d5 160000 --- a/duckdb +++ b/duckdb @@ -1 +1 @@ -Subproject commit 89bcc3e2ce739b1b470afa79818ee03c8cf96fe8 +Subproject commit 05870d5f44ef88d1c6f7710f43d0225d368dcc85 diff --git a/scripts/generate_test_data.py b/scripts/generate_test_data.py index 001b9b2..3f08136 100644 --- a/scripts/generate_test_data.py +++ b/scripts/generate_test_data.py @@ -188,6 +188,11 @@ def generate_test_data_pyspark(name, current_path, input_path, delete_predicate con.query(f"COPY (SELECT i as id, ('val' || i::VARCHAR) as value FROM range(0,1000000) tbl(i))TO '{TMP_PATH}/simple_sf1_with_dv.parquet'") generate_test_data_pyspark('simple_sf1_with_dv', 'simple_sf1_with_dv', f'{TMP_PATH}/simple_sf1_with_dv.parquet', "id % 1000 = 0") +## really simple +con = duckdb.connect() +con.query(f"COPY (SELECT i FROM range(0,10) tbl(i)) TO '{TMP_PATH}/really_simple.parquet'") +generate_test_data_pyspark('really_simple', 'really_simple', f'{TMP_PATH}/really_simple.parquet') + ## Lineitem SF0.01 with deletion vector con = duckdb.connect() con.query(f"call dbgen(sf=0.01); COPY (from lineitem) TO '{TMP_PATH}/modified_lineitem_sf0_01.parquet'") diff --git a/src/include/delta_kernel_ffi.hpp b/src/include/delta_kernel_ffi.hpp index 464c176..adac183 100644 --- a/src/include/delta_kernel_ffi.hpp +++ b/src/include/delta_kernel_ffi.hpp @@ -502,6 +502,9 @@ struct im_an_unused_struct_that_tricks_msvc_into_compilation { ExternResult> field9; ExternResult> field10; ExternResult field11; + ExternResult> field12; + ExternResult> field13; + ExternResult field14; }; /// An `Event` can generally be thought of a "log message". It contains all the relevant bits such diff --git a/src/include/delta_utils.hpp b/src/include/delta_utils.hpp index 5bf4e1d..931ee26 100644 --- a/src/include/delta_utils.hpp +++ b/src/include/delta_utils.hpp @@ -210,6 +210,12 @@ struct UniqueKernelPointer { } } + KernelType *release() { + auto copy = ptr; + ptr = nullptr; + return copy; + } + KernelType *get() const { return ptr; } @@ -231,6 +237,8 @@ typedef TemplatedUniqueKernelPointer typedef TemplatedUniqueKernelPointer KernelScan; typedef TemplatedUniqueKernelPointer KernelGlobalScanState; typedef TemplatedUniqueKernelPointer KernelScanDataIterator; +typedef TemplatedUniqueKernelPointer KernelExclusiveTransaction; +typedef TemplatedUniqueKernelPointer KernelEngineData; template struct SharedKernelPointer; diff --git a/src/include/functions/delta_scan.hpp b/src/include/functions/delta_scan.hpp index 86edd2e..a4c8bbe 100644 --- a/src/include/functions/delta_scan.hpp +++ b/src/include/functions/delta_scan.hpp @@ -70,11 +70,13 @@ struct DeltaSnapshot : public MultiFileList { void InitializeSnapshot(); void InitializeScan(); +public: template T TryUnpackKernelResult(ffi::ExternResult result) { return KernelUtils::UnpackResult( result, StringUtil::Format("While trying to read from delta table: '%s'", paths[0])); } +protected: static void VisitData(void *engine_context, ffi::ExclusiveEngineData *engine_data, const struct ffi::KernelBoolSlice selection_vec); @@ -89,8 +91,9 @@ struct DeltaSnapshot : public MultiFileList { //! Delta Kernel Structures shared_ptr snapshot; - +public: KernelExternEngine extern_engine; +protected: KernelScan scan; KernelGlobalScanState global_state; KernelScanDataIterator scan_data_iterator; diff --git a/src/include/storage/delta_insert.hpp b/src/include/storage/delta_insert.hpp new file mode 100644 index 0000000..92d1970 --- /dev/null +++ b/src/include/storage/delta_insert.hpp @@ -0,0 +1,70 @@ +//===----------------------------------------------------------------------===// +// DuckDB +// +// storage/delta_insert.hpp +// +// +//===----------------------------------------------------------------------===// + +#pragma once + +#include "duckdb/execution/operator/persistent/physical_copy_to_file.hpp" + +#include "duckdb/execution/physical_operator.hpp" +#include "duckdb/common/index_vector.hpp" + +namespace duckdb { + +class DeltaInsert : public PhysicalCopyToFile { +public: + DeltaInsert(LogicalOperator &op, TableCatalogEntry &table_p, physical_index_vector_t column_index_map_p, + vector types, CopyFunction function_p, unique_ptr bind_data, idx_t estimated_cardinality); + + DeltaInsert(LogicalOperator &op, SchemaCatalogEntry &schema_p, unique_ptr info, + vector types, CopyFunction function_p, unique_ptr bind_data, idx_t estimated_cardinality); + + //! The table to insert into + optional_ptr table; + //! Table schema, in case of CREATE TABLE AS + optional_ptr schema; + //! Create table info, in case of CREATE TABLE AS + unique_ptr info; + //! column_index_map + physical_index_vector_t column_index_map; + //! The physical copy used internally by this insert + unique_ptr physical_copy_to_file; + +public: + // // Source interface + // SourceResultType GetData(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const override; + // + // bool IsSource() const override { + // return true; + // } + +public: + // Sink interface + // SinkResultType Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const override; + // SinkCombineResultType Combine(ExecutionContext &context, OperatorSinkCombineInput &input) const override; + SinkFinalizeType Finalize(Pipeline &pipeline, Event &event, ClientContext &context, + OperatorSinkFinalizeInput &input) const override; + // unique_ptr GetLocalSinkState(ExecutionContext &context) const override; + // unique_ptr GetGlobalSinkState(ClientContext &context) const override; + + // bool IsSink() const override { + // return true; + // } + // + // bool ParallelSink() const override { + // return true; + // } + // + // bool SinkOrderDependent() const override { + // return true; + // } + + string GetName() const override; + InsertionOrderPreservingMap ParamsToString() const override; +}; + +} // namespace duckdb diff --git a/src/include/storage/delta_transaction.hpp b/src/include/storage/delta_transaction.hpp index b9d369c..10411a5 100644 --- a/src/include/storage/delta_transaction.hpp +++ b/src/include/storage/delta_transaction.hpp @@ -8,6 +8,7 @@ #pragma once +#include "delta_utils.hpp" #include "duckdb/transaction/transaction.hpp" namespace duckdb { @@ -27,15 +28,16 @@ class DeltaTransaction : public Transaction { void Commit(); void Rollback(); + void Append(const vector &append_files); + static DeltaTransaction &Get(ClientContext &context, Catalog &catalog); AccessMode GetAccessMode() const; - void SetReadWrite() override { - throw NotImplementedException("Can not start read-write transaction"); - }; - public: unique_ptr table_entry; + vector outstanding_appends; + + KernelExclusiveTransaction kernel_transaction; private: // DeltaConnection connection; diff --git a/src/storage/delta_catalog.cpp b/src/storage/delta_catalog.cpp index fb301e2..a7ed247 100644 --- a/src/storage/delta_catalog.cpp +++ b/src/storage/delta_catalog.cpp @@ -89,14 +89,6 @@ DatabaseSize DeltaCatalog::GetDatabaseSize(ClientContext &context) { return size; } -unique_ptr DeltaCatalog::PlanInsert(ClientContext &context, LogicalInsert &op, - unique_ptr plan) { - throw NotImplementedException("DeltaCatalog does not support inserts"); -} -unique_ptr DeltaCatalog::PlanCreateTableAs(ClientContext &context, LogicalCreateTable &op, - unique_ptr plan) { - throw NotImplementedException("DeltaCatalog does not support creating new tables"); -} unique_ptr DeltaCatalog::PlanDelete(ClientContext &context, LogicalDelete &op, unique_ptr plan) { throw NotImplementedException("DeltaCatalog does not support deletes"); diff --git a/src/storage/delta_insert.cpp b/src/storage/delta_insert.cpp new file mode 100644 index 0000000..89bf3f2 --- /dev/null +++ b/src/storage/delta_insert.cpp @@ -0,0 +1,179 @@ +#include "storage/delta_insert.hpp" + +#include "duckdb/catalog/catalog_entry/copy_function_catalog_entry.hpp" +#include "duckdb/main/client_data.hpp" +#include "duckdb/main/extension_util.hpp" +#include "duckdb/planner/operator/logical_copy_to_file.hpp" +#include "functions/delta_scan.hpp" +#include "duckdb/execution/physical_operator_states.hpp" + +#include "storage/delta_catalog.hpp" +#include "storage/delta_transaction.hpp" +#include "duckdb/planner/operator/logical_insert.hpp" +#include "duckdb/planner/operator/logical_create_table.hpp" +#include "storage/delta_table_entry.hpp" +#include "duckdb/planner/parsed_data/bound_create_table_info.hpp" +#include "duckdb/execution/operator/projection/physical_projection.hpp" +#include "duckdb/execution/operator/scan/physical_table_scan.hpp" +#include "duckdb/planner/expression/bound_cast_expression.hpp" +#include "duckdb/planner/expression/bound_reference_expression.hpp" + +namespace duckdb { + +DeltaInsert::DeltaInsert(LogicalOperator &op, TableCatalogEntry &table_p, physical_index_vector_t column_index_map_p, + vector types, CopyFunction function_p, unique_ptr bind_data, idx_t estimated_cardinality) : + PhysicalCopyToFile(std::move(types), std::move(function_p), std::move(bind_data), estimated_cardinality), table(&table_p), schema(nullptr), column_index_map(std::move(column_index_map_p)) { + + type = PhysicalOperatorType::EXTENSION; +} + +DeltaInsert::DeltaInsert(LogicalOperator &op, SchemaCatalogEntry &schema_p, unique_ptr info, + vector types, CopyFunction function_p, unique_ptr bind_data, idx_t estimated_cardinality) : + PhysicalCopyToFile(std::move(types), std::move(function_p), std::move(bind_data), estimated_cardinality), table(nullptr), schema(&schema_p), info(std::move(info)) { + + type = PhysicalOperatorType::EXTENSION; +} + +//===--------------------------------------------------------------------===// +// States +//===--------------------------------------------------------------------===// +class DeltaInsertGlobalState : public GlobalSinkState { +public: + explicit DeltaInsertGlobalState(ClientContext &context, DeltaTableEntry &table, + const vector &varchar_types) + : table(table), insert_count(0) { + } + + DeltaTableEntry &table; + idx_t insert_count; +}; + +// unique_ptr DeltaInsert::GetGlobalSinkState(ClientContext &context) const { +// return physical_copy_to_file->GetGlobalSinkState(context); +// } +// +// unique_ptr DeltaInsert::GetLocalSinkState(ExecutionContext &context) const { +// return physical_copy_to_file->GetLocalSinkState(context); +// } + +//===--------------------------------------------------------------------===// +// Sink +//===--------------------------------------------------------------------===// +// SinkResultType DeltaInsert::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const { +// return physical_copy_to_file->Sink(context, chunk, input); +// } +// +// SinkCombineResultType DeltaInsert::Combine(ExecutionContext &context, OperatorSinkCombineInput &input) const { +// return physical_copy_to_file->Combine(context, input); +// } + +//===--------------------------------------------------------------------===// +// Finalize +//===--------------------------------------------------------------------===// +SinkFinalizeType DeltaInsert::Finalize(Pipeline &pipeline, Event &event, ClientContext &context, + OperatorSinkFinalizeInput &input) const { + // Call base class to finalize the PhysicalCopy + auto res = PhysicalCopyToFile::Finalize(pipeline, event, context, input); + + if (res != SinkFinalizeType::READY) { + throw NotImplementedException("Unknown SinkFinalizeType in DeltaInsert::Finalize: %s", EnumUtil::ToString(res)); + } + + auto ©_global_state = input.global_state.Cast(); + + auto &transaction = DeltaTransaction::Get(context, table->catalog); + vector filenames; + for (const auto& filename : copy_global_state.file_names) { + filenames.push_back(filename.ToString()); + } + transaction.Append(filenames); + + return res; +} + +//===--------------------------------------------------------------------===// +// GetData +//===--------------------------------------------------------------------===// + + +//===--------------------------------------------------------------------===// +// Helpers +//===--------------------------------------------------------------------===// +string DeltaInsert::GetName() const { + return table ? "DELTA_INSERT" : "DELTA_CREATE_TABLE_AS"; +} + +InsertionOrderPreservingMap DeltaInsert::ParamsToString() const { + InsertionOrderPreservingMap result; + result["Table Name"] = table ? table->name : info->Base().table; + return result; +} + +//===--------------------------------------------------------------------===// +// Plan +//===--------------------------------------------------------------------===// +static optional_ptr TryGetCopyFunction(DatabaseInstance &db, const string &name) { + D_ASSERT(!name.empty()); + auto &system_catalog = Catalog::GetSystemCatalog(db); + auto data = CatalogTransaction::GetSystemTransaction(db); + auto &schema = system_catalog.GetSchema(data, DEFAULT_SCHEMA); + return schema.GetEntry(data, CatalogType::COPY_FUNCTION_ENTRY, name)->Cast(); +} + +unique_ptr DeltaCatalog::PlanInsert(ClientContext &context, LogicalInsert &op, + unique_ptr plan) { + if (op.return_chunk) { + throw BinderException("RETURNING clause not yet supported for insertion into Delta table"); + } + if (op.action_type != OnConflictAction::THROW) { + throw BinderException("ON CONFLICT clause not yet supported for insertion into Delta table"); + } + + string delta_path = op.table.Cast().snapshot->GetPaths()[0]; // TODO unsafe? + + // Create Copy Info + auto info = make_uniq(); + info->file_path = delta_path; + info->format = "parquet"; + info->is_from = false; + + // Get Parquet Copy function + auto copy_fun = TryGetCopyFunction(*context.db, "parquet"); + if (!copy_fun) { + throw MissingExtensionException("Did not find parquet copy function required to write to delta table"); + } + + // Bind Copy Function + auto &columns = op.table.Cast().GetColumns(); + CopyFunctionBindInput bind_input(*info); + auto function_data = copy_fun->function.copy_to_bind(context, bind_input, columns.GetColumnNames(), columns.GetColumnTypes()); + + auto insert = make_uniq(op, op.table, op.column_index_map, + GetCopyFunctionReturnLogicalTypes(CopyFunctionReturnType::CHANGED_ROWS), copy_fun->function, std::move(function_data), op.estimated_cardinality); + + insert->use_tmp_file = false; + insert->file_path = delta_path; + insert->filename_pattern.SetFilenamePattern("duckdb_data_file_{uuid}"); + insert->file_extension = "parquet"; + insert->overwrite_mode = CopyOverwriteMode::COPY_OVERWRITE_OR_IGNORE; + insert->per_thread_output = true; + insert->rotate = false; + insert->return_type = CopyFunctionReturnType::CHANGED_ROWS; + insert->partition_output = false; + insert->write_partition_columns = false; + insert->names = {}; + insert->expected_types = columns.GetColumnTypes(); + insert->children.push_back(std::move(plan)); + + return std::move(insert); +} + +unique_ptr DeltaCatalog::PlanCreateTableAs(ClientContext &context, LogicalCreateTable &op, + unique_ptr plan) { + throw NotImplementedException("DeltaCatalog::PlanCreateTableAs"); + // auto insert = make_uniq(op, op.schema, std::move(op.info)); + // insert->children.push_back(std::move(plan)); + // return std::move(insert); +} + +} // namespace duckdb diff --git a/src/storage/delta_transaction.cpp b/src/storage/delta_transaction.cpp index 2af1a46..fc81bc2 100644 --- a/src/storage/delta_transaction.cpp +++ b/src/storage/delta_transaction.cpp @@ -1,7 +1,8 @@ #include "storage/delta_transaction.hpp" #include "storage/delta_catalog.hpp" -#include "duckdb/parser/parsed_data/create_view_info.hpp" -#include "duckdb/catalog/catalog_entry/index_catalog_entry.hpp" +#include "duckdb/main/client_properties.hpp" +#include "duckdb/common/arrow/arrow_converter.hpp" +#include "duckdb/common/arrow/arrow_appender.hpp" #include "duckdb/catalog/catalog_entry/view_catalog_entry.hpp" #include "functions/delta_scan.hpp" #include "storage/delta_table_entry.hpp" @@ -18,10 +19,133 @@ DeltaTransaction::~DeltaTransaction() { void DeltaTransaction::Start() { transaction_state = DeltaTransactionState::TRANSACTION_NOT_YET_STARTED; } + +static void *allocate_string(const struct ffi::KernelStringSlice slice) { + return new string(slice.ptr, slice.len); +} + +struct CommitInfo { + static vector GetTypes() { + return {LogicalType::MAP(LogicalType::VARCHAR, LogicalType::VARCHAR)}; + }; + static vector GetNames() { + return {"engineCommitInfo"}; + }; + + CommitInfo() { + buffer.Initialize(Allocator::DefaultAllocator(), GetTypes()); + } + + void Append(Value commit_info_map) { + idx_t current_size = buffer.size(); + idx_t current_capacity = buffer.GetCapacity(); + + if (current_size == current_capacity) { + buffer.SetCapacity(2*current_capacity); + } + + buffer.SetValue(0, current_size, commit_info_map); + } + + ffi::ArrowFFIData ToArrow() { + ffi::ArrowFFIData ffi_data; + unordered_map> extension_types; + ClientProperties props("UTC", ArrowOffsetSize::REGULAR, false, false, false, nullptr); + ArrowConverter::ToArrowArray(buffer, (ArrowArray*)(&ffi_data.array), props, extension_types); + ArrowConverter::ToArrowSchema((ArrowSchema*)(&ffi_data.schema), GetTypes(), GetNames(), props); + return ffi_data; + } + + DataChunk buffer; +}; + +struct WriteMetaData { + static vector GetTypes() { + return { + LogicalType::VARCHAR, + LogicalType::MAP(LogicalType::VARCHAR, LogicalType::VARCHAR), + LogicalType::BIGINT, + LogicalType::BIGINT, + LogicalType::BOOLEAN, + }; + }; + static vector GetNames() { + return { + "path", + "partitionValues", + "size", + "modificationTime", + "dataChange" + }; + }; + + WriteMetaData() { + buffer.Initialize(Allocator::DefaultAllocator(), GetTypes()); + } + + void Append(const string &path, Value partition_values, idx_t size, idx_t modification_time, bool data_change) { + idx_t current_size = buffer.size(); + idx_t current_capacity = buffer.GetCapacity(); + + if (current_size == current_capacity) { + buffer.SetCapacity(2*current_capacity); + } + + buffer.SetValue(0, current_size, path); + buffer.SetValue(1, current_size, partition_values); + buffer.SetValue(2, current_size, Value::BIGINT(size)); + buffer.SetValue(3, current_size, Value::BIGINT(modification_time)); + buffer.SetValue(4, current_size, data_change); + } + + ffi::ArrowFFIData ToArrow() { + ffi::ArrowFFIData ffi_data; + unordered_map> extension_types; + ClientProperties props("UTC", ArrowOffsetSize::REGULAR, false, false, false, nullptr); + ArrowConverter::ToArrowArray(buffer, (ArrowArray*)(&ffi_data.array), props, extension_types); + ArrowConverter::ToArrowSchema((ArrowSchema*)(&ffi_data.schema), GetTypes(), GetNames(), props); + return ffi_data; + } + + DataChunk buffer; +}; + void DeltaTransaction::Commit() { if (transaction_state == DeltaTransactionState::TRANSACTION_STARTED) { transaction_state = DeltaTransactionState::TRANSACTION_FINISHED; - // NOP: we only support read-only transactions currently + + if (!outstanding_appends.empty()) { + // Create commit info + CommitInfo commit_info; + commit_info.Append(Value::MAP(LogicalType::VARCHAR, LogicalType::VARCHAR, {Value("engineInfo")}, {Value("default engine")})); + auto commit_info_arrow = commit_info.ToArrow(); + + // Convert arrow to Engine Data + KernelEngineData commit_info_engine_data = table_entry->snapshot->TryUnpackKernelResult(ffi::get_engine_data(&commit_info_arrow, table_entry->snapshot->extern_engine.get())); + + KernelExclusiveTransaction transction_with_info = ffi::with_commit_info(kernel_transaction.release(), commit_info_engine_data.release()); + + auto write_context = ffi::get_write_context(transction_with_info.get()); + auto write_schema = ffi::get_write_schema(write_context); + auto write_path = ffi::get_write_path(write_context, allocate_string); + if (write_path) { + (string*)write_path; + // TODO use write path? + delete (string*)write_path; + } + + WriteMetaData meta_data; + for (const auto &file : outstanding_appends) { + // TODO: how to figure out how many tuples we've written? + meta_data.Append(file, Value(), 1, Timestamp::GetCurrentTimestamp().value, true); + } + + auto write_metadata_ffi = meta_data.ToArrow(); + KernelEngineData write_info_engine_data = table_entry->snapshot->TryUnpackKernelResult(ffi::get_engine_data(&write_metadata_ffi, table_entry->snapshot->extern_engine.get())); + ffi::add_write_metadata(transction_with_info.get(), write_info_engine_data.release()); + + ffi::commit(transction_with_info.release(), table_entry->snapshot->extern_engine.get()); + } } } void DeltaTransaction::Rollback() { @@ -31,6 +155,23 @@ void DeltaTransaction::Rollback() { } } +void DeltaTransaction::Append(const vector &append_files) { + if (transaction_state == DeltaTransactionState::TRANSACTION_NOT_YET_STARTED) { + if (access_mode == AccessMode::READ_ONLY) { + throw InvalidInputException("Can not append to a read only table"); + } + transaction_state = DeltaTransactionState::TRANSACTION_STARTED; + + // Start the kernel transaction + string path = table_entry->snapshot->GetPaths()[0]; + auto path_slice = KernelUtils::ToDeltaString(path); + kernel_transaction = table_entry->snapshot->TryUnpackKernelResult(ffi::transaction(path_slice, table_entry->snapshot->extern_engine.get())); + } + + // Append the newly inserted data + outstanding_appends.insert(outstanding_appends.end(), append_files.begin(), append_files.end()); +} + DeltaTransaction &DeltaTransaction::Get(ClientContext &context, Catalog &catalog) { return Transaction::Get(context, catalog).Cast(); }