Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
  • Loading branch information
samansmink committed Dec 19, 2024
1 parent da26428 commit 252bad8
Show file tree
Hide file tree
Showing 9 changed files with 806 additions and 841 deletions.
11 changes: 6 additions & 5 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ ExternalProject_Add(
# Build debug build
BUILD_COMMAND
${CMAKE_COMMAND} -E env ${RUST_UNSET_ENV_VARS} ${RUST_ENV_VARS} cargo build
--package delta_kernel_ffi --workspace $<$<CONFIG:Release>:--release> --all-features ${RUST_PLATFORM_PARAM}
--package delta_kernel_ffi --workspace $<$<CONFIG:Release>:--release>
--all-features ${RUST_PLATFORM_PARAM}
# Build DATs
COMMAND
${CMAKE_COMMAND} -E env ${RUST_UNSET_ENV_VARS} ${RUST_ENV_VARS} cargo build
Expand All @@ -177,13 +178,13 @@ set(CMAKE_OSX_DEPLOYMENT_TARGET
add_compile_definitions(DEFINE_DEFAULT_ENGINE)

# Link delta-kernal-rs to static lib
target_link_libraries(
${EXTENSION_NAME} ${DELTA_KERNEL_LIBPATH} ${PLATFORM_LIBS})
target_link_libraries(${EXTENSION_NAME} ${DELTA_KERNEL_LIBPATH}

Check warning on line 181 in CMakeLists.txt

View workflow job for this annotation

GitHub Actions / Build extension binaries / MacOS (osx_amd64, x86_64, x64-osx)

Not disabling vptr sanitizer on M1 Macbook - set DISABLE_VPTR_SANITIZER

Check warning on line 181 in CMakeLists.txt

View workflow job for this annotation

GitHub Actions / Build extension binaries / MacOS (osx_arm64, arm64, arm64-osx)

Not disabling vptr sanitizer on M1 Macbook - set DISABLE_VPTR_SANITIZER
${PLATFORM_LIBS})
add_dependencies(${EXTENSION_NAME} delta_kernel)

# Link delta-kernal-rs to dynamic lib
target_link_libraries(
${LOADABLE_EXTENSION_NAME} ${DELTA_KERNEL_LIBPATH} ${PLATFORM_LIBS})
target_link_libraries(${LOADABLE_EXTENSION_NAME} ${DELTA_KERNEL_LIBPATH}
${PLATFORM_LIBS})
add_dependencies(${LOADABLE_EXTENSION_NAME} delta_kernel)

install(
Expand Down
18 changes: 9 additions & 9 deletions src/delta_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ class DeltaStorageExtension : public StorageExtension {
};

static void LoadInternal(DatabaseInstance &instance) {
// Load Table functions
for (const auto &function : DeltaFunctions::GetTableFunctions(instance)) {
ExtensionUtil::RegisterFunction(instance, function);
}

// Load Scalar functions
for (const auto &function : DeltaFunctions::GetScalarFunctions(instance)) {
ExtensionUtil::RegisterFunction(instance, function);
}
// Load Table functions
for (const auto &function : DeltaFunctions::GetTableFunctions(instance)) {
ExtensionUtil::RegisterFunction(instance, function);
}

// Load Scalar functions
for (const auto &function : DeltaFunctions::GetScalarFunctions(instance)) {
ExtensionUtil::RegisterFunction(instance, function);
}

// Register the "single table" delta catalog (to ATTACH a single delta table)
auto &config = DBConfig::GetConfig(instance);
Expand Down
9 changes: 4 additions & 5 deletions src/delta_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@ vector<TableFunctionSet> DeltaFunctions::GetTableFunctions(DatabaseInstance &ins
}

vector<ScalarFunctionSet> DeltaFunctions::GetScalarFunctions(DatabaseInstance &instance) {
vector<ScalarFunctionSet> functions;
vector<ScalarFunctionSet> functions;

functions.push_back(GetExpressionFunction(instance));
functions.push_back(GetExpressionFunction(instance));

return functions;
return functions;
}


};
}; // namespace duckdb
502 changes: 256 additions & 246 deletions src/delta_utils.cpp

Large diffs are not rendered by default.

139 changes: 69 additions & 70 deletions src/functions/delta_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ string url_decode(string input) {
}

void DeltaSnapshot::VisitCallback(ffi::NullableCvoid engine_context, struct ffi::KernelStringSlice path, int64_t size,
const ffi::Stats *stats, const ffi::DvInfo *dv_info,
const struct ffi::CStringMap *partition_values) {
const ffi::Stats *stats, const ffi::DvInfo *dv_info,
const struct ffi::CStringMap *partition_values) {
auto context = (DeltaSnapshot *)engine_context;
auto path_string = context->GetPath();
StringUtil::RTrim(path_string, "/");
Expand Down Expand Up @@ -92,7 +92,7 @@ void DeltaSnapshot::VisitCallback(ffi::NullableCvoid engine_context, struct ffi:
}

void DeltaSnapshot::VisitData(void *engine_context, ffi::ExclusiveEngineData *engine_data,
const struct ffi::KernelBoolSlice selection_vec) {
const struct ffi::KernelBoolSlice selection_vec) {
ffi::visit_scan_data(engine_data, selection_vec, engine_context, VisitCallback);
}

Expand Down Expand Up @@ -238,52 +238,52 @@ static ffi::EngineBuilder *CreateBuilder(ClientContext &context, const string &p
// Here you would need to add the logic for setting the builder options for Azure
// This is just a placeholder and will need to be replaced with the actual logic
if (secret_type == "s3" || secret_type == "gcs" || secret_type == "r2") {
string key_id, secret, session_token, region, endpoint, url_style;
bool use_ssl = true;
secret_reader.TryGetSecretKey("key_id", key_id);
secret_reader.TryGetSecretKey("secret", secret);
secret_reader.TryGetSecretKey("session_token", session_token);
secret_reader.TryGetSecretKey("region", region);
secret_reader.TryGetSecretKey("endpoint", endpoint);
secret_reader.TryGetSecretKey("url_style", url_style);
secret_reader.TryGetSecretKey("use_ssl", use_ssl);

if (key_id.empty() && secret.empty()) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("skip_signature"),
KernelUtils::ToDeltaString("true"));
}

if (!key_id.empty()) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_access_key_id"),
KernelUtils::ToDeltaString(key_id));
}
if (!secret.empty()) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_secret_access_key"),
KernelUtils::ToDeltaString(secret));
}
if (!session_token.empty()) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_session_token"),
KernelUtils::ToDeltaString(session_token));
}
if (!endpoint.empty() && endpoint != "s3.amazonaws.com") {
if (!StringUtil::StartsWith(endpoint, "https://") && !StringUtil::StartsWith(endpoint, "http://")) {
if (use_ssl) {
endpoint = "https://" + endpoint;
} else {
endpoint = "http://" + endpoint;
}
}

if (StringUtil::StartsWith(endpoint, "http://")) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("allow_http"),
KernelUtils::ToDeltaString("true"));
}
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_endpoint"),
KernelUtils::ToDeltaString(endpoint));
} else if (StringUtil::StartsWith(path, "gs://") || StringUtil::StartsWith(path, "gcs://")) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_endpoint"),
KernelUtils::ToDeltaString("https://storage.googleapis.com"));
}
string key_id, secret, session_token, region, endpoint, url_style;
bool use_ssl = true;
secret_reader.TryGetSecretKey("key_id", key_id);
secret_reader.TryGetSecretKey("secret", secret);
secret_reader.TryGetSecretKey("session_token", session_token);
secret_reader.TryGetSecretKey("region", region);
secret_reader.TryGetSecretKey("endpoint", endpoint);
secret_reader.TryGetSecretKey("url_style", url_style);
secret_reader.TryGetSecretKey("use_ssl", use_ssl);

if (key_id.empty() && secret.empty()) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("skip_signature"),
KernelUtils::ToDeltaString("true"));
}

if (!key_id.empty()) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_access_key_id"),
KernelUtils::ToDeltaString(key_id));
}
if (!secret.empty()) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_secret_access_key"),
KernelUtils::ToDeltaString(secret));
}
if (!session_token.empty()) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_session_token"),
KernelUtils::ToDeltaString(session_token));
}
if (!endpoint.empty() && endpoint != "s3.amazonaws.com") {
if (!StringUtil::StartsWith(endpoint, "https://") && !StringUtil::StartsWith(endpoint, "http://")) {
if (use_ssl) {
endpoint = "https://" + endpoint;
} else {
endpoint = "http://" + endpoint;
}
}

if (StringUtil::StartsWith(endpoint, "http://")) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("allow_http"),
KernelUtils::ToDeltaString("true"));
}
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_endpoint"),
KernelUtils::ToDeltaString(endpoint));
} else if (StringUtil::StartsWith(path, "gs://") || StringUtil::StartsWith(path, "gcs://")) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_endpoint"),
KernelUtils::ToDeltaString("https://storage.googleapis.com"));
}

ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_region"), KernelUtils::ToDeltaString(region));

Expand Down Expand Up @@ -415,7 +415,7 @@ string DeltaSnapshot::ToDeltaPath(const string &raw_path) {
}

void DeltaSnapshot::Bind(vector<LogicalType> &return_types, vector<string> &names) {
unique_lock<mutex> lck(lock);
unique_lock<mutex> lck(lock);

if (have_bound) {
names = this->names;
Expand Down Expand Up @@ -478,9 +478,9 @@ string DeltaSnapshot::GetFileInternal(idx_t i) {
}

string DeltaSnapshot::GetFile(idx_t i) {
// TODO: profile this: we should be able to use atomics here to optimize
unique_lock<mutex> lck(lock);
return GetFileInternal(i);
// TODO: profile this: we should be able to use atomics here to optimize
unique_lock<mutex> lck(lock);
return GetFileInternal(i);
}

void DeltaSnapshot::InitializeSnapshot() {
Expand Down Expand Up @@ -537,17 +537,17 @@ unique_ptr<MultiFileList> DeltaSnapshot::ComplexFilterPushdown(ClientContext &co
filtered_list->names = names;

// Copy over the snapshot, this avoids reparsing metadata
{
unique_lock<mutex> lck(lock);
filtered_list->snapshot = snapshot;
}
{
unique_lock<mutex> lck(lock);
filtered_list->snapshot = snapshot;
}

auto &profiler = QueryProfiler::Get(context);

// Note: this is potentially quite expensive: we are creating 2 scans of the snapshot and fully materializing both
// file lists Therefore this is only done when profile is enabled. This is enable by default in debug mode or for
// EXPLAIN ANALYZE queries
// TODO: check locking behaviour below
// TODO: check locking behaviour below
if (profiler.IsEnabled()) {
Value result;
if (!context.TryGetCurrentSetting("delta_scan_explain_files_filtered", result)) {
Expand Down Expand Up @@ -595,7 +595,7 @@ unique_ptr<MultiFileList> DeltaSnapshot::ComplexFilterPushdown(ClientContext &co
}

vector<string> DeltaSnapshot::GetAllFiles() {
unique_lock<mutex> lck(lock);
unique_lock<mutex> lck(lock);
idx_t i = resolved_files.size();
// TODO: this can probably be improved
while (!GetFileInternal(i).empty()) {
Expand All @@ -613,7 +613,7 @@ FileExpandResult DeltaSnapshot::GetExpandResult() {
}

idx_t DeltaSnapshot::GetTotalFileCount() {
unique_lock<mutex> lck(lock);
unique_lock<mutex> lck(lock);
idx_t i = resolved_files.size();
while (!GetFileInternal(i).empty()) {
i++;
Expand All @@ -625,8 +625,8 @@ unique_ptr<NodeStatistics> DeltaSnapshot::GetCardinality(ClientContext &context)
// This also ensures all files are expanded
auto total_file_count = DeltaSnapshot::GetTotalFileCount();

// TODO: internalize above
unique_lock<mutex> lck(lock);
// TODO: internalize above
unique_lock<mutex> lck(lock);

if (total_file_count == 0) {
return make_uniq<NodeStatistics>(0, 0);
Expand All @@ -648,15 +648,14 @@ unique_ptr<NodeStatistics> DeltaSnapshot::GetCardinality(ClientContext &context)
return nullptr;
}


idx_t DeltaSnapshot::GetVersion() {
unique_lock<mutex> lck(lock);
return version;
unique_lock<mutex> lck(lock);
return version;
}

DeltaFileMetaData &DeltaSnapshot::GetMetaData(idx_t index) const {
unique_lock<mutex> lck(lock);
return *metadata[index];
unique_lock<mutex> lck(lock);
return *metadata[index];
}

unique_ptr<MultiFileReader> DeltaMultiFileReader::CreateInstance(const TableFunction &table_function) {
Expand Down Expand Up @@ -737,7 +736,7 @@ void DeltaMultiFileReader::FinalizeBind(const MultiFileReaderOptions &file_optio
// Get the metadata for this file
D_ASSERT(global_state->file_list);
const auto &snapshot = dynamic_cast<const DeltaSnapshot &>(*global_state->file_list);
auto &file_metadata = snapshot.GetMetaData(reader_data.file_list_idx.GetIndex());
auto &file_metadata = snapshot.GetMetaData(reader_data.file_list_idx.GetIndex());

if (!file_metadata.partition_map.empty()) {
for (idx_t i = 0; i < global_column_ids.size(); i++) {
Expand Down Expand Up @@ -997,15 +996,15 @@ void DeltaMultiFileReader::FinalizeChunk(ClientContext &context, const MultiFile

// Get the metadata for this file
const auto &snapshot = dynamic_cast<const DeltaSnapshot &>(*global_state->file_list);
auto &metadata = snapshot.GetMetaData(reader_data.file_list_idx.GetIndex());
auto &metadata = snapshot.GetMetaData(reader_data.file_list_idx.GetIndex());

if (metadata.selection_vector.ptr && chunk.size() != 0) {
D_ASSERT(delta_global_state.file_row_number_idx != DConstants::INVALID_INDEX);
auto &file_row_number_column = chunk.data[delta_global_state.file_row_number_idx];

// Construct the selection vector using the file_row_number column and the raw selection vector from delta
idx_t select_count;
auto sv = DuckSVFromDeltaSV(metadata.selection_vector, file_row_number_column, chunk.size(), select_count);
auto sv = DuckSVFromDeltaSV(metadata.selection_vector, file_row_number_column, chunk.size(), select_count);
chunk.Slice(sv, select_count);
}

Expand Down
12 changes: 6 additions & 6 deletions src/include/delta_functions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ namespace duckdb {

class DeltaFunctions {
public:
static vector<TableFunctionSet> GetTableFunctions(DatabaseInstance &instance);
static vector<ScalarFunctionSet> GetScalarFunctions(DatabaseInstance &instance);
static vector<TableFunctionSet> GetTableFunctions(DatabaseInstance &instance);
static vector<ScalarFunctionSet> GetScalarFunctions(DatabaseInstance &instance);

private:
//! Table Functions
static TableFunctionSet GetDeltaScanFunction(DatabaseInstance &instance);
//! Table Functions
static TableFunctionSet GetDeltaScanFunction(DatabaseInstance &instance);

//! Scalar Functions
static ScalarFunctionSet GetExpressionFunction(DatabaseInstance &instance);
//! Scalar Functions
static ScalarFunctionSet GetExpressionFunction(DatabaseInstance &instance);
};
} // namespace duckdb
Loading

0 comments on commit 252bad8

Please sign in to comment.