From bbf4f7824c23c0c482f52bafdf1ece1213da2f65 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Mon, 13 Jan 2025 11:44:54 -0800 Subject: [PATCH 1/4] Host compression (#17656) Add compression APIs to make the nvCOMP use transparent. Remove direct dependency on nvCOMP in the ORC and Parquet writers. Add multi-threaded host-side compression; currently off by default, can only be enabled via `LIBCUDF_USE_HOST_COMPRESSION` environment variable. Currently the host compression adds D2H + H2D transfers. Avoiding the H2D transfer requires large changes to the writers. Also moved handling of the AUTO compression type to the options classes, which should own such defaults (translate AUTO to SNAPPY in this case). Authors: - Vukasin Milovanovic (https://github.com/vuule) Approvers: - Yunsong Wang (https://github.com/PointKernel) - Shruti Shivakumar (https://github.com/shrshi) - Vyas Ramasubramani (https://github.com/vyasr) URL: https://github.com/rapidsai/cudf/pull/17656 --- cpp/CMakeLists.txt | 2 +- cpp/include/cudf/io/orc.hpp | 22 ++- cpp/src/io/comp/comp.cpp | 163 ++++++++++++++++++++- cpp/src/io/comp/{statistics.cu => comp.cu} | 4 +- cpp/src/io/comp/comp.hpp | 54 ++++++- cpp/src/io/comp/gpuinflate.hpp | 15 +- cpp/src/io/functions.cpp | 3 +- cpp/src/io/orc/orc_gpu.hpp | 4 +- cpp/src/io/orc/stripe_enc.cu | 49 +------ cpp/src/io/orc/writer_impl.cu | 144 ++++++------------ cpp/src/io/orc/writer_impl.hpp | 4 +- cpp/src/io/parquet/writer_impl.cu | 99 +++++-------- cpp/src/io/parquet/writer_impl.hpp | 4 +- cpp/src/io/parquet/writer_impl_helpers.cpp | 46 +----- cpp/src/io/parquet/writer_impl_helpers.hpp | 38 +---- cpp/tests/io/orc_test.cpp | 3 +- cpp/tests/io/parquet_misc_test.cpp | 3 +- 17 files changed, 338 insertions(+), 319 deletions(-) rename cpp/src/io/comp/{statistics.cu => comp.cu} (96%) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 9dabe4e8800..252cc7897d8 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -477,13 +477,13 @@ add_library( src/io/avro/reader_impl.cu src/io/comp/brotli_dict.cpp src/io/comp/comp.cpp + src/io/comp/comp.cu src/io/comp/cpu_unbz2.cpp src/io/comp/debrotli.cu src/io/comp/gpuinflate.cu src/io/comp/nvcomp_adapter.cpp src/io/comp/nvcomp_adapter.cu src/io/comp/snap.cu - src/io/comp/statistics.cu src/io/comp/uncomp.cpp src/io/comp/unsnap.cu src/io/csv/csv_gpu.cu diff --git a/cpp/include/cudf/io/orc.hpp b/cpp/include/cudf/io/orc.hpp index 163fa20806d..82f7761da2e 100644 --- a/cpp/include/cudf/io/orc.hpp +++ b/cpp/include/cudf/io/orc.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2024, NVIDIA CORPORATION. + * Copyright (c) 2020-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -578,7 +578,7 @@ class orc_writer_options { // Specify the sink to use for writer output sink_info _sink; // Specify the compression format to use - compression_type _compression = compression_type::AUTO; + compression_type _compression = compression_type::SNAPPY; // Specify frequency of statistics collection statistics_freq _stats_freq = ORC_STATISTICS_ROW_GROUP; // Maximum size of each stripe (unless smaller than a single row group) @@ -733,7 +733,11 @@ class orc_writer_options { * * @param comp Compression type */ - void set_compression(compression_type comp) { _compression = comp; } + void set_compression(compression_type comp) + { + _compression = comp; + if (comp == compression_type::AUTO) { _compression = compression_type::SNAPPY; } + } /** * @brief Choose granularity of statistics collection. @@ -865,7 +869,7 @@ class orc_writer_options_builder { */ orc_writer_options_builder& compression(compression_type comp) { - options._compression = comp; + options.set_compression(comp); return *this; } @@ -1026,7 +1030,7 @@ class chunked_orc_writer_options { // Specify the sink to use for writer output sink_info _sink; // Specify the compression format to use - compression_type _compression = compression_type::AUTO; + compression_type _compression = compression_type::SNAPPY; // Specify granularity of statistics collection statistics_freq _stats_freq = ORC_STATISTICS_ROW_GROUP; // Maximum size of each stripe (unless smaller than a single row group) @@ -1157,7 +1161,11 @@ class chunked_orc_writer_options { * * @param comp The compression type to use */ - void set_compression(compression_type comp) { _compression = comp; } + void set_compression(compression_type comp) + { + _compression = comp; + if (comp == compression_type::AUTO) { _compression = compression_type::SNAPPY; } + } /** * @brief Choose granularity of statistics collection @@ -1279,7 +1287,7 @@ class chunked_orc_writer_options_builder { */ chunked_orc_writer_options_builder& compression(compression_type comp) { - options._compression = comp; + options.set_compression(comp); return *this; } diff --git a/cpp/src/io/comp/comp.cpp b/cpp/src/io/comp/comp.cpp index 26535bed43b..3800835eaf1 100644 --- a/cpp/src/io/comp/comp.cpp +++ b/cpp/src/io/comp/comp.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2024, NVIDIA CORPORATION. + * Copyright (c) 2018-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,22 +16,45 @@ #include "comp.hpp" +#include "gpuinflate.hpp" +#include "io/utilities/getenv_or.hpp" #include "io/utilities/hostdevice_vector.hpp" #include "nvcomp_adapter.hpp" #include #include +#include #include #include #include #include +#include #include // GZIP compression namespace cudf::io::detail { namespace { +auto& h_comp_pool() +{ + static std::size_t pool_size = + getenv_or("LIBCUDF_HOST_COMPRESSION_NUM_THREADS", std::thread::hardware_concurrency()); + static BS::thread_pool pool(pool_size); + return pool; +} + +std::optional to_nvcomp_compression(compression_type compression) +{ + switch (compression) { + case compression_type::SNAPPY: return nvcomp::compression_type::SNAPPY; + case compression_type::ZSTD: return nvcomp::compression_type::ZSTD; + case compression_type::LZ4: return nvcomp::compression_type::LZ4; + case compression_type::ZLIB: return nvcomp::compression_type::DEFLATE; + default: return std::nullopt; + } +} + /** * @brief GZIP host compressor (includes header) */ @@ -98,8 +121,132 @@ std::vector compress_snappy(host_span src, return cudf::detail::make_std_vector_sync(d_dst, stream); } +void device_compress(compression_type compression, + device_span const> inputs, + device_span const> outputs, + device_span results, + rmm::cuda_stream_view stream) +{ + if (compression == compression_type::NONE) { return; } + + auto const nvcomp_type = to_nvcomp_compression(compression); + auto nvcomp_disabled = nvcomp_type.has_value() ? nvcomp::is_compression_disabled(*nvcomp_type) + : "invalid compression type"; + if (not nvcomp_disabled) { + return nvcomp::batched_compress(*nvcomp_type, inputs, outputs, results, stream); + } + + switch (compression) { + case compression_type::SNAPPY: return gpu_snap(inputs, outputs, results, stream); + default: CUDF_FAIL("Compression error: " + nvcomp_disabled.value()); + } +} + +void host_compress(compression_type compression, + device_span const> inputs, + device_span const> outputs, + device_span results, + rmm::cuda_stream_view stream) +{ + if (compression == compression_type::NONE) { return; } + + auto const num_chunks = inputs.size(); + auto h_results = cudf::detail::make_host_vector(num_chunks, stream); + auto const h_inputs = cudf::detail::make_host_vector_async(inputs, stream); + auto const h_outputs = cudf::detail::make_host_vector_async(outputs, stream); + stream.synchronize(); + + std::vector> tasks; + auto const num_streams = + std::min({num_chunks, + cudf::detail::global_cuda_stream_pool().get_stream_pool_size(), + h_comp_pool().get_thread_count()}); + auto const streams = cudf::detail::fork_streams(stream, num_streams); + for (size_t i = 0; i < num_chunks; ++i) { + auto const cur_stream = streams[i % streams.size()]; + auto task = [d_in = h_inputs[i], d_out = h_outputs[i], cur_stream, compression]() -> size_t { + auto const h_in = cudf::detail::make_host_vector_sync(d_in, cur_stream); + auto const h_out = compress(compression, h_in, cur_stream); + cudf::detail::cuda_memcpy(d_out.subspan(0, h_out.size()), h_out, cur_stream); + return h_out.size(); + }; + tasks.emplace_back(h_comp_pool().submit_task(std::move(task))); + } + + for (auto i = 0ul; i < num_chunks; ++i) { + h_results[i] = {tasks[i].get(), compression_status::SUCCESS}; + } + cudf::detail::cuda_memcpy_async(results, h_results, stream); +} + +[[nodiscard]] bool host_compression_supported(compression_type compression) +{ + switch (compression) { + case compression_type::GZIP: + case compression_type::NONE: return true; + default: return false; + } +} + +[[nodiscard]] bool device_compression_supported(compression_type compression) +{ + auto const nvcomp_type = to_nvcomp_compression(compression); + switch (compression) { + case compression_type::LZ4: + case compression_type::ZLIB: + case compression_type::ZSTD: return not nvcomp::is_compression_disabled(nvcomp_type.value()); + case compression_type::SNAPPY: + case compression_type::NONE: return true; + default: return false; + } +} + +[[nodiscard]] bool use_host_compression( + compression_type compression, + [[maybe_unused]] device_span const> inputs, + [[maybe_unused]] device_span const> outputs) +{ + CUDF_EXPECTS( + not host_compression_supported(compression) or device_compression_supported(compression), + "Unsupported compression type"); + if (not host_compression_supported(compression)) { return false; } + if (not device_compression_supported(compression)) { return true; } + // If both host and device compression are supported, use the host if the env var is set + return getenv_or("LIBCUDF_USE_HOST_COMPRESSION", 0); +} + } // namespace +std::optional compress_max_allowed_chunk_size(compression_type compression) +{ + if (auto nvcomp_type = to_nvcomp_compression(compression); + nvcomp_type.has_value() and not nvcomp::is_compression_disabled(*nvcomp_type)) { + return nvcomp::compress_max_allowed_chunk_size(*nvcomp_type); + } + return std::nullopt; +} + +[[nodiscard]] size_t compress_required_chunk_alignment(compression_type compression) +{ + auto nvcomp_type = to_nvcomp_compression(compression); + if (compression == compression_type::NONE or not nvcomp_type.has_value() or + nvcomp::is_compression_disabled(*nvcomp_type)) { + return 1ul; + } + + return nvcomp::required_alignment(*nvcomp_type); +} + +[[nodiscard]] size_t max_compressed_size(compression_type compression, uint32_t uncompressed_size) +{ + if (compression == compression_type::NONE) { return uncompressed_size; } + + if (auto nvcomp_type = to_nvcomp_compression(compression); nvcomp_type.has_value()) { + return nvcomp::compress_max_output_chunk_size(*nvcomp_type, uncompressed_size); + } + CUDF_FAIL("Unsupported compression type"); +} + std::vector compress(compression_type compression, host_span src, rmm::cuda_stream_view stream) @@ -112,4 +259,18 @@ std::vector compress(compression_type compression, } } +void compress(compression_type compression, + device_span const> inputs, + device_span const> outputs, + device_span results, + rmm::cuda_stream_view stream) +{ + CUDF_FUNC_RANGE(); + if (use_host_compression(compression, inputs, outputs)) { + return host_compress(compression, inputs, outputs, results, stream); + } else { + return device_compress(compression, inputs, outputs, results, stream); + } +} + } // namespace cudf::io::detail diff --git a/cpp/src/io/comp/statistics.cu b/cpp/src/io/comp/comp.cu similarity index 96% rename from cpp/src/io/comp/statistics.cu rename to cpp/src/io/comp/comp.cu index caee9145d2c..af0f73869a2 100644 --- a/cpp/src/io/comp/statistics.cu +++ b/cpp/src/io/comp/comp.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * Copyright (c) 2023-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "gpuinflate.hpp" +#include "comp.hpp" #include diff --git a/cpp/src/io/comp/comp.hpp b/cpp/src/io/comp/comp.hpp index e16f26e1f06..90932a11499 100644 --- a/cpp/src/io/comp/comp.hpp +++ b/cpp/src/io/comp/comp.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -57,5 +57,57 @@ std::vector compress(compression_type compression, host_span src, rmm::cuda_stream_view stream); +/** + * @brief Maximum size of uncompressed chunks that can be compressed. + * + * @param compression Compression type + * @returns maximum chunk size + */ +[[nodiscard]] std::optional compress_max_allowed_chunk_size(compression_type compression); + +/** + * @brief Gets input and output alignment requirements for the given compression type. + * + * @param compression Compression type + * @returns required alignment + */ +[[nodiscard]] size_t compress_required_chunk_alignment(compression_type compression); + +/** + * @brief Gets the maximum size any chunk could compress to in the batch. + * + * @param compression Compression type + * @param uncompressed_size Size of the largest uncompressed chunk in the batch + */ +[[nodiscard]] size_t max_compressed_size(compression_type compression, uint32_t uncompressed_size); + +/** + * @brief Compresses device memory buffers. + * + * @param compression Type of compression of the input data + * @param inputs Device memory buffers to compress + * @param outputs Device memory buffers to store the compressed output + * @param results Compression results + * @param stream CUDA stream used for device memory operations and kernel launches + */ +void compress(compression_type compression, + device_span const> inputs, + device_span const> outputs, + device_span results, + rmm::cuda_stream_view stream); + +/** + * @brief Aggregate results of compression into a single statistics object. + * + * @param inputs List of uncompressed input buffers + * @param results List of compression results + * @param stream CUDA stream to use + * @return writer_compression_statistics + */ +[[nodiscard]] writer_compression_statistics collect_compression_statistics( + device_span const> inputs, + device_span results, + rmm::cuda_stream_view stream); + } // namespace io::detail } // namespace CUDF_EXPORT cudf diff --git a/cpp/src/io/comp/gpuinflate.hpp b/cpp/src/io/comp/gpuinflate.hpp index 4b09bd5a84c..0a35b230242 100644 --- a/cpp/src/io/comp/gpuinflate.hpp +++ b/cpp/src/io/comp/gpuinflate.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2024, NVIDIA CORPORATION. + * Copyright (c) 2018-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -124,17 +124,4 @@ void gpu_snap(device_span const> inputs, device_span results, rmm::cuda_stream_view stream); -/** - * @brief Aggregate results of compression into a single statistics object. - * - * @param inputs List of uncompressed input buffers - * @param results List of compression results - * @param stream CUDA stream to use - * @return writer_compression_statistics - */ -[[nodiscard]] writer_compression_statistics collect_compression_statistics( - device_span const> inputs, - device_span results, - rmm::cuda_stream_view stream); - } // namespace cudf::io::detail diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 88423122e16..d63fa9f5c35 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -766,6 +766,7 @@ void parquet_writer_options_base::set_stats_level(statistics_freq sf) { _stats_l void parquet_writer_options_base::set_compression(compression_type compression) { _compression = compression; + if (compression == compression_type::AUTO) { _compression = compression_type::SNAPPY; } } void parquet_writer_options_base::enable_int96_timestamps(bool req) diff --git a/cpp/src/io/orc/orc_gpu.hpp b/cpp/src/io/orc/orc_gpu.hpp index f4e75f78dec..8b30cee6681 100644 --- a/cpp/src/io/orc/orc_gpu.hpp +++ b/cpp/src/io/orc/orc_gpu.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -407,7 +407,7 @@ void CompactOrcDataStreams(device_2dspan strm_desc, std::optional CompressOrcDataStreams( device_span compressed_data, uint32_t num_compressed_blocks, - CompressionKind compression, + compression_type compression, uint32_t comp_blk_size, uint32_t max_comp_blk_size, uint32_t comp_block_align, diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 79ecca0ca99..4f296bb5bfc 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,7 +15,6 @@ */ #include "io/comp/gpuinflate.hpp" -#include "io/comp/nvcomp_adapter.hpp" #include "io/utilities/block_utils.cuh" #include "io/utilities/time_utils.cuh" #include "orc_gpu.hpp" @@ -45,8 +44,6 @@ namespace io { namespace orc { namespace gpu { -namespace nvcomp = cudf::io::detail::nvcomp; - using cudf::detail::device_2dspan; using cudf::io::detail::compression_result; using cudf::io::detail::compression_status; @@ -1362,7 +1359,7 @@ void CompactOrcDataStreams(device_2dspan strm_desc, std::optional CompressOrcDataStreams( device_span compressed_data, uint32_t num_compressed_blocks, - CompressionKind compression, + compression_type compression, uint32_t comp_blk_size, uint32_t max_comp_blk_size, uint32_t comp_block_align, @@ -1387,47 +1384,7 @@ std::optional CompressOrcDataStreams( max_comp_blk_size, comp_block_align); - if (compression == SNAPPY) { - try { - if (nvcomp::is_compression_disabled(nvcomp::compression_type::SNAPPY)) { - cudf::io::detail::gpu_snap(comp_in, comp_out, comp_res, stream); - } else { - nvcomp::batched_compress( - nvcomp::compression_type::SNAPPY, comp_in, comp_out, comp_res, stream); - } - } catch (...) { - // There was an error in compressing so set an error status for each block - thrust::for_each( - rmm::exec_policy(stream), - comp_res.begin(), - comp_res.end(), - [] __device__(compression_result & stat) { stat.status = compression_status::FAILURE; }); - // Since SNAPPY is the default compression (may not be explicitly requested), fall back to - // writing without compression - CUDF_LOG_WARN("ORC writer: compression failed, writing uncompressed data"); - } - } else if (compression == ZLIB) { - if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::DEFLATE); - reason) { - CUDF_FAIL("Compression error: " + reason.value()); - } - nvcomp::batched_compress( - nvcomp::compression_type::DEFLATE, comp_in, comp_out, comp_res, stream); - } else if (compression == ZSTD) { - if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::ZSTD); - reason) { - CUDF_FAIL("Compression error: " + reason.value()); - } - nvcomp::batched_compress(nvcomp::compression_type::ZSTD, comp_in, comp_out, comp_res, stream); - } else if (compression == LZ4) { - if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::LZ4); - reason) { - CUDF_FAIL("Compression error: " + reason.value()); - } - nvcomp::batched_compress(nvcomp::compression_type::LZ4, comp_in, comp_out, comp_res, stream); - } else if (compression != NONE) { - CUDF_FAIL("Unsupported compression type"); - } + cudf::io::detail::compress(compression, comp_in, comp_out, comp_res, stream); dim3 dim_block_compact(1024, 1); gpuCompactCompressedBlocks<<>>( diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index ce868b83c04..aa0b509981a 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,6 @@ * @brief cuDF-IO ORC writer class implementation */ -#include "io/comp/nvcomp_adapter.hpp" #include "io/orc/orc_gpu.hpp" #include "io/statistics/column_statistics.cuh" #include "io/utilities/column_utils.cuh" @@ -71,8 +70,6 @@ namespace cudf::io::orc::detail { -namespace nvcomp = cudf::io::detail::nvcomp; - template [[nodiscard]] constexpr int varint_size(T val) { @@ -92,21 +89,8 @@ struct row_group_index_info { }; namespace { - /** - * @brief Translates ORC compression to nvCOMP compression - */ -auto to_nvcomp_compression_type(CompressionKind compression_kind) -{ - if (compression_kind == SNAPPY) return nvcomp::compression_type::SNAPPY; - if (compression_kind == ZLIB) return nvcomp::compression_type::DEFLATE; - if (compression_kind == ZSTD) return nvcomp::compression_type::ZSTD; - if (compression_kind == LZ4) return nvcomp::compression_type::LZ4; - CUDF_FAIL("Unsupported compression type"); -} - -/** - * @brief Translates cuDF compression to ORC compression + * @brief Translates cuDF compression to ORC compression. */ orc::CompressionKind to_orc_compression(compression_type compression) { @@ -122,19 +106,14 @@ orc::CompressionKind to_orc_compression(compression_type compression) } /** - * @brief Returns the block size for a given compression kind. + * @brief Returns the block size for a given compression format. */ -constexpr size_t compression_block_size(orc::CompressionKind compression) +size_t compression_block_size(compression_type compression) { - if (compression == orc::CompressionKind::NONE) { return 0; } - - auto const ncomp_type = to_nvcomp_compression_type(compression); - auto const nvcomp_limit = nvcomp::is_compression_disabled(ncomp_type) - ? std::nullopt - : nvcomp::compress_max_allowed_chunk_size(ncomp_type); + auto const comp_limit = compress_max_allowed_chunk_size(compression); constexpr size_t max_block_size = 256 * 1024; - return std::min(nvcomp_limit.value_or(max_block_size), max_block_size); + return std::min(comp_limit.value_or(max_block_size), max_block_size); } /** @@ -534,26 +513,6 @@ size_t RLE_stream_size(TypeKind kind, size_t count) } } -auto uncomp_block_alignment(CompressionKind compression_kind) -{ - if (compression_kind == NONE or - nvcomp::is_compression_disabled(to_nvcomp_compression_type(compression_kind))) { - return 1ul; - } - - return nvcomp::required_alignment(to_nvcomp_compression_type(compression_kind)); -} - -auto comp_block_alignment(CompressionKind compression_kind) -{ - if (compression_kind == NONE or - nvcomp::is_compression_disabled(to_nvcomp_compression_type(compression_kind))) { - return 1ul; - } - - return nvcomp::required_alignment(to_nvcomp_compression_type(compression_kind)); -} - /** * @brief Builds up per-column streams. * @@ -566,7 +525,7 @@ orc_streams create_streams(host_span columns, file_segmentation const& segmentation, std::map const& decimal_column_sizes, bool enable_dictionary, - CompressionKind compression_kind, + compression_type compression, single_write_mode write_mode) { // 'column 0' row index stream @@ -610,7 +569,7 @@ orc_streams create_streams(host_span columns, auto add_stream = [&](gpu::StreamIndexType index_type, StreamKind kind, TypeKind type_kind, size_t size) { - auto const max_alignment_padding = uncomp_block_alignment(compression_kind) - 1; + auto const max_alignment_padding = compress_required_chunk_alignment(compression) - 1; const auto base = column.index() * gpu::CI_NUM_STREAMS; ids[base + index_type] = streams.size(); streams.push_back(orc::Stream{ @@ -1473,7 +1432,7 @@ encoded_footer_statistics finish_statistic_blobs(Footer const& footer, * @param[in] rg_stats row group level statistics * @param[in,out] stripe Stream's parent stripe * @param[in,out] streams List of all streams - * @param[in] compression_kind The compression kind + * @param[in] compression The compression format * @param[in] compression_blocksize The block size used for compression * @param[in] out_sink Sink for writing data */ @@ -1487,7 +1446,7 @@ void write_index_stream(int32_t stripe_id, host_span rg_stats, StripeInformation* stripe, orc_streams* streams, - CompressionKind compression_kind, + compression_type compression, size_t compression_blocksize, std::unique_ptr const& out_sink) { @@ -1501,7 +1460,7 @@ void write_index_stream(int32_t stripe_id, row_group_index_info record; if (stream.ids[type] > 0) { record.pos = 0; - if (compression_kind != NONE) { + if (compression != compression_type::NONE) { auto const& ss = strm_desc[stripe_id][stream.ids[type] - (columns.size() + 1)]; record.blk_pos = ss.first_block; record.comp_pos = 0; @@ -1541,7 +1500,7 @@ void write_index_stream(int32_t stripe_id, } } - ProtobufWriter pbw((compression_kind != NONE) ? 3 : 0); + ProtobufWriter pbw((compression != compression_type::NONE) ? 3 : 0); // Add row index entries auto const& rowgroups_range = segmentation.stripes[stripe_id]; @@ -1566,7 +1525,7 @@ void write_index_stream(int32_t stripe_id, }); (*streams)[stream_id].length = pbw.size(); - if (compression_kind != NONE) { + if (compression != compression_type::NONE) { uint32_t uncomp_ix_len = (uint32_t)((*streams)[stream_id].length - 3) * 2 + 1; pbw.buffer()[0] = static_cast(uncomp_ix_len >> 0); pbw.buffer()[1] = static_cast(uncomp_ix_len >> 8); @@ -1585,7 +1544,7 @@ void write_index_stream(int32_t stripe_id, * @param[in,out] bounce_buffer Pinned memory bounce buffer for D2H data transfer * @param[in,out] stripe Stream's parent stripe * @param[in,out] streams List of all streams - * @param[in] compression_kind The compression kind + * @param[in] compression The compression format * @param[in] out_sink Sink for writing data * @param[in] stream CUDA stream used for device memory operations and kernel launches * @return An std::future that should be synchronized to ensure the writing is complete @@ -1596,7 +1555,7 @@ std::future write_data_stream(gpu::StripeStream const& strm_desc, host_span bounce_buffer, StripeInformation* stripe, orc_streams* streams, - CompressionKind compression_kind, + compression_type compression, std::unique_ptr const& out_sink, rmm::cuda_stream_view stream) { @@ -1606,8 +1565,9 @@ std::future write_data_stream(gpu::StripeStream const& strm_desc, return std::async(std::launch::deferred, [] {}); } - auto const* stream_in = (compression_kind == NONE) ? enc_stream.data_ptrs[strm_desc.stream_type] - : (compressed_data + strm_desc.bfr_offset); + auto const* stream_in = (compression == compression_type::NONE) + ? enc_stream.data_ptrs[strm_desc.stream_type] + : (compressed_data + strm_desc.bfr_offset); auto write_task = [&]() { if (out_sink->is_device_write_preferred(length)) { @@ -1627,15 +1587,15 @@ std::future write_data_stream(gpu::StripeStream const& strm_desc, /** * @brief Insert 3-byte uncompressed block headers in a byte vector * - * @param compression_kind The compression kind + * @param compression The compression kind * @param compression_blocksize The block size used for compression * @param v The destitation byte vector to write, which must include initial 3-byte header */ -void add_uncompressed_block_headers(CompressionKind compression_kind, +void add_uncompressed_block_headers(compression_type compression, size_t compression_blocksize, std::vector& v) { - if (compression_kind != NONE) { + if (compression != compression_type::NONE) { size_t uncomp_len = v.size() - 3, pos = 0, block_len; while (uncomp_len > compression_blocksize) { block_len = compression_blocksize * 2 + 1; @@ -2021,14 +1981,6 @@ std::map decimal_column_sizes( return column_sizes; } -size_t max_compression_output_size(CompressionKind compression_kind, uint32_t compression_blocksize) -{ - if (compression_kind == NONE) return 0; - - return nvcomp::compress_max_output_chunk_size(to_nvcomp_compression_type(compression_kind), - compression_blocksize); -} - std::unique_ptr make_table_meta(table_view const& input) { auto table_meta = std::make_unique(input); @@ -2287,7 +2239,7 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table, * @param row_index_stride The row index stride * @param enable_dictionary Whether dictionary is enabled * @param sort_dictionaries Whether to sort the dictionaries - * @param compression_kind The compression kind + * @param compression The compression format * @param compression_blocksize The block size used for compression * @param stats_freq Column statistics granularity type for parquet/orc writers * @param collect_compression_stats Flag to indicate if compression statistics should be collected @@ -2302,7 +2254,7 @@ auto convert_table_to_orc_data(table_view const& input, size_type row_index_stride, bool enable_dictionary, bool sort_dictionaries, - CompressionKind compression_kind, + compression_type compression, size_t compression_blocksize, statistics_freq stats_freq, bool collect_compression_stats, @@ -2329,17 +2281,16 @@ auto convert_table_to_orc_data(table_view const& input, auto stripe_dicts = build_dictionaries(orc_table, segmentation, sort_dictionaries, stream); auto dec_chunk_sizes = decimal_chunk_sizes(orc_table, segmentation, stream); - auto const uncompressed_block_align = uncomp_block_alignment(compression_kind); - auto const compressed_block_align = comp_block_alignment(compression_kind); + auto const block_align = compress_required_chunk_alignment(compression); auto streams = create_streams(orc_table.columns, segmentation, decimal_column_sizes(dec_chunk_sizes.rg_sizes), enable_dictionary, - compression_kind, + compression, write_mode); auto enc_data = encode_columns( - orc_table, std::move(dec_chunk_sizes), segmentation, streams, uncompressed_block_align, stream); + orc_table, std::move(dec_chunk_sizes), segmentation, streams, block_align, stream); stripe_dicts.on_encode_complete(stream); @@ -2371,16 +2322,15 @@ auto convert_table_to_orc_data(table_view const& input, size_t compressed_bfr_size = 0; size_t num_compressed_blocks = 0; - auto const max_compressed_block_size = - max_compression_output_size(compression_kind, compression_blocksize); + auto const max_compressed_block_size = max_compressed_size(compression, compression_blocksize); auto const padded_max_compressed_block_size = - util::round_up_unsafe(max_compressed_block_size, compressed_block_align); + util::round_up_unsafe(max_compressed_block_size, block_align); auto const padded_block_header_size = - util::round_up_unsafe(block_header_size, compressed_block_align); + util::round_up_unsafe(block_header_size, block_align); for (auto& ss : strm_descs.host_view().flat_view()) { size_t stream_size = ss.stream_size; - if (compression_kind != NONE) { + if (compression != compression_type::NONE) { ss.first_block = num_compressed_blocks; ss.bfr_offset = compressed_bfr_size; @@ -2401,14 +2351,14 @@ auto convert_table_to_orc_data(table_view const& input, comp_results.d_begin(), comp_results.d_end(), compression_result{0, compression_status::FAILURE}); - if (compression_kind != NONE) { + if (compression != compression_type::NONE) { strm_descs.host_to_device_async(stream); compression_stats = gpu::CompressOrcDataStreams(compressed_data, num_compressed_blocks, - compression_kind, + compression, compression_blocksize, max_compressed_block_size, - compressed_block_align, + block_align, collect_compression_stats, strm_descs, enc_data.streams, @@ -2459,8 +2409,8 @@ writer::impl::impl(std::unique_ptr sink, : _stream(stream), _max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()}, _row_index_stride{options.get_row_index_stride()}, - _compression_kind(to_orc_compression(options.get_compression())), - _compression_blocksize(compression_block_size(_compression_kind)), + _compression{options.get_compression()}, + _compression_blocksize(compression_block_size(_compression)), _compression_statistics(options.get_compression_statistics()), _stats_freq(options.get_statistics_freq()), _sort_dictionaries{options.get_enable_dictionary_sort()}, @@ -2480,8 +2430,8 @@ writer::impl::impl(std::unique_ptr sink, : _stream(stream), _max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()}, _row_index_stride{options.get_row_index_stride()}, - _compression_kind(to_orc_compression(options.get_compression())), - _compression_blocksize(compression_block_size(_compression_kind)), + _compression{options.get_compression()}, + _compression_blocksize(compression_block_size(_compression)), _compression_statistics(options.get_compression_statistics()), _stats_freq(options.get_statistics_freq()), _sort_dictionaries{options.get_enable_dictionary_sort()}, @@ -2526,7 +2476,7 @@ void writer::impl::write(table_view const& input) _row_index_stride, _enable_dictionary, _sort_dictionaries, - _compression_kind, + _compression, _compression_blocksize, _stats_freq, _compression_statistics != nullptr, @@ -2613,7 +2563,7 @@ void writer::impl::write_orc_data_to_sink(encoded_data const& enc_data, rg_stats, &stripe, &streams, - _compression_kind, + _compression, _compression_blocksize, _out_sink); } @@ -2627,7 +2577,7 @@ void writer::impl::write_orc_data_to_sink(encoded_data const& enc_data, bounce_buffer, &stripe, &streams, - _compression_kind, + _compression, _out_sink, _stream)); } @@ -2645,10 +2595,10 @@ void writer::impl::write_orc_data_to_sink(encoded_data const& enc_data, : 0; if (orc_table.column(i - 1).orc_kind() == TIMESTAMP) { sf.writerTimezone = "UTC"; } } - ProtobufWriter pbw((_compression_kind != NONE) ? 3 : 0); + ProtobufWriter pbw((_compression != compression_type::NONE) ? 3 : 0); pbw.write(sf); stripe.footerLength = pbw.size(); - if (_compression_kind != NONE) { + if (_compression != compression_type::NONE) { uint32_t uncomp_sf_len = (stripe.footerLength - 3) * 2 + 1; pbw.buffer()[0] = static_cast(uncomp_sf_len >> 0); pbw.buffer()[1] = static_cast(uncomp_sf_len >> 8); @@ -2780,21 +2730,21 @@ void writer::impl::close() // Write statistics metadata if (not _orc_meta.stripeStats.empty()) { - ProtobufWriter pbw((_compression_kind != NONE) ? 3 : 0); + ProtobufWriter pbw((_compression != compression_type::NONE) ? 3 : 0); pbw.write(_orc_meta); - add_uncompressed_block_headers(_compression_kind, _compression_blocksize, pbw.buffer()); + add_uncompressed_block_headers(_compression, _compression_blocksize, pbw.buffer()); ps.metadataLength = pbw.size(); _out_sink->host_write(pbw.data(), pbw.size()); } else { ps.metadataLength = 0; } - ProtobufWriter pbw((_compression_kind != NONE) ? 3 : 0); + ProtobufWriter pbw((_compression != compression_type::NONE) ? 3 : 0); pbw.write(_footer); - add_uncompressed_block_headers(_compression_kind, _compression_blocksize, pbw.buffer()); + add_uncompressed_block_headers(_compression, _compression_blocksize, pbw.buffer()); // Write postscript metadata ps.footerLength = pbw.size(); - ps.compression = _compression_kind; + ps.compression = to_orc_compression(_compression); ps.compressionBlockSize = _compression_blocksize; ps.version = {0, 12}; // Hive 0.12 ps.writerVersion = cudf_writer_version; diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index cae849ee315..7d23482cb17 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -342,7 +342,7 @@ class writer::impl { // Writer options. stripe_size_limits const _max_stripe_size; size_type const _row_index_stride; - CompressionKind const _compression_kind; + compression_type const _compression; size_t const _compression_blocksize; std::shared_ptr _compression_statistics; // Optional output statistics_freq const _stats_freq; diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 77924ac0f35..1b67b53ae8e 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -23,8 +23,7 @@ #include "compact_protocol_reader.hpp" #include "compact_protocol_writer.hpp" #include "interop/decimal_conversion_utilities.cuh" -#include "io/comp/gpuinflate.hpp" -#include "io/comp/nvcomp_adapter.hpp" +#include "io/comp/comp.hpp" #include "io/parquet/parquet.hpp" #include "io/parquet/parquet_gpu.hpp" #include "io/statistics/column_statistics.cuh" @@ -67,6 +66,20 @@ namespace cudf::io::parquet::detail { using namespace cudf::io::detail; +Compression to_parquet_compression(compression_type compression) +{ + switch (compression) { + case compression_type::AUTO: + case compression_type::SNAPPY: return Compression::SNAPPY; + case compression_type::ZSTD: return Compression::ZSTD; + case compression_type::LZ4: + // Parquet refers to LZ4 as "LZ4_RAW"; Parquet's "LZ4" is not standard LZ4 + return Compression::LZ4_RAW; + case compression_type::NONE: return Compression::UNCOMPRESSED; + default: CUDF_FAIL("Unsupported compression type"); + } +} + struct aggregate_writer_metadata { aggregate_writer_metadata(host_span partitions, host_span const> kv_md, @@ -1172,7 +1185,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, size_t max_page_size_bytes, size_type max_page_size_rows, bool write_v2_headers, - Compression compression_codec, + compression_type compression, rmm::cuda_stream_view stream) { if (chunks.is_empty()) { return cudf::detail::hostdevice_vector{}; } @@ -1187,7 +1200,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, num_columns, max_page_size_bytes, max_page_size_rows, - page_alignment(compression_codec), + compress_required_chunk_alignment(compression), write_v2_headers, nullptr, nullptr, @@ -1212,7 +1225,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, num_columns, max_page_size_bytes, max_page_size_rows, - page_alignment(compression_codec), + compress_required_chunk_alignment(compression), write_v2_headers, nullptr, nullptr, @@ -1221,12 +1234,10 @@ auto init_page_sizes(hostdevice_2dvector& chunks, // Get per-page max compressed size cudf::detail::hostdevice_vector comp_page_sizes(num_pages, stream); - std::transform(page_sizes.begin(), - page_sizes.end(), - comp_page_sizes.begin(), - [compression_codec](auto page_size) { - return max_compression_output_size(compression_codec, page_size); - }); + std::transform( + page_sizes.begin(), page_sizes.end(), comp_page_sizes.begin(), [compression](auto page_size) { + return max_compressed_size(compression, page_size); + }); comp_page_sizes.host_to_device_async(stream); // Use per-page max compressed size to calculate chunk.compressed_size @@ -1238,7 +1249,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, num_columns, max_page_size_bytes, max_page_size_rows, - page_alignment(compression_codec), + compress_required_chunk_alignment(compression), write_v2_headers, nullptr, nullptr, @@ -1247,16 +1258,13 @@ auto init_page_sizes(hostdevice_2dvector& chunks, return comp_page_sizes; } -size_t max_page_bytes(Compression compression, size_t max_page_size_bytes) +size_t max_page_bytes(compression_type compression, size_t max_page_size_bytes) { - if (compression == Compression::UNCOMPRESSED) { return max_page_size_bytes; } + if (compression == compression_type::NONE) { return max_page_size_bytes; } - auto const ncomp_type = to_nvcomp_compression_type(compression); - auto const nvcomp_limit = nvcomp::is_compression_disabled(ncomp_type) - ? std::nullopt - : nvcomp::compress_max_allowed_chunk_size(ncomp_type); + auto const comp_limit = compress_max_allowed_chunk_size(compression); - auto max_size = std::min(nvcomp_limit.value_or(max_page_size_bytes), max_page_size_bytes); + auto max_size = std::min(comp_limit.value_or(max_page_size_bytes), max_page_size_bytes); // page size must fit in a 32-bit signed integer return std::min(max_size, std::numeric_limits::max()); } @@ -1265,7 +1273,7 @@ std::pair>, std::vector& chunks, host_span col_desc, device_2dspan frags, - Compression compression, + compression_type compression, dictionary_policy dict_policy, size_t max_dict_size, rmm::cuda_stream_view stream) @@ -1404,7 +1412,7 @@ build_chunk_dictionaries(hostdevice_2dvector& chunks, * @param num_columns Total number of columns * @param num_pages Total number of pages * @param num_stats_bfr Number of statistics buffers - * @param compression Compression format + * @param alignment Page alignment * @param max_page_size_bytes Maximum uncompressed page size, in bytes * @param max_page_size_rows Maximum page size, in rows * @param write_v2_headers True if version 2 page headers are to be written @@ -1419,7 +1427,7 @@ void init_encoder_pages(hostdevice_2dvector& chunks, uint32_t num_columns, uint32_t num_pages, uint32_t num_stats_bfr, - Compression compression, + size_t alignment, size_t max_page_size_bytes, size_type max_page_size_rows, bool write_v2_headers, @@ -1435,7 +1443,7 @@ void init_encoder_pages(hostdevice_2dvector& chunks, num_columns, max_page_size_bytes, max_page_size_rows, - page_alignment(compression), + alignment, write_v2_headers, (num_stats_bfr) ? page_stats_mrg.data() : nullptr, (num_stats_bfr > num_pages) ? page_stats_mrg.data() + num_pages : nullptr, @@ -1478,7 +1486,7 @@ void encode_pages(hostdevice_2dvector& chunks, statistics_chunk const* chunk_stats, statistics_chunk const* column_stats, std::optional& comp_stats, - Compression compression, + compression_type compression, int32_t column_index_truncate_length, bool write_v2_headers, rmm::cuda_stream_view stream) @@ -1488,7 +1496,7 @@ void encode_pages(hostdevice_2dvector& chunks, ? device_span(page_stats, num_pages) : device_span(); - uint32_t max_comp_pages = (compression != Compression::UNCOMPRESSED) ? num_pages : 0; + uint32_t max_comp_pages = (compression != compression_type::NONE) ? num_pages : 0; rmm::device_uvector> comp_in(max_comp_pages, stream); rmm::device_uvector> comp_out(max_comp_pages, stream); @@ -1499,34 +1507,7 @@ void encode_pages(hostdevice_2dvector& chunks, compression_result{0, compression_status::FAILURE}); EncodePages(pages, write_v2_headers, comp_in, comp_out, comp_res, stream); - switch (compression) { - case Compression::SNAPPY: - if (nvcomp::is_compression_disabled(nvcomp::compression_type::SNAPPY)) { - gpu_snap(comp_in, comp_out, comp_res, stream); - } else { - nvcomp::batched_compress( - nvcomp::compression_type::SNAPPY, comp_in, comp_out, comp_res, stream); - } - break; - case Compression::ZSTD: { - if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::ZSTD); - reason) { - CUDF_FAIL("Compression error: " + reason.value()); - } - nvcomp::batched_compress(nvcomp::compression_type::ZSTD, comp_in, comp_out, comp_res, stream); - break; - } - case Compression::LZ4_RAW: { - if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::LZ4); - reason) { - CUDF_FAIL("Compression error: " + reason.value()); - } - nvcomp::batched_compress(nvcomp::compression_type::LZ4, comp_in, comp_out, comp_res, stream); - break; - } - case Compression::UNCOMPRESSED: break; - default: CUDF_FAIL("invalid compression type"); - } + compress(compression, comp_in, comp_out, comp_res, stream); // TBD: Not clear if the official spec actually allows dynamically turning off compression at the // chunk-level @@ -1744,7 +1725,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, size_type max_page_size_rows, int32_t column_index_truncate_length, statistics_freq stats_granularity, - Compression compression, + compression_type compression, bool collect_compression_statistics, dictionary_policy dict_policy, size_t max_dictionary_size, @@ -2146,7 +2127,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, } // Clear compressed buffer size if compression has been turned off - if (compression == Compression::UNCOMPRESSED) { max_comp_bfr_size = 0; } + if (compression == compression_type::NONE) { max_comp_bfr_size = 0; } // Initialize data pointers uint32_t const num_stats_bfr = @@ -2214,7 +2195,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, num_columns, num_pages, num_stats_bfr, - compression, + compress_required_chunk_alignment(compression), max_page_size_bytes, max_page_size_rows, write_v2_headers, @@ -2270,7 +2251,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, auto const dev_bfr = ck.is_compressed ? ck.compressed_bfr : ck.uncompressed_bfr; auto& column_chunk_meta = row_group.columns[i].meta_data; - if (ck.is_compressed) { column_chunk_meta.codec = compression; } + if (ck.is_compressed) { column_chunk_meta.codec = to_parquet_compression(compression); } if (!out_sink[p]->is_device_write_preferred(ck.compressed_size)) { all_device_write = false; } @@ -2375,7 +2356,7 @@ writer::impl::impl(std::vector> sinks, single_write_mode mode, rmm::cuda_stream_view stream) : _stream(stream), - _compression(to_parquet_compression(options.get_compression())), + _compression(options.get_compression()), _max_row_group_size{options.get_row_group_size_bytes()}, _max_row_group_rows{options.get_row_group_size_rows()}, _max_page_size_bytes(max_page_bytes(_compression, options.get_max_page_size_bytes())), @@ -2406,7 +2387,7 @@ writer::impl::impl(std::vector> sinks, single_write_mode mode, rmm::cuda_stream_view stream) : _stream(stream), - _compression(to_parquet_compression(options.get_compression())), + _compression(options.get_compression()), _max_row_group_size{options.get_row_group_size_bytes()}, _max_row_group_rows{options.get_row_group_size_rows()}, _max_page_size_bytes(max_page_bytes(_compression, options.get_max_page_size_bytes())), diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index 63128faf993..d5a5a534b93 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -144,7 +144,7 @@ class writer::impl { rmm::cuda_stream_view _stream; // Writer options. - Compression const _compression; + compression_type const _compression; size_t const _max_row_group_size; size_type const _max_row_group_rows; size_t const _max_page_size_bytes; diff --git a/cpp/src/io/parquet/writer_impl_helpers.cpp b/cpp/src/io/parquet/writer_impl_helpers.cpp index f15ea1f3c37..ede788c97c2 100644 --- a/cpp/src/io/parquet/writer_impl_helpers.cpp +++ b/cpp/src/io/parquet/writer_impl_helpers.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,8 +21,6 @@ #include "writer_impl_helpers.hpp" -#include "io/comp/nvcomp_adapter.hpp" - #include #include #include @@ -32,48 +30,6 @@ namespace cudf::io::parquet::detail { using namespace cudf::io::detail; -Compression to_parquet_compression(compression_type compression) -{ - switch (compression) { - case compression_type::AUTO: - case compression_type::SNAPPY: return Compression::SNAPPY; - case compression_type::ZSTD: return Compression::ZSTD; - case compression_type::LZ4: - // Parquet refers to LZ4 as "LZ4_RAW"; Parquet's "LZ4" is not standard LZ4 - return Compression::LZ4_RAW; - case compression_type::NONE: return Compression::UNCOMPRESSED; - default: CUDF_FAIL("Unsupported compression type"); - } -} - -nvcomp::compression_type to_nvcomp_compression_type(Compression codec) -{ - switch (codec) { - case Compression::SNAPPY: return nvcomp::compression_type::SNAPPY; - case Compression::ZSTD: return nvcomp::compression_type::ZSTD; - // Parquet refers to LZ4 as "LZ4_RAW"; Parquet's "LZ4" is not standard LZ4 - case Compression::LZ4_RAW: return nvcomp::compression_type::LZ4; - default: CUDF_FAIL("Unsupported compression type"); - } -} - -uint32_t page_alignment(Compression codec) -{ - if (codec == Compression::UNCOMPRESSED or - nvcomp::is_compression_disabled(to_nvcomp_compression_type(codec))) { - return 1u; - } - - return nvcomp::required_alignment(to_nvcomp_compression_type(codec)); -} - -size_t max_compression_output_size(Compression codec, uint32_t compression_blocksize) -{ - if (codec == Compression::UNCOMPRESSED) return 0; - - return compress_max_output_chunk_size(to_nvcomp_compression_type(codec), compression_blocksize); -} - void fill_table_meta(table_input_metadata& table_meta) { // Fill unnamed columns' names in table_meta diff --git a/cpp/src/io/parquet/writer_impl_helpers.hpp b/cpp/src/io/parquet/writer_impl_helpers.hpp index 14a9a0ed5b7..b5c73c348fe 100644 --- a/cpp/src/io/parquet/writer_impl_helpers.hpp +++ b/cpp/src/io/parquet/writer_impl_helpers.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,48 +20,12 @@ */ #pragma once -#include "parquet_common.hpp" #include #include -#include namespace cudf::io::parquet::detail { -/** - * @brief Function that translates GDF compression to parquet compression. - * - * @param compression The compression type - * @return The supported Parquet compression - */ -Compression to_parquet_compression(compression_type compression); - -/** - * @brief Function that translates the given compression codec to nvcomp compression type. - * - * @param codec Compression codec - * @return Translated nvcomp compression type - */ -cudf::io::detail::nvcomp::compression_type to_nvcomp_compression_type(Compression codec); - -/** - * @brief Function that computes input alignment requirements for the given compression type. - * - * @param codec Compression codec - * @return Required alignment - */ -uint32_t page_alignment(Compression codec); - -/** - * @brief Gets the maximum compressed chunk size for the largest chunk uncompressed chunk in the - * batch. - * - * @param codec Compression codec - * @param compression_blocksize Size of the largest uncompressed chunk in the batch - * @return Maximum compressed chunk size - */ -size_t max_compression_output_size(Compression codec, uint32_t compression_blocksize); - /** * @brief Fill the table metadata with default column names. * diff --git a/cpp/tests/io/orc_test.cpp b/cpp/tests/io/orc_test.cpp index 2209a30149d..708c2045a74 100644 --- a/cpp/tests/io/orc_test.cpp +++ b/cpp/tests/io/orc_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -2068,6 +2068,7 @@ TEST_P(OrcCompressionTest, Basic) INSTANTIATE_TEST_CASE_P(OrcCompressionTest, OrcCompressionTest, ::testing::Values(cudf::io::compression_type::NONE, + cudf::io::compression_type::AUTO, cudf::io::compression_type::SNAPPY, cudf::io::compression_type::LZ4, cudf::io::compression_type::ZSTD)); diff --git a/cpp/tests/io/parquet_misc_test.cpp b/cpp/tests/io/parquet_misc_test.cpp index d66f685cd9c..419ac909ac6 100644 --- a/cpp/tests/io/parquet_misc_test.cpp +++ b/cpp/tests/io/parquet_misc_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * Copyright (c) 2023-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -268,6 +268,7 @@ TEST_P(ParquetCompressionTest, Basic) INSTANTIATE_TEST_CASE_P(ParquetCompressionTest, ParquetCompressionTest, ::testing::Values(cudf::io::compression_type::NONE, + cudf::io::compression_type::AUTO, cudf::io::compression_type::SNAPPY, cudf::io::compression_type::LZ4, cudf::io::compression_type::ZSTD)); From 478ec50edf302a338db043039abad6a2560144ea Mon Sep 17 00:00:00 2001 From: Bradley Dice Date: Mon, 13 Jan 2025 15:19:44 -0600 Subject: [PATCH 2/4] Precompute AST arity (#17234) This PR precomputes AST arity on the host, to reduce the complexity in device-side arity lookup. Authors: - Bradley Dice (https://github.com/bdice) - Basit Ayantunde (https://github.com/lamarrr) Approvers: - Basit Ayantunde (https://github.com/lamarrr) - Kyle Edwards (https://github.com/KyleFromNVIDIA) URL: https://github.com/rapidsai/cudf/pull/17234 --- cpp/CMakeLists.txt | 1 + .../cudf/ast/detail/expression_evaluator.cuh | 4 +- .../cudf/ast/detail/expression_parser.hpp | 50 ++- cpp/include/cudf/ast/detail/operators.hpp | 418 +++--------------- cpp/src/ast/expression_parser.cpp | 3 +- cpp/src/ast/operators.cpp | 293 ++++++++++++ 6 files changed, 391 insertions(+), 378 deletions(-) create mode 100644 cpp/src/ast/operators.cpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 252cc7897d8..4d83cbd907c 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -336,6 +336,7 @@ add_library( src/aggregation/result_cache.cpp src/ast/expression_parser.cpp src/ast/expressions.cpp + src/ast/operators.cpp src/binaryop/binaryop.cpp src/binaryop/compiled/ATan2.cu src/binaryop/compiled/Add.cu diff --git a/cpp/include/cudf/ast/detail/expression_evaluator.cuh b/cpp/include/cudf/ast/detail/expression_evaluator.cuh index 9d8762555d7..001b604814c 100644 --- a/cpp/include/cudf/ast/detail/expression_evaluator.cuh +++ b/cpp/include/cudf/ast/detail/expression_evaluator.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2024, NVIDIA CORPORATION. + * Copyright (c) 2021-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -452,7 +452,7 @@ struct expression_evaluator { ++operator_index) { // Execute operator auto const op = plan.operators[operator_index]; - auto const arity = ast_operator_arity(op); + auto const arity = plan.operator_arities[operator_index]; if (arity == 1) { // Unary operator auto const& input = diff --git a/cpp/include/cudf/ast/detail/expression_parser.hpp b/cpp/include/cudf/ast/detail/expression_parser.hpp index b5973d0ace9..d2e8c1cd41f 100644 --- a/cpp/include/cudf/ast/detail/expression_parser.hpp +++ b/cpp/include/cudf/ast/detail/expression_parser.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2024, NVIDIA CORPORATION. + * Copyright (c) 2020-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ #include #include +#include #include #include #include @@ -88,6 +89,7 @@ struct expression_device_view { device_span data_references; device_span literals; device_span operators; + device_span operator_arities; device_span operator_source_indices; cudf::size_type num_intermediates; }; @@ -229,39 +231,55 @@ class expression_parser { * @param[in] v The `std::vector` containing components (operators, literals, etc). * @param[in,out] sizes The `std::vector` containing the size of each data buffer. * @param[in,out] data_pointers The `std::vector` containing pointers to each data buffer. + * @param[in,out] alignment The maximum alignment needed for all the extracted size and pointers */ template void extract_size_and_pointer(std::vector const& v, std::vector& sizes, - std::vector& data_pointers) + std::vector& data_pointers, + cudf::size_type& alignment) { + // sub-type alignment will only work provided the alignment is lesser or equal to + // alignof(max_align_t) which is the maximum alignment provided by rmm's device buffers + static_assert(alignof(T) <= alignof(max_align_t)); auto const data_size = sizeof(T) * v.size(); sizes.push_back(data_size); data_pointers.push_back(v.data()); + alignment = std::max(alignment, static_cast(alignof(T))); } void move_to_device(rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { std::vector sizes; std::vector data_pointers; + // use a minimum of 4-byte alignment + cudf::size_type buffer_alignment = 4; - extract_size_and_pointer(_data_references, sizes, data_pointers); - extract_size_and_pointer(_literals, sizes, data_pointers); - extract_size_and_pointer(_operators, sizes, data_pointers); - extract_size_and_pointer(_operator_source_indices, sizes, data_pointers); + extract_size_and_pointer(_data_references, sizes, data_pointers, buffer_alignment); + extract_size_and_pointer(_literals, sizes, data_pointers, buffer_alignment); + extract_size_and_pointer(_operators, sizes, data_pointers, buffer_alignment); + extract_size_and_pointer(_operator_arities, sizes, data_pointers, buffer_alignment); + extract_size_and_pointer(_operator_source_indices, sizes, data_pointers, buffer_alignment); // Create device buffer - auto const buffer_size = std::accumulate(sizes.cbegin(), sizes.cend(), 0); - auto buffer_offsets = std::vector(sizes.size()); - thrust::exclusive_scan(sizes.cbegin(), sizes.cend(), buffer_offsets.begin(), 0); + auto buffer_offsets = std::vector(sizes.size()); + thrust::exclusive_scan(sizes.cbegin(), + sizes.cend(), + buffer_offsets.begin(), + cudf::size_type{0}, + [buffer_alignment](auto a, auto b) { + // align each component of the AST program + return cudf::util::round_up_safe(a + b, buffer_alignment); + }); + + auto const buffer_size = buffer_offsets.empty() ? 0 : (buffer_offsets.back() + sizes.back()); + auto host_data_buffer = std::vector(buffer_size); - auto h_data_buffer = std::vector(buffer_size); for (unsigned int i = 0; i < data_pointers.size(); ++i) { - std::memcpy(h_data_buffer.data() + buffer_offsets[i], data_pointers[i], sizes[i]); + std::memcpy(host_data_buffer.data() + buffer_offsets[i], data_pointers[i], sizes[i]); } - _device_data_buffer = rmm::device_buffer(h_data_buffer.data(), buffer_size, stream, mr); - + _device_data_buffer = rmm::device_buffer(host_data_buffer.data(), buffer_size, stream, mr); stream.synchronize(); // Create device pointers to components of plan @@ -277,8 +295,11 @@ class expression_parser { device_expression_data.operators = device_span( reinterpret_cast(device_data_buffer_ptr + buffer_offsets[2]), _operators.size()); - device_expression_data.operator_source_indices = device_span( + device_expression_data.operator_arities = device_span( reinterpret_cast(device_data_buffer_ptr + buffer_offsets[3]), + _operators.size()); + device_expression_data.operator_source_indices = device_span( + reinterpret_cast(device_data_buffer_ptr + buffer_offsets[4]), _operator_source_indices.size()); device_expression_data.num_intermediates = _intermediate_counter.get_max_used(); shmem_per_thread = static_cast( @@ -322,6 +343,7 @@ class expression_parser { bool _has_nulls; std::vector _data_references; std::vector _operators; + std::vector _operator_arities; std::vector _operator_source_indices; std::vector _literals; }; diff --git a/cpp/include/cudf/ast/detail/operators.hpp b/cpp/include/cudf/ast/detail/operators.hpp index 46507700e21..db04e1fe989 100644 --- a/cpp/include/cudf/ast/detail/operators.hpp +++ b/cpp/include/cudf/ast/detail/operators.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2024, NVIDIA CORPORATION. + * Copyright (c) 2020-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -69,159 +69,111 @@ constexpr bool is_valid_unary_op = cuda::std::is_invocable_v; * @param args Forwarded arguments to `operator()` of `f`. */ template -CUDF_HOST_DEVICE inline constexpr void ast_operator_dispatcher(ast_operator op, F&& f, Ts&&... args) +CUDF_HOST_DEVICE inline constexpr decltype(auto) ast_operator_dispatcher(ast_operator op, + F&& f, + Ts&&... args) { switch (op) { case ast_operator::ADD: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::SUB: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::MUL: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::DIV: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::TRUE_DIV: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::FLOOR_DIV: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::MOD: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::PYMOD: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::POW: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::EQUAL: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::NULL_EQUAL: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::NOT_EQUAL: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::LESS: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::GREATER: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::LESS_EQUAL: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::GREATER_EQUAL: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::BITWISE_AND: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::BITWISE_OR: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::BITWISE_XOR: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::LOGICAL_AND: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::NULL_LOGICAL_AND: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::LOGICAL_OR: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::NULL_LOGICAL_OR: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::IDENTITY: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::IS_NULL: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::SIN: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::COS: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::TAN: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::ARCSIN: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::ARCCOS: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::ARCTAN: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::SINH: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::COSH: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::TANH: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::ARCSINH: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::ARCCOSH: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::ARCTANH: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::EXP: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::LOG: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::SQRT: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::CBRT: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::CEIL: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::FLOOR: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::ABS: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::RINT: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::BIT_INVERT: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::NOT: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::CAST_TO_INT64: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::CAST_TO_UINT64: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); case ast_operator::CAST_TO_FLOAT64: - f.template operator()(std::forward(args)...); - break; + return f.template operator()(std::forward(args)...); default: { #ifndef __CUDA_ARCH__ CUDF_FAIL("Invalid operator."); @@ -955,231 +907,6 @@ struct operator_functor { } }; -/** - * @brief Functor used to single-type-dispatch binary operators. - * - * This functor's `operator()` is templated to validate calls to its operators based on the input - * type, as determined by the `is_valid_binary_op` trait. This function assumes that both inputs are - * the same type, and dispatches based on the type of the left input. - * - * @tparam OperatorFunctor Binary operator functor. - */ -template -struct single_dispatch_binary_operator_types { - template >* = nullptr> - CUDF_HOST_DEVICE inline void operator()(F&& f, Ts&&... args) - { - f.template operator()(std::forward(args)...); - } - - template >* = nullptr> - CUDF_HOST_DEVICE inline void operator()(F&& f, Ts&&... args) - { -#ifndef __CUDA_ARCH__ - CUDF_FAIL("Invalid binary operation."); -#else - CUDF_UNREACHABLE("Invalid binary operation."); -#endif - } -}; - -/** - * @brief Functor performing a type dispatch for a binary operator. - * - * This functor performs single dispatch, which assumes lhs_type == rhs_type. This may not be true - * for all binary operators but holds for all currently implemented operators. - */ -struct type_dispatch_binary_op { - /** - * @brief Performs type dispatch for a binary operator. - * - * @tparam op AST operator. - * @tparam F Type of forwarded functor. - * @tparam Ts Parameter pack of forwarded arguments. - * @param lhs_type Type of left input data. - * @param rhs_type Type of right input data. - * @param f Forwarded functor to be called. - * @param args Forwarded arguments to `operator()` of `f`. - */ - template - CUDF_HOST_DEVICE inline void operator()(cudf::data_type lhs_type, - cudf::data_type rhs_type, - F&& f, - Ts&&... args) - { - // Single dispatch (assume lhs_type == rhs_type) - type_dispatcher( - lhs_type, - // Always dispatch to the non-null operator for the purpose of type determination. - detail::single_dispatch_binary_operator_types>{}, - std::forward(f), - std::forward(args)...); - } -}; - -/** - * @brief Dispatches a runtime binary operator to a templated type dispatcher. - * - * @tparam F Type of forwarded functor. - * @tparam Ts Parameter pack of forwarded arguments. - * @param lhs_type Type of left input data. - * @param rhs_type Type of right input data. - * @param f Forwarded functor to be called. - * @param args Forwarded arguments to `operator()` of `f`. - */ -template -CUDF_HOST_DEVICE inline constexpr void binary_operator_dispatcher( - ast_operator op, cudf::data_type lhs_type, cudf::data_type rhs_type, F&& f, Ts&&... args) -{ - ast_operator_dispatcher(op, - detail::type_dispatch_binary_op{}, - lhs_type, - rhs_type, - std::forward(f), - std::forward(args)...); -} - -/** - * @brief Functor used to type-dispatch unary operators. - * - * This functor's `operator()` is templated to validate calls to its operators based on the input - * type, as determined by the `is_valid_unary_op` trait. - * - * @tparam OperatorFunctor Unary operator functor. - */ -template -struct dispatch_unary_operator_types { - template >* = nullptr> - CUDF_HOST_DEVICE inline void operator()(F&& f, Ts&&... args) - { - f.template operator()(std::forward(args)...); - } - - template >* = nullptr> - CUDF_HOST_DEVICE inline void operator()(F&& f, Ts&&... args) - { -#ifndef __CUDA_ARCH__ - CUDF_FAIL("Invalid unary operation."); -#else - CUDF_UNREACHABLE("Invalid unary operation."); -#endif - } -}; - -/** - * @brief Functor performing a type dispatch for a unary operator. - */ -struct type_dispatch_unary_op { - template - CUDF_HOST_DEVICE inline void operator()(cudf::data_type input_type, F&& f, Ts&&... args) - { - type_dispatcher( - input_type, - // Always dispatch to the non-null operator for the purpose of type determination. - detail::dispatch_unary_operator_types>{}, - std::forward(f), - std::forward(args)...); - } -}; - -/** - * @brief Dispatches a runtime unary operator to a templated type dispatcher. - * - * @tparam F Type of forwarded functor. - * @tparam Ts Parameter pack of forwarded arguments. - * @param input_type Type of input data. - * @param f Forwarded functor to be called. - * @param args Forwarded arguments to `operator()` of `f`. - */ -template -CUDF_HOST_DEVICE inline constexpr void unary_operator_dispatcher(ast_operator op, - cudf::data_type input_type, - F&& f, - Ts&&... args) -{ - ast_operator_dispatcher(op, - detail::type_dispatch_unary_op{}, - input_type, - std::forward(f), - std::forward(args)...); -} - -/** - * @brief Functor to determine the return type of an operator from its input types. - */ -struct return_type_functor { - /** - * @brief Callable for binary operators to determine return type. - * - * @tparam OperatorFunctor Operator functor to perform. - * @tparam LHS Left input type. - * @tparam RHS Right input type. - * @param result Reference whose value is assigned to the result data type. - */ - template >* = nullptr> - CUDF_HOST_DEVICE inline void operator()(cudf::data_type& result) - { - using Out = cuda::std::invoke_result_t; - result = cudf::data_type(cudf::type_to_id()); - } - - template >* = nullptr> - CUDF_HOST_DEVICE inline void operator()(cudf::data_type& result) - { -#ifndef __CUDA_ARCH__ - CUDF_FAIL("Invalid binary operation. Return type cannot be determined."); -#else - CUDF_UNREACHABLE("Invalid binary operation. Return type cannot be determined."); -#endif - } - - /** - * @brief Callable for unary operators to determine return type. - * - * @tparam OperatorFunctor Operator functor to perform. - * @tparam T Input type. - * @param result Pointer whose value is assigned to the result data type. - */ - template >* = nullptr> - CUDF_HOST_DEVICE inline void operator()(cudf::data_type& result) - { - using Out = cuda::std::invoke_result_t; - result = cudf::data_type(cudf::type_to_id()); - } - - template >* = nullptr> - CUDF_HOST_DEVICE inline void operator()(cudf::data_type& result) - { -#ifndef __CUDA_ARCH__ - CUDF_FAIL("Invalid unary operation. Return type cannot be determined."); -#else - CUDF_UNREACHABLE("Invalid unary operation. Return type cannot be determined."); -#endif - } -}; - /** * @brief Gets the return type of an AST operator. * @@ -1187,34 +914,8 @@ struct return_type_functor { * @param operand_types Vector of input types to the operator. * @return cudf::data_type Return type of the operator. */ -inline cudf::data_type ast_operator_return_type(ast_operator op, - std::vector const& operand_types) -{ - auto result = cudf::data_type(cudf::type_id::EMPTY); - switch (operand_types.size()) { - case 1: - unary_operator_dispatcher(op, operand_types[0], detail::return_type_functor{}, result); - break; - case 2: - binary_operator_dispatcher( - op, operand_types[0], operand_types[1], detail::return_type_functor{}, result); - break; - default: CUDF_FAIL("Unsupported operator return type."); break; - } - return result; -} - -/** - * @brief Functor to determine the arity (number of operands) of an operator. - */ -struct arity_functor { - template - CUDF_HOST_DEVICE inline void operator()(cudf::size_type& result) - { - // Arity is not dependent on null handling, so just use the false implementation here. - result = operator_functor::arity; - } -}; +cudf::data_type ast_operator_return_type(ast_operator op, + std::vector const& operand_types); /** * @brief Gets the arity (number of operands) of an AST operator. @@ -1222,12 +923,7 @@ struct arity_functor { * @param op Operator used to determine arity. * @return Arity of the operator. */ -CUDF_HOST_DEVICE inline cudf::size_type ast_operator_arity(ast_operator op) -{ - auto result = cudf::size_type(0); - ast_operator_dispatcher(op, detail::arity_functor{}, result); - return result; -} +cudf::size_type ast_operator_arity(ast_operator op); } // namespace detail diff --git a/cpp/src/ast/expression_parser.cpp b/cpp/src/ast/expression_parser.cpp index d0e4c59ca54..b2cc134d9fa 100644 --- a/cpp/src/ast/expression_parser.cpp +++ b/cpp/src/ast/expression_parser.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2024, NVIDIA CORPORATION. + * Copyright (c) 2020-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -161,6 +161,7 @@ cudf::size_type expression_parser::visit(operation const& expr) auto const op = expr.get_operator(); auto const data_type = cudf::ast::detail::ast_operator_return_type(op, operand_types); _operators.push_back(op); + _operator_arities.push_back(cudf::ast::detail::ast_operator_arity(op)); // Push data reference auto const output = [&]() { if (expression_index == 0) { diff --git a/cpp/src/ast/operators.cpp b/cpp/src/ast/operators.cpp new file mode 100644 index 00000000000..b60a69a42d9 --- /dev/null +++ b/cpp/src/ast/operators.cpp @@ -0,0 +1,293 @@ +/* + * Copyright (c) 2021-2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include + +#include +#include + +#include + +namespace cudf { +namespace ast { +namespace detail { +namespace { + +struct arity_functor { + template + void operator()(cudf::size_type& result) + { + // Arity is not dependent on null handling, so just use the false implementation here. + result = operator_functor::arity; + } +}; + +/** + * @brief Functor to determine the return type of an operator from its input types. + */ +struct return_type_functor { + /** + * @brief Callable for binary operators to determine return type. + * + * @tparam OperatorFunctor Operator functor to perform. + * @tparam LHS Left input type. + * @tparam RHS Right input type. + * @param result Pointer whose value is assigned to the result data type. + */ + template >* = nullptr> + void operator()(cudf::data_type& result) + { + using Out = cuda::std::invoke_result_t; + result = cudf::data_type{cudf::type_to_id()}; + } + + template >* = nullptr> + void operator()(cudf::data_type& result) + { +#ifndef __CUDA_ARCH__ + CUDF_FAIL("Invalid binary operation. Return type cannot be determined."); +#else + CUDF_UNREACHABLE("Invalid binary operation. Return type cannot be determined."); +#endif + result = cudf::data_type{cudf::type_id::EMPTY}; + } + + /** + * @brief Callable for unary operators to determine return type. + * + * @tparam OperatorFunctor Operator functor to perform. + * @tparam T Input type. + * @param result Pointer whose value is assigned to the result data type. + */ + template >* = nullptr> + void operator()(cudf::data_type& result) + { + using Out = cuda::std::invoke_result_t; + result = cudf::data_type{cudf::type_to_id()}; + } + + template >* = nullptr> + void operator()(cudf::data_type& result) + { +#ifndef __CUDA_ARCH__ + CUDF_FAIL("Invalid unary operation. Return type cannot be determined."); +#else + CUDF_UNREACHABLE("Invalid unary operation. Return type cannot be determined."); +#endif + result = cudf::data_type{cudf::type_id::EMPTY}; + } +}; + +/** + * @brief Functor used to single-type-dispatch binary operators. + * + * This functor's `operator()` is templated to validate calls to its operators based on the input + * type, as determined by the `is_valid_binary_op` trait. This function assumes that both inputs are + * the same type, and dispatches based on the type of the left input. + * + * @tparam OperatorFunctor Binary operator functor. + */ +template +struct single_dispatch_binary_operator_types { + template >* = nullptr> + inline void operator()(F&& f, Ts&&... args) + { + f.template operator()(std::forward(args)...); + } + + template >* = nullptr> + inline void operator()(F&& f, Ts&&... args) + { +#ifndef __CUDA_ARCH__ + CUDF_FAIL("Invalid binary operation."); +#else + CUDF_UNREACHABLE("Invalid binary operation."); +#endif + } +}; + +/** + * @brief Functor performing a type dispatch for a binary operator. + * + * This functor performs single dispatch, which assumes lhs_type == rhs_type. This may not be true + * for all binary operators but holds for all currently implemented operators. + */ +struct type_dispatch_binary_op { + /** + * @brief Performs type dispatch for a binary operator. + * + * @tparam op AST operator. + * @tparam F Type of forwarded functor. + * @tparam Ts Parameter pack of forwarded arguments. + * @param lhs_type Type of left input data. + * @param rhs_type Type of right input data. + * @param f Forwarded functor to be called. + * @param args Forwarded arguments to `operator()` of `f`. + */ + template + inline void operator()(cudf::data_type lhs_type, cudf::data_type rhs_type, F&& f, Ts&&... args) + { + // Single dispatch (assume lhs_type == rhs_type) + type_dispatcher( + lhs_type, + // Always dispatch to the non-null operator for the purpose of type determination. + detail::single_dispatch_binary_operator_types>{}, + std::forward(f), + std::forward(args)...); + } +}; + +/** + * @brief Dispatches a runtime binary operator to a templated type dispatcher. + * + * @tparam F Type of forwarded functor. + * @tparam Ts Parameter pack of forwarded arguments. + * @param lhs_type Type of left input data. + * @param rhs_type Type of right input data. + * @param f Forwarded functor to be called. + * @param args Forwarded arguments to `operator()` of `f`. + */ +template +inline constexpr void binary_operator_dispatcher( + ast_operator op, cudf::data_type lhs_type, cudf::data_type rhs_type, F&& f, Ts&&... args) +{ + ast_operator_dispatcher(op, + detail::type_dispatch_binary_op{}, + lhs_type, + rhs_type, + std::forward(f), + std::forward(args)...); +} + +/** + * @brief Functor used to type-dispatch unary operators. + * + * This functor's `operator()` is templated to validate calls to its operators based on the input + * type, as determined by the `is_valid_unary_op` trait. + * + * @tparam OperatorFunctor Unary operator functor. + */ +template +struct dispatch_unary_operator_types { + template >* = nullptr> + inline void operator()(F&& f, Ts&&... args) + { + f.template operator()(std::forward(args)...); + } + + template >* = nullptr> + inline void operator()(F&& f, Ts&&... args) + { +#ifndef __CUDA_ARCH__ + CUDF_FAIL("Invalid unary operation."); +#else + CUDF_UNREACHABLE("Invalid unary operation."); +#endif + } +}; + +/** + * @brief Functor performing a type dispatch for a unary operator. + */ +struct type_dispatch_unary_op { + template + inline void operator()(cudf::data_type input_type, F&& f, Ts&&... args) + { + type_dispatcher( + input_type, + // Always dispatch to the non-null operator for the purpose of type determination. + detail::dispatch_unary_operator_types>{}, + std::forward(f), + std::forward(args)...); + } +}; + +/** + * @brief Dispatches a runtime unary operator to a templated type dispatcher. + * + * @tparam F Type of forwarded functor. + * @tparam Ts Parameter pack of forwarded arguments. + * @param input_type Type of input data. + * @param f Forwarded functor to be called. + * @param args Forwarded arguments to `operator()` of `f`. + */ +template +inline constexpr void unary_operator_dispatcher(ast_operator op, + cudf::data_type input_type, + F&& f, + Ts&&... args) +{ + ast_operator_dispatcher(op, + detail::type_dispatch_unary_op{}, + input_type, + std::forward(f), + std::forward(args)...); +} + +} // namespace + +cudf::data_type ast_operator_return_type(ast_operator op, + std::vector const& operand_types) +{ + cudf::data_type result{cudf::type_id::EMPTY}; + switch (operand_types.size()) { + case 1: + unary_operator_dispatcher(op, operand_types[0], detail::return_type_functor{}, result); + break; + case 2: + binary_operator_dispatcher( + op, operand_types[0], operand_types[1], detail::return_type_functor{}, result); + break; + default: CUDF_FAIL("Unsupported operator return type."); break; + } + return result; +} + +cudf::size_type ast_operator_arity(ast_operator op) +{ + cudf::size_type result{}; + ast_operator_dispatcher(op, arity_functor{}, result); + return result; +} + +} // namespace detail + +} // namespace ast + +} // namespace cudf From f84cd4316eaa61e231b5fd096608ca09d5e3c08c Mon Sep 17 00:00:00 2001 From: Matthew Murray <41342305+Matt711@users.noreply.github.com> Date: Mon, 13 Jan 2025 22:26:43 -0500 Subject: [PATCH 3/4] [BUG] xfail Polars excel test (#17731) One the Polars tests fails when `fastexcel>=0.12.1`. I opened https://github.com/pola-rs/polars/issues/20698 to track that failing test. This PR xfail that test for now. xref #17677 Authors: - Matthew Murray (https://github.com/Matt711) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) URL: https://github.com/rapidsai/cudf/pull/17731 --- python/cudf_polars/cudf_polars/testing/plugin.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/python/cudf_polars/cudf_polars/testing/plugin.py b/python/cudf_polars/cudf_polars/testing/plugin.py index c16df320ceb..e453a8b89b9 100644 --- a/python/cudf_polars/cudf_polars/testing/plugin.py +++ b/python/cudf_polars/cudf_polars/testing/plugin.py @@ -8,7 +8,9 @@ from functools import partialmethod from typing import TYPE_CHECKING +import fastexcel import pytest +from packaging import version import polars @@ -44,7 +46,7 @@ def pytest_configure(config: pytest.Config) -> None: ) -EXPECTED_FAILURES: Mapping[str, str] = { +EXPECTED_FAILURES: Mapping[str, str | tuple[str, bool]] = { "tests/unit/io/test_csv.py::test_compressed_csv": "Need to determine if file is compressed", "tests/unit/io/test_csv.py::test_read_csv_only_loads_selected_columns": "Memory usage won't be correct due to GPU", "tests/unit/io/test_delta.py::test_scan_delta_version": "Need to expose hive partitioning", @@ -192,6 +194,10 @@ def pytest_configure(config: pytest.Config) -> None: # Maybe flaky, order-dependent? "tests/unit/test_projections.py::test_schema_full_outer_join_projection_pd_13287": "Order-specific result check, query is correct but in different order", "tests/unit/test_queries.py::test_group_by_agg_equals_zero_3535": "libcudf sums all nulls to null, not zero", + "tests/unit/io/test_spreadsheet.py::test_write_excel_bytes[calamine]": ( + "Fails when fastexcel version >= 0.12.1. tracking issue: https://github.com/pola-rs/polars/issues/20698", + version.parse(fastexcel.__version__) >= version.parse("0.12.1"), + ), } @@ -219,4 +225,12 @@ def pytest_collection_modifyitems( if item.nodeid in TESTS_TO_SKIP: item.add_marker(pytest.mark.skip(reason=TESTS_TO_SKIP[item.nodeid])) elif item.nodeid in EXPECTED_FAILURES: + if isinstance(EXPECTED_FAILURES[item.nodeid], tuple): + # the second entry in the tuple is the condition to xfail on + item.add_marker( + pytest.mark.xfail( + condition=EXPECTED_FAILURES[item.nodeid][1], + reason=EXPECTED_FAILURES[item.nodeid][0], + ), + ) item.add_marker(pytest.mark.xfail(reason=EXPECTED_FAILURES[item.nodeid])) From 253fb2f10e921519502e562672d29029e844c2cf Mon Sep 17 00:00:00 2001 From: Nghia Truong <7416935+ttnghia@users.noreply.github.com> Date: Mon, 13 Jan 2025 22:41:47 -0800 Subject: [PATCH 4/4] Require to implement `AutoCloseable` for the classes derived from `HostUDFWrapper` (#17727) This adds the requirement to implement `AutoCloseable` to the classes derived from `HostUDFWrapper`, forcing them to delete the native UDF instance upon class destruction. Doing so will fix the memory leak issue when the native UDF instance never being destroyed. Authors: - Nghia Truong (https://github.com/ttnghia) Approvers: - Robert (Bobby) Evans (https://github.com/revans2) URL: https://github.com/rapidsai/cudf/pull/17727 --- java/src/main/java/ai/rapids/cudf/HostUDFWrapper.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/java/src/main/java/ai/rapids/cudf/HostUDFWrapper.java b/java/src/main/java/ai/rapids/cudf/HostUDFWrapper.java index 0b6ecf2e140..124f2c99188 100644 --- a/java/src/main/java/ai/rapids/cudf/HostUDFWrapper.java +++ b/java/src/main/java/ai/rapids/cudf/HostUDFWrapper.java @@ -24,8 +24,10 @@ *

* A new host UDF aggregation implementation must extend this class and override the * {@code hashCode} and {@code equals} methods for such purposes. + * In addition, since this class implements {@code AutoCloseable}, the {@code close} method must + * also be overridden to automatically delete the native UDF instance upon class destruction. */ -public abstract class HostUDFWrapper { +public abstract class HostUDFWrapper implements AutoCloseable { public final long udfNativeHandle; public HostUDFWrapper(long udfNativeHandle) {