Skip to content

Commit

Permalink
first version of read-only atomicity for delta tables
Browse files Browse the repository at this point in the history
  • Loading branch information
samansmink committed Sep 23, 2024
1 parent 7fb17d3 commit 03ed79b
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 11 deletions.
2 changes: 1 addition & 1 deletion duckdb
26 changes: 17 additions & 9 deletions src/functions/delta_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -544,8 +544,15 @@ unique_ptr<NodeStatistics> DeltaSnapshot::GetCardinality(ClientContext &context)
return nullptr;
}

unique_ptr<MultiFileReader> DeltaMultiFileReader::CreateInstance() {
return std::move(make_uniq<DeltaMultiFileReader>());
unique_ptr<MultiFileReader> DeltaMultiFileReader::CreateInstance(const TableFunction &table_function) {
auto result = make_uniq<DeltaMultiFileReader>();

if (table_function.function_info) {
result->kernel_snapshot = table_function.function_info->Cast<DeltaFunctionInfo>().snapshot;
result->kernel_snapshot_path = table_function.function_info->Cast<DeltaFunctionInfo>().expected_path;
}

return std::move(result);
}

bool DeltaMultiFileReader::Bind(MultiFileReaderOptions &options, MultiFileList &files,
Expand Down Expand Up @@ -638,14 +645,15 @@ unique_ptr<MultiFileList> DeltaMultiFileReader::CreateFileList(ClientContext &co
throw BinderException("'delta_scan' only supports single path as input");
}

// TODO: this is techinically incorrect for `select * from attach_delta union all from delta_scan('../some/path')
// since the first one should scan the snapshot from the transaction whereas the second one should scan the current state
for (auto& transaction : context.ActiveTransaction().OpenedTransactions()) {
auto & catalog = transaction.get().GetCatalog();
if (catalog.GetCatalogType() == "delta" && catalog.GetDBPath() == paths[0]) {
auto snapshot = make_uniq<DeltaSnapshot>(context, paths[0]);
snapshot->snapshot =
if (kernel_snapshot) {
if (kernel_snapshot_path != paths[0]) {
throw InternalException("Expected path from injected function info did not match! '%s' expected to match '%s'", kernel_snapshot_path, paths[0]);
}

// This takes the kernel snapshot from the delta snapshot and ensures we use that snapshot for reading
if (kernel_snapshot) {
auto snapshot = make_uniq<DeltaSnapshot>(context, paths[0]);
snapshot->snapshot = kernel_snapshot;
return std::move(snapshot);
}
}
Expand Down
13 changes: 12 additions & 1 deletion src/include/functions/delta_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@

namespace duckdb {

struct DeltaFunctionInfo : public TableFunctionInfo {
shared_ptr<SharedKernelSnapshot> snapshot;
string expected_path;
};

struct DeltaFileMetaData {
DeltaFileMetaData() {};

Expand Down Expand Up @@ -104,7 +109,7 @@ struct DeltaMultiFileReaderGlobalState : public MultiFileReaderGlobalState {
};

struct DeltaMultiFileReader : public MultiFileReader {
static unique_ptr<MultiFileReader> CreateInstance();
static unique_ptr<MultiFileReader> CreateInstance(const TableFunction &table_function);
//! Return a DeltaSnapshot
unique_ptr<MultiFileList> CreateFileList(ClientContext &context, const vector<string> &paths,
FileGlobOptions options) override;
Expand Down Expand Up @@ -142,6 +147,12 @@ struct DeltaMultiFileReader : public MultiFileReader {
//! Override the ParseOption call to parse delta_scan specific options
bool ParseOption(const string &key, const Value &val, MultiFileReaderOptions &options,
ClientContext &context) override;

// For the TableFunction used on attached delta tables, the kernel snapshot is injected into the MultiFileReader to
// ensure the table is read at the correct timestamp
shared_ptr<SharedKernelSnapshot> kernel_snapshot;
// Path for when kernel_snapshot is set: this multifilereader can then only be used to scan this path
string kernel_snapshot_path;
};

} // namespace duckdb
13 changes: 13 additions & 0 deletions src/storage/delta_table_entry.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
#include "storage/delta_catalog.hpp"
#include "storage/delta_schema_entry.hpp"
#include "storage/delta_table_entry.hpp"

#include "delta_utils.hpp"
#include "functions/delta_scan.hpp"

#include "storage/delta_transaction.hpp"
#include "duckdb/storage/statistics/base_statistics.hpp"
#include "duckdb/storage/table_storage_info.hpp"
Expand All @@ -11,6 +15,8 @@
#include "duckdb/parser/tableref/table_function_ref.hpp"
#include "../../duckdb/third_party/catch/catch.hpp"

#include <functional>

namespace duckdb {

DeltaTableEntry::DeltaTableEntry(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info)
Expand All @@ -34,12 +40,19 @@ TableFunction DeltaTableEntry::GetScanFunction(ClientContext &context, unique_pt
auto delta_scan_function = delta_function_set.functions.GetFunctionByArguments(context, {LogicalType::VARCHAR});
auto &delta_catalog = catalog.Cast<DeltaCatalog>();

// Copy over the internal kernel snapshot
auto function_info = make_shared_ptr<DeltaFunctionInfo>();
function_info->snapshot = this->snapshot->snapshot;
function_info->expected_path = delta_catalog.GetDBPath();
delta_scan_function.function_info = std::move(function_info);

vector<Value> inputs = {delta_catalog.GetDBPath()};
named_parameter_map_t param_map;
vector<LogicalType> return_types;
vector<string> names;
TableFunctionRef empty_ref;


TableFunctionBindInput bind_input(inputs, param_map, return_types, names, nullptr, nullptr, delta_scan_function,
empty_ref);

Expand Down

0 comments on commit 03ed79b

Please sign in to comment.