Skip to content

Commit

Permalink
improve delta snapshot sharing
Browse files Browse the repository at this point in the history
  • Loading branch information
samansmink committed Oct 3, 2024
1 parent 45eab7a commit ec3a8b1
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 32 deletions.
13 changes: 10 additions & 3 deletions scripts/generate_test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,29 @@ def generate_test_data_delta_rs_multi(path, init, tables, splits = 1):

os.makedirs(f"{generated_path}")

# First we write a DuckDB file TODO: this should go in 10 appends as well?
# First we write a DuckDB file TODO: this should go in N appends as well?
con = duckdb.connect(f"{generated_path}/duckdb.db")

con.sql(init)

# Then we write the parquet files
for table in tables:
total_count = con.sql(f"select count(*) from ({table['query']})").fetchall()[0][0]
tuples_per_file = math.ceil(total_count / splits)
# At least 1 tuple per file
if total_count < splits:
splits = total_count
tuples_per_file = total_count // splits
remainder = total_count % splits

file_no = 0
write_from = 0
while file_no < splits:
os.makedirs(f"{generated_path}/{table['name']}/parquet", exist_ok=True)
# Write DuckDB's reference data
con.sql(f"COPY ({table['query']} where rowid >= {(file_no) * tuples_per_file} and rowid < {(file_no+1) * tuples_per_file}) to '{generated_path}/{table['name']}/parquet/data_{file_no}.parquet' (FORMAT parquet)")
write_to = write_from + tuples_per_file + (1 if file_no < remainder else 0)
con.sql(f"COPY ({table['query']} where rowid >= {write_from} and rowid < {write_to}) to '{generated_path}/{table['name']}/parquet/data_{file_no}.parquet' (FORMAT parquet)")
file_no += 1
write_from = write_to

for table in tables:
con = duckdb.connect(f"{generated_path}/duckdb.db")
Expand Down
33 changes: 22 additions & 11 deletions src/functions/delta_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,12 @@ string DeltaSnapshot::ToDeltaPath(const string &raw_path) {
}

void DeltaSnapshot::Bind(vector<LogicalType> &return_types, vector<string> &names) {
if (have_bound) {
names = this->names;
return_types = this->types;
return;
}

if (!initialized) {
InitializeFiles();
}
Expand All @@ -405,7 +411,9 @@ void DeltaSnapshot::Bind(vector<LogicalType> &return_types, vector<string> &name
return_types.push_back(field.second);
}
// Store the bound names for resolving the complex filter pushdown later
have_bound = true;
this->names = names;
this->types = return_types;
}

string DeltaSnapshot::GetFile(idx_t i) {
Expand Down Expand Up @@ -473,12 +481,19 @@ void DeltaSnapshot::InitializeFiles() {
unique_ptr<MultiFileList> DeltaSnapshot::ComplexFilterPushdown(ClientContext &context, const MultiFileReaderOptions &options, MultiFilePushdownInfo &info,
vector<unique_ptr<Expression>> &filters) {
FilterCombiner combiner(context);

// TODO: can we avoid constructing 2 scans for scans with filter pushdown?
if (filters.empty()) {
return nullptr;
}

for (const auto &filter : filters) {
combiner.AddFilter(filter->Copy());
}
auto filterstmp = combiner.GenerateTableScanFilters(info.column_ids);

// TODO: can/should we figure out if this filtered anything?

auto filtered_list = make_uniq<DeltaSnapshot>(context, paths[0]);
filtered_list->table_filters = std::move(filterstmp);
filtered_list->names = names;
Expand Down Expand Up @@ -548,8 +563,7 @@ unique_ptr<MultiFileReader> DeltaMultiFileReader::CreateInstance(const TableFunc
auto result = make_uniq<DeltaMultiFileReader>();

if (table_function.function_info) {
result->kernel_snapshot = table_function.function_info->Cast<DeltaFunctionInfo>().snapshot;
result->kernel_snapshot_path = table_function.function_info->Cast<DeltaFunctionInfo>().expected_path;
result->snapshot = table_function.function_info->Cast<DeltaFunctionInfo>().snapshot;
}

return std::move(result);
Expand Down Expand Up @@ -640,21 +654,18 @@ void DeltaMultiFileReader::FinalizeBind(const MultiFileReaderOptions &file_optio
}
}

unique_ptr<MultiFileList> DeltaMultiFileReader::CreateFileList(ClientContext &context, const vector<string>& paths, FileGlobOptions options) {
shared_ptr<MultiFileList> DeltaMultiFileReader::CreateFileList(ClientContext &context, const vector<string>& paths, FileGlobOptions options) {
if (paths.size() != 1) {
throw BinderException("'delta_scan' only supports single path as input");
}

if (kernel_snapshot) {
if (kernel_snapshot_path != paths[0]) {
throw InternalException("Expected path from injected function info did not match! '%s' expected to match '%s'", kernel_snapshot_path, paths[0]);
}

if (snapshot) {
// TODO: somehow assert that we are querying the same path as this injected snapshot

// This takes the kernel snapshot from the delta snapshot and ensures we use that snapshot for reading
if (kernel_snapshot) {
auto snapshot = make_uniq<DeltaSnapshot>(context, paths[0]);
snapshot->snapshot = kernel_snapshot;
return std::move(snapshot);
if (snapshot) {
return snapshot;
}
}

Expand Down
15 changes: 8 additions & 7 deletions src/include/functions/delta_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
#include "duckdb/common/multi_file_reader.hpp"

namespace duckdb {
struct DeltaSnapshot;

struct DeltaFunctionInfo : public TableFunctionInfo {
shared_ptr<SharedKernelSnapshot> snapshot;
shared_ptr<DeltaSnapshot> snapshot;
string expected_path;
};

Expand Down Expand Up @@ -84,6 +85,8 @@ struct DeltaSnapshot : public MultiFileList {

//! Names
vector<string> names;
vector<LogicalType> types;
bool have_bound = false;

//! Metadata map for files
vector<unique_ptr<DeltaFileMetaData>> metadata;
Expand Down Expand Up @@ -111,7 +114,7 @@ struct DeltaMultiFileReaderGlobalState : public MultiFileReaderGlobalState {
struct DeltaMultiFileReader : public MultiFileReader {
static unique_ptr<MultiFileReader> CreateInstance(const TableFunction &table_function);
//! Return a DeltaSnapshot
unique_ptr<MultiFileList> CreateFileList(ClientContext &context, const vector<string> &paths,
shared_ptr<MultiFileList> CreateFileList(ClientContext &context, const vector<string> &paths,

Check failure on line 117 in src/include/functions/delta_scan.hpp

View workflow job for this annotation

GitHub Actions / Build extension binaries / MacOS (osx_arm64, arm64, arm64-osx)

virtual function 'CreateFileList' has a different return type ('shared_ptr<MultiFileList>') than the function it overrides (which has return type 'unique_ptr<MultiFileList>')

Check failure on line 117 in src/include/functions/delta_scan.hpp

View workflow job for this annotation

GitHub Actions / Build extension binaries / Linux (linux_amd64, ubuntu:18.04, x64-linux)

invalid covariant return type for 'virtual duckdb::shared_ptr<duckdb::MultiFileList, true> duckdb::DeltaMultiFileReader::CreateFileList(duckdb::ClientContext&, const duckdb::vector<std::__cxx11::basic_string<char> >&, duckdb::FileGlobOptions)'

Check failure on line 117 in src/include/functions/delta_scan.hpp

View workflow job for this annotation

GitHub Actions / Build extension binaries / Linux (linux_amd64, ubuntu:18.04, x64-linux)

invalid covariant return type for 'virtual duckdb::shared_ptr<duckdb::MultiFileList, true> duckdb::DeltaMultiFileReader::CreateFileList(duckdb::ClientContext&, const duckdb::vector<std::__cxx11::basic_string<char> >&, duckdb::FileGlobOptions)'
FileGlobOptions options) override;

//! Override the regular parquet bind using the MultiFileReader Bind. The bind from these are what DuckDB's file
Expand Down Expand Up @@ -148,11 +151,9 @@ struct DeltaMultiFileReader : public MultiFileReader {
bool ParseOption(const string &key, const Value &val, MultiFileReaderOptions &options,
ClientContext &context) override;

// For the TableFunction used on attached delta tables, the kernel snapshot is injected into the MultiFileReader to
// ensure the table is read at the correct timestamp
shared_ptr<SharedKernelSnapshot> kernel_snapshot;
// Path for when kernel_snapshot is set: this multifilereader can then only be used to scan this path
string kernel_snapshot_path;
// A snapshot can be injected into the multifilereader, this ensures the GetMultiFileList can return this snapshot
// (note that the path should match the one passed to CreateFileList)
shared_ptr<DeltaSnapshot> snapshot;
};

} // namespace duckdb
2 changes: 1 addition & 1 deletion src/include/storage/delta_table_entry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class DeltaTableEntry : public TableCatalogEntry {
ClientContext &context) override;

public:
shared_ptr<const DeltaSnapshot> snapshot;
shared_ptr<DeltaSnapshot> snapshot;
};

} // namespace duckdb
13 changes: 6 additions & 7 deletions src/storage/delta_schema_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ static unique_ptr<DeltaTableEntry> CreateTableEntry(ClientContext &context, Delt
void DeltaSchemaEntry::Scan(ClientContext &context, CatalogType type,
const std::function<void(CatalogEntry &)> &callback) {
if (!CatalogTypeIsSupported(type)) {
CatalogTransaction transaction(this->catalog, context);
auto transaction = catalog.GetCatalogTransaction(context);
auto default_table = GetEntry(transaction, type, DEFAULT_DELTA_TABLE);
if (default_table) {
callback(*default_table);
Expand All @@ -151,11 +151,11 @@ optional_ptr<CatalogEntry> DeltaSchemaEntry::GetEntry(CatalogTransaction transac
auto &context = transaction.GetContext();

if (type == CatalogType::TABLE_ENTRY && name == DEFAULT_DELTA_TABLE) {
auto &transaction = context.ActiveTransaction().GetTransaction(this->catalog.GetAttached()).Cast<DeltaTransaction>();
auto &delta_transaction = GetDeltaTransaction(transaction);
auto &delta_catalog = catalog.Cast<DeltaCatalog>();

if (transaction.table_entry) {
return *transaction.table_entry;
if (delta_transaction.table_entry) {
return *delta_transaction.table_entry;
}

if (delta_catalog.UseCachedSnapshot()) {
Expand All @@ -166,9 +166,8 @@ optional_ptr<CatalogEntry> DeltaSchemaEntry::GetEntry(CatalogTransaction transac
return *cached_table;
}

unique_lock<mutex> l(lock);
transaction.table_entry = CreateTableEntry(context, delta_catalog, *this);
return *transaction.table_entry;
delta_transaction.table_entry = CreateTableEntry(context, delta_catalog, *this);
return *delta_transaction.table_entry;
}

return nullptr;
Expand Down
4 changes: 2 additions & 2 deletions src/storage/delta_table_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ TableFunction DeltaTableEntry::GetScanFunction(ClientContext &context, unique_pt

// Copy over the internal kernel snapshot
auto function_info = make_shared_ptr<DeltaFunctionInfo>();
function_info->snapshot = this->snapshot->snapshot;
function_info->expected_path = delta_catalog.GetDBPath();

function_info->snapshot = this->snapshot;
delta_scan_function.function_info = std::move(function_info);

vector<Value> inputs = {delta_catalog.GetDBPath()};
Expand Down

0 comments on commit ec3a8b1

Please sign in to comment.