diff --git a/.github/workflows/MainDistributionPipeline.yml b/.github/workflows/MainDistributionPipeline.yml index 3449ca6..45d26e7 100644 --- a/.github/workflows/MainDistributionPipeline.yml +++ b/.github/workflows/MainDistributionPipeline.yml @@ -16,14 +16,12 @@ jobs: name: Build extension binaries uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@main with: - # pip install duckdb==1.1.4.dev2005 - duckdb_version: b470dea7ee + duckdb_version: main ci_tools_version: main extension_name: delta enable_rust: true - exclude_archs: 'wasm_mvp;wasm_eh;wasm_threads;windows_amd64_rtools;windows_amd64_mingw' + exclude_archs: 'wasm_mvp;wasm_eh;wasm_threads;windows_amd64_rtools;windows_amd64_mingw;linux_amd64_musl' extra_toolchains: 'python3' - vcpkg_commit: c82f74667287d3dc386bce81e44964370c91a289 duckdb-stable-deploy: name: Deploy extension binaries @@ -32,6 +30,6 @@ jobs: secrets: inherit with: extension_name: delta + duckdb_version: main ci_tools_version: main - duckdb_version: b470dea7ee - exclude_archs: 'wasm_mvp;wasm_eh;wasm_threads;windows_amd64_rtools;windows_amd64_mingw' + exclude_archs: 'wasm_mvp;wasm_eh;wasm_threads;windows_amd64_rtools;windows_amd64_mingw;linux_amd64_musl' diff --git a/Makefile b/Makefile index 4a361b3..7eb8376 100644 --- a/Makefile +++ b/Makefile @@ -20,7 +20,7 @@ test_debug: export DELTA_KERNEL_TESTS_PATH=./build/debug/rust/src/delta_kernel/k test_debug: export DAT_PATH=./build/debug/rust/src/delta_kernel/acceptance/tests/dat # Core extensions that we need for testing -#CORE_EXTENSIONS='tpcds;tpch;aws;azure;httpfs' +CORE_EXTENSIONS='tpcds;tpch;aws;azure;httpfs' # Set this flag during building to enable the benchmark runner ifeq (${BUILD_BENCHMARK}, 1) diff --git a/duckdb b/duckdb index b470dea..89bcc3e 160000 --- a/duckdb +++ b/duckdb @@ -1 +1 @@ -Subproject commit b470dea7ee47dc2debcc37a4e94976f8eff6670c +Subproject commit 89bcc3e2ce739b1b470afa79818ee03c8cf96fe8 diff --git a/extension-ci-tools b/extension-ci-tools index 916d4ef..4317e39 160000 --- a/extension-ci-tools +++ b/extension-ci-tools @@ -1 +1 @@ -Subproject commit 916d4ef4371068ca98a007378b52582c3e46b4e5 +Subproject commit 4317e39099f4b71d614f00d044aaec651bec6fc9 diff --git a/src/delta_extension.cpp b/src/delta_extension.cpp index 50ce93d..dac8888 100644 --- a/src/delta_extension.cpp +++ b/src/delta_extension.cpp @@ -2,6 +2,7 @@ #include "delta_extension.hpp" +#include "delta_utils.hpp" #include "delta_functions.hpp" #include "duckdb.hpp" #include "duckdb/common/exception.hpp" @@ -9,6 +10,7 @@ #include "duckdb/storage/storage_extension.hpp" #include "storage/delta_catalog.hpp" #include "storage/delta_transaction_manager.hpp" +#include "duckdb/main/config.hpp" namespace duckdb { @@ -59,9 +61,17 @@ static void LoadInternal(DatabaseInstance &instance) { config.storage_extensions["delta"] = make_uniq(); config.AddExtensionOption("delta_scan_explain_files_filtered", - "Adds the filtered files to the explain output. Warning: this may change performance of " + "Adds the filtered files to the explain output. Warning: this may impact performance of " "delta scan during explain analyze queries.", LogicalType::BOOLEAN, Value(true)); + + config.AddExtensionOption( + "delta_kernel_logging", + "Forwards the internal logging of the Delta Kernel to the duckdb logger. Warning: this may impact " + "performance even with DuckDB logging disabled.", + LogicalType::BOOLEAN, Value(false), LoggerCallback::DuckDBSettingCallBack); + + LoggerCallback::Initialize(instance); } void DeltaExtension::Load(DuckDB &db) { diff --git a/src/delta_utils.cpp b/src/delta_utils.cpp index d47d99f..9cf8337 100644 --- a/src/delta_utils.cpp +++ b/src/delta_utils.cpp @@ -4,6 +4,7 @@ #include "duckdb.hpp" #include "duckdb/main/extension_util.hpp" +#include "duckdb/main/database.hpp" #include #include @@ -171,7 +172,7 @@ void ExpressionVisitor::VisitPrimitiveLiteralDouble(void *state, uintptr_t sibli } void ExpressionVisitor::VisitTimestampLiteral(void *state, uintptr_t sibling_list_id, int64_t value) { - auto expression = make_uniq(Value::TIMESTAMPTZ(static_cast(value))); + auto expression = make_uniq(Value::TIMESTAMPTZ(timestamp_tz_t(value))); static_cast(state)->AppendToList(sibling_list_id, std::move(expression)); } @@ -634,4 +635,73 @@ uintptr_t PredicateVisitor::VisitFilter(const string &col_name, const TableFilte } } +void LoggerCallback::Initialize(DatabaseInstance &db_p) { + auto &instance = GetInstance(); + unique_lock lck(instance.lock); + if (instance.db.expired()) { + instance.db = db_p.shared_from_this(); + } +} + +void LoggerCallback::CallbackEvent(ffi::Event event) { + auto &instance = GetInstance(); + auto db_locked = instance.db.lock(); + if (db_locked) { + auto transformed_log_level = GetDuckDBLogLevel(event.level); + string constructed_log_message; + Logger::Log("delta.Kernel", *db_locked, transformed_log_level, [&]() { + auto log_type = KernelUtils::FromDeltaString(event.target); + auto message = KernelUtils::FromDeltaString(event.message); + auto file = KernelUtils::FromDeltaString(event.file); + if (!file.empty()) { + constructed_log_message = StringUtil::Format("[%s] %s@%u : %s ", log_type, file, event.line, message); + } else { + constructed_log_message = message; + } + + return constructed_log_message.c_str(); + }); + } +} + +LogLevel LoggerCallback::GetDuckDBLogLevel(ffi::Level level) { + switch (level) { + case ffi::Level::TRACE: + return LogLevel::LOG_TRACE; + case ffi::Level::DEBUGGING: + return LogLevel::LOG_DEBUG; + case ffi::Level::INFO: + return LogLevel::LOG_INFO; + case ffi::Level::WARN: + return LogLevel::LOG_WARN; + case ffi::Level::ERROR: + return LogLevel::LOG_ERROR; + } +} + +LoggerCallback &LoggerCallback::GetInstance() { + static LoggerCallback instance; + return instance; +} + +void LoggerCallback::DuckDBSettingCallBack(ClientContext &context, SetScope scope, Value ¶meter) { + Value current_setting; + auto res = context.TryGetCurrentSetting("delta_kernel_logging", current_setting); + + if (res.GetScope() == SettingScope::INVALID) { + throw InternalException("Failed to find setting 'delta_kernel_logging'"); + } + + if (current_setting.GetValue() && !parameter.GetValue()) { + throw InvalidInputException("Can not disable 'delta_kernel_logging' after enabling it. You can disable DuckDB " + "logging with SET enable_logging=false, but there will still be some performance " + "overhead from 'delta_kernel_logging'" + "that can only be mitigated by restarting DuckDB"); + } + + if (!current_setting.GetValue() && parameter.GetValue()) { + ffi::enable_event_tracing(LoggerCallback::CallbackEvent, ffi::Level::TRACE); + } +} + }; // namespace duckdb diff --git a/src/include/delta_utils.hpp b/src/include/delta_utils.hpp index 540bdb6..5bf4e1d 100644 --- a/src/include/delta_utils.hpp +++ b/src/include/delta_utils.hpp @@ -15,6 +15,7 @@ // TODO: clean up this file as we go namespace duckdb { +class DatabaseInstance; class ExpressionVisitor : public ffi::EngineExpressionVisitor { using FieldList = vector>; @@ -336,4 +337,25 @@ class PredicateVisitor : public ffi::EnginePredicate { uintptr_t VisitFilter(const string &col_name, const TableFilter &filter, ffi::KernelExpressionVisitorState *state); }; +// Singleton class to forward logs to DuckDB +class LoggerCallback { +public: + //! The Callback for the DuckDB setting to hook up Delta Kernel Logging to the DuckDB logger + static void DuckDBSettingCallBack(ClientContext &context, SetScope scope, Value ¶meter); + + //! Singleton GetInstance + static LoggerCallback &GetInstance(); + static void Initialize(DatabaseInstance &db); + static void CallbackEvent(ffi::Event log_line); + + static LogLevel GetDuckDBLogLevel(ffi::Level); + +protected: + LoggerCallback() { + } + + mutex lock; + weak_ptr db; +}; + } // namespace duckdb diff --git a/src/storage/delta_catalog.cpp b/src/storage/delta_catalog.cpp index 5ecba54..fb301e2 100644 --- a/src/storage/delta_catalog.cpp +++ b/src/storage/delta_catalog.cpp @@ -60,22 +60,22 @@ bool DeltaCatalog::UseCachedSnapshot() { optional_idx DeltaCatalog::GetCatalogVersion(ClientContext &context) { auto &delta_transaction = DeltaTransaction::Get(context, *this); - idx_t version = DConstants::INVALID_INDEX; + idx_t version = DConstants::INVALID_INDEX; // Option 1: snapshot is cached table-wide auto cached_snapshot = main_schema->GetCachedTable(); if (cached_snapshot) { - version = cached_snapshot->snapshot->GetVersion(); + version = cached_snapshot->snapshot->GetVersion(); } // Option 2: snapshot is cached in transaction if (delta_transaction.table_entry) { - version = delta_transaction.table_entry->snapshot->GetVersion(); + version = delta_transaction.table_entry->snapshot->GetVersion(); } - if (version != DConstants::INVALID_INDEX) { - return version; - } + if (version != DConstants::INVALID_INDEX) { + return version; + } return optional_idx::Invalid(); } diff --git a/test/sql/delta_kernel_rs/logging.test b/test/sql/delta_kernel_rs/logging.test new file mode 100644 index 0000000..68fd5b1 --- /dev/null +++ b/test/sql/delta_kernel_rs/logging.test @@ -0,0 +1,44 @@ +# name: test/sql/delta_kernel_rs/logging.test +# description: test that delta kernel log entries are properly propagated to the DuckDB logger +# group: [delta_kernel_rs] + +require parquet + +require delta + +require-env DELTA_KERNEL_TESTS_PATH + +statement ok +set enable_logging=true; + +statement ok +SELECT * FROM delta_scan('${DELTA_KERNEL_TESTS_PATH}/basic_partitioned') + +# No kernel logging available yet: we need to set delta_kernel_logging=true +query I +SELECT count(*) FROM duckdb_logs WHERE starts_with(type, 'delta.Kernel') +---- +0 + +statement ok +set delta_kernel_logging=true; + +statement ok +set logging_level = 'TRACE'; + +statement ok +SELECT * FROM delta_scan('${DELTA_KERNEL_TESTS_PATH}/basic_partitioned') + +# Now we have log! +query I +SELECT count(*) > 50 FROM duckdb_logs WHERE starts_with(type, 'delta.Kernel') +---- +true + +statement ok +set delta_kernel_logging=true; + +statement error +set delta_kernel_logging=false; +---- +Invalid Input Error: Can not disable 'delta_kernel_logging' after enabling it \ No newline at end of file