diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 9dabe4e8800..4d83cbd907c 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -336,6 +336,7 @@ add_library( src/aggregation/result_cache.cpp src/ast/expression_parser.cpp src/ast/expressions.cpp + src/ast/operators.cpp src/binaryop/binaryop.cpp src/binaryop/compiled/ATan2.cu src/binaryop/compiled/Add.cu @@ -477,13 +478,13 @@ add_library( src/io/avro/reader_impl.cu src/io/comp/brotli_dict.cpp src/io/comp/comp.cpp + src/io/comp/comp.cu src/io/comp/cpu_unbz2.cpp src/io/comp/debrotli.cu src/io/comp/gpuinflate.cu src/io/comp/nvcomp_adapter.cpp src/io/comp/nvcomp_adapter.cu src/io/comp/snap.cu - src/io/comp/statistics.cu src/io/comp/uncomp.cpp src/io/comp/unsnap.cu src/io/csv/csv_gpu.cu diff --git a/cpp/include/cudf/ast/detail/expression_evaluator.cuh b/cpp/include/cudf/ast/detail/expression_evaluator.cuh index 9d8762555d7..001b604814c 100644 --- a/cpp/include/cudf/ast/detail/expression_evaluator.cuh +++ b/cpp/include/cudf/ast/detail/expression_evaluator.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2024, NVIDIA CORPORATION. + * Copyright (c) 2021-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -452,7 +452,7 @@ struct expression_evaluator { ++operator_index) { // Execute operator auto const op = plan.operators[operator_index]; - auto const arity = ast_operator_arity(op); + auto const arity = plan.operator_arities[operator_index]; if (arity == 1) { // Unary operator auto const& input = diff --git a/cpp/include/cudf/ast/detail/expression_parser.hpp b/cpp/include/cudf/ast/detail/expression_parser.hpp index b5973d0ace9..d2e8c1cd41f 100644 --- a/cpp/include/cudf/ast/detail/expression_parser.hpp +++ b/cpp/include/cudf/ast/detail/expression_parser.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2024, NVIDIA CORPORATION. + * Copyright (c) 2020-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ #include #include +#include #include #include #include @@ -88,6 +89,7 @@ struct expression_device_view { device_span data_references; device_span literals; device_span operators; + device_span operator_arities; device_span operator_source_indices; cudf::size_type num_intermediates; }; @@ -229,39 +231,55 @@ class expression_parser { * @param[in] v The `std::vector` containing components (operators, literals, etc). * @param[in,out] sizes The `std::vector` containing the size of each data buffer. * @param[in,out] data_pointers The `std::vector` containing pointers to each data buffer. + * @param[in,out] alignment The maximum alignment needed for all the extracted size and pointers */ template void extract_size_and_pointer(std::vector const& v, std::vector& sizes, - std::vector& data_pointers) + std::vector& data_pointers, + cudf::size_type& alignment) { + // sub-type alignment will only work provided the alignment is lesser or equal to + // alignof(max_align_t) which is the maximum alignment provided by rmm's device buffers + static_assert(alignof(T) <= alignof(max_align_t)); auto const data_size = sizeof(T) * v.size(); sizes.push_back(data_size); data_pointers.push_back(v.data()); + alignment = std::max(alignment, static_cast(alignof(T))); } void move_to_device(rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { std::vector sizes; std::vector data_pointers; + // use a minimum of 4-byte alignment + cudf::size_type buffer_alignment = 4; - extract_size_and_pointer(_data_references, sizes, data_pointers); - extract_size_and_pointer(_literals, sizes, data_pointers); - extract_size_and_pointer(_operators, sizes, data_pointers); - extract_size_and_pointer(_operator_source_indices, sizes, data_pointers); + extract_size_and_pointer(_data_references, sizes, data_pointers, buffer_alignment); + extract_size_and_pointer(_literals, sizes, data_pointers, buffer_alignment); + extract_size_and_pointer(_operators, sizes, data_pointers, buffer_alignment); + extract_size_and_pointer(_operator_arities, sizes, data_pointers, buffer_alignment); + extract_size_and_pointer(_operator_source_indices, sizes, data_pointers, buffer_alignment); // Create device buffer - auto const buffer_size = std::accumulate(sizes.cbegin(), sizes.cend(), 0); - auto buffer_offsets = std::vector(sizes.size()); - thrust::exclusive_scan(sizes.cbegin(), sizes.cend(), buffer_offsets.begin(), 0); + auto buffer_offsets = std::vector(sizes.size()); + thrust::exclusive_scan(sizes.cbegin(), + sizes.cend(), + buffer_offsets.begin(), + cudf::size_type{0}, + [buffer_alignment](auto a, auto b) { + // align each component of the AST program + return cudf::util::round_up_safe(a + b, buffer_alignment); + }); + + auto const buffer_size = buffer_offsets.empty() ? 0 : (buffer_offsets.back() + sizes.back()); + auto host_data_buffer = std::vector(buffer_size); - auto h_data_buffer = std::vector(buffer_size); for (unsigned int i = 0; i < data_pointers.size(); ++i) { - std::memcpy(h_data_buffer.data() + buffer_offsets[i], data_pointers[i], sizes[i]); + std::memcpy(host_data_buffer.data() + buffer_offsets[i], data_pointers[i], sizes[i]); } - _device_data_buffer = rmm::device_buffer(h_data_buffer.data(), buffer_size, stream, mr); - + _device_data_buffer = rmm::device_buffer(host_data_buffer.data(), buffer_size, stream, mr); stream.synchronize(); // Create device pointers to components of plan @@ -277,8 +295,11 @@ class expression_parser { device_expression_data.operators = device_span( reinterpret_cast(device_data_buffer_ptr + buffer_offsets[2]), _operators.size()); - device_expression_data.operator_source_indices = device_span( + device_expression_data.operator_arities = device_span( reinterpret_cast(device_data_buffer_ptr + buffer_offsets[3]), + _operators.size()); + device_expression_data.operator_source_indices = device_span( + reinterpret_cast(device_data_buffer_ptr + buffer_offsets[4]), _operator_source_indices.size()); device_expression_data.num_intermediates = _intermediate_counter.get_max_used(); shmem_per_thread = static_cast( @@ -322,6 +343,7 @@ class expression_parser { bool _has_nulls; std::vector _data_references; std::vector _operators; + std::vector _operator_arities; std::vector _operator_source_indices; std::vector _literals; }; diff --git a/cpp/include/cudf/ast/detail/operators.hpp b/cpp/include/cudf/ast/detail/operators.hpp index 46507700e21..db04e1fe989 100644 --- a/cpp/include/cudf/ast/detail/operators.hpp +++ b/cpp/include/cudf/ast/detail/operators.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2024, NVIDIA CORPORATION. + * Copyright (c) 2020-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -69,159 +69,111 @@ constexpr bool is_valid_unary_op = cuda::std::is_invocable_v; * @param args Forwarded arguments to `operator()` of `f`. */ template -CUDF_HOST_DEVICE inline constexpr void ast_operator_dispatcher(ast_operator op, F&& f, Ts&&... args) +CUDF_HOST_DEVICE inline constexpr decltype(auto) ast_operator_dispatcher(ast_operator op, + F&& f, + Ts&&... args) { switch (op) { case ast_operator::ADD: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::SUB: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::MUL: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::DIV: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::TRUE_DIV: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::FLOOR_DIV: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::MOD: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::PYMOD: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::POW: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::EQUAL: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::NULL_EQUAL: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::NOT_EQUAL: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::LESS: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::GREATER: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::LESS_EQUAL: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::GREATER_EQUAL: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::BITWISE_AND: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::BITWISE_OR: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::BITWISE_XOR: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::LOGICAL_AND: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::NULL_LOGICAL_AND: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::LOGICAL_OR: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::NULL_LOGICAL_OR: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::IDENTITY: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::IS_NULL: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::SIN: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::COS: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::TAN: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::ARCSIN: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::ARCCOS: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::ARCTAN: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::SINH: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::COSH: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::TANH: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::ARCSINH: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::ARCCOSH: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::ARCTANH: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::EXP: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::LOG: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::SQRT: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::CBRT: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::CEIL: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::FLOOR: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::ABS: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::RINT: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::BIT_INVERT: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::NOT: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::CAST_TO_INT64: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::CAST_TO_UINT64: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::CAST_TO_FLOAT64: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); default: { #ifndef __CUDA_ARCH__ CUDF_FAIL("Invalid operator."); @@ -955,231 +907,6 @@ struct operator_functor { } }; -/** - * @brief Functor used to single-type-dispatch binary operators. - * - * This functor's `operator()` is templated to validate calls to its operators based on the input - * type, as determined by the `is_valid_binary_op` trait. This function assumes that both inputs are - * the same type, and dispatches based on the type of the left input. - * - * @tparam OperatorFunctor Binary operator functor. - */ -template -struct single_dispatch_binary_operator_types { - template >* = nullptr> - CUDF_HOST_DEVICE inline void operator()(F&& f, Ts&&... args) - { - f.template operator()(std::forward(args)...); - } - - template >* = nullptr> - CUDF_HOST_DEVICE inline void operator()(F&& f, Ts&&... args) - { -#ifndef __CUDA_ARCH__ - CUDF_FAIL("Invalid binary operation."); -#else - CUDF_UNREACHABLE("Invalid binary operation."); -#endif - } -}; - -/** - * @brief Functor performing a type dispatch for a binary operator. - * - * This functor performs single dispatch, which assumes lhs_type == rhs_type. This may not be true - * for all binary operators but holds for all currently implemented operators. - */ -struct type_dispatch_binary_op { - /** - * @brief Performs type dispatch for a binary operator. - * - * @tparam op AST operator. - * @tparam F Type of forwarded functor. - * @tparam Ts Parameter pack of forwarded arguments. - * @param lhs_type Type of left input data. - * @param rhs_type Type of right input data. - * @param f Forwarded functor to be called. - * @param args Forwarded arguments to `operator()` of `f`. - */ - template - CUDF_HOST_DEVICE inline void operator()(cudf::data_type lhs_type, - cudf::data_type rhs_type, - F&& f, - Ts&&... args) - { - // Single dispatch (assume lhs_type == rhs_type) - type_dispatcher( - lhs_type, - // Always dispatch to the non-null operator for the purpose of type determination. - detail::single_dispatch_binary_operator_types>{}, - std::forward(f), - std::forward(args)...); - } -}; - -/** - * @brief Dispatches a runtime binary operator to a templated type dispatcher. - * - * @tparam F Type of forwarded functor. - * @tparam Ts Parameter pack of forwarded arguments. - * @param lhs_type Type of left input data. - * @param rhs_type Type of right input data. - * @param f Forwarded functor to be called. - * @param args Forwarded arguments to `operator()` of `f`. - */ -template -CUDF_HOST_DEVICE inline constexpr void binary_operator_dispatcher( - ast_operator op, cudf::data_type lhs_type, cudf::data_type rhs_type, F&& f, Ts&&... args) -{ - ast_operator_dispatcher(op, - detail::type_dispatch_binary_op{}, - lhs_type, - rhs_type, - std::forward(f), - std::forward(args)...); -} - -/** - * @brief Functor used to type-dispatch unary operators. - * - * This functor's `operator()` is templated to validate calls to its operators based on the input - * type, as determined by the `is_valid_unary_op` trait. - * - * @tparam OperatorFunctor Unary operator functor. - */ -template -struct dispatch_unary_operator_types { - template >* = nullptr> - CUDF_HOST_DEVICE inline void operator()(F&& f, Ts&&... args) - { - f.template operator()(std::forward(args)...); - } - - template >* = nullptr> - CUDF_HOST_DEVICE inline void operator()(F&& f, Ts&&... args) - { -#ifndef __CUDA_ARCH__ - CUDF_FAIL("Invalid unary operation."); -#else - CUDF_UNREACHABLE("Invalid unary operation."); -#endif - } -}; - -/** - * @brief Functor performing a type dispatch for a unary operator. - */ -struct type_dispatch_unary_op { - template - CUDF_HOST_DEVICE inline void operator()(cudf::data_type input_type, F&& f, Ts&&... args) - { - type_dispatcher( - input_type, - // Always dispatch to the non-null operator for the purpose of type determination. - detail::dispatch_unary_operator_types>{}, - std::forward(f), - std::forward(args)...); - } -}; - -/** - * @brief Dispatches a runtime unary operator to a templated type dispatcher. - * - * @tparam F Type of forwarded functor. - * @tparam Ts Parameter pack of forwarded arguments. - * @param input_type Type of input data. - * @param f Forwarded functor to be called. - * @param args Forwarded arguments to `operator()` of `f`. - */ -template -CUDF_HOST_DEVICE inline constexpr void unary_operator_dispatcher(ast_operator op, - cudf::data_type input_type, - F&& f, - Ts&&... args) -{ - ast_operator_dispatcher(op, - detail::type_dispatch_unary_op{}, - input_type, - std::forward(f), - std::forward(args)...); -} - -/** - * @brief Functor to determine the return type of an operator from its input types. - */ -struct return_type_functor { - /** - * @brief Callable for binary operators to determine return type. - * - * @tparam OperatorFunctor Operator functor to perform. - * @tparam LHS Left input type. - * @tparam RHS Right input type. - * @param result Reference whose value is assigned to the result data type. - */ - template >* = nullptr> - CUDF_HOST_DEVICE inline void operator()(cudf::data_type& result) - { - using Out = cuda::std::invoke_result_t; - result = cudf::data_type(cudf::type_to_id()); - } - - template >* = nullptr> - CUDF_HOST_DEVICE inline void operator()(cudf::data_type& result) - { -#ifndef __CUDA_ARCH__ - CUDF_FAIL("Invalid binary operation. Return type cannot be determined."); -#else - CUDF_UNREACHABLE("Invalid binary operation. Return type cannot be determined."); -#endif - } - - /** - * @brief Callable for unary operators to determine return type. - * - * @tparam OperatorFunctor Operator functor to perform. - * @tparam T Input type. - * @param result Pointer whose value is assigned to the result data type. - */ - template >* = nullptr> - CUDF_HOST_DEVICE inline void operator()(cudf::data_type& result) - { - using Out = cuda::std::invoke_result_t; - result = cudf::data_type(cudf::type_to_id()); - } - - template >* = nullptr> - CUDF_HOST_DEVICE inline void operator()(cudf::data_type& result) - { -#ifndef __CUDA_ARCH__ - CUDF_FAIL("Invalid unary operation. Return type cannot be determined."); -#else - CUDF_UNREACHABLE("Invalid unary operation. Return type cannot be determined."); -#endif - } -}; - /** * @brief Gets the return type of an AST operator. * @@ -1187,34 +914,8 @@ struct return_type_functor { * @param operand_types Vector of input types to the operator. * @return cudf::data_type Return type of the operator. */ -inline cudf::data_type ast_operator_return_type(ast_operator op, - std::vector const& operand_types) -{ - auto result = cudf::data_type(cudf::type_id::EMPTY); - switch (operand_types.size()) { - case 1: - unary_operator_dispatcher(op, operand_types[0], detail::return_type_functor{}, result); - break; - case 2: - binary_operator_dispatcher( - op, operand_types[0], operand_types[1], detail::return_type_functor{}, result); - break; - default: CUDF_FAIL("Unsupported operator return type."); break; - } - return result; -} - -/** - * @brief Functor to determine the arity (number of operands) of an operator. - */ -struct arity_functor { - template - CUDF_HOST_DEVICE inline void operator()(cudf::size_type& result) - { - // Arity is not dependent on null handling, so just use the false implementation here. - result = operator_functor::arity; - } -}; +cudf::data_type ast_operator_return_type(ast_operator op, + std::vector const& operand_types); /** * @brief Gets the arity (number of operands) of an AST operator. @@ -1222,12 +923,7 @@ struct arity_functor { * @param op Operator used to determine arity. * @return Arity of the operator. */ -CUDF_HOST_DEVICE inline cudf::size_type ast_operator_arity(ast_operator op) -{ - auto result = cudf::size_type(0); - ast_operator_dispatcher(op, detail::arity_functor{}, result); - return result; -} +cudf::size_type ast_operator_arity(ast_operator op); } // namespace detail diff --git a/cpp/include/cudf/io/orc.hpp b/cpp/include/cudf/io/orc.hpp index 163fa20806d..82f7761da2e 100644 --- a/cpp/include/cudf/io/orc.hpp +++ b/cpp/include/cudf/io/orc.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2024, NVIDIA CORPORATION. + * Copyright (c) 2020-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -578,7 +578,7 @@ class orc_writer_options { // Specify the sink to use for writer output sink_info _sink; // Specify the compression format to use - compression_type _compression = compression_type::AUTO; + compression_type _compression = compression_type::SNAPPY; // Specify frequency of statistics collection statistics_freq _stats_freq = ORC_STATISTICS_ROW_GROUP; // Maximum size of each stripe (unless smaller than a single row group) @@ -733,7 +733,11 @@ class orc_writer_options { * * @param comp Compression type */ - void set_compression(compression_type comp) { _compression = comp; } + void set_compression(compression_type comp) + { + _compression = comp; + if (comp == compression_type::AUTO) { _compression = compression_type::SNAPPY; } + } /** * @brief Choose granularity of statistics collection. @@ -865,7 +869,7 @@ class orc_writer_options_builder { */ orc_writer_options_builder& compression(compression_type comp) { - options._compression = comp; + options.set_compression(comp); return *this; } @@ -1026,7 +1030,7 @@ class chunked_orc_writer_options { // Specify the sink to use for writer output sink_info _sink; // Specify the compression format to use - compression_type _compression = compression_type::AUTO; + compression_type _compression = compression_type::SNAPPY; // Specify granularity of statistics collection statistics_freq _stats_freq = ORC_STATISTICS_ROW_GROUP; // Maximum size of each stripe (unless smaller than a single row group) @@ -1157,7 +1161,11 @@ class chunked_orc_writer_options { * * @param comp The compression type to use */ - void set_compression(compression_type comp) { _compression = comp; } + void set_compression(compression_type comp) + { + _compression = comp; + if (comp == compression_type::AUTO) { _compression = compression_type::SNAPPY; } + } /** * @brief Choose granularity of statistics collection @@ -1279,7 +1287,7 @@ class chunked_orc_writer_options_builder { */ chunked_orc_writer_options_builder& compression(compression_type comp) { - options._compression = comp; + options.set_compression(comp); return *this; } diff --git a/cpp/src/ast/expression_parser.cpp b/cpp/src/ast/expression_parser.cpp index d0e4c59ca54..b2cc134d9fa 100644 --- a/cpp/src/ast/expression_parser.cpp +++ b/cpp/src/ast/expression_parser.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2024, NVIDIA CORPORATION. + * Copyright (c) 2020-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -161,6 +161,7 @@ cudf::size_type expression_parser::visit(operation const& expr) auto const op = expr.get_operator(); auto const data_type = cudf::ast::detail::ast_operator_return_type(op, operand_types); _operators.push_back(op); + _operator_arities.push_back(cudf::ast::detail::ast_operator_arity(op)); // Push data reference auto const output = [&]() { if (expression_index == 0) { diff --git a/cpp/src/ast/operators.cpp b/cpp/src/ast/operators.cpp new file mode 100644 index 00000000000..b60a69a42d9 --- /dev/null +++ b/cpp/src/ast/operators.cpp @@ -0,0 +1,293 @@ +/* + * Copyright (c) 2021-2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include + +#include +#include + +#include + +namespace cudf { +namespace ast { +namespace detail { +namespace { + +struct arity_functor { + template + void operator()(cudf::size_type& result) + { + // Arity is not dependent on null handling, so just use the false implementation here. + result = operator_functor::arity; + } +}; + +/** + * @brief Functor to determine the return type of an operator from its input types. + */ +struct return_type_functor { + /** + * @brief Callable for binary operators to determine return type. + * + * @tparam OperatorFunctor Operator functor to perform. + * @tparam LHS Left input type. + * @tparam RHS Right input type. + * @param result Pointer whose value is assigned to the result data type. + */ + template >* = nullptr> + void operator()(cudf::data_type& result) + { + using Out = cuda::std::invoke_result_t; + result = cudf::data_type{cudf::type_to_id()}; + } + + template >* = nullptr> + void operator()(cudf::data_type& result) + { +#ifndef __CUDA_ARCH__ + CUDF_FAIL("Invalid binary operation. Return type cannot be determined."); +#else + CUDF_UNREACHABLE("Invalid binary operation. Return type cannot be determined."); +#endif + result = cudf::data_type{cudf::type_id::EMPTY}; + } + + /** + * @brief Callable for unary operators to determine return type. + * + * @tparam OperatorFunctor Operator functor to perform. + * @tparam T Input type. + * @param result Pointer whose value is assigned to the result data type. + */ + template >* = nullptr> + void operator()(cudf::data_type& result) + { + using Out = cuda::std::invoke_result_t; + result = cudf::data_type{cudf::type_to_id()}; + } + + template >* = nullptr> + void operator()(cudf::data_type& result) + { +#ifndef __CUDA_ARCH__ + CUDF_FAIL("Invalid unary operation. Return type cannot be determined."); +#else + CUDF_UNREACHABLE("Invalid unary operation. Return type cannot be determined."); +#endif + result = cudf::data_type{cudf::type_id::EMPTY}; + } +}; + +/** + * @brief Functor used to single-type-dispatch binary operators. + * + * This functor's `operator()` is templated to validate calls to its operators based on the input + * type, as determined by the `is_valid_binary_op` trait. This function assumes that both inputs are + * the same type, and dispatches based on the type of the left input. + * + * @tparam OperatorFunctor Binary operator functor. + */ +template +struct single_dispatch_binary_operator_types { + template >* = nullptr> + inline void operator()(F&& f, Ts&&... args) + { + f.template operator()(std::forward(args)...); + } + + template >* = nullptr> + inline void operator()(F&& f, Ts&&... args) + { +#ifndef __CUDA_ARCH__ + CUDF_FAIL("Invalid binary operation."); +#else + CUDF_UNREACHABLE("Invalid binary operation."); +#endif + } +}; + +/** + * @brief Functor performing a type dispatch for a binary operator. + * + * This functor performs single dispatch, which assumes lhs_type == rhs_type. This may not be true + * for all binary operators but holds for all currently implemented operators. + */ +struct type_dispatch_binary_op { + /** + * @brief Performs type dispatch for a binary operator. + * + * @tparam op AST operator. + * @tparam F Type of forwarded functor. + * @tparam Ts Parameter pack of forwarded arguments. + * @param lhs_type Type of left input data. + * @param rhs_type Type of right input data. + * @param f Forwarded functor to be called. + * @param args Forwarded arguments to `operator()` of `f`. + */ + template + inline void operator()(cudf::data_type lhs_type, cudf::data_type rhs_type, F&& f, Ts&&... args) + { + // Single dispatch (assume lhs_type == rhs_type) + type_dispatcher( + lhs_type, + // Always dispatch to the non-null operator for the purpose of type determination. + detail::single_dispatch_binary_operator_types>{}, + std::forward(f), + std::forward(args)...); + } +}; + +/** + * @brief Dispatches a runtime binary operator to a templated type dispatcher. + * + * @tparam F Type of forwarded functor. + * @tparam Ts Parameter pack of forwarded arguments. + * @param lhs_type Type of left input data. + * @param rhs_type Type of right input data. + * @param f Forwarded functor to be called. + * @param args Forwarded arguments to `operator()` of `f`. + */ +template +inline constexpr void binary_operator_dispatcher( + ast_operator op, cudf::data_type lhs_type, cudf::data_type rhs_type, F&& f, Ts&&... args) +{ + ast_operator_dispatcher(op, + detail::type_dispatch_binary_op{}, + lhs_type, + rhs_type, + std::forward(f), + std::forward(args)...); +} + +/** + * @brief Functor used to type-dispatch unary operators. + * + * This functor's `operator()` is templated to validate calls to its operators based on the input + * type, as determined by the `is_valid_unary_op` trait. + * + * @tparam OperatorFunctor Unary operator functor. + */ +template +struct dispatch_unary_operator_types { + template >* = nullptr> + inline void operator()(F&& f, Ts&&... args) + { + f.template operator()(std::forward(args)...); + } + + template >* = nullptr> + inline void operator()(F&& f, Ts&&... args) + { +#ifndef __CUDA_ARCH__ + CUDF_FAIL("Invalid unary operation."); +#else + CUDF_UNREACHABLE("Invalid unary operation."); +#endif + } +}; + +/** + * @brief Functor performing a type dispatch for a unary operator. + */ +struct type_dispatch_unary_op { + template + inline void operator()(cudf::data_type input_type, F&& f, Ts&&... args) + { + type_dispatcher( + input_type, + // Always dispatch to the non-null operator for the purpose of type determination. + detail::dispatch_unary_operator_types>{}, + std::forward(f), + std::forward(args)...); + } +}; + +/** + * @brief Dispatches a runtime unary operator to a templated type dispatcher. + * + * @tparam F Type of forwarded functor. + * @tparam Ts Parameter pack of forwarded arguments. + * @param input_type Type of input data. + * @param f Forwarded functor to be called. + * @param args Forwarded arguments to `operator()` of `f`. + */ +template +inline constexpr void unary_operator_dispatcher(ast_operator op, + cudf::data_type input_type, + F&& f, + Ts&&... args) +{ + ast_operator_dispatcher(op, + detail::type_dispatch_unary_op{}, + input_type, + std::forward(f), + std::forward(args)...); +} + +} // namespace + +cudf::data_type ast_operator_return_type(ast_operator op, + std::vector const& operand_types) +{ + cudf::data_type result{cudf::type_id::EMPTY}; + switch (operand_types.size()) { + case 1: + unary_operator_dispatcher(op, operand_types[0], detail::return_type_functor{}, result); + break; + case 2: + binary_operator_dispatcher( + op, operand_types[0], operand_types[1], detail::return_type_functor{}, result); + break; + default: CUDF_FAIL("Unsupported operator return type."); break; + } + return result; +} + +cudf::size_type ast_operator_arity(ast_operator op) +{ + cudf::size_type result{}; + ast_operator_dispatcher(op, arity_functor{}, result); + return result; +} + +} // namespace detail + +} // namespace ast + +} // namespace cudf diff --git a/cpp/src/io/comp/comp.cpp b/cpp/src/io/comp/comp.cpp index 26535bed43b..3800835eaf1 100644 --- a/cpp/src/io/comp/comp.cpp +++ b/cpp/src/io/comp/comp.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2024, NVIDIA CORPORATION. + * Copyright (c) 2018-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,22 +16,45 @@ #include "comp.hpp" +#include "gpuinflate.hpp" +#include "io/utilities/getenv_or.hpp" #include "io/utilities/hostdevice_vector.hpp" #include "nvcomp_adapter.hpp" #include #include +#include #include #include #include #include +#include #include // GZIP compression namespace cudf::io::detail { namespace { +auto& h_comp_pool() +{ + static std::size_t pool_size = + getenv_or("LIBCUDF_HOST_COMPRESSION_NUM_THREADS", std::thread::hardware_concurrency()); + static BS::thread_pool pool(pool_size); + return pool; +} + +std::optional to_nvcomp_compression(compression_type compression) +{ + switch (compression) { + case compression_type::SNAPPY: return nvcomp::compression_type::SNAPPY; + case compression_type::ZSTD: return nvcomp::compression_type::ZSTD; + case compression_type::LZ4: return nvcomp::compression_type::LZ4; + case compression_type::ZLIB: return nvcomp::compression_type::DEFLATE; + default: return std::nullopt; + } +} + /** * @brief GZIP host compressor (includes header) */ @@ -98,8 +121,132 @@ std::vector compress_snappy(host_span src, return cudf::detail::make_std_vector_sync(d_dst, stream); } +void device_compress(compression_type compression, + device_span const> inputs, + device_span const> outputs, + device_span results, + rmm::cuda_stream_view stream) +{ + if (compression == compression_type::NONE) { return; } + + auto const nvcomp_type = to_nvcomp_compression(compression); + auto nvcomp_disabled = nvcomp_type.has_value() ? nvcomp::is_compression_disabled(*nvcomp_type) + : "invalid compression type"; + if (not nvcomp_disabled) { + return nvcomp::batched_compress(*nvcomp_type, inputs, outputs, results, stream); + } + + switch (compression) { + case compression_type::SNAPPY: return gpu_snap(inputs, outputs, results, stream); + default: CUDF_FAIL("Compression error: " + nvcomp_disabled.value()); + } +} + +void host_compress(compression_type compression, + device_span const> inputs, + device_span const> outputs, + device_span results, + rmm::cuda_stream_view stream) +{ + if (compression == compression_type::NONE) { return; } + + auto const num_chunks = inputs.size(); + auto h_results = cudf::detail::make_host_vector(num_chunks, stream); + auto const h_inputs = cudf::detail::make_host_vector_async(inputs, stream); + auto const h_outputs = cudf::detail::make_host_vector_async(outputs, stream); + stream.synchronize(); + + std::vector> tasks; + auto const num_streams = + std::min({num_chunks, + cudf::detail::global_cuda_stream_pool().get_stream_pool_size(), + h_comp_pool().get_thread_count()}); + auto const streams = cudf::detail::fork_streams(stream, num_streams); + for (size_t i = 0; i < num_chunks; ++i) { + auto const cur_stream = streams[i % streams.size()]; + auto task = [d_in = h_inputs[i], d_out = h_outputs[i], cur_stream, compression]() -> size_t { + auto const h_in = cudf::detail::make_host_vector_sync(d_in, cur_stream); + auto const h_out = compress(compression, h_in, cur_stream); + cudf::detail::cuda_memcpy(d_out.subspan(0, h_out.size()), h_out, cur_stream); + return h_out.size(); + }; + tasks.emplace_back(h_comp_pool().submit_task(std::move(task))); + } + + for (auto i = 0ul; i < num_chunks; ++i) { + h_results[i] = {tasks[i].get(), compression_status::SUCCESS}; + } + cudf::detail::cuda_memcpy_async(results, h_results, stream); +} + +[[nodiscard]] bool host_compression_supported(compression_type compression) +{ + switch (compression) { + case compression_type::GZIP: + case compression_type::NONE: return true; + default: return false; + } +} + +[[nodiscard]] bool device_compression_supported(compression_type compression) +{ + auto const nvcomp_type = to_nvcomp_compression(compression); + switch (compression) { + case compression_type::LZ4: + case compression_type::ZLIB: + case compression_type::ZSTD: return not nvcomp::is_compression_disabled(nvcomp_type.value()); + case compression_type::SNAPPY: + case compression_type::NONE: return true; + default: return false; + } +} + +[[nodiscard]] bool use_host_compression( + compression_type compression, + [[maybe_unused]] device_span const> inputs, + [[maybe_unused]] device_span const> outputs) +{ + CUDF_EXPECTS( + not host_compression_supported(compression) or device_compression_supported(compression), + "Unsupported compression type"); + if (not host_compression_supported(compression)) { return false; } + if (not device_compression_supported(compression)) { return true; } + // If both host and device compression are supported, use the host if the env var is set + return getenv_or("LIBCUDF_USE_HOST_COMPRESSION", 0); +} + } // namespace +std::optional compress_max_allowed_chunk_size(compression_type compression) +{ + if (auto nvcomp_type = to_nvcomp_compression(compression); + nvcomp_type.has_value() and not nvcomp::is_compression_disabled(*nvcomp_type)) { + return nvcomp::compress_max_allowed_chunk_size(*nvcomp_type); + } + return std::nullopt; +} + +[[nodiscard]] size_t compress_required_chunk_alignment(compression_type compression) +{ + auto nvcomp_type = to_nvcomp_compression(compression); + if (compression == compression_type::NONE or not nvcomp_type.has_value() or + nvcomp::is_compression_disabled(*nvcomp_type)) { + return 1ul; + } + + return nvcomp::required_alignment(*nvcomp_type); +} + +[[nodiscard]] size_t max_compressed_size(compression_type compression, uint32_t uncompressed_size) +{ + if (compression == compression_type::NONE) { return uncompressed_size; } + + if (auto nvcomp_type = to_nvcomp_compression(compression); nvcomp_type.has_value()) { + return nvcomp::compress_max_output_chunk_size(*nvcomp_type, uncompressed_size); + } + CUDF_FAIL("Unsupported compression type"); +} + std::vector compress(compression_type compression, host_span src, rmm::cuda_stream_view stream) @@ -112,4 +259,18 @@ std::vector compress(compression_type compression, } } +void compress(compression_type compression, + device_span const> inputs, + device_span const> outputs, + device_span results, + rmm::cuda_stream_view stream) +{ + CUDF_FUNC_RANGE(); + if (use_host_compression(compression, inputs, outputs)) { + return host_compress(compression, inputs, outputs, results, stream); + } else { + return device_compress(compression, inputs, outputs, results, stream); + } +} + } // namespace cudf::io::detail diff --git a/cpp/src/io/comp/statistics.cu b/cpp/src/io/comp/comp.cu similarity index 96% rename from cpp/src/io/comp/statistics.cu rename to cpp/src/io/comp/comp.cu index caee9145d2c..af0f73869a2 100644 --- a/cpp/src/io/comp/statistics.cu +++ b/cpp/src/io/comp/comp.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * Copyright (c) 2023-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "gpuinflate.hpp" +#include "comp.hpp" #include diff --git a/cpp/src/io/comp/comp.hpp b/cpp/src/io/comp/comp.hpp index e16f26e1f06..90932a11499 100644 --- a/cpp/src/io/comp/comp.hpp +++ b/cpp/src/io/comp/comp.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -57,5 +57,57 @@ std::vector compress(compression_type compression, host_span src, rmm::cuda_stream_view stream); +/** + * @brief Maximum size of uncompressed chunks that can be compressed. + * + * @param compression Compression type + * @returns maximum chunk size + */ +[[nodiscard]] std::optional compress_max_allowed_chunk_size(compression_type compression); + +/** + * @brief Gets input and output alignment requirements for the given compression type. + * + * @param compression Compression type + * @returns required alignment + */ +[[nodiscard]] size_t compress_required_chunk_alignment(compression_type compression); + +/** + * @brief Gets the maximum size any chunk could compress to in the batch. + * + * @param compression Compression type + * @param uncompressed_size Size of the largest uncompressed chunk in the batch + */ +[[nodiscard]] size_t max_compressed_size(compression_type compression, uint32_t uncompressed_size); + +/** + * @brief Compresses device memory buffers. + * + * @param compression Type of compression of the input data + * @param inputs Device memory buffers to compress + * @param outputs Device memory buffers to store the compressed output + * @param results Compression results + * @param stream CUDA stream used for device memory operations and kernel launches + */ +void compress(compression_type compression, + device_span const> inputs, + device_span const> outputs, + device_span results, + rmm::cuda_stream_view stream); + +/** + * @brief Aggregate results of compression into a single statistics object. + * + * @param inputs List of uncompressed input buffers + * @param results List of compression results + * @param stream CUDA stream to use + * @return writer_compression_statistics + */ +[[nodiscard]] writer_compression_statistics collect_compression_statistics( + device_span const> inputs, + device_span results, + rmm::cuda_stream_view stream); + } // namespace io::detail } // namespace CUDF_EXPORT cudf diff --git a/cpp/src/io/comp/gpuinflate.hpp b/cpp/src/io/comp/gpuinflate.hpp index 4b09bd5a84c..0a35b230242 100644 --- a/cpp/src/io/comp/gpuinflate.hpp +++ b/cpp/src/io/comp/gpuinflate.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2024, NVIDIA CORPORATION. + * Copyright (c) 2018-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -124,17 +124,4 @@ void gpu_snap(device_span const> inputs, device_span results, rmm::cuda_stream_view stream); -/** - * @brief Aggregate results of compression into a single statistics object. - * - * @param inputs List of uncompressed input buffers - * @param results List of compression results - * @param stream CUDA stream to use - * @return writer_compression_statistics - */ -[[nodiscard]] writer_compression_statistics collect_compression_statistics( - device_span const> inputs, - device_span results, - rmm::cuda_stream_view stream); - } // namespace cudf::io::detail diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 88423122e16..d63fa9f5c35 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -766,6 +766,7 @@ void parquet_writer_options_base::set_stats_level(statistics_freq sf) { _stats_l void parquet_writer_options_base::set_compression(compression_type compression) { _compression = compression; + if (compression == compression_type::AUTO) { _compression = compression_type::SNAPPY; } } void parquet_writer_options_base::enable_int96_timestamps(bool req) diff --git a/cpp/src/io/orc/orc_gpu.hpp b/cpp/src/io/orc/orc_gpu.hpp index f4e75f78dec..8b30cee6681 100644 --- a/cpp/src/io/orc/orc_gpu.hpp +++ b/cpp/src/io/orc/orc_gpu.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -407,7 +407,7 @@ void CompactOrcDataStreams(device_2dspan strm_desc, std::optional CompressOrcDataStreams( device_span compressed_data, uint32_t num_compressed_blocks, - CompressionKind compression, + compression_type compression, uint32_t comp_blk_size, uint32_t max_comp_blk_size, uint32_t comp_block_align, diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 79ecca0ca99..4f296bb5bfc 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,7 +15,6 @@ */ #include "io/comp/gpuinflate.hpp" -#include "io/comp/nvcomp_adapter.hpp" #include "io/utilities/block_utils.cuh" #include "io/utilities/time_utils.cuh" #include "orc_gpu.hpp" @@ -45,8 +44,6 @@ namespace io { namespace orc { namespace gpu { -namespace nvcomp = cudf::io::detail::nvcomp; - using cudf::detail::device_2dspan; using cudf::io::detail::compression_result; using cudf::io::detail::compression_status; @@ -1362,7 +1359,7 @@ void CompactOrcDataStreams(device_2dspan strm_desc, std::optional CompressOrcDataStreams( device_span compressed_data, uint32_t num_compressed_blocks, - CompressionKind compression, + compression_type compression, uint32_t comp_blk_size, uint32_t max_comp_blk_size, uint32_t comp_block_align, @@ -1387,47 +1384,7 @@ std::optional CompressOrcDataStreams( max_comp_blk_size, comp_block_align); - if (compression == SNAPPY) { - try { - if (nvcomp::is_compression_disabled(nvcomp::compression_type::SNAPPY)) { - cudf::io::detail::gpu_snap(comp_in, comp_out, comp_res, stream); - } else { - nvcomp::batched_compress( - nvcomp::compression_type::SNAPPY, comp_in, comp_out, comp_res, stream); - } - } catch (...) { - // There was an error in compressing so set an error status for each block - thrust::for_each( - rmm::exec_policy(stream), - comp_res.begin(), - comp_res.end(), - [] __device__(compression_result & stat) { stat.status = compression_status::FAILURE; }); - // Since SNAPPY is the default compression (may not be explicitly requested), fall back to - // writing without compression - CUDF_LOG_WARN("ORC writer: compression failed, writing uncompressed data"); - } - } else if (compression == ZLIB) { - if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::DEFLATE); - reason) { - CUDF_FAIL("Compression error: " + reason.value()); - } - nvcomp::batched_compress( - nvcomp::compression_type::DEFLATE, comp_in, comp_out, comp_res, stream); - } else if (compression == ZSTD) { - if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::ZSTD); - reason) { - CUDF_FAIL("Compression error: " + reason.value()); - } - nvcomp::batched_compress(nvcomp::compression_type::ZSTD, comp_in, comp_out, comp_res, stream); - } else if (compression == LZ4) { - if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::LZ4); - reason) { - CUDF_FAIL("Compression error: " + reason.value()); - } - nvcomp::batched_compress(nvcomp::compression_type::LZ4, comp_in, comp_out, comp_res, stream); - } else if (compression != NONE) { - CUDF_FAIL("Unsupported compression type"); - } + cudf::io::detail::compress(compression, comp_in, comp_out, comp_res, stream); dim3 dim_block_compact(1024, 1); gpuCompactCompressedBlocks<<>>( diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index ce868b83c04..aa0b509981a 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,6 @@ * @brief cuDF-IO ORC writer class implementation */ -#include "io/comp/nvcomp_adapter.hpp" #include "io/orc/orc_gpu.hpp" #include "io/statistics/column_statistics.cuh" #include "io/utilities/column_utils.cuh" @@ -71,8 +70,6 @@ namespace cudf::io::orc::detail { -namespace nvcomp = cudf::io::detail::nvcomp; - template [[nodiscard]] constexpr int varint_size(T val) { @@ -92,21 +89,8 @@ struct row_group_index_info { }; namespace { - /** - * @brief Translates ORC compression to nvCOMP compression - */ -auto to_nvcomp_compression_type(CompressionKind compression_kind) -{ - if (compression_kind == SNAPPY) return nvcomp::compression_type::SNAPPY; - if (compression_kind == ZLIB) return nvcomp::compression_type::DEFLATE; - if (compression_kind == ZSTD) return nvcomp::compression_type::ZSTD; - if (compression_kind == LZ4) return nvcomp::compression_type::LZ4; - CUDF_FAIL("Unsupported compression type"); -} - -/** - * @brief Translates cuDF compression to ORC compression + * @brief Translates cuDF compression to ORC compression. */ orc::CompressionKind to_orc_compression(compression_type compression) { @@ -122,19 +106,14 @@ orc::CompressionKind to_orc_compression(compression_type compression) } /** - * @brief Returns the block size for a given compression kind. + * @brief Returns the block size for a given compression format. */ -constexpr size_t compression_block_size(orc::CompressionKind compression) +size_t compression_block_size(compression_type compression) { - if (compression == orc::CompressionKind::NONE) { return 0; } - - auto const ncomp_type = to_nvcomp_compression_type(compression); - auto const nvcomp_limit = nvcomp::is_compression_disabled(ncomp_type) - ? std::nullopt - : nvcomp::compress_max_allowed_chunk_size(ncomp_type); + auto const comp_limit = compress_max_allowed_chunk_size(compression); constexpr size_t max_block_size = 256 * 1024; - return std::min(nvcomp_limit.value_or(max_block_size), max_block_size); + return std::min(comp_limit.value_or(max_block_size), max_block_size); } /** @@ -534,26 +513,6 @@ size_t RLE_stream_size(TypeKind kind, size_t count) } } -auto uncomp_block_alignment(CompressionKind compression_kind) -{ - if (compression_kind == NONE or - nvcomp::is_compression_disabled(to_nvcomp_compression_type(compression_kind))) { - return 1ul; - } - - return nvcomp::required_alignment(to_nvcomp_compression_type(compression_kind)); -} - -auto comp_block_alignment(CompressionKind compression_kind) -{ - if (compression_kind == NONE or - nvcomp::is_compression_disabled(to_nvcomp_compression_type(compression_kind))) { - return 1ul; - } - - return nvcomp::required_alignment(to_nvcomp_compression_type(compression_kind)); -} - /** * @brief Builds up per-column streams. * @@ -566,7 +525,7 @@ orc_streams create_streams(host_span columns, file_segmentation const& segmentation, std::map const& decimal_column_sizes, bool enable_dictionary, - CompressionKind compression_kind, + compression_type compression, single_write_mode write_mode) { // 'column 0' row index stream @@ -610,7 +569,7 @@ orc_streams create_streams(host_span columns, auto add_stream = [&](gpu::StreamIndexType index_type, StreamKind kind, TypeKind type_kind, size_t size) { - auto const max_alignment_padding = uncomp_block_alignment(compression_kind) - 1; + auto const max_alignment_padding = compress_required_chunk_alignment(compression) - 1; const auto base = column.index() * gpu::CI_NUM_STREAMS; ids[base + index_type] = streams.size(); streams.push_back(orc::Stream{ @@ -1473,7 +1432,7 @@ encoded_footer_statistics finish_statistic_blobs(Footer const& footer, * @param[in] rg_stats row group level statistics * @param[in,out] stripe Stream's parent stripe * @param[in,out] streams List of all streams - * @param[in] compression_kind The compression kind + * @param[in] compression The compression format * @param[in] compression_blocksize The block size used for compression * @param[in] out_sink Sink for writing data */ @@ -1487,7 +1446,7 @@ void write_index_stream(int32_t stripe_id, host_span rg_stats, StripeInformation* stripe, orc_streams* streams, - CompressionKind compression_kind, + compression_type compression, size_t compression_blocksize, std::unique_ptr const& out_sink) { @@ -1501,7 +1460,7 @@ void write_index_stream(int32_t stripe_id, row_group_index_info record; if (stream.ids[type] > 0) { record.pos = 0; - if (compression_kind != NONE) { + if (compression != compression_type::NONE) { auto const& ss = strm_desc[stripe_id][stream.ids[type] - (columns.size() + 1)]; record.blk_pos = ss.first_block; record.comp_pos = 0; @@ -1541,7 +1500,7 @@ void write_index_stream(int32_t stripe_id, } } - ProtobufWriter pbw((compression_kind != NONE) ? 3 : 0); + ProtobufWriter pbw((compression != compression_type::NONE) ? 3 : 0); // Add row index entries auto const& rowgroups_range = segmentation.stripes[stripe_id]; @@ -1566,7 +1525,7 @@ void write_index_stream(int32_t stripe_id, }); (*streams)[stream_id].length = pbw.size(); - if (compression_kind != NONE) { + if (compression != compression_type::NONE) { uint32_t uncomp_ix_len = (uint32_t)((*streams)[stream_id].length - 3) * 2 + 1; pbw.buffer()[0] = static_cast(uncomp_ix_len >> 0); pbw.buffer()[1] = static_cast(uncomp_ix_len >> 8); @@ -1585,7 +1544,7 @@ void write_index_stream(int32_t stripe_id, * @param[in,out] bounce_buffer Pinned memory bounce buffer for D2H data transfer * @param[in,out] stripe Stream's parent stripe * @param[in,out] streams List of all streams - * @param[in] compression_kind The compression kind + * @param[in] compression The compression format * @param[in] out_sink Sink for writing data * @param[in] stream CUDA stream used for device memory operations and kernel launches * @return An std::future that should be synchronized to ensure the writing is complete @@ -1596,7 +1555,7 @@ std::future write_data_stream(gpu::StripeStream const& strm_desc, host_span bounce_buffer, StripeInformation* stripe, orc_streams* streams, - CompressionKind compression_kind, + compression_type compression, std::unique_ptr const& out_sink, rmm::cuda_stream_view stream) { @@ -1606,8 +1565,9 @@ std::future write_data_stream(gpu::StripeStream const& strm_desc, return std::async(std::launch::deferred, [] {}); } - auto const* stream_in = (compression_kind == NONE) ? enc_stream.data_ptrs[strm_desc.stream_type] - : (compressed_data + strm_desc.bfr_offset); + auto const* stream_in = (compression == compression_type::NONE) + ? enc_stream.data_ptrs[strm_desc.stream_type] + : (compressed_data + strm_desc.bfr_offset); auto write_task = [&]() { if (out_sink->is_device_write_preferred(length)) { @@ -1627,15 +1587,15 @@ std::future write_data_stream(gpu::StripeStream const& strm_desc, /** * @brief Insert 3-byte uncompressed block headers in a byte vector * - * @param compression_kind The compression kind + * @param compression The compression kind * @param compression_blocksize The block size used for compression * @param v The destitation byte vector to write, which must include initial 3-byte header */ -void add_uncompressed_block_headers(CompressionKind compression_kind, +void add_uncompressed_block_headers(compression_type compression, size_t compression_blocksize, std::vector& v) { - if (compression_kind != NONE) { + if (compression != compression_type::NONE) { size_t uncomp_len = v.size() - 3, pos = 0, block_len; while (uncomp_len > compression_blocksize) { block_len = compression_blocksize * 2 + 1; @@ -2021,14 +1981,6 @@ std::map decimal_column_sizes( return column_sizes; } -size_t max_compression_output_size(CompressionKind compression_kind, uint32_t compression_blocksize) -{ - if (compression_kind == NONE) return 0; - - return nvcomp::compress_max_output_chunk_size(to_nvcomp_compression_type(compression_kind), - compression_blocksize); -} - std::unique_ptr make_table_meta(table_view const& input) { auto table_meta = std::make_unique(input); @@ -2287,7 +2239,7 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table, * @param row_index_stride The row index stride * @param enable_dictionary Whether dictionary is enabled * @param sort_dictionaries Whether to sort the dictionaries - * @param compression_kind The compression kind + * @param compression The compression format * @param compression_blocksize The block size used for compression * @param stats_freq Column statistics granularity type for parquet/orc writers * @param collect_compression_stats Flag to indicate if compression statistics should be collected @@ -2302,7 +2254,7 @@ auto convert_table_to_orc_data(table_view const& input, size_type row_index_stride, bool enable_dictionary, bool sort_dictionaries, - CompressionKind compression_kind, + compression_type compression, size_t compression_blocksize, statistics_freq stats_freq, bool collect_compression_stats, @@ -2329,17 +2281,16 @@ auto convert_table_to_orc_data(table_view const& input, auto stripe_dicts = build_dictionaries(orc_table, segmentation, sort_dictionaries, stream); auto dec_chunk_sizes = decimal_chunk_sizes(orc_table, segmentation, stream); - auto const uncompressed_block_align = uncomp_block_alignment(compression_kind); - auto const compressed_block_align = comp_block_alignment(compression_kind); + auto const block_align = compress_required_chunk_alignment(compression); auto streams = create_streams(orc_table.columns, segmentation, decimal_column_sizes(dec_chunk_sizes.rg_sizes), enable_dictionary, - compression_kind, + compression, write_mode); auto enc_data = encode_columns( - orc_table, std::move(dec_chunk_sizes), segmentation, streams, uncompressed_block_align, stream); + orc_table, std::move(dec_chunk_sizes), segmentation, streams, block_align, stream); stripe_dicts.on_encode_complete(stream); @@ -2371,16 +2322,15 @@ auto convert_table_to_orc_data(table_view const& input, size_t compressed_bfr_size = 0; size_t num_compressed_blocks = 0; - auto const max_compressed_block_size = - max_compression_output_size(compression_kind, compression_blocksize); + auto const max_compressed_block_size = max_compressed_size(compression, compression_blocksize); auto const padded_max_compressed_block_size = - util::round_up_unsafe(max_compressed_block_size, compressed_block_align); + util::round_up_unsafe(max_compressed_block_size, block_align); auto const padded_block_header_size = - util::round_up_unsafe(block_header_size, compressed_block_align); + util::round_up_unsafe(block_header_size, block_align); for (auto& ss : strm_descs.host_view().flat_view()) { size_t stream_size = ss.stream_size; - if (compression_kind != NONE) { + if (compression != compression_type::NONE) { ss.first_block = num_compressed_blocks; ss.bfr_offset = compressed_bfr_size; @@ -2401,14 +2351,14 @@ auto convert_table_to_orc_data(table_view const& input, comp_results.d_begin(), comp_results.d_end(), compression_result{0, compression_status::FAILURE}); - if (compression_kind != NONE) { + if (compression != compression_type::NONE) { strm_descs.host_to_device_async(stream); compression_stats = gpu::CompressOrcDataStreams(compressed_data, num_compressed_blocks, - compression_kind, + compression, compression_blocksize, max_compressed_block_size, - compressed_block_align, + block_align, collect_compression_stats, strm_descs, enc_data.streams, @@ -2459,8 +2409,8 @@ writer::impl::impl(std::unique_ptr sink, : _stream(stream), _max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()}, _row_index_stride{options.get_row_index_stride()}, - _compression_kind(to_orc_compression(options.get_compression())), - _compression_blocksize(compression_block_size(_compression_kind)), + _compression{options.get_compression()}, + _compression_blocksize(compression_block_size(_compression)), _compression_statistics(options.get_compression_statistics()), _stats_freq(options.get_statistics_freq()), _sort_dictionaries{options.get_enable_dictionary_sort()}, @@ -2480,8 +2430,8 @@ writer::impl::impl(std::unique_ptr sink, : _stream(stream), _max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()}, _row_index_stride{options.get_row_index_stride()}, - _compression_kind(to_orc_compression(options.get_compression())), - _compression_blocksize(compression_block_size(_compression_kind)), + _compression{options.get_compression()}, + _compression_blocksize(compression_block_size(_compression)), _compression_statistics(options.get_compression_statistics()), _stats_freq(options.get_statistics_freq()), _sort_dictionaries{options.get_enable_dictionary_sort()}, @@ -2526,7 +2476,7 @@ void writer::impl::write(table_view const& input) _row_index_stride, _enable_dictionary, _sort_dictionaries, - _compression_kind, + _compression, _compression_blocksize, _stats_freq, _compression_statistics != nullptr, @@ -2613,7 +2563,7 @@ void writer::impl::write_orc_data_to_sink(encoded_data const& enc_data, rg_stats, &stripe, &streams, - _compression_kind, + _compression, _compression_blocksize, _out_sink); } @@ -2627,7 +2577,7 @@ void writer::impl::write_orc_data_to_sink(encoded_data const& enc_data, bounce_buffer, &stripe, &streams, - _compression_kind, + _compression, _out_sink, _stream)); } @@ -2645,10 +2595,10 @@ void writer::impl::write_orc_data_to_sink(encoded_data const& enc_data, : 0; if (orc_table.column(i - 1).orc_kind() == TIMESTAMP) { sf.writerTimezone = "UTC"; } } - ProtobufWriter pbw((_compression_kind != NONE) ? 3 : 0); + ProtobufWriter pbw((_compression != compression_type::NONE) ? 3 : 0); pbw.write(sf); stripe.footerLength = pbw.size(); - if (_compression_kind != NONE) { + if (_compression != compression_type::NONE) { uint32_t uncomp_sf_len = (stripe.footerLength - 3) * 2 + 1; pbw.buffer()[0] = static_cast(uncomp_sf_len >> 0); pbw.buffer()[1] = static_cast(uncomp_sf_len >> 8); @@ -2780,21 +2730,21 @@ void writer::impl::close() // Write statistics metadata if (not _orc_meta.stripeStats.empty()) { - ProtobufWriter pbw((_compression_kind != NONE) ? 3 : 0); + ProtobufWriter pbw((_compression != compression_type::NONE) ? 3 : 0); pbw.write(_orc_meta); - add_uncompressed_block_headers(_compression_kind, _compression_blocksize, pbw.buffer()); + add_uncompressed_block_headers(_compression, _compression_blocksize, pbw.buffer()); ps.metadataLength = pbw.size(); _out_sink->host_write(pbw.data(), pbw.size()); } else { ps.metadataLength = 0; } - ProtobufWriter pbw((_compression_kind != NONE) ? 3 : 0); + ProtobufWriter pbw((_compression != compression_type::NONE) ? 3 : 0); pbw.write(_footer); - add_uncompressed_block_headers(_compression_kind, _compression_blocksize, pbw.buffer()); + add_uncompressed_block_headers(_compression, _compression_blocksize, pbw.buffer()); // Write postscript metadata ps.footerLength = pbw.size(); - ps.compression = _compression_kind; + ps.compression = to_orc_compression(_compression); ps.compressionBlockSize = _compression_blocksize; ps.version = {0, 12}; // Hive 0.12 ps.writerVersion = cudf_writer_version; diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index cae849ee315..7d23482cb17 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -342,7 +342,7 @@ class writer::impl { // Writer options. stripe_size_limits const _max_stripe_size; size_type const _row_index_stride; - CompressionKind const _compression_kind; + compression_type const _compression; size_t const _compression_blocksize; std::shared_ptr _compression_statistics; // Optional output statistics_freq const _stats_freq; diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 77924ac0f35..1b67b53ae8e 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -23,8 +23,7 @@ #include "compact_protocol_reader.hpp" #include "compact_protocol_writer.hpp" #include "interop/decimal_conversion_utilities.cuh" -#include "io/comp/gpuinflate.hpp" -#include "io/comp/nvcomp_adapter.hpp" +#include "io/comp/comp.hpp" #include "io/parquet/parquet.hpp" #include "io/parquet/parquet_gpu.hpp" #include "io/statistics/column_statistics.cuh" @@ -67,6 +66,20 @@ namespace cudf::io::parquet::detail { using namespace cudf::io::detail; +Compression to_parquet_compression(compression_type compression) +{ + switch (compression) { + case compression_type::AUTO: + case compression_type::SNAPPY: return Compression::SNAPPY; + case compression_type::ZSTD: return Compression::ZSTD; + case compression_type::LZ4: + // Parquet refers to LZ4 as "LZ4_RAW"; Parquet's "LZ4" is not standard LZ4 + return Compression::LZ4_RAW; + case compression_type::NONE: return Compression::UNCOMPRESSED; + default: CUDF_FAIL("Unsupported compression type"); + } +} + struct aggregate_writer_metadata { aggregate_writer_metadata(host_span partitions, host_span const> kv_md, @@ -1172,7 +1185,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, size_t max_page_size_bytes, size_type max_page_size_rows, bool write_v2_headers, - Compression compression_codec, + compression_type compression, rmm::cuda_stream_view stream) { if (chunks.is_empty()) { return cudf::detail::hostdevice_vector{}; } @@ -1187,7 +1200,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, num_columns, max_page_size_bytes, max_page_size_rows, - page_alignment(compression_codec), + compress_required_chunk_alignment(compression), write_v2_headers, nullptr, nullptr, @@ -1212,7 +1225,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, num_columns, max_page_size_bytes, max_page_size_rows, - page_alignment(compression_codec), + compress_required_chunk_alignment(compression), write_v2_headers, nullptr, nullptr, @@ -1221,12 +1234,10 @@ auto init_page_sizes(hostdevice_2dvector& chunks, // Get per-page max compressed size cudf::detail::hostdevice_vector comp_page_sizes(num_pages, stream); - std::transform(page_sizes.begin(), - page_sizes.end(), - comp_page_sizes.begin(), - [compression_codec](auto page_size) { - return max_compression_output_size(compression_codec, page_size); - }); + std::transform( + page_sizes.begin(), page_sizes.end(), comp_page_sizes.begin(), [compression](auto page_size) { + return max_compressed_size(compression, page_size); + }); comp_page_sizes.host_to_device_async(stream); // Use per-page max compressed size to calculate chunk.compressed_size @@ -1238,7 +1249,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, num_columns, max_page_size_bytes, max_page_size_rows, - page_alignment(compression_codec), + compress_required_chunk_alignment(compression), write_v2_headers, nullptr, nullptr, @@ -1247,16 +1258,13 @@ auto init_page_sizes(hostdevice_2dvector& chunks, return comp_page_sizes; } -size_t max_page_bytes(Compression compression, size_t max_page_size_bytes) +size_t max_page_bytes(compression_type compression, size_t max_page_size_bytes) { - if (compression == Compression::UNCOMPRESSED) { return max_page_size_bytes; } + if (compression == compression_type::NONE) { return max_page_size_bytes; } - auto const ncomp_type = to_nvcomp_compression_type(compression); - auto const nvcomp_limit = nvcomp::is_compression_disabled(ncomp_type) - ? std::nullopt - : nvcomp::compress_max_allowed_chunk_size(ncomp_type); + auto const comp_limit = compress_max_allowed_chunk_size(compression); - auto max_size = std::min(nvcomp_limit.value_or(max_page_size_bytes), max_page_size_bytes); + auto max_size = std::min(comp_limit.value_or(max_page_size_bytes), max_page_size_bytes); // page size must fit in a 32-bit signed integer return std::min(max_size, std::numeric_limits::max()); } @@ -1265,7 +1273,7 @@ std::pair>, std::vector& chunks, host_span col_desc, device_2dspan frags, - Compression compression, + compression_type compression, dictionary_policy dict_policy, size_t max_dict_size, rmm::cuda_stream_view stream) @@ -1404,7 +1412,7 @@ build_chunk_dictionaries(hostdevice_2dvector& chunks, * @param num_columns Total number of columns * @param num_pages Total number of pages * @param num_stats_bfr Number of statistics buffers - * @param compression Compression format + * @param alignment Page alignment * @param max_page_size_bytes Maximum uncompressed page size, in bytes * @param max_page_size_rows Maximum page size, in rows * @param write_v2_headers True if version 2 page headers are to be written @@ -1419,7 +1427,7 @@ void init_encoder_pages(hostdevice_2dvector& chunks, uint32_t num_columns, uint32_t num_pages, uint32_t num_stats_bfr, - Compression compression, + size_t alignment, size_t max_page_size_bytes, size_type max_page_size_rows, bool write_v2_headers, @@ -1435,7 +1443,7 @@ void init_encoder_pages(hostdevice_2dvector& chunks, num_columns, max_page_size_bytes, max_page_size_rows, - page_alignment(compression), + alignment, write_v2_headers, (num_stats_bfr) ? page_stats_mrg.data() : nullptr, (num_stats_bfr > num_pages) ? page_stats_mrg.data() + num_pages : nullptr, @@ -1478,7 +1486,7 @@ void encode_pages(hostdevice_2dvector& chunks, statistics_chunk const* chunk_stats, statistics_chunk const* column_stats, std::optional& comp_stats, - Compression compression, + compression_type compression, int32_t column_index_truncate_length, bool write_v2_headers, rmm::cuda_stream_view stream) @@ -1488,7 +1496,7 @@ void encode_pages(hostdevice_2dvector& chunks, ? device_span(page_stats, num_pages) : device_span(); - uint32_t max_comp_pages = (compression != Compression::UNCOMPRESSED) ? num_pages : 0; + uint32_t max_comp_pages = (compression != compression_type::NONE) ? num_pages : 0; rmm::device_uvector> comp_in(max_comp_pages, stream); rmm::device_uvector> comp_out(max_comp_pages, stream); @@ -1499,34 +1507,7 @@ void encode_pages(hostdevice_2dvector& chunks, compression_result{0, compression_status::FAILURE}); EncodePages(pages, write_v2_headers, comp_in, comp_out, comp_res, stream); - switch (compression) { - case Compression::SNAPPY: - if (nvcomp::is_compression_disabled(nvcomp::compression_type::SNAPPY)) { - gpu_snap(comp_in, comp_out, comp_res, stream); - } else { - nvcomp::batched_compress( - nvcomp::compression_type::SNAPPY, comp_in, comp_out, comp_res, stream); - } - break; - case Compression::ZSTD: { - if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::ZSTD); - reason) { - CUDF_FAIL("Compression error: " + reason.value()); - } - nvcomp::batched_compress(nvcomp::compression_type::ZSTD, comp_in, comp_out, comp_res, stream); - break; - } - case Compression::LZ4_RAW: { - if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::LZ4); - reason) { - CUDF_FAIL("Compression error: " + reason.value()); - } - nvcomp::batched_compress(nvcomp::compression_type::LZ4, comp_in, comp_out, comp_res, stream); - break; - } - case Compression::UNCOMPRESSED: break; - default: CUDF_FAIL("invalid compression type"); - } + compress(compression, comp_in, comp_out, comp_res, stream); // TBD: Not clear if the official spec actually allows dynamically turning off compression at the // chunk-level @@ -1744,7 +1725,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, size_type max_page_size_rows, int32_t column_index_truncate_length, statistics_freq stats_granularity, - Compression compression, + compression_type compression, bool collect_compression_statistics, dictionary_policy dict_policy, size_t max_dictionary_size, @@ -2146,7 +2127,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, } // Clear compressed buffer size if compression has been turned off - if (compression == Compression::UNCOMPRESSED) { max_comp_bfr_size = 0; } + if (compression == compression_type::NONE) { max_comp_bfr_size = 0; } // Initialize data pointers uint32_t const num_stats_bfr = @@ -2214,7 +2195,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, num_columns, num_pages, num_stats_bfr, - compression, + compress_required_chunk_alignment(compression), max_page_size_bytes, max_page_size_rows, write_v2_headers, @@ -2270,7 +2251,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, auto const dev_bfr = ck.is_compressed ? ck.compressed_bfr : ck.uncompressed_bfr; auto& column_chunk_meta = row_group.columns[i].meta_data; - if (ck.is_compressed) { column_chunk_meta.codec = compression; } + if (ck.is_compressed) { column_chunk_meta.codec = to_parquet_compression(compression); } if (!out_sink[p]->is_device_write_preferred(ck.compressed_size)) { all_device_write = false; } @@ -2375,7 +2356,7 @@ writer::impl::impl(std::vector> sinks, single_write_mode mode, rmm::cuda_stream_view stream) : _stream(stream), - _compression(to_parquet_compression(options.get_compression())), + _compression(options.get_compression()), _max_row_group_size{options.get_row_group_size_bytes()}, _max_row_group_rows{options.get_row_group_size_rows()}, _max_page_size_bytes(max_page_bytes(_compression, options.get_max_page_size_bytes())), @@ -2406,7 +2387,7 @@ writer::impl::impl(std::vector> sinks, single_write_mode mode, rmm::cuda_stream_view stream) : _stream(stream), - _compression(to_parquet_compression(options.get_compression())), + _compression(options.get_compression()), _max_row_group_size{options.get_row_group_size_bytes()}, _max_row_group_rows{options.get_row_group_size_rows()}, _max_page_size_bytes(max_page_bytes(_compression, options.get_max_page_size_bytes())), diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index 63128faf993..d5a5a534b93 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -144,7 +144,7 @@ class writer::impl { rmm::cuda_stream_view _stream; // Writer options. - Compression const _compression; + compression_type const _compression; size_t const _max_row_group_size; size_type const _max_row_group_rows; size_t const _max_page_size_bytes; diff --git a/cpp/src/io/parquet/writer_impl_helpers.cpp b/cpp/src/io/parquet/writer_impl_helpers.cpp index f15ea1f3c37..ede788c97c2 100644 --- a/cpp/src/io/parquet/writer_impl_helpers.cpp +++ b/cpp/src/io/parquet/writer_impl_helpers.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,8 +21,6 @@ #include "writer_impl_helpers.hpp" -#include "io/comp/nvcomp_adapter.hpp" - #include #include #include @@ -32,48 +30,6 @@ namespace cudf::io::parquet::detail { using namespace cudf::io::detail; -Compression to_parquet_compression(compression_type compression) -{ - switch (compression) { - case compression_type::AUTO: - case compression_type::SNAPPY: return Compression::SNAPPY; - case compression_type::ZSTD: return Compression::ZSTD; - case compression_type::LZ4: - // Parquet refers to LZ4 as "LZ4_RAW"; Parquet's "LZ4" is not standard LZ4 - return Compression::LZ4_RAW; - case compression_type::NONE: return Compression::UNCOMPRESSED; - default: CUDF_FAIL("Unsupported compression type"); - } -} - -nvcomp::compression_type to_nvcomp_compression_type(Compression codec) -{ - switch (codec) { - case Compression::SNAPPY: return nvcomp::compression_type::SNAPPY; - case Compression::ZSTD: return nvcomp::compression_type::ZSTD; - // Parquet refers to LZ4 as "LZ4_RAW"; Parquet's "LZ4" is not standard LZ4 - case Compression::LZ4_RAW: return nvcomp::compression_type::LZ4; - default: CUDF_FAIL("Unsupported compression type"); - } -} - -uint32_t page_alignment(Compression codec) -{ - if (codec == Compression::UNCOMPRESSED or - nvcomp::is_compression_disabled(to_nvcomp_compression_type(codec))) { - return 1u; - } - - return nvcomp::required_alignment(to_nvcomp_compression_type(codec)); -} - -size_t max_compression_output_size(Compression codec, uint32_t compression_blocksize) -{ - if (codec == Compression::UNCOMPRESSED) return 0; - - return compress_max_output_chunk_size(to_nvcomp_compression_type(codec), compression_blocksize); -} - void fill_table_meta(table_input_metadata& table_meta) { // Fill unnamed columns' names in table_meta diff --git a/cpp/src/io/parquet/writer_impl_helpers.hpp b/cpp/src/io/parquet/writer_impl_helpers.hpp index 14a9a0ed5b7..b5c73c348fe 100644 --- a/cpp/src/io/parquet/writer_impl_helpers.hpp +++ b/cpp/src/io/parquet/writer_impl_helpers.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,48 +20,12 @@ */ #pragma once -#include "parquet_common.hpp" #include #include -#include namespace cudf::io::parquet::detail { -/** - * @brief Function that translates GDF compression to parquet compression. - * - * @param compression The compression type - * @return The supported Parquet compression - */ -Compression to_parquet_compression(compression_type compression); - -/** - * @brief Function that translates the given compression codec to nvcomp compression type. - * - * @param codec Compression codec - * @return Translated nvcomp compression type - */ -cudf::io::detail::nvcomp::compression_type to_nvcomp_compression_type(Compression codec); - -/** - * @brief Function that computes input alignment requirements for the given compression type. - * - * @param codec Compression codec - * @return Required alignment - */ -uint32_t page_alignment(Compression codec); - -/** - * @brief Gets the maximum compressed chunk size for the largest chunk uncompressed chunk in the - * batch. - * - * @param codec Compression codec - * @param compression_blocksize Size of the largest uncompressed chunk in the batch - * @return Maximum compressed chunk size - */ -size_t max_compression_output_size(Compression codec, uint32_t compression_blocksize); - /** * @brief Fill the table metadata with default column names. * diff --git a/cpp/tests/io/orc_test.cpp b/cpp/tests/io/orc_test.cpp index 2209a30149d..708c2045a74 100644 --- a/cpp/tests/io/orc_test.cpp +++ b/cpp/tests/io/orc_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -2068,6 +2068,7 @@ TEST_P(OrcCompressionTest, Basic) INSTANTIATE_TEST_CASE_P(OrcCompressionTest, OrcCompressionTest, ::testing::Values(cudf::io::compression_type::NONE, + cudf::io::compression_type::AUTO, cudf::io::compression_type::SNAPPY, cudf::io::compression_type::LZ4, cudf::io::compression_type::ZSTD)); diff --git a/cpp/tests/io/parquet_misc_test.cpp b/cpp/tests/io/parquet_misc_test.cpp index d66f685cd9c..419ac909ac6 100644 --- a/cpp/tests/io/parquet_misc_test.cpp +++ b/cpp/tests/io/parquet_misc_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * Copyright (c) 2023-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -268,6 +268,7 @@ TEST_P(ParquetCompressionTest, Basic) INSTANTIATE_TEST_CASE_P(ParquetCompressionTest, ParquetCompressionTest, ::testing::Values(cudf::io::compression_type::NONE, + cudf::io::compression_type::AUTO, cudf::io::compression_type::SNAPPY, cudf::io::compression_type::LZ4, cudf::io::compression_type::ZSTD));