Skip to content

Commit

Permalink
wip delta appends
Browse files Browse the repository at this point in the history
  • Loading branch information
samansmink committed Jan 14, 2025
1 parent 3d9c464 commit 71620b3
Show file tree
Hide file tree
Showing 11 changed files with 421 additions and 17 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ set(EXTENSION_SOURCES
src/delta_utils.cpp
src/functions/delta_scan.cpp
src/functions/expression_functions.cpp
src/storage/delta_insert.cpp
src/storage/delta_catalog.cpp
src/storage/delta_schema_entry.cpp
src/storage/delta_table_entry.cpp
Expand Down
5 changes: 5 additions & 0 deletions scripts/generate_test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ def generate_test_data_pyspark(name, current_path, input_path, delete_predicate
con.query(f"COPY (SELECT i as id, ('val' || i::VARCHAR) as value FROM range(0,1000000) tbl(i))TO '{TMP_PATH}/simple_sf1_with_dv.parquet'")
generate_test_data_pyspark('simple_sf1_with_dv', 'simple_sf1_with_dv', f'{TMP_PATH}/simple_sf1_with_dv.parquet', "id % 1000 = 0")

## really simple
con = duckdb.connect()
con.query(f"COPY (SELECT i FROM range(0,10) tbl(i)) TO '{TMP_PATH}/really_simple.parquet'")
generate_test_data_pyspark('really_simple', 'really_simple', f'{TMP_PATH}/really_simple.parquet')

## Lineitem SF0.01 with deletion vector
con = duckdb.connect()
con.query(f"call dbgen(sf=0.01); COPY (from lineitem) TO '{TMP_PATH}/modified_lineitem_sf0_01.parquet'")
Expand Down
3 changes: 3 additions & 0 deletions src/include/delta_kernel_ffi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,9 @@ struct im_an_unused_struct_that_tricks_msvc_into_compilation {
ExternResult<Handle<SharedScan>> field9;
ExternResult<Handle<ExclusiveFileReadResultIterator>> field10;
ExternResult<KernelRowIndexArray> field11;
ExternResult<Handle<ExclusiveEngineData>> field12;
ExternResult<Handle<ExclusiveTransaction>> field13;
ExternResult<uint64_t> field14;
};

/// An `Event` can generally be thought of a "log message". It contains all the relevant bits such
Expand Down
8 changes: 8 additions & 0 deletions src/include/delta_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,12 @@ struct UniqueKernelPointer {
}
}

KernelType *release() {
auto copy = ptr;
ptr = nullptr;
return copy;
}

KernelType *get() const {
return ptr;
}
Expand All @@ -231,6 +237,8 @@ typedef TemplatedUniqueKernelPointer<ffi::SharedExternEngine, ffi::free_engine>
typedef TemplatedUniqueKernelPointer<ffi::SharedScan, ffi::free_scan> KernelScan;
typedef TemplatedUniqueKernelPointer<ffi::SharedGlobalScanState, ffi::free_global_scan_state> KernelGlobalScanState;
typedef TemplatedUniqueKernelPointer<ffi::SharedScanDataIterator, ffi::free_kernel_scan_data> KernelScanDataIterator;
typedef TemplatedUniqueKernelPointer<ffi::ExclusiveTransaction, ffi::free_transaction> KernelExclusiveTransaction;
typedef TemplatedUniqueKernelPointer<ffi::ExclusiveEngineData, ffi::free_engine_data> KernelEngineData;

template <typename KernelType, void (*DeleteFunction)(KernelType *)>
struct SharedKernelPointer;
Expand Down
5 changes: 4 additions & 1 deletion src/include/functions/delta_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,13 @@ struct DeltaSnapshot : public MultiFileList {
void InitializeSnapshot();
void InitializeScan();

public:
template <class T>
T TryUnpackKernelResult(ffi::ExternResult<T> result) {
return KernelUtils::UnpackResult<T>(
result, StringUtil::Format("While trying to read from delta table: '%s'", paths[0]));
}
protected:

static void VisitData(void *engine_context, ffi::ExclusiveEngineData *engine_data,
const struct ffi::KernelBoolSlice selection_vec);
Expand All @@ -89,8 +91,9 @@ struct DeltaSnapshot : public MultiFileList {

//! Delta Kernel Structures
shared_ptr<SharedKernelSnapshot> snapshot;

public:
KernelExternEngine extern_engine;
protected:
KernelScan scan;
KernelGlobalScanState global_state;
KernelScanDataIterator scan_data_iterator;
Expand Down
70 changes: 70 additions & 0 deletions src/include/storage/delta_insert.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
//===----------------------------------------------------------------------===//
// DuckDB
//
// storage/delta_insert.hpp
//
//
//===----------------------------------------------------------------------===//

#pragma once

#include "duckdb/execution/operator/persistent/physical_copy_to_file.hpp"

#include "duckdb/execution/physical_operator.hpp"
#include "duckdb/common/index_vector.hpp"

namespace duckdb {

class DeltaInsert : public PhysicalCopyToFile {
public:
DeltaInsert(LogicalOperator &op, TableCatalogEntry &table_p, physical_index_vector_t<idx_t> column_index_map_p,
vector<LogicalType> types, CopyFunction function_p, unique_ptr<FunctionData> bind_data, idx_t estimated_cardinality);

DeltaInsert(LogicalOperator &op, SchemaCatalogEntry &schema_p, unique_ptr<BoundCreateTableInfo> info,
vector<LogicalType> types, CopyFunction function_p, unique_ptr<FunctionData> bind_data, idx_t estimated_cardinality);

//! The table to insert into
optional_ptr<TableCatalogEntry> table;
//! Table schema, in case of CREATE TABLE AS
optional_ptr<SchemaCatalogEntry> schema;
//! Create table info, in case of CREATE TABLE AS
unique_ptr<BoundCreateTableInfo> info;
//! column_index_map
physical_index_vector_t<idx_t> column_index_map;
//! The physical copy used internally by this insert
unique_ptr<PhysicalOperator> physical_copy_to_file;

public:
// // Source interface
// SourceResultType GetData(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const override;
//
// bool IsSource() const override {
// return true;
// }

public:
// Sink interface
// SinkResultType Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const override;
// SinkCombineResultType Combine(ExecutionContext &context, OperatorSinkCombineInput &input) const override;
SinkFinalizeType Finalize(Pipeline &pipeline, Event &event, ClientContext &context,
OperatorSinkFinalizeInput &input) const override;
// unique_ptr<LocalSinkState> GetLocalSinkState(ExecutionContext &context) const override;
// unique_ptr<GlobalSinkState> GetGlobalSinkState(ClientContext &context) const override;

// bool IsSink() const override {
// return true;
// }
//
// bool ParallelSink() const override {
// return true;
// }
//
// bool SinkOrderDependent() const override {
// return true;
// }

string GetName() const override;
InsertionOrderPreservingMap<string> ParamsToString() const override;
};

} // namespace duckdb
10 changes: 6 additions & 4 deletions src/include/storage/delta_transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#pragma once

#include "delta_utils.hpp"
#include "duckdb/transaction/transaction.hpp"

namespace duckdb {
Expand All @@ -27,15 +28,16 @@ class DeltaTransaction : public Transaction {
void Commit();
void Rollback();

void Append(const vector<string> &append_files);

static DeltaTransaction &Get(ClientContext &context, Catalog &catalog);
AccessMode GetAccessMode() const;

void SetReadWrite() override {
throw NotImplementedException("Can not start read-write transaction");
};

public:
unique_ptr<DeltaTableEntry> table_entry;
vector<string> outstanding_appends;

KernelExclusiveTransaction kernel_transaction;

private:
// DeltaConnection connection;
Expand Down
8 changes: 0 additions & 8 deletions src/storage/delta_catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,6 @@ DatabaseSize DeltaCatalog::GetDatabaseSize(ClientContext &context) {
return size;
}

unique_ptr<PhysicalOperator> DeltaCatalog::PlanInsert(ClientContext &context, LogicalInsert &op,
unique_ptr<PhysicalOperator> plan) {
throw NotImplementedException("DeltaCatalog does not support inserts");
}
unique_ptr<PhysicalOperator> DeltaCatalog::PlanCreateTableAs(ClientContext &context, LogicalCreateTable &op,
unique_ptr<PhysicalOperator> plan) {
throw NotImplementedException("DeltaCatalog does not support creating new tables");
}
unique_ptr<PhysicalOperator> DeltaCatalog::PlanDelete(ClientContext &context, LogicalDelete &op,
unique_ptr<PhysicalOperator> plan) {
throw NotImplementedException("DeltaCatalog does not support deletes");
Expand Down
179 changes: 179 additions & 0 deletions src/storage/delta_insert.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
#include "storage/delta_insert.hpp"

#include "duckdb/catalog/catalog_entry/copy_function_catalog_entry.hpp"
#include "duckdb/main/client_data.hpp"
#include "duckdb/main/extension_util.hpp"
#include "duckdb/planner/operator/logical_copy_to_file.hpp"
#include "functions/delta_scan.hpp"
#include "duckdb/execution/physical_operator_states.hpp"

#include "storage/delta_catalog.hpp"
#include "storage/delta_transaction.hpp"
#include "duckdb/planner/operator/logical_insert.hpp"
#include "duckdb/planner/operator/logical_create_table.hpp"
#include "storage/delta_table_entry.hpp"
#include "duckdb/planner/parsed_data/bound_create_table_info.hpp"
#include "duckdb/execution/operator/projection/physical_projection.hpp"
#include "duckdb/execution/operator/scan/physical_table_scan.hpp"
#include "duckdb/planner/expression/bound_cast_expression.hpp"
#include "duckdb/planner/expression/bound_reference_expression.hpp"

namespace duckdb {

DeltaInsert::DeltaInsert(LogicalOperator &op, TableCatalogEntry &table_p, physical_index_vector_t<idx_t> column_index_map_p,
vector<LogicalType> types, CopyFunction function_p, unique_ptr<FunctionData> bind_data, idx_t estimated_cardinality) :
PhysicalCopyToFile(std::move(types), std::move(function_p), std::move(bind_data), estimated_cardinality), table(&table_p), schema(nullptr), column_index_map(std::move(column_index_map_p)) {

type = PhysicalOperatorType::EXTENSION;
}

DeltaInsert::DeltaInsert(LogicalOperator &op, SchemaCatalogEntry &schema_p, unique_ptr<BoundCreateTableInfo> info,
vector<LogicalType> types, CopyFunction function_p, unique_ptr<FunctionData> bind_data, idx_t estimated_cardinality) :
PhysicalCopyToFile(std::move(types), std::move(function_p), std::move(bind_data), estimated_cardinality), table(nullptr), schema(&schema_p), info(std::move(info)) {

type = PhysicalOperatorType::EXTENSION;
}

//===--------------------------------------------------------------------===//
// States
//===--------------------------------------------------------------------===//
class DeltaInsertGlobalState : public GlobalSinkState {
public:
explicit DeltaInsertGlobalState(ClientContext &context, DeltaTableEntry &table,
const vector<LogicalType> &varchar_types)
: table(table), insert_count(0) {
}

DeltaTableEntry &table;
idx_t insert_count;
};

// unique_ptr<GlobalSinkState> DeltaInsert::GetGlobalSinkState(ClientContext &context) const {
// return physical_copy_to_file->GetGlobalSinkState(context);
// }
//
// unique_ptr<LocalSinkState> DeltaInsert::GetLocalSinkState(ExecutionContext &context) const {
// return physical_copy_to_file->GetLocalSinkState(context);
// }

//===--------------------------------------------------------------------===//
// Sink
//===--------------------------------------------------------------------===//
// SinkResultType DeltaInsert::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const {
// return physical_copy_to_file->Sink(context, chunk, input);
// }
//
// SinkCombineResultType DeltaInsert::Combine(ExecutionContext &context, OperatorSinkCombineInput &input) const {
// return physical_copy_to_file->Combine(context, input);
// }

//===--------------------------------------------------------------------===//
// Finalize
//===--------------------------------------------------------------------===//
SinkFinalizeType DeltaInsert::Finalize(Pipeline &pipeline, Event &event, ClientContext &context,
OperatorSinkFinalizeInput &input) const {
// Call base class to finalize the PhysicalCopy
auto res = PhysicalCopyToFile::Finalize(pipeline, event, context, input);

if (res != SinkFinalizeType::READY) {
throw NotImplementedException("Unknown SinkFinalizeType in DeltaInsert::Finalize: %s", EnumUtil::ToString(res));
}

auto &copy_global_state = input.global_state.Cast<CopyToFunctionGlobalState>();

Check failure on line 82 in src/storage/delta_insert.cpp

View workflow job for this annotation

GitHub Actions / Build extension binaries / MacOS (osx_amd64, x86_64, x64-osx)

use of undeclared identifier 'CopyToFunctionGlobalState'

auto &transaction = DeltaTransaction::Get(context, table->catalog);
vector<string> filenames;
for (const auto& filename : copy_global_state.file_names) {
filenames.push_back(filename.ToString());
}
transaction.Append(filenames);

return res;
}

//===--------------------------------------------------------------------===//
// GetData
//===--------------------------------------------------------------------===//


//===--------------------------------------------------------------------===//
// Helpers
//===--------------------------------------------------------------------===//
string DeltaInsert::GetName() const {
return table ? "DELTA_INSERT" : "DELTA_CREATE_TABLE_AS";
}

InsertionOrderPreservingMap<string> DeltaInsert::ParamsToString() const {
InsertionOrderPreservingMap<string> result;
result["Table Name"] = table ? table->name : info->Base().table;
return result;
}

//===--------------------------------------------------------------------===//
// Plan
//===--------------------------------------------------------------------===//
static optional_ptr<CopyFunctionCatalogEntry> TryGetCopyFunction(DatabaseInstance &db, const string &name) {
D_ASSERT(!name.empty());
auto &system_catalog = Catalog::GetSystemCatalog(db);
auto data = CatalogTransaction::GetSystemTransaction(db);
auto &schema = system_catalog.GetSchema(data, DEFAULT_SCHEMA);
return schema.GetEntry(data, CatalogType::COPY_FUNCTION_ENTRY, name)->Cast<CopyFunctionCatalogEntry>();
}

unique_ptr<PhysicalOperator> DeltaCatalog::PlanInsert(ClientContext &context, LogicalInsert &op,
unique_ptr<PhysicalOperator> plan) {
if (op.return_chunk) {
throw BinderException("RETURNING clause not yet supported for insertion into Delta table");
}
if (op.action_type != OnConflictAction::THROW) {
throw BinderException("ON CONFLICT clause not yet supported for insertion into Delta table");
}

string delta_path = op.table.Cast<DeltaTableEntry>().snapshot->GetPaths()[0]; // TODO unsafe?

// Create Copy Info
auto info = make_uniq<CopyInfo>();
info->file_path = delta_path;
info->format = "parquet";
info->is_from = false;

// Get Parquet Copy function
auto copy_fun = TryGetCopyFunction(*context.db, "parquet");
if (!copy_fun) {
throw MissingExtensionException("Did not find parquet copy function required to write to delta table");
}

// Bind Copy Function
auto &columns = op.table.Cast<DeltaTableEntry>().GetColumns();
CopyFunctionBindInput bind_input(*info);
auto function_data = copy_fun->function.copy_to_bind(context, bind_input, columns.GetColumnNames(), columns.GetColumnTypes());

auto insert = make_uniq<DeltaInsert>(op, op.table, op.column_index_map,
GetCopyFunctionReturnLogicalTypes(CopyFunctionReturnType::CHANGED_ROWS), copy_fun->function, std::move(function_data), op.estimated_cardinality);

insert->use_tmp_file = false;
insert->file_path = delta_path;
insert->filename_pattern.SetFilenamePattern("duckdb_data_file_{uuid}");
insert->file_extension = "parquet";
insert->overwrite_mode = CopyOverwriteMode::COPY_OVERWRITE_OR_IGNORE;
insert->per_thread_output = true;
insert->rotate = false;
insert->return_type = CopyFunctionReturnType::CHANGED_ROWS;
insert->partition_output = false;
insert->write_partition_columns = false;
insert->names = {};
insert->expected_types = columns.GetColumnTypes();
insert->children.push_back(std::move(plan));

return std::move(insert);
}

unique_ptr<PhysicalOperator> DeltaCatalog::PlanCreateTableAs(ClientContext &context, LogicalCreateTable &op,
unique_ptr<PhysicalOperator> plan) {
throw NotImplementedException("DeltaCatalog::PlanCreateTableAs");
// auto insert = make_uniq<DeltaInsert>(op, op.schema, std::move(op.info));
// insert->children.push_back(std::move(plan));
// return std::move(insert);
}

} // namespace duckdb
Loading

0 comments on commit 71620b3

Please sign in to comment.