diff --git a/.github/workflows/LocalTesting.yml b/.github/workflows/LocalTesting.yml index 34457eb..5c425b8 100644 --- a/.github/workflows/LocalTesting.yml +++ b/.github/workflows/LocalTesting.yml @@ -294,11 +294,12 @@ jobs: run: | python ./duckdb/scripts/regression/test_runner.py --old=duckdb_delta/build/release/benchmark/benchmark_runner --new=build/release/benchmark/benchmark_runner --benchmarks=.github/regression/tpcds_sf1_local.csv --verbose --threads=2 --root-dir=. - - name: Regression Test Micro - if: always() - shell: bash - run: | - python ./duckdb/scripts/regression/test_runner.py --old=duckdb_delta/build/release/benchmark/benchmark_runner --new=build/release/benchmark/benchmark_runner --benchmarks=.github/regression/micro.csv --verbose --threads=2 --root-dir=. + # FIXME: re-enable +# - name: Regression Test Micro +# if: always() +# shell: bash +# run: | +# python ./duckdb/scripts/regression/test_runner.py --old=duckdb_delta/build/release/benchmark/benchmark_runner --new=build/release/benchmark/benchmark_runner --benchmarks=.github/regression/micro.csv --verbose --threads=2 --root-dir=. - name: Test benchmark makefile shell: bash diff --git a/CMakeLists.txt b/CMakeLists.txt index da8dfd2..13ce508 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -37,6 +37,7 @@ elseif(UNIX) elseif(WIN32) set(PLATFORM_LIBS ntdll + crypt32 ncrypt secur32 ws2_32 @@ -119,11 +120,8 @@ set(RUST_UNSET_ENV_VARS --unset=CC --unset=CXX --unset=LD) set(DELTA_KERNEL_LIBNAME "${CMAKE_STATIC_LIBRARY_PREFIX}delta_kernel_ffi${CMAKE_STATIC_LIBRARY_SUFFIX}" ) -set(DELTA_KERNEL_LIBPATH_DEBUG - "${CMAKE_BINARY_DIR}/rust/src/delta_kernel/target/${RUST_PLATFORM_TARGET}/debug/${DELTA_KERNEL_LIBNAME}" -) -set(DELTA_KERNEL_LIBPATH_RELEASE - "${CMAKE_BINARY_DIR}/rust/src/delta_kernel/target/${RUST_PLATFORM_TARGET}/release/${DELTA_KERNEL_LIBNAME}" +set(DELTA_KERNEL_LIBPATH + "${CMAKE_BINARY_DIR}/rust/src/delta_kernel/target/${RUST_PLATFORM_TARGET}/$,debug,release>/${DELTA_KERNEL_LIBNAME}" ) set(DELTA_KERNEL_FFI_HEADER_PATH "${CMAKE_BINARY_DIR}/rust/src/delta_kernel/target/ffi-headers") @@ -142,7 +140,7 @@ ExternalProject_Add( # the c++ headers. Currently, when bumping the kernel version, the produced # header in ./src/include/delta_kernel_ffi.hpp should be also bumped, applying # the fix - GIT_TAG v0.5.0 + GIT_TAG v0.6.0 # Prints the env variables passed to the cargo build to the terminal, useful # in debugging because passing them through CMake is an error-prone mess CONFIGURE_COMMAND ${CMAKE_COMMAND} -E env ${RUST_UNSET_ENV_VARS} @@ -152,19 +150,13 @@ ExternalProject_Add( # Build debug build BUILD_COMMAND ${CMAKE_COMMAND} -E env ${RUST_UNSET_ENV_VARS} ${RUST_ENV_VARS} cargo build - --package delta_kernel_ffi --workspace --all-features ${RUST_PLATFORM_PARAM} - # Build release build - COMMAND - ${CMAKE_COMMAND} -E env ${RUST_UNSET_ENV_VARS} ${RUST_ENV_VARS} cargo build - --package delta_kernel_ffi --workspace --all-features --release - ${RUST_PLATFORM_PARAM} + --package delta_kernel_ffi --workspace $<$:--release> --all-features ${RUST_PLATFORM_PARAM} # Build DATs COMMAND ${CMAKE_COMMAND} -E env ${RUST_UNSET_ENV_VARS} ${RUST_ENV_VARS} cargo build --manifest-path=${CMAKE_BINARY_DIR}/rust/src/delta_kernel/acceptance/Cargo.toml # Define the byproducts, required for building with Ninja - BUILD_BYPRODUCTS "${DELTA_KERNEL_LIBPATH_DEBUG}" - BUILD_BYPRODUCTS "${DELTA_KERNEL_LIBPATH_RELEASE}" + BUILD_BYPRODUCTS "${DELTA_KERNEL_LIBPATH}" BUILD_BYPRODUCTS "${DELTA_KERNEL_FFI_HEADER_C}" BUILD_BYPRODUCTS "${DELTA_KERNEL_FFI_HEADER_CXX}" INSTALL_COMMAND "" @@ -186,14 +178,12 @@ add_compile_definitions(DEFINE_DEFAULT_ENGINE) # Link delta-kernal-rs to static lib target_link_libraries( - ${EXTENSION_NAME} debug ${DELTA_KERNEL_LIBPATH_DEBUG} optimized - ${DELTA_KERNEL_LIBPATH_RELEASE} ${PLATFORM_LIBS}) + ${EXTENSION_NAME} ${DELTA_KERNEL_LIBPATH} ${PLATFORM_LIBS}) add_dependencies(${EXTENSION_NAME} delta_kernel) # Link delta-kernal-rs to dynamic lib target_link_libraries( - ${LOADABLE_EXTENSION_NAME} debug ${DELTA_KERNEL_LIBPATH_DEBUG} optimized - ${DELTA_KERNEL_LIBPATH_RELEASE} ${PLATFORM_LIBS}) + ${LOADABLE_EXTENSION_NAME} ${DELTA_KERNEL_LIBPATH} ${PLATFORM_LIBS}) add_dependencies(${LOADABLE_EXTENSION_NAME} delta_kernel) install( diff --git a/Makefile b/Makefile index 8cc8bc9..4a361b3 100644 --- a/Makefile +++ b/Makefile @@ -4,6 +4,14 @@ PROJ_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST)))) EXT_NAME=deltatable EXT_CONFIG=${PROJ_DIR}extension_config.cmake +ifeq ($(SANITIZER_MODE), thread) + EXT_DEBUG_FLAGS:=-DENABLE_THREAD_SANITIZER=1 +endif + +ifneq ("${CUSTOM_LINKER}", "") + EXT_DEBUG_FLAGS:=${EXT_DEBUG_FLAGS} -DCUSTOM_LINKER=${CUSTOM_LINKER} +endif + # Set test paths test_release: export DELTA_KERNEL_TESTS_PATH=./build/release/rust/src/delta_kernel/kernel/tests/data test_release: export DAT_PATH=./build/release/rust/src/delta_kernel/acceptance/tests/dat @@ -12,7 +20,7 @@ test_debug: export DELTA_KERNEL_TESTS_PATH=./build/debug/rust/src/delta_kernel/k test_debug: export DAT_PATH=./build/debug/rust/src/delta_kernel/acceptance/tests/dat # Core extensions that we need for testing -CORE_EXTENSIONS='tpcds;tpch;aws;azure;httpfs' +#CORE_EXTENSIONS='tpcds;tpch;aws;azure;httpfs' # Set this flag during building to enable the benchmark runner ifeq (${BUILD_BENCHMARK}, 1) diff --git a/src/delta_utils.cpp b/src/delta_utils.cpp index 6b3c03c..93a84ca 100644 --- a/src/delta_utils.cpp +++ b/src/delta_utils.cpp @@ -372,18 +372,15 @@ uintptr_t SchemaVisitor::MakeFieldListImpl(uintptr_t capacity_hint) { void SchemaVisitor::AppendToList(uintptr_t id, ffi::KernelStringSlice name, LogicalType &&child) { auto it = inflight_lists.find(id); if (it == inflight_lists.end()) { - // TODO... some error... - throw InternalException("WEIRD SHIT"); - } else { - it->second->emplace_back(std::make_pair(string(name.ptr, name.len), std::move(child))); + throw InternalException("Unhandled error in SchemaVisitor::AppendToList child"); } + it->second->emplace_back(std::make_pair(string(name.ptr, name.len), std::move(child))); } unique_ptr SchemaVisitor::TakeFieldList(uintptr_t id) { auto it = inflight_lists.find(id); if (it == inflight_lists.end()) { - // TODO: Raise some kind of error. - throw InternalException("WEIRD SHIT 2"); + throw InternalException("Unhandled error in SchemaVisitor::TakeFieldList"); } auto rval = std::move(it->second); inflight_lists.erase(it); @@ -438,10 +435,12 @@ string DuckDBEngineError::KernelErrorEnumToString(ffi::KernelError err) { "MissingCommitInfo", "UnsupportedError", "ParseIntervalError", - "ChangeDataFeedUnsupported" + "ChangeDataFeedUnsupported", + "ChangeDataFeedIncompatibleSchema", + "InvalidCheckpoint" }; - static_assert(sizeof(KERNEL_ERROR_ENUM_STRINGS) / sizeof(char *) - 1 == (int)ffi::KernelError::ChangeDataFeedUnsupported, + static_assert(sizeof(KERNEL_ERROR_ENUM_STRINGS) / sizeof(char *) - 1 == (int)ffi::KernelError::InvalidCheckpoint, "KernelErrorEnumStrings mismatched with kernel"); if ((int)err < sizeof(KERNEL_ERROR_ENUM_STRINGS) / sizeof(char *)) { diff --git a/src/functions/delta_scan.cpp b/src/functions/delta_scan.cpp index fb4bbe4..0018923 100644 --- a/src/functions/delta_scan.cpp +++ b/src/functions/delta_scan.cpp @@ -1,4 +1,5 @@ #include "functions/delta_scan.hpp" +#include "storage/delta_catalog.hpp" #include "delta_functions.hpp" #include "duckdb/catalog/catalog_entry/table_function_catalog_entry.hpp" @@ -16,13 +17,9 @@ #include "duckdb/planner/binder.hpp" #include "duckdb/planner/operator/logical_get.hpp" #include "duckdb/main/query_profiler.hpp" +#include "duckdb/main/client_data.hpp" -#include -#include #include -#include -#include -#include namespace duckdb { @@ -49,7 +46,7 @@ string url_decode(string input) { return result; } -static void visit_callback(ffi::NullableCvoid engine_context, struct ffi::KernelStringSlice path, int64_t size, +void DeltaSnapshot::VisitCallback(ffi::NullableCvoid engine_context, struct ffi::KernelStringSlice path, int64_t size, const ffi::Stats *stats, const ffi::DvInfo *dv_info, const struct ffi::CStringMap *partition_values) { auto context = (DeltaSnapshot *)engine_context; @@ -94,9 +91,9 @@ static void visit_callback(ffi::NullableCvoid engine_context, struct ffi::Kernel context->metadata.back()->partition_map = std::move(constant_map); } -static void visit_data(void *engine_context, ffi::ExclusiveEngineData *engine_data, +void DeltaSnapshot::VisitData(void *engine_context, ffi::ExclusiveEngineData *engine_data, const struct ffi::KernelBoolSlice selection_vec) { - ffi::visit_scan_data(engine_data, selection_vec, engine_context, visit_callback); + ffi::visit_scan_data(engine_data, selection_vec, engine_context, VisitCallback); } string ParseAccountNameFromEndpoint(const string &endpoint) { @@ -241,50 +238,52 @@ static ffi::EngineBuilder *CreateBuilder(ClientContext &context, const string &p // Here you would need to add the logic for setting the builder options for Azure // This is just a placeholder and will need to be replaced with the actual logic if (secret_type == "s3" || secret_type == "gcs" || secret_type == "r2") { - - string key_id, secret, session_token, region, endpoint, url_style; - bool use_ssl = true; - secret_reader.TryGetSecretKey("key_id", key_id); - secret_reader.TryGetSecretKey("secret", secret); - secret_reader.TryGetSecretKey("session_token", session_token); - secret_reader.TryGetSecretKey("region", region); - secret_reader.TryGetSecretKey("endpoint", endpoint); - secret_reader.TryGetSecretKey("url_style", url_style); - secret_reader.TryGetSecretKey("use_ssl", use_ssl); - - if (key_id.empty() && secret.empty()) { - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("skip_signature"), - KernelUtils::ToDeltaString("true")); - } - - if (!key_id.empty()) { - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_access_key_id"), - KernelUtils::ToDeltaString(key_id)); - } - if (!secret.empty()) { - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_secret_access_key"), - KernelUtils::ToDeltaString(secret)); - } - if (!session_token.empty()) { - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_session_token"), - KernelUtils::ToDeltaString(session_token)); - } - if (!endpoint.empty() && endpoint != "s3.amazonaws.com") { - if (!StringUtil::StartsWith(endpoint, "https://") && !StringUtil::StartsWith(endpoint, "http://")) { - if (use_ssl) { - endpoint = "https://" + endpoint; - } else { - endpoint = "http://" + endpoint; - } - } - - if (StringUtil::StartsWith(endpoint, "http://")) { - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("allow_http"), - KernelUtils::ToDeltaString("true")); - } - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_endpoint"), - KernelUtils::ToDeltaString(endpoint)); - } + string key_id, secret, session_token, region, endpoint, url_style; + bool use_ssl = true; + secret_reader.TryGetSecretKey("key_id", key_id); + secret_reader.TryGetSecretKey("secret", secret); + secret_reader.TryGetSecretKey("session_token", session_token); + secret_reader.TryGetSecretKey("region", region); + secret_reader.TryGetSecretKey("endpoint", endpoint); + secret_reader.TryGetSecretKey("url_style", url_style); + secret_reader.TryGetSecretKey("use_ssl", use_ssl); + + if (key_id.empty() && secret.empty()) { + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("skip_signature"), + KernelUtils::ToDeltaString("true")); + } + + if (!key_id.empty()) { + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_access_key_id"), + KernelUtils::ToDeltaString(key_id)); + } + if (!secret.empty()) { + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_secret_access_key"), + KernelUtils::ToDeltaString(secret)); + } + if (!session_token.empty()) { + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_session_token"), + KernelUtils::ToDeltaString(session_token)); + } + if (!endpoint.empty() && endpoint != "s3.amazonaws.com") { + if (!StringUtil::StartsWith(endpoint, "https://") && !StringUtil::StartsWith(endpoint, "http://")) { + if (use_ssl) { + endpoint = "https://" + endpoint; + } else { + endpoint = "http://" + endpoint; + } + } + + if (StringUtil::StartsWith(endpoint, "http://")) { + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("allow_http"), + KernelUtils::ToDeltaString("true")); + } + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_endpoint"), + KernelUtils::ToDeltaString(endpoint)); + } else if (StringUtil::StartsWith(path, "gs://") || StringUtil::StartsWith(path, "gcs://")) { + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_endpoint"), + KernelUtils::ToDeltaString("https://storage.googleapis.com")); + } ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_region"), KernelUtils::ToDeltaString(region)); @@ -386,7 +385,7 @@ DeltaSnapshot::DeltaSnapshot(ClientContext &context_p, const string &path) : MultiFileList({ToDeltaPath(path)}, FileGlobOptions::ALLOW_EMPTY), context(context_p) { } -string DeltaSnapshot::GetPath() { +string DeltaSnapshot::GetPath() const { return GetPaths()[0]; } @@ -416,6 +415,8 @@ string DeltaSnapshot::ToDeltaPath(const string &raw_path) { } void DeltaSnapshot::Bind(vector &return_types, vector &names) { + unique_lock lck(lock); + if (have_bound) { names = this->names; return_types = this->types; @@ -443,7 +444,7 @@ void DeltaSnapshot::Bind(vector &return_types, vector &name this->types = return_types; } -string DeltaSnapshot::GetFile(idx_t i) { +string DeltaSnapshot::GetFileInternal(idx_t i) { if (!initialized_snapshot) { InitializeSnapshot(); } @@ -462,7 +463,7 @@ string DeltaSnapshot::GetFile(idx_t i) { } while (i >= resolved_files.size()) { - auto have_scan_data_res = ffi::kernel_scan_data_next(scan_data_iterator.get(), this, visit_data); + auto have_scan_data_res = ffi::kernel_scan_data_next(scan_data_iterator.get(), this, VisitData); auto have_scan_data = TryUnpackKernelResult(have_scan_data_res); @@ -473,14 +474,15 @@ string DeltaSnapshot::GetFile(idx_t i) { } } - // The kernel scan visitor should have resolved a file OR returned - if (i >= resolved_files.size()) { - throw IOException("Delta Kernel seems to have failed to resolve a new file"); - } - return resolved_files[i]; } +string DeltaSnapshot::GetFile(idx_t i) { + // TODO: profile this: we should be able to use atomics here to optimize + unique_lock lck(lock); + return GetFileInternal(i); +} + void DeltaSnapshot::InitializeSnapshot() { auto path_slice = KernelUtils::ToDeltaString(paths[0]); @@ -535,13 +537,17 @@ unique_ptr DeltaSnapshot::ComplexFilterPushdown(ClientContext &co filtered_list->names = names; // Copy over the snapshot, this avoids reparsing metadata - filtered_list->snapshot = snapshot; + { + unique_lock lck(lock); + filtered_list->snapshot = snapshot; + } auto &profiler = QueryProfiler::Get(context); // Note: this is potentially quite expensive: we are creating 2 scans of the snapshot and fully materializing both // file lists Therefore this is only done when profile is enabled. This is enable by default in debug mode or for // EXPLAIN ANALYZE queries + // TODO: check locking behaviour below if (profiler.IsEnabled()) { Value result; if (!context.TryGetCurrentSetting("delta_scan_explain_files_filtered", result)) { @@ -589,9 +595,10 @@ unique_ptr DeltaSnapshot::ComplexFilterPushdown(ClientContext &co } vector DeltaSnapshot::GetAllFiles() { + unique_lock lck(lock); idx_t i = resolved_files.size(); // TODO: this can probably be improved - while (!GetFile(i).empty()) { + while (!GetFileInternal(i).empty()) { i++; } return resolved_files; @@ -606,9 +613,9 @@ FileExpandResult DeltaSnapshot::GetExpandResult() { } idx_t DeltaSnapshot::GetTotalFileCount() { - // TODO: this can probably be improved + unique_lock lck(lock); idx_t i = resolved_files.size(); - while (!GetFile(i).empty()) { + while (!GetFileInternal(i).empty()) { i++; } return resolved_files.size(); @@ -618,6 +625,9 @@ unique_ptr DeltaSnapshot::GetCardinality(ClientContext &context) // This also ensures all files are expanded auto total_file_count = DeltaSnapshot::GetTotalFileCount(); + // TODO: internalize above + unique_lock lck(lock); + if (total_file_count == 0) { return make_uniq(0, 0); } @@ -638,6 +648,17 @@ unique_ptr DeltaSnapshot::GetCardinality(ClientContext &context) return nullptr; } + +idx_t DeltaSnapshot::GetVersion() { + unique_lock lck(lock); + return version; +} + +DeltaFileMetaData &DeltaSnapshot::GetMetaData(idx_t index) const { + unique_lock lck(lock); + return *metadata[index]; +} + unique_ptr DeltaMultiFileReader::CreateInstance(const TableFunction &table_function) { auto result = make_uniq(); @@ -716,16 +737,16 @@ void DeltaMultiFileReader::FinalizeBind(const MultiFileReaderOptions &file_optio // Get the metadata for this file D_ASSERT(global_state->file_list); const auto &snapshot = dynamic_cast(*global_state->file_list); - auto &file_metadata = snapshot.metadata[reader_data.file_list_idx.GetIndex()]; + auto &file_metadata = snapshot.GetMetaData(reader_data.file_list_idx.GetIndex()); - if (!file_metadata->partition_map.empty()) { + if (!file_metadata.partition_map.empty()) { for (idx_t i = 0; i < global_column_ids.size(); i++) { column_t col_id = global_column_ids[i].GetPrimaryIndex(); if (IsRowIdColumnId(col_id)) { continue; } - auto col_partition_entry = file_metadata->partition_map.find(global_names[col_id]); - if (col_partition_entry != file_metadata->partition_map.end()) { + auto col_partition_entry = file_metadata.partition_map.find(global_names[col_id]); + if (col_partition_entry != file_metadata.partition_map.end()) { auto ¤t_type = global_types[col_id]; if (current_type == LogicalType::BLOB) { reader_data.constant_map.emplace_back(i, Value::BLOB_RAW(col_partition_entry->second)); @@ -770,7 +791,6 @@ static SelectionVector DuckSVFromDeltaSV(const ffi::KernelBoolSlice &dv, Vector for (idx_t i = 0; i < count; i++) { auto row_id = row_ids[data.sel->get_index(i)]; - // TODO: why are deletion vectors not spanning whole data? if (row_id >= dv.len || dv.ptr[row_id]) { result.data()[current_select] = i; current_select++; @@ -977,15 +997,15 @@ void DeltaMultiFileReader::FinalizeChunk(ClientContext &context, const MultiFile // Get the metadata for this file const auto &snapshot = dynamic_cast(*global_state->file_list); - auto &metadata = snapshot.metadata[reader_data.file_list_idx.GetIndex()]; + auto &metadata = snapshot.GetMetaData(reader_data.file_list_idx.GetIndex()); - if (metadata->selection_vector.ptr && chunk.size() != 0) { + if (metadata.selection_vector.ptr && chunk.size() != 0) { D_ASSERT(delta_global_state.file_row_number_idx != DConstants::INVALID_INDEX); auto &file_row_number_column = chunk.data[delta_global_state.file_row_number_idx]; // Construct the selection vector using the file_row_number column and the raw selection vector from delta idx_t select_count; - auto sv = DuckSVFromDeltaSV(metadata->selection_vector, file_row_number_column, chunk.size(), select_count); + auto sv = DuckSVFromDeltaSV(metadata.selection_vector, file_row_number_column, chunk.size(), select_count); chunk.Slice(sv, select_count); } diff --git a/src/include/delta_kernel_ffi.hpp b/src/include/delta_kernel_ffi.hpp index 3b6a615..ec9db0c 100644 --- a/src/include/delta_kernel_ffi.hpp +++ b/src/include/delta_kernel_ffi.hpp @@ -9,56 +9,99 @@ namespace ffi { enum class KernelError { - UnknownError, - FFIError, + UnknownError, + FFIError, #if (defined(DEFINE_DEFAULT_ENGINE) || defined(DEFINE_SYNC_ENGINE)) - ArrowError, + ArrowError, #endif - EngineDataTypeError, - ExtractError, - GenericError, - IOErrorError, + EngineDataTypeError, + ExtractError, + GenericError, + IOErrorError, #if (defined(DEFINE_DEFAULT_ENGINE) || defined(DEFINE_SYNC_ENGINE)) - ParquetError, + ParquetError, #endif #if defined(DEFINE_DEFAULT_ENGINE) - ObjectStoreError, + ObjectStoreError, #endif #if defined(DEFINE_DEFAULT_ENGINE) - ObjectStorePathError, + ObjectStorePathError, #endif #if defined(DEFINE_DEFAULT_ENGINE) - ReqwestError, + ReqwestError, #endif - FileNotFoundError, - MissingColumnError, - UnexpectedColumnTypeError, - MissingDataError, - MissingVersionError, - DeletionVectorError, - InvalidUrlError, - MalformedJsonError, - MissingMetadataError, - MissingProtocolError, - InvalidProtocolError, - MissingMetadataAndProtocolError, - ParseError, - JoinFailureError, - Utf8Error, - ParseIntError, - InvalidColumnMappingModeError, - InvalidTableLocationError, - InvalidDecimalError, - InvalidStructDataError, - InternalError, - InvalidExpression, - InvalidLogPath, - InvalidCommitInfo, - FileAlreadyExists, - MissingCommitInfo, - UnsupportedError, - ParseIntervalError, - ChangeDataFeedUnsupported, + FileNotFoundError, + MissingColumnError, + UnexpectedColumnTypeError, + MissingDataError, + MissingVersionError, + DeletionVectorError, + InvalidUrlError, + MalformedJsonError, + MissingMetadataError, + MissingProtocolError, + InvalidProtocolError, + MissingMetadataAndProtocolError, + ParseError, + JoinFailureError, + Utf8Error, + ParseIntError, + InvalidColumnMappingModeError, + InvalidTableLocationError, + InvalidDecimalError, + InvalidStructDataError, + InternalError, + InvalidExpression, + InvalidLogPath, + InvalidCommitInfo, + FileAlreadyExists, + MissingCommitInfo, + UnsupportedError, + ParseIntervalError, + ChangeDataFeedUnsupported, + ChangeDataFeedIncompatibleSchema, + InvalidCheckpoint, +}; + +/// Definitions of level verbosity. Verbose Levels are "greater than" less verbose ones. So +/// Level::ERROR is the lowest, and Level::TRACE the highest. +enum class Level { + ERROR = 0, + WARN = 1, + INFO = 2, + DEBUGGING = 3, + TRACE = 4, +}; + +/// Format to use for log lines. These correspond to the formats from [`tracing_subscriber` +/// formats](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/fmt/format/index.html). +enum class LogLineFormat { + /// The default formatter. This emits human-readable, single-line logs for each event that + /// occurs, with the context displayed before the formatted representation of the event. + /// Example: + /// `2022-02-15T18:40:14.289898Z INFO fmt: preparing to shave yaks number_of_yaks=3` + FULL, + /// A variant of the FULL formatter, optimized for short line lengths. Fields from the context + /// are appended to the fields of the formatted event, and targets are not shown. + /// Example: + /// `2022-02-17T19:51:05.809287Z INFO fmt_compact: preparing to shave yaks number_of_yaks=3` + COMPACT, + /// Emits excessively pretty, multi-line logs, optimized for human readability. This is + /// primarily intended to be used in local development and debugging, or for command-line + /// applications, where automated analysis and compact storage of logs is less of a priority + /// than readability and visual appeal. + /// Example: + /// ```ignore + /// 2022-02-15T18:44:24.535324Z INFO fmt_pretty: preparing to shave yaks, number_of_yaks: 3 + /// at examples/examples/fmt-pretty.rs:16 on main + /// ``` + PRETTY, + /// Outputs newline-delimited JSON logs. This is intended for production use with systems where + /// structured logs are consumed as JSON by analysis and viewing tools. The JSON output is not + /// optimized for human readability. + /// Example: + /// `{"timestamp":"2022-02-15T18:47:10.821315Z","level":"INFO","fields":{"message":"preparing to shave yaks","number_of_yaks":3},"target":"fmt_json"}` + JSON, }; struct CStringMap; @@ -100,15 +143,15 @@ struct StringSliceIterator; /// receives a `KernelBoolSlice` as a return value from a kernel method, engine is responsible /// to free that slice, by calling [super::free_bool_slice] exactly once. struct KernelBoolSlice { - bool *ptr; - uintptr_t len; + bool *ptr; + uintptr_t len; }; /// An owned slice of u64 row indexes allocated by the kernel. The engine is responsible for /// freeing this slice by calling [super::free_row_indexes] once. struct KernelRowIndexArray { - uint64_t *ptr; - uintptr_t len; + uint64_t *ptr; + uintptr_t len; }; /// Represents an object that crosses the FFI boundary and which outlives the scope that created @@ -143,8 +186,8 @@ struct KernelRowIndexArray { /// NOTE: Because the underlying type is always [`Sync`], multi-threaded external code can /// freely access shared (non-mutable) handles. /// -template -using Handle = H *; +template +using Handle = H*; /// An error that can be returned to the engine. Engines that wish to associate additional /// information can define and use any type that is [pointer @@ -153,31 +196,31 @@ using Handle = H *; /// of a [standard layout](https://en.cppreference.com/w/cpp/language/data_members#Standard-layout) /// class. struct EngineError { - KernelError etype; + KernelError etype; }; /// Semantics: Kernel will always immediately return the leaked engine error to the engine (if it /// allocated one at all), and engine is responsible for freeing it. -template +template struct ExternResult { - enum class Tag { - Ok, - Err, - }; - - struct Ok_Body { - T _0; - }; - - struct Err_Body { - EngineError *_0; - }; - - Tag tag; - union { - Ok_Body ok; - Err_Body err; - }; + enum class Tag { + Ok, + Err, + }; + + struct Ok_Body { + T _0; + }; + + struct Err_Body { + EngineError *_0; + }; + + Tag tag; + union { + Ok_Body ok; + Err_Body err; + }; }; /// A non-owned slice of a UTF8 string, intended for arg-passing between kernel and engine. The @@ -203,17 +246,17 @@ struct ExternResult { /// Meanwhile, the callee must assume that the slice is only valid until the function returns, and /// must not retain any references to the slice or its data that might outlive the function call. struct KernelStringSlice { - const char *ptr; - uintptr_t len; + const char *ptr; + uintptr_t len; }; -using AllocateErrorFn = EngineError *(*)(KernelError etype, KernelStringSlice msg); +using AllocateErrorFn = EngineError*(*)(KernelError etype, KernelStringSlice msg); -using NullableCvoid = void *; +using NullableCvoid = void*; /// Allow engines to allocate strings of their own type. the contract of calling a passed allocate /// function is that `kernel_str` is _only_ valid until the return from this function -using AllocateStringFn = NullableCvoid (*)(KernelStringSlice kernel_str); +using AllocateStringFn = NullableCvoid(*)(KernelStringSlice kernel_str); /// ABI-compatible struct for ArrowArray from C Data Interface /// See @@ -226,16 +269,16 @@ using AllocateStringFn = NullableCvoid (*)(KernelStringSlice kernel_str); /// } /// ``` struct FFI_ArrowArray { - int64_t length; - int64_t null_count; - int64_t offset; - int64_t n_buffers; - int64_t n_children; - const void **buffers; - FFI_ArrowArray **children; - FFI_ArrowArray *dictionary; - void (*release)(FFI_ArrowArray *arg1); - void *private_data; + int64_t length; + int64_t null_count; + int64_t offset; + int64_t n_buffers; + int64_t n_children; + const void **buffers; + FFI_ArrowArray **children; + FFI_ArrowArray *dictionary; + void (*release)(FFI_ArrowArray *arg1); + void *private_data; }; /// ABI-compatible struct for `ArrowSchema` from C Data Interface @@ -250,16 +293,16 @@ struct FFI_ArrowArray { /// ``` /// struct FFI_ArrowSchema { - const char *format; - const char *name; - const char *metadata; - /// Refer to [Arrow Flags](https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema.flags) - int64_t flags; - int64_t n_children; - FFI_ArrowSchema **children; - FFI_ArrowSchema *dictionary; - void (*release)(FFI_ArrowSchema *arg1); - void *private_data; + const char *format; + const char *name; + const char *metadata; + /// Refer to [Arrow Flags](https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema.flags) + int64_t flags; + int64_t n_children; + FFI_ArrowSchema **children; + FFI_ArrowSchema *dictionary; + void (*release)(FFI_ArrowSchema *arg1); + void *private_data; }; #if defined(DEFINE_DEFAULT_ENGINE) @@ -267,35 +310,35 @@ struct FFI_ArrowSchema { /// Interface](https://arrow.apache.org/docs/format/CDataInterface.html). This includes the data and /// the schema. struct ArrowFFIData { - FFI_ArrowArray array; - FFI_ArrowSchema schema; + FFI_ArrowArray array; + FFI_ArrowSchema schema; }; #endif struct FileMeta { - KernelStringSlice path; - int64_t last_modified; - uintptr_t size; + KernelStringSlice path; + int64_t last_modified; + uintptr_t size; }; /// Model iterators. This allows an engine to specify iteration however it likes, and we simply wrap /// the engine functions. The engine retains ownership of the iterator. struct EngineIterator { - void *data; - /// A function that should advance the iterator and return the next time from the data - /// If the iterator is complete, it should return null. It should be safe to - /// call `get_next()` multiple times if it returns null. - const void *(*get_next)(void *data); + void *data; + /// A function that should advance the iterator and return the next time from the data + /// If the iterator is complete, it should return null. It should be safe to + /// call `get_next()` multiple times if it returns null. + const void *(*get_next)(void *data); }; -template -using VisitLiteralFn = void (*)(void *data, uintptr_t sibling_list_id, T value); +template +using VisitLiteralFn = void(*)(void *data, uintptr_t sibling_list_id, T value); -using VisitVariadicFn = void (*)(void *data, uintptr_t sibling_list_id, uintptr_t child_list_id); +using VisitVariadicFn = void(*)(void *data, uintptr_t sibling_list_id, uintptr_t child_list_id); -using VisitUnaryFn = void (*)(void *data, uintptr_t sibling_list_id, uintptr_t child_list_id); +using VisitUnaryFn = void(*)(void *data, uintptr_t sibling_list_id, uintptr_t child_list_id); -using VisitBinaryOpFn = void (*)(void *data, uintptr_t sibling_list_id, uintptr_t child_list_id); +using VisitBinaryOpFn = void(*)(void *data, uintptr_t sibling_list_id, uintptr_t child_list_id); /// The [`EngineExpressionVisitor`] defines a visitor system to allow engines to build their own /// representation of a kernel expression. @@ -328,126 +371,154 @@ using VisitBinaryOpFn = void (*)(void *data, uintptr_t sibling_list_id, uintptr_ /// visitor. Note that struct literals are currently in flux, and may change significantly. Here is the relevant /// issue: https://github.com/delta-io/delta-kernel-rs/issues/412 struct EngineExpressionVisitor { - /// An opaque engine state pointer - void *data; - /// Creates a new expression list, optionally reserving capacity up front - uintptr_t (*make_field_list)(void *data, uintptr_t reserve); - /// Visit a 32bit `integer` belonging to the list identified by `sibling_list_id`. - VisitLiteralFn visit_literal_int; - /// Visit a 64bit `long` belonging to the list identified by `sibling_list_id`. - VisitLiteralFn visit_literal_long; - /// Visit a 16bit `short` belonging to the list identified by `sibling_list_id`. - VisitLiteralFn visit_literal_short; - /// Visit an 8bit `byte` belonging to the list identified by `sibling_list_id`. - VisitLiteralFn visit_literal_byte; - /// Visit a 32bit `float` belonging to the list identified by `sibling_list_id`. - VisitLiteralFn visit_literal_float; - /// Visit a 64bit `double` belonging to the list identified by `sibling_list_id`. - VisitLiteralFn visit_literal_double; - /// Visit a `string` belonging to the list identified by `sibling_list_id`. - VisitLiteralFn visit_literal_string; - /// Visit a `boolean` belonging to the list identified by `sibling_list_id`. - VisitLiteralFn visit_literal_bool; - /// Visit a 64bit timestamp belonging to the list identified by `sibling_list_id`. - /// The timestamp is microsecond precision and adjusted to UTC. - VisitLiteralFn visit_literal_timestamp; - /// Visit a 64bit timestamp belonging to the list identified by `sibling_list_id`. - /// The timestamp is microsecond precision with no timezone. - VisitLiteralFn visit_literal_timestamp_ntz; - /// Visit a 32bit intger `date` representing days since UNIX epoch 1970-01-01. The `date` belongs - /// to the list identified by `sibling_list_id`. - VisitLiteralFn visit_literal_date; - /// Visit binary data at the `buffer` with length `len` belonging to the list identified by - /// `sibling_list_id`. - void (*visit_literal_binary)(void *data, uintptr_t sibling_list_id, const uint8_t *buffer, uintptr_t len); - /// Visit a 128bit `decimal` value with the given precision and scale. The 128bit integer - /// is split into the most significant 64 bits in `value_ms`, and the least significant 64 - /// bits in `value_ls`. The `decimal` belongs to the list identified by `sibling_list_id`. - void (*visit_literal_decimal)(void *data, uintptr_t sibling_list_id, uint64_t value_ms, uint64_t value_ls, - uint8_t precision, uint8_t scale); - /// Visit a struct literal belonging to the list identified by `sibling_list_id`. - /// The field names of the struct are in a list identified by `child_field_list_id`. - /// The values of the struct are in a list identified by `child_value_list_id`. - void (*visit_literal_struct)(void *data, uintptr_t sibling_list_id, uintptr_t child_field_list_id, - uintptr_t child_value_list_id); - /// Visit an array literal belonging to the list identified by `sibling_list_id`. - /// The values of the array are in a list identified by `child_list_id`. - void (*visit_literal_array)(void *data, uintptr_t sibling_list_id, uintptr_t child_list_id); - /// Visits a null value belonging to the list identified by `sibling_list_id. - void (*visit_literal_null)(void *data, uintptr_t sibling_list_id); - /// Visits an `and` expression belonging to the list identified by `sibling_list_id`. - /// The sub-expressions of the array are in a list identified by `child_list_id` - VisitVariadicFn visit_and; - /// Visits an `or` expression belonging to the list identified by `sibling_list_id`. - /// The sub-expressions of the array are in a list identified by `child_list_id` - VisitVariadicFn visit_or; - /// Visits a `not` expression belonging to the list identified by `sibling_list_id`. - /// The sub-expression will be in a _one_ item list identified by `child_list_id` - VisitUnaryFn visit_not; - /// Visits a `is_null` expression belonging to the list identified by `sibling_list_id`. - /// The sub-expression will be in a _one_ item list identified by `child_list_id` - VisitUnaryFn visit_is_null; - /// Visits the `LessThan` binary operator belonging to the list identified by `sibling_list_id`. - /// The operands will be in a _two_ item list identified by `child_list_id` - VisitBinaryOpFn visit_lt; - /// Visits the `LessThanOrEqual` binary operator belonging to the list identified by `sibling_list_id`. - /// The operands will be in a _two_ item list identified by `child_list_id` - VisitBinaryOpFn visit_le; - /// Visits the `GreaterThan` binary operator belonging to the list identified by `sibling_list_id`. - /// The operands will be in a _two_ item list identified by `child_list_id` - VisitBinaryOpFn visit_gt; - /// Visits the `GreaterThanOrEqual` binary operator belonging to the list identified by `sibling_list_id`. - /// The operands will be in a _two_ item list identified by `child_list_id` - VisitBinaryOpFn visit_ge; - /// Visits the `Equal` binary operator belonging to the list identified by `sibling_list_id`. - /// The operands will be in a _two_ item list identified by `child_list_id` - VisitBinaryOpFn visit_eq; - /// Visits the `NotEqual` binary operator belonging to the list identified by `sibling_list_id`. - /// The operands will be in a _two_ item list identified by `child_list_id` - VisitBinaryOpFn visit_ne; - /// Visits the `Distinct` binary operator belonging to the list identified by `sibling_list_id`. - /// The operands will be in a _two_ item list identified by `child_list_id` - VisitBinaryOpFn visit_distinct; - /// Visits the `In` binary operator belonging to the list identified by `sibling_list_id`. - /// The operands will be in a _two_ item list identified by `child_list_id` - VisitBinaryOpFn visit_in; - /// Visits the `NotIn` binary operator belonging to the list identified by `sibling_list_id`. - /// The operands will be in a _two_ item list identified by `child_list_id` - VisitBinaryOpFn visit_not_in; - /// Visits the `Add` binary operator belonging to the list identified by `sibling_list_id`. - /// The operands will be in a _two_ item list identified by `child_list_id` - VisitBinaryOpFn visit_add; - /// Visits the `Minus` binary operator belonging to the list identified by `sibling_list_id`. - /// The operands will be in a _two_ item list identified by `child_list_id` - VisitBinaryOpFn visit_minus; - /// Visits the `Multiply` binary operator belonging to the list identified by `sibling_list_id`. - /// The operands will be in a _two_ item list identified by `child_list_id` - VisitBinaryOpFn visit_multiply; - /// Visits the `Divide` binary operator belonging to the list identified by `sibling_list_id`. - /// The operands will be in a _two_ item list identified by `child_list_id` - VisitBinaryOpFn visit_divide; - /// Visits the `column` belonging to the list identified by `sibling_list_id`. - void (*visit_column)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visits a `StructExpression` belonging to the list identified by `sibling_list_id`. - /// The sub-expressions of the `StructExpression` are in a list identified by `child_list_id` - void (*visit_struct_expr)(void *data, uintptr_t sibling_list_id, uintptr_t child_list_id); + /// An opaque engine state pointer + void *data; + /// Creates a new expression list, optionally reserving capacity up front + uintptr_t (*make_field_list)(void *data, uintptr_t reserve); + /// Visit a 32bit `integer` belonging to the list identified by `sibling_list_id`. + VisitLiteralFn visit_literal_int; + /// Visit a 64bit `long` belonging to the list identified by `sibling_list_id`. + VisitLiteralFn visit_literal_long; + /// Visit a 16bit `short` belonging to the list identified by `sibling_list_id`. + VisitLiteralFn visit_literal_short; + /// Visit an 8bit `byte` belonging to the list identified by `sibling_list_id`. + VisitLiteralFn visit_literal_byte; + /// Visit a 32bit `float` belonging to the list identified by `sibling_list_id`. + VisitLiteralFn visit_literal_float; + /// Visit a 64bit `double` belonging to the list identified by `sibling_list_id`. + VisitLiteralFn visit_literal_double; + /// Visit a `string` belonging to the list identified by `sibling_list_id`. + VisitLiteralFn visit_literal_string; + /// Visit a `boolean` belonging to the list identified by `sibling_list_id`. + VisitLiteralFn visit_literal_bool; + /// Visit a 64bit timestamp belonging to the list identified by `sibling_list_id`. + /// The timestamp is microsecond precision and adjusted to UTC. + VisitLiteralFn visit_literal_timestamp; + /// Visit a 64bit timestamp belonging to the list identified by `sibling_list_id`. + /// The timestamp is microsecond precision with no timezone. + VisitLiteralFn visit_literal_timestamp_ntz; + /// Visit a 32bit intger `date` representing days since UNIX epoch 1970-01-01. The `date` belongs + /// to the list identified by `sibling_list_id`. + VisitLiteralFn visit_literal_date; + /// Visit binary data at the `buffer` with length `len` belonging to the list identified by + /// `sibling_list_id`. + void (*visit_literal_binary)(void *data, + uintptr_t sibling_list_id, + const uint8_t *buffer, + uintptr_t len); + /// Visit a 128bit `decimal` value with the given precision and scale. The 128bit integer + /// is split into the most significant 64 bits in `value_ms`, and the least significant 64 + /// bits in `value_ls`. The `decimal` belongs to the list identified by `sibling_list_id`. + void (*visit_literal_decimal)(void *data, + uintptr_t sibling_list_id, + uint64_t value_ms, + uint64_t value_ls, + uint8_t precision, + uint8_t scale); + /// Visit a struct literal belonging to the list identified by `sibling_list_id`. + /// The field names of the struct are in a list identified by `child_field_list_id`. + /// The values of the struct are in a list identified by `child_value_list_id`. + void (*visit_literal_struct)(void *data, + uintptr_t sibling_list_id, + uintptr_t child_field_list_id, + uintptr_t child_value_list_id); + /// Visit an array literal belonging to the list identified by `sibling_list_id`. + /// The values of the array are in a list identified by `child_list_id`. + void (*visit_literal_array)(void *data, uintptr_t sibling_list_id, uintptr_t child_list_id); + /// Visits a null value belonging to the list identified by `sibling_list_id. + void (*visit_literal_null)(void *data, uintptr_t sibling_list_id); + /// Visits an `and` expression belonging to the list identified by `sibling_list_id`. + /// The sub-expressions of the array are in a list identified by `child_list_id` + VisitVariadicFn visit_and; + /// Visits an `or` expression belonging to the list identified by `sibling_list_id`. + /// The sub-expressions of the array are in a list identified by `child_list_id` + VisitVariadicFn visit_or; + /// Visits a `not` expression belonging to the list identified by `sibling_list_id`. + /// The sub-expression will be in a _one_ item list identified by `child_list_id` + VisitUnaryFn visit_not; + /// Visits a `is_null` expression belonging to the list identified by `sibling_list_id`. + /// The sub-expression will be in a _one_ item list identified by `child_list_id` + VisitUnaryFn visit_is_null; + /// Visits the `LessThan` binary operator belonging to the list identified by `sibling_list_id`. + /// The operands will be in a _two_ item list identified by `child_list_id` + VisitBinaryOpFn visit_lt; + /// Visits the `LessThanOrEqual` binary operator belonging to the list identified by `sibling_list_id`. + /// The operands will be in a _two_ item list identified by `child_list_id` + VisitBinaryOpFn visit_le; + /// Visits the `GreaterThan` binary operator belonging to the list identified by `sibling_list_id`. + /// The operands will be in a _two_ item list identified by `child_list_id` + VisitBinaryOpFn visit_gt; + /// Visits the `GreaterThanOrEqual` binary operator belonging to the list identified by `sibling_list_id`. + /// The operands will be in a _two_ item list identified by `child_list_id` + VisitBinaryOpFn visit_ge; + /// Visits the `Equal` binary operator belonging to the list identified by `sibling_list_id`. + /// The operands will be in a _two_ item list identified by `child_list_id` + VisitBinaryOpFn visit_eq; + /// Visits the `NotEqual` binary operator belonging to the list identified by `sibling_list_id`. + /// The operands will be in a _two_ item list identified by `child_list_id` + VisitBinaryOpFn visit_ne; + /// Visits the `Distinct` binary operator belonging to the list identified by `sibling_list_id`. + /// The operands will be in a _two_ item list identified by `child_list_id` + VisitBinaryOpFn visit_distinct; + /// Visits the `In` binary operator belonging to the list identified by `sibling_list_id`. + /// The operands will be in a _two_ item list identified by `child_list_id` + VisitBinaryOpFn visit_in; + /// Visits the `NotIn` binary operator belonging to the list identified by `sibling_list_id`. + /// The operands will be in a _two_ item list identified by `child_list_id` + VisitBinaryOpFn visit_not_in; + /// Visits the `Add` binary operator belonging to the list identified by `sibling_list_id`. + /// The operands will be in a _two_ item list identified by `child_list_id` + VisitBinaryOpFn visit_add; + /// Visits the `Minus` binary operator belonging to the list identified by `sibling_list_id`. + /// The operands will be in a _two_ item list identified by `child_list_id` + VisitBinaryOpFn visit_minus; + /// Visits the `Multiply` binary operator belonging to the list identified by `sibling_list_id`. + /// The operands will be in a _two_ item list identified by `child_list_id` + VisitBinaryOpFn visit_multiply; + /// Visits the `Divide` binary operator belonging to the list identified by `sibling_list_id`. + /// The operands will be in a _two_ item list identified by `child_list_id` + VisitBinaryOpFn visit_divide; + /// Visits the `column` belonging to the list identified by `sibling_list_id`. + void (*visit_column)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visits a `StructExpression` belonging to the list identified by `sibling_list_id`. + /// The sub-expressions of the `StructExpression` are in a list identified by `child_list_id` + void (*visit_struct_expr)(void *data, uintptr_t sibling_list_id, uintptr_t child_list_id); }; // This trickery is from https://github.com/mozilla/cbindgen/issues/402#issuecomment-578680163 struct im_an_unused_struct_that_tricks_msvc_into_compilation { - ExternResult field; - ExternResult field2; - ExternResult field3; - ExternResult> field4; - ExternResult> field5; - ExternResult field6; - ExternResult field7; - ExternResult> field8; - ExternResult> field9; - ExternResult> field10; - ExternResult field11; + ExternResult field; + ExternResult field2; + ExternResult field3; + ExternResult> field4; + ExternResult> field5; + ExternResult field6; + ExternResult field7; + ExternResult> field8; + ExternResult> field9; + ExternResult> field10; + ExternResult field11; }; +/// An `Event` can generally be thought of a "log message". It contains all the relevant bits such +/// that an engine can generate a log message in its format +struct Event { + /// The log message associated with the event + KernelStringSlice message; + /// Level that the event was emitted at + Level level; + /// A string that specifies in what part of the system the event occurred + KernelStringSlice target; + /// source file line number where the event occurred, or 0 (zero) if unknown + uint32_t line; + /// file where the event occurred. If unknown the slice `ptr` will be null and the len will be 0 + KernelStringSlice file; +}; + +using TracingEventFn = void(*)(Event event); + +using TracingLogLineFn = void(*)(KernelStringSlice line); + /// A predicate that can be used to skip data when scanning. /// /// When invoking [`scan::scan`], The engine provides a pointer to the (engine's native) predicate, @@ -459,21 +530,25 @@ struct im_an_unused_struct_that_tricks_msvc_into_compilation { /// kernel each retain ownership of their respective objects, with no need to coordinate memory /// lifetimes with the other. struct EnginePredicate { - void *predicate; - uintptr_t (*visitor)(void *predicate, KernelExpressionVisitorState *state); + void *predicate; + uintptr_t (*visitor)(void *predicate, KernelExpressionVisitorState *state); }; /// Give engines an easy way to consume stats struct Stats { - /// For any file where the deletion vector is not present (see [`DvInfo::has_vector`]), the - /// `num_records` statistic must be present and accurate, and must equal the number of records - /// in the data file. In the presence of Deletion Vectors the statistics may be somewhat - /// outdated, i.e. not reflecting deleted rows yet. - uint64_t num_records; + /// For any file where the deletion vector is not present (see [`DvInfo::has_vector`]), the + /// `num_records` statistic must be present and accurate, and must equal the number of records + /// in the data file. In the presence of Deletion Vectors the statistics may be somewhat + /// outdated, i.e. not reflecting deleted rows yet. + uint64_t num_records; }; -using CScanCallback = void (*)(NullableCvoid engine_context, KernelStringSlice path, int64_t size, const Stats *stats, - const DvInfo *dv_info, const CStringMap *partition_map); +using CScanCallback = void(*)(NullableCvoid engine_context, + KernelStringSlice path, + int64_t size, + const Stats *stats, + const DvInfo *dv_info, + const CStringMap *partition_map); /// The `EngineSchemaVisitor` defines a visitor system to allow engines to build their own /// representation of a schema from a particular schema within kernel. @@ -501,49 +576,61 @@ using CScanCallback = void (*)(NullableCvoid engine_context, KernelStringSlice p /// that element's (already-visited) children. /// 4. The [`visit_schema`] method returns the id of the list of top-level columns struct EngineSchemaVisitor { - /// opaque state pointer - void *data; - /// Creates a new field list, optionally reserving capacity up front - uintptr_t (*make_field_list)(void *data, uintptr_t reserve); - /// Indicate that the schema contains a `Struct` type. The top level of a Schema is always a - /// `Struct`. The fields of the `Struct` are in the list identified by `child_list_id`. - void (*visit_struct)(void *data, uintptr_t sibling_list_id, KernelStringSlice name, uintptr_t child_list_id); - /// Indicate that the schema contains an Array type. `child_list_id` will be a _one_ item list - /// with the array's element type - void (*visit_array)(void *data, uintptr_t sibling_list_id, KernelStringSlice name, bool contains_null, - uintptr_t child_list_id); - /// Indicate that the schema contains an Map type. `child_list_id` will be a _two_ item list - /// where the first element is the map's key type and the second element is the - /// map's value type - void (*visit_map)(void *data, uintptr_t sibling_list_id, KernelStringSlice name, bool value_contains_null, - uintptr_t child_list_id); - /// visit a `decimal` with the specified `precision` and `scale` - void (*visit_decimal)(void *data, uintptr_t sibling_list_id, KernelStringSlice name, uint8_t precision, - uint8_t scale); - /// Visit a `string` belonging to the list identified by `sibling_list_id`. - void (*visit_string)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit a `long` belonging to the list identified by `sibling_list_id`. - void (*visit_long)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit an `integer` belonging to the list identified by `sibling_list_id`. - void (*visit_integer)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit a `short` belonging to the list identified by `sibling_list_id`. - void (*visit_short)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit a `byte` belonging to the list identified by `sibling_list_id`. - void (*visit_byte)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit a `float` belonging to the list identified by `sibling_list_id`. - void (*visit_float)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit a `double` belonging to the list identified by `sibling_list_id`. - void (*visit_double)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit a `boolean` belonging to the list identified by `sibling_list_id`. - void (*visit_boolean)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit `binary` belonging to the list identified by `sibling_list_id`. - void (*visit_binary)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit a `date` belonging to the list identified by `sibling_list_id`. - void (*visit_date)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit a `timestamp` belonging to the list identified by `sibling_list_id`. - void (*visit_timestamp)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit a `timestamp` with no timezone belonging to the list identified by `sibling_list_id`. - void (*visit_timestamp_ntz)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// opaque state pointer + void *data; + /// Creates a new field list, optionally reserving capacity up front + uintptr_t (*make_field_list)(void *data, uintptr_t reserve); + /// Indicate that the schema contains a `Struct` type. The top level of a Schema is always a + /// `Struct`. The fields of the `Struct` are in the list identified by `child_list_id`. + void (*visit_struct)(void *data, + uintptr_t sibling_list_id, + KernelStringSlice name, + uintptr_t child_list_id); + /// Indicate that the schema contains an Array type. `child_list_id` will be a _one_ item list + /// with the array's element type + void (*visit_array)(void *data, + uintptr_t sibling_list_id, + KernelStringSlice name, + bool contains_null, + uintptr_t child_list_id); + /// Indicate that the schema contains an Map type. `child_list_id` will be a _two_ item list + /// where the first element is the map's key type and the second element is the + /// map's value type + void (*visit_map)(void *data, + uintptr_t sibling_list_id, + KernelStringSlice name, + bool value_contains_null, + uintptr_t child_list_id); + /// visit a `decimal` with the specified `precision` and `scale` + void (*visit_decimal)(void *data, + uintptr_t sibling_list_id, + KernelStringSlice name, + uint8_t precision, + uint8_t scale); + /// Visit a `string` belonging to the list identified by `sibling_list_id`. + void (*visit_string)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit a `long` belonging to the list identified by `sibling_list_id`. + void (*visit_long)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit an `integer` belonging to the list identified by `sibling_list_id`. + void (*visit_integer)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit a `short` belonging to the list identified by `sibling_list_id`. + void (*visit_short)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit a `byte` belonging to the list identified by `sibling_list_id`. + void (*visit_byte)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit a `float` belonging to the list identified by `sibling_list_id`. + void (*visit_float)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit a `double` belonging to the list identified by `sibling_list_id`. + void (*visit_double)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit a `boolean` belonging to the list identified by `sibling_list_id`. + void (*visit_boolean)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit `binary` belonging to the list identified by `sibling_list_id`. + void (*visit_binary)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit a `date` belonging to the list identified by `sibling_list_id`. + void (*visit_date)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit a `timestamp` belonging to the list identified by `sibling_list_id`. + void (*visit_timestamp)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit a `timestamp` with no timezone belonging to the list identified by `sibling_list_id`. + void (*visit_timestamp_ntz)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); }; extern "C" { @@ -572,7 +659,8 @@ void free_engine_data(Handle engine_data); /// /// # Safety /// Caller is responsible for passing a valid path pointer. -ExternResult get_engine_builder(KernelStringSlice path, AllocateErrorFn allocate_error); +ExternResult get_engine_builder(KernelStringSlice path, + AllocateErrorFn allocate_error); #endif #if defined(DEFINE_DEFAULT_ENGINE) @@ -599,7 +687,8 @@ ExternResult> builder_build(EngineBuilder *builder); /// # Safety /// /// Caller is responsible for passing a valid path pointer. -ExternResult> get_default_engine(KernelStringSlice path, AllocateErrorFn allocate_error); +ExternResult> get_default_engine(KernelStringSlice path, + AllocateErrorFn allocate_error); #endif #if defined(DEFINE_SYNC_ENGINE) @@ -619,7 +708,8 @@ void free_engine(Handle engine); /// # Safety /// /// Caller is responsible for passing valid handles and path pointer. -ExternResult> snapshot(KernelStringSlice path, Handle engine); +ExternResult> snapshot(KernelStringSlice path, + Handle engine); /// # Safety /// @@ -645,7 +735,8 @@ NullableCvoid snapshot_table_root(Handle snapshot, AllocateStrin /// /// The iterator must be valid (returned by [kernel_scan_data_init]) and not yet freed by /// [kernel_scan_data_free]. The visitor function pointer must be non-null. -bool string_slice_next(Handle data, NullableCvoid engine_context, +bool string_slice_next(Handle data, + NullableCvoid engine_context, void (*engine_visitor)(NullableCvoid engine_context, KernelStringSlice slice)); /// # Safety @@ -676,7 +767,8 @@ void *get_raw_engine_data(Handle data); /// # Safety /// data_handle must be a valid ExclusiveEngineData as read by the /// [`delta_kernel::engine::default::DefaultEngine`] obtained from `get_default_engine`. -ExternResult get_raw_arrow_data(Handle data, Handle engine); +ExternResult get_raw_arrow_data(Handle data, + Handle engine); #endif /// Call the engine back with the next `EngingeData` batch read by Parquet/Json handler. The @@ -688,7 +780,8 @@ ExternResult get_raw_arrow_data(Handle data /// /// The iterator must be valid (returned by [`read_parquet_file`]) and not yet freed by /// [`free_read_result_iter`]. The visitor function pointer must be non-null. -ExternResult read_result_next(Handle data, NullableCvoid engine_context, +ExternResult read_result_next(Handle data, + NullableCvoid engine_context, void (*engine_visitor)(NullableCvoid engine_context, Handle engine_data)); @@ -703,8 +796,9 @@ void free_read_result_iter(Handle data); /// /// # Safety /// Caller is responsible for calling with a valid `ExternEngineHandle` and `FileMeta` -ExternResult> -read_parquet_file(Handle engine, const FileMeta *file, Handle physical_schema); +ExternResult> read_parquet_file(Handle engine, + const FileMeta *file, + Handle physical_schema); uintptr_t visit_expression_and(KernelExpressionVisitorState *state, EngineIterator *children); @@ -720,7 +814,8 @@ uintptr_t visit_expression_eq(KernelExpressionVisitorState *state, uintptr_t a, /// # Safety /// The string slice must be valid -ExternResult visit_expression_column(KernelExpressionVisitorState *state, KernelStringSlice name, +ExternResult visit_expression_column(KernelExpressionVisitorState *state, + KernelStringSlice name, AllocateErrorFn allocate_error); uintptr_t visit_expression_not(KernelExpressionVisitorState *state, uintptr_t inner_expr); @@ -729,7 +824,8 @@ uintptr_t visit_expression_is_null(KernelExpressionVisitorState *state, uintptr_ /// # Safety /// The string slice must be valid -ExternResult visit_expression_literal_string(KernelExpressionVisitorState *state, KernelStringSlice value, +ExternResult visit_expression_literal_string(KernelExpressionVisitorState *state, + KernelStringSlice value, AllocateErrorFn allocate_error); uintptr_t visit_expression_literal_int(KernelExpressionVisitorState *state, int32_t value); @@ -761,7 +857,84 @@ void free_kernel_predicate(Handle data); /// # Safety /// /// The caller must pass a valid SharedExpression Handle and expression visitor -uintptr_t visit_expression(const Handle *expression, EngineExpressionVisitor *visitor); +uintptr_t visit_expression(const Handle *expression, + EngineExpressionVisitor *visitor); + +/// Enable getting called back for tracing (logging) events in the kernel. `max_level` specifies +/// that only events `<=` to the specified level should be reported. More verbose Levels are "greater +/// than" less verbose ones. So Level::ERROR is the lowest, and Level::TRACE the highest. +/// +/// Note that setting up such a call back can only be done ONCE. Calling any of +/// `enable_event_tracing`, `enable_log_line_tracing`, or `enable_formatted_log_line_tracing` more +/// than once is a no-op. +/// +/// Returns `true` if the callback was setup successfully, false on failure (i.e. if called a second +/// time) +/// +/// [`event`] based tracing gives an engine maximal flexibility in formatting event log +/// lines. Kernel can also format events for the engine. If this is desired call +/// [`enable_log_line_tracing`] instead of this method. +/// +/// # Safety +/// Caller must pass a valid function pointer for the callback +bool enable_event_tracing(TracingEventFn callback, + Level max_level); + +/// Enable getting called back with log lines in the kernel using default settings: +/// - FULL format +/// - include ansi color +/// - include timestamps +/// - include level +/// - include target +/// +/// `max_level` specifies that only logs `<=` to the specified level should be reported. More +/// verbose Levels are "greater than" less verbose ones. So Level::ERROR is the lowest, and +/// Level::TRACE the highest. +/// +/// Log lines passed to the callback will already have a newline at the end. +/// +/// Note that setting up such a call back can only be done ONCE. Calling any of +/// `enable_event_tracing`, `enable_log_line_tracing`, or `enable_formatted_log_line_tracing` more +/// than once is a no-op. +/// +/// Returns `true` if the callback was setup successfully, false on failure (i.e. if called a second +/// time) +/// +/// Log line based tracing is simple for an engine as it can just log the passed string, but does +/// not provide flexibility for an engine to format events. If the engine wants to use a specific +/// format for events it should call [`enable_event_tracing`] instead of this function. +/// +/// # Safety +/// Caller must pass a valid function pointer for the callback +bool enable_log_line_tracing(TracingLogLineFn callback, Level max_level); + +/// Enable getting called back with log lines in the kernel. This variant allows specifying +/// formatting options for the log lines. See [`enable_log_line_tracing`] for general info on +/// getting called back for log lines. +/// +/// Note that setting up such a call back can only be done ONCE. Calling any of +/// `enable_event_tracing`, `enable_log_line_tracing`, or `enable_formatted_log_line_tracing` more +/// than once is a no-op. +/// +/// Returns `true` if the callback was setup successfully, false on failure (i.e. if called a second +/// time) +/// +/// Options that can be set: +/// - `format`: see [`LogLineFormat`] +/// - `ansi`: should the formatter use ansi escapes for color +/// - `with_time`: should the formatter include a timestamp in the log message +/// - `with_level`: should the formatter include the level in the log message +/// - `with_target`: should the formatter include what part of the system the event occurred +/// +/// # Safety +/// Caller must pass a valid function pointer for the callback +bool enable_formatted_log_line_tracing(TracingLogLineFn callback, + Level max_level, + LogLineFormat format, + bool ansi, + bool with_time, + bool with_level, + bool with_target); /// Drops a scan. /// # Safety @@ -774,7 +947,8 @@ void free_scan(Handle scan); /// # Safety /// /// Caller is responsible for passing a valid snapshot pointer, and engine pointer -ExternResult> scan(Handle snapshot, Handle engine, +ExternResult> scan(Handle snapshot, + Handle engine, EnginePredicate *predicate); /// Get the global state for a scan. See the docs for [`delta_kernel::scan::state::GlobalScanState`] @@ -832,7 +1006,8 @@ ExternResult> kernel_scan_data_init(Handle kernel_scan_data_next(Handle data, NullableCvoid engine_context, +ExternResult kernel_scan_data_next(Handle data, + NullableCvoid engine_context, void (*engine_visitor)(NullableCvoid engine_context, Handle engine_data, KernelBoolSlice selection_vector)); @@ -850,20 +1025,24 @@ void free_kernel_scan_data(Handle data); /// # Safety /// /// The engine is responsible for providing a valid [`CStringMap`] pointer and [`KernelStringSlice`] -NullableCvoid get_from_map(const CStringMap *map, KernelStringSlice key, AllocateStringFn allocate_fn); +NullableCvoid get_from_map(const CStringMap *map, + KernelStringSlice key, + AllocateStringFn allocate_fn); /// Get a selection vector out of a [`DvInfo`] struct /// /// # Safety /// Engine is responsible for providing valid pointers for each argument -ExternResult selection_vector_from_dv(const DvInfo *dv_info, Handle engine, +ExternResult selection_vector_from_dv(const DvInfo *dv_info, + Handle engine, Handle state); /// Get a vector of row indexes out of a [`DvInfo`] struct /// /// # Safety /// Engine is responsible for providing valid pointers for each argument -ExternResult row_indexes_from_dv(const DvInfo *dv_info, Handle engine, +ExternResult row_indexes_from_dv(const DvInfo *dv_info, + Handle engine, Handle state); /// Shim for ffi to call visit_scan_data. This will generally be called when iterating through scan @@ -871,7 +1050,9 @@ ExternResult row_indexes_from_dv(const DvInfo *dv_info, Han /// /// # Safety /// engine is responsbile for passing a valid [`ExclusiveEngineData`] and selection vector. -void visit_scan_data(Handle data, KernelBoolSlice selection_vec, NullableCvoid engine_context, +void visit_scan_data(Handle data, + KernelBoolSlice selection_vec, + NullableCvoid engine_context, CScanCallback callback); /// Visit the schema of the passed `SnapshotHandle`, using the provided `visitor`. See the @@ -892,6 +1073,6 @@ uintptr_t visit_schema(Handle snapshot, EngineSchemaVisitor *vis /// [`free_kernel_predicate`], or [`Handle::drop_handle`] Handle get_testing_kernel_expression(); -} // extern "C" +} // extern "C" -} // namespace ffi +} // namespace ffi diff --git a/src/include/delta_utils.hpp b/src/include/delta_utils.hpp index 5332bd8..5fa1fdf 100644 --- a/src/include/delta_utils.hpp +++ b/src/include/delta_utils.hpp @@ -301,11 +301,11 @@ struct KernelUtils { if (result.err._0) { auto error_cast = static_cast(result.err._0); error_cast->Throw(from_where); - } else { - throw IOException("Hit DeltaKernel FFI error (from: %s): Hit error, but error was nullptr", - from_where.c_str()); } - } else if (result.tag == ffi::ExternResult::Tag::Ok) { + throw IOException("Hit DeltaKernel FFI error (from: %s): Hit error, but error was nullptr", + from_where.c_str()); + } + if (result.tag == ffi::ExternResult::Tag::Ok) { return result.ok._0; } throw IOException("Invalid error ExternResult tag found!"); diff --git a/src/include/functions/delta_scan.hpp b/src/include/functions/delta_scan.hpp index e9e89da..fe842d3 100644 --- a/src/include/functions/delta_scan.hpp +++ b/src/include/functions/delta_scan.hpp @@ -43,7 +43,7 @@ struct DeltaFileMetaData { //! The DeltaSnapshot implements the MultiFileList API to allow injecting it into the regular DuckDB parquet scan struct DeltaSnapshot : public MultiFileList { DeltaSnapshot(ClientContext &context, const string &path); - string GetPath(); + string GetPath() const; static string ToDuckDBPath(const string &raw_path); static string ToDeltaPath(const string &raw_path); @@ -58,12 +58,15 @@ struct DeltaSnapshot : public MultiFileList { idx_t GetTotalFileCount() override; unique_ptr GetCardinality(ClientContext &context) override; + idx_t GetVersion(); + DeltaFileMetaData &GetMetaData(idx_t index) const; protected: //! Get the i-th expanded file string GetFile(idx_t i) override; protected: + string GetFileInternal(idx_t i); void InitializeSnapshot(); void InitializeScan(); @@ -73,8 +76,15 @@ struct DeltaSnapshot : public MultiFileList { result, StringUtil::Format("While trying to read from delta table: '%s'", paths[0])); } - // TODO: change back to protected -public: + static void VisitData(void *engine_context, ffi::ExclusiveEngineData *engine_data, + const struct ffi::KernelBoolSlice selection_vec); + static void VisitCallback(ffi::NullableCvoid engine_context, struct ffi::KernelStringSlice path, int64_t size, + const ffi::Stats *stats, const ffi::DvInfo *dv_info, + const struct ffi::CStringMap *partition_values); + +protected: + mutable mutex lock; + idx_t version; //! Delta Kernel Structures diff --git a/src/storage/delta_catalog.cpp b/src/storage/delta_catalog.cpp index 53b1195..44e03e7 100644 --- a/src/storage/delta_catalog.cpp +++ b/src/storage/delta_catalog.cpp @@ -64,12 +64,12 @@ optional_idx DeltaCatalog::GetCatalogVersion(ClientContext &context) { // Option 1: snapshot is cached table-wide auto cached_snapshot = main_schema->GetCachedTable(); if (cached_snapshot) { - return cached_snapshot->snapshot->version; + return cached_snapshot->snapshot->GetVersion(); } // Option 2: snapshot is cached in transaction if (delta_transaction.table_entry) { - return delta_transaction.table_entry->snapshot->version; + return delta_transaction.table_entry->snapshot->GetVersion(); } return {}; diff --git a/test/sql/cloud/gcs.test b/test/sql/cloud/gcs.test new file mode 100644 index 0000000..5a12aff --- /dev/null +++ b/test/sql/cloud/gcs.test @@ -0,0 +1,22 @@ +# name: test/sql/cloud/minio_local/gcs_r2.test +# description: test delta extension with GCS and R2 +# group: [aws] + +require httpfs + +require parquet + +require delta + +statement ok +CREATE SECRET ( + TYPE GCS, + KEY_ID 'SOME_KEY', + SECRET 'SOME SECRET' +); + +statement error +select col1, col2 from delta_scan('gcs://some-bucket/some-table'); +---- +403 + diff --git a/test/sql/generated/attach_parallel.test b/test/sql/generated/attach_parallel.test new file mode 100644 index 0000000..37a5fbb --- /dev/null +++ b/test/sql/generated/attach_parallel.test @@ -0,0 +1,100 @@ +# name: test/sql/generated/attach_parallel.test +# description: Test attaching a delta table and reading from it in parallel +# group: [dat] + +require parquet + +require delta + +require-env GENERATED_DATA_AVAILABLE + +statement ok +pragma threads=10; + +statement ok +ATTACH 'data/generated/simple_partitioned/delta_lake/' as dt (TYPE delta) + +statement ok +ATTACH 'data/generated/simple_partitioned/delta_lake/' as dt_pinned (TYPE delta, PIN_SNAPSHOT) + +concurrentloop threadid 0 20 + +query I +WITH RECURSIVE ctename AS ( + SELECT *, 1 as recursiondepth + FROM dt + UNION ALL + SELECT * EXCLUDE (c2.recursiondepth), c2.recursiondepth + 1 as recursiondepth + FROM ctename as c2 + WHERE c2.recursiondepth < 8 +) +SELECT count(i) FROM ctename; +---- +80 + +query I +WITH RECURSIVE ctename AS ( + SELECT *, 1 as recursiondepth + FROM dt_pinned + UNION ALL + SELECT * EXCLUDE (c2.recursiondepth), c2.recursiondepth + 1 as recursiondepth + FROM ctename as c2 + WHERE c2.recursiondepth < 8 +) +SELECT count(i) FROM ctename; +---- +80 + +endloop + +concurrentloop threadid 0 20 + +query I +SELECT count(i) FROM dt UNION ALL +SELECT count(i) FROM dt UNION ALL +SELECT count(i) FROM dt UNION ALL +SELECT count(i) FROM dt UNION ALL +SELECT count(i) FROM dt UNION ALL +SELECT count(i) FROM dt UNION ALL +SELECT count(i) FROM dt UNION ALL +SELECT count(i) FROM dt UNION ALL +SELECT count(i) FROM dt UNION ALL +SELECT count(i) FROM dt +---- +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 + +query I +SELECT count(i) FROM dt_pinned UNION ALL +SELECT count(i) FROM dt_pinned UNION ALL +SELECT count(i) FROM dt_pinned UNION ALL +SELECT count(i) FROM dt_pinned UNION ALL +SELECT count(i) FROM dt_pinned UNION ALL +SELECT count(i) FROM dt_pinned UNION ALL +SELECT count(i) FROM dt_pinned UNION ALL +SELECT count(i) FROM dt_pinned UNION ALL +SELECT count(i) FROM dt_pinned UNION ALL +SELECT count(i) FROM dt_pinned +---- +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 + + + +endloop \ No newline at end of file