diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 9dabe4e8800..252cc7897d8 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -477,13 +477,13 @@ add_library( src/io/avro/reader_impl.cu src/io/comp/brotli_dict.cpp src/io/comp/comp.cpp + src/io/comp/comp.cu src/io/comp/cpu_unbz2.cpp src/io/comp/debrotli.cu src/io/comp/gpuinflate.cu src/io/comp/nvcomp_adapter.cpp src/io/comp/nvcomp_adapter.cu src/io/comp/snap.cu - src/io/comp/statistics.cu src/io/comp/uncomp.cpp src/io/comp/unsnap.cu src/io/csv/csv_gpu.cu diff --git a/cpp/include/cudf/io/orc.hpp b/cpp/include/cudf/io/orc.hpp index 163fa20806d..82f7761da2e 100644 --- a/cpp/include/cudf/io/orc.hpp +++ b/cpp/include/cudf/io/orc.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2024, NVIDIA CORPORATION. + * Copyright (c) 2020-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -578,7 +578,7 @@ class orc_writer_options { // Specify the sink to use for writer output sink_info _sink; // Specify the compression format to use - compression_type _compression = compression_type::AUTO; + compression_type _compression = compression_type::SNAPPY; // Specify frequency of statistics collection statistics_freq _stats_freq = ORC_STATISTICS_ROW_GROUP; // Maximum size of each stripe (unless smaller than a single row group) @@ -733,7 +733,11 @@ class orc_writer_options { * * @param comp Compression type */ - void set_compression(compression_type comp) { _compression = comp; } + void set_compression(compression_type comp) + { + _compression = comp; + if (comp == compression_type::AUTO) { _compression = compression_type::SNAPPY; } + } /** * @brief Choose granularity of statistics collection. @@ -865,7 +869,7 @@ class orc_writer_options_builder { */ orc_writer_options_builder& compression(compression_type comp) { - options._compression = comp; + options.set_compression(comp); return *this; } @@ -1026,7 +1030,7 @@ class chunked_orc_writer_options { // Specify the sink to use for writer output sink_info _sink; // Specify the compression format to use - compression_type _compression = compression_type::AUTO; + compression_type _compression = compression_type::SNAPPY; // Specify granularity of statistics collection statistics_freq _stats_freq = ORC_STATISTICS_ROW_GROUP; // Maximum size of each stripe (unless smaller than a single row group) @@ -1157,7 +1161,11 @@ class chunked_orc_writer_options { * * @param comp The compression type to use */ - void set_compression(compression_type comp) { _compression = comp; } + void set_compression(compression_type comp) + { + _compression = comp; + if (comp == compression_type::AUTO) { _compression = compression_type::SNAPPY; } + } /** * @brief Choose granularity of statistics collection @@ -1279,7 +1287,7 @@ class chunked_orc_writer_options_builder { */ chunked_orc_writer_options_builder& compression(compression_type comp) { - options._compression = comp; + options.set_compression(comp); return *this; } diff --git a/cpp/src/io/comp/comp.cpp b/cpp/src/io/comp/comp.cpp index 26535bed43b..3800835eaf1 100644 --- a/cpp/src/io/comp/comp.cpp +++ b/cpp/src/io/comp/comp.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2024, NVIDIA CORPORATION. + * Copyright (c) 2018-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,22 +16,45 @@ #include "comp.hpp" +#include "gpuinflate.hpp" +#include "io/utilities/getenv_or.hpp" #include "io/utilities/hostdevice_vector.hpp" #include "nvcomp_adapter.hpp" #include #include +#include #include #include #include #include +#include #include // GZIP compression namespace cudf::io::detail { namespace { +auto& h_comp_pool() +{ + static std::size_t pool_size = + getenv_or("LIBCUDF_HOST_COMPRESSION_NUM_THREADS", std::thread::hardware_concurrency()); + static BS::thread_pool pool(pool_size); + return pool; +} + +std::optional to_nvcomp_compression(compression_type compression) +{ + switch (compression) { + case compression_type::SNAPPY: return nvcomp::compression_type::SNAPPY; + case compression_type::ZSTD: return nvcomp::compression_type::ZSTD; + case compression_type::LZ4: return nvcomp::compression_type::LZ4; + case compression_type::ZLIB: return nvcomp::compression_type::DEFLATE; + default: return std::nullopt; + } +} + /** * @brief GZIP host compressor (includes header) */ @@ -98,8 +121,132 @@ std::vector compress_snappy(host_span src, return cudf::detail::make_std_vector_sync(d_dst, stream); } +void device_compress(compression_type compression, + device_span const> inputs, + device_span const> outputs, + device_span results, + rmm::cuda_stream_view stream) +{ + if (compression == compression_type::NONE) { return; } + + auto const nvcomp_type = to_nvcomp_compression(compression); + auto nvcomp_disabled = nvcomp_type.has_value() ? nvcomp::is_compression_disabled(*nvcomp_type) + : "invalid compression type"; + if (not nvcomp_disabled) { + return nvcomp::batched_compress(*nvcomp_type, inputs, outputs, results, stream); + } + + switch (compression) { + case compression_type::SNAPPY: return gpu_snap(inputs, outputs, results, stream); + default: CUDF_FAIL("Compression error: " + nvcomp_disabled.value()); + } +} + +void host_compress(compression_type compression, + device_span const> inputs, + device_span const> outputs, + device_span results, + rmm::cuda_stream_view stream) +{ + if (compression == compression_type::NONE) { return; } + + auto const num_chunks = inputs.size(); + auto h_results = cudf::detail::make_host_vector(num_chunks, stream); + auto const h_inputs = cudf::detail::make_host_vector_async(inputs, stream); + auto const h_outputs = cudf::detail::make_host_vector_async(outputs, stream); + stream.synchronize(); + + std::vector> tasks; + auto const num_streams = + std::min({num_chunks, + cudf::detail::global_cuda_stream_pool().get_stream_pool_size(), + h_comp_pool().get_thread_count()}); + auto const streams = cudf::detail::fork_streams(stream, num_streams); + for (size_t i = 0; i < num_chunks; ++i) { + auto const cur_stream = streams[i % streams.size()]; + auto task = [d_in = h_inputs[i], d_out = h_outputs[i], cur_stream, compression]() -> size_t { + auto const h_in = cudf::detail::make_host_vector_sync(d_in, cur_stream); + auto const h_out = compress(compression, h_in, cur_stream); + cudf::detail::cuda_memcpy(d_out.subspan(0, h_out.size()), h_out, cur_stream); + return h_out.size(); + }; + tasks.emplace_back(h_comp_pool().submit_task(std::move(task))); + } + + for (auto i = 0ul; i < num_chunks; ++i) { + h_results[i] = {tasks[i].get(), compression_status::SUCCESS}; + } + cudf::detail::cuda_memcpy_async(results, h_results, stream); +} + +[[nodiscard]] bool host_compression_supported(compression_type compression) +{ + switch (compression) { + case compression_type::GZIP: + case compression_type::NONE: return true; + default: return false; + } +} + +[[nodiscard]] bool device_compression_supported(compression_type compression) +{ + auto const nvcomp_type = to_nvcomp_compression(compression); + switch (compression) { + case compression_type::LZ4: + case compression_type::ZLIB: + case compression_type::ZSTD: return not nvcomp::is_compression_disabled(nvcomp_type.value()); + case compression_type::SNAPPY: + case compression_type::NONE: return true; + default: return false; + } +} + +[[nodiscard]] bool use_host_compression( + compression_type compression, + [[maybe_unused]] device_span const> inputs, + [[maybe_unused]] device_span const> outputs) +{ + CUDF_EXPECTS( + not host_compression_supported(compression) or device_compression_supported(compression), + "Unsupported compression type"); + if (not host_compression_supported(compression)) { return false; } + if (not device_compression_supported(compression)) { return true; } + // If both host and device compression are supported, use the host if the env var is set + return getenv_or("LIBCUDF_USE_HOST_COMPRESSION", 0); +} + } // namespace +std::optional compress_max_allowed_chunk_size(compression_type compression) +{ + if (auto nvcomp_type = to_nvcomp_compression(compression); + nvcomp_type.has_value() and not nvcomp::is_compression_disabled(*nvcomp_type)) { + return nvcomp::compress_max_allowed_chunk_size(*nvcomp_type); + } + return std::nullopt; +} + +[[nodiscard]] size_t compress_required_chunk_alignment(compression_type compression) +{ + auto nvcomp_type = to_nvcomp_compression(compression); + if (compression == compression_type::NONE or not nvcomp_type.has_value() or + nvcomp::is_compression_disabled(*nvcomp_type)) { + return 1ul; + } + + return nvcomp::required_alignment(*nvcomp_type); +} + +[[nodiscard]] size_t max_compressed_size(compression_type compression, uint32_t uncompressed_size) +{ + if (compression == compression_type::NONE) { return uncompressed_size; } + + if (auto nvcomp_type = to_nvcomp_compression(compression); nvcomp_type.has_value()) { + return nvcomp::compress_max_output_chunk_size(*nvcomp_type, uncompressed_size); + } + CUDF_FAIL("Unsupported compression type"); +} + std::vector compress(compression_type compression, host_span src, rmm::cuda_stream_view stream) @@ -112,4 +259,18 @@ std::vector compress(compression_type compression, } } +void compress(compression_type compression, + device_span const> inputs, + device_span const> outputs, + device_span results, + rmm::cuda_stream_view stream) +{ + CUDF_FUNC_RANGE(); + if (use_host_compression(compression, inputs, outputs)) { + return host_compress(compression, inputs, outputs, results, stream); + } else { + return device_compress(compression, inputs, outputs, results, stream); + } +} + } // namespace cudf::io::detail diff --git a/cpp/src/io/comp/statistics.cu b/cpp/src/io/comp/comp.cu similarity index 96% rename from cpp/src/io/comp/statistics.cu rename to cpp/src/io/comp/comp.cu index caee9145d2c..af0f73869a2 100644 --- a/cpp/src/io/comp/statistics.cu +++ b/cpp/src/io/comp/comp.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * Copyright (c) 2023-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "gpuinflate.hpp" +#include "comp.hpp" #include diff --git a/cpp/src/io/comp/comp.hpp b/cpp/src/io/comp/comp.hpp index e16f26e1f06..90932a11499 100644 --- a/cpp/src/io/comp/comp.hpp +++ b/cpp/src/io/comp/comp.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -57,5 +57,57 @@ std::vector compress(compression_type compression, host_span src, rmm::cuda_stream_view stream); +/** + * @brief Maximum size of uncompressed chunks that can be compressed. + * + * @param compression Compression type + * @returns maximum chunk size + */ +[[nodiscard]] std::optional compress_max_allowed_chunk_size(compression_type compression); + +/** + * @brief Gets input and output alignment requirements for the given compression type. + * + * @param compression Compression type + * @returns required alignment + */ +[[nodiscard]] size_t compress_required_chunk_alignment(compression_type compression); + +/** + * @brief Gets the maximum size any chunk could compress to in the batch. + * + * @param compression Compression type + * @param uncompressed_size Size of the largest uncompressed chunk in the batch + */ +[[nodiscard]] size_t max_compressed_size(compression_type compression, uint32_t uncompressed_size); + +/** + * @brief Compresses device memory buffers. + * + * @param compression Type of compression of the input data + * @param inputs Device memory buffers to compress + * @param outputs Device memory buffers to store the compressed output + * @param results Compression results + * @param stream CUDA stream used for device memory operations and kernel launches + */ +void compress(compression_type compression, + device_span const> inputs, + device_span const> outputs, + device_span results, + rmm::cuda_stream_view stream); + +/** + * @brief Aggregate results of compression into a single statistics object. + * + * @param inputs List of uncompressed input buffers + * @param results List of compression results + * @param stream CUDA stream to use + * @return writer_compression_statistics + */ +[[nodiscard]] writer_compression_statistics collect_compression_statistics( + device_span const> inputs, + device_span results, + rmm::cuda_stream_view stream); + } // namespace io::detail } // namespace CUDF_EXPORT cudf diff --git a/cpp/src/io/comp/gpuinflate.hpp b/cpp/src/io/comp/gpuinflate.hpp index 4b09bd5a84c..0a35b230242 100644 --- a/cpp/src/io/comp/gpuinflate.hpp +++ b/cpp/src/io/comp/gpuinflate.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2024, NVIDIA CORPORATION. + * Copyright (c) 2018-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -124,17 +124,4 @@ void gpu_snap(device_span const> inputs, device_span results, rmm::cuda_stream_view stream); -/** - * @brief Aggregate results of compression into a single statistics object. - * - * @param inputs List of uncompressed input buffers - * @param results List of compression results - * @param stream CUDA stream to use - * @return writer_compression_statistics - */ -[[nodiscard]] writer_compression_statistics collect_compression_statistics( - device_span const> inputs, - device_span results, - rmm::cuda_stream_view stream); - } // namespace cudf::io::detail diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 88423122e16..d63fa9f5c35 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -766,6 +766,7 @@ void parquet_writer_options_base::set_stats_level(statistics_freq sf) { _stats_l void parquet_writer_options_base::set_compression(compression_type compression) { _compression = compression; + if (compression == compression_type::AUTO) { _compression = compression_type::SNAPPY; } } void parquet_writer_options_base::enable_int96_timestamps(bool req) diff --git a/cpp/src/io/orc/orc_gpu.hpp b/cpp/src/io/orc/orc_gpu.hpp index f4e75f78dec..8b30cee6681 100644 --- a/cpp/src/io/orc/orc_gpu.hpp +++ b/cpp/src/io/orc/orc_gpu.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -407,7 +407,7 @@ void CompactOrcDataStreams(device_2dspan strm_desc, std::optional CompressOrcDataStreams( device_span compressed_data, uint32_t num_compressed_blocks, - CompressionKind compression, + compression_type compression, uint32_t comp_blk_size, uint32_t max_comp_blk_size, uint32_t comp_block_align, diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 79ecca0ca99..4f296bb5bfc 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,7 +15,6 @@ */ #include "io/comp/gpuinflate.hpp" -#include "io/comp/nvcomp_adapter.hpp" #include "io/utilities/block_utils.cuh" #include "io/utilities/time_utils.cuh" #include "orc_gpu.hpp" @@ -45,8 +44,6 @@ namespace io { namespace orc { namespace gpu { -namespace nvcomp = cudf::io::detail::nvcomp; - using cudf::detail::device_2dspan; using cudf::io::detail::compression_result; using cudf::io::detail::compression_status; @@ -1362,7 +1359,7 @@ void CompactOrcDataStreams(device_2dspan strm_desc, std::optional CompressOrcDataStreams( device_span compressed_data, uint32_t num_compressed_blocks, - CompressionKind compression, + compression_type compression, uint32_t comp_blk_size, uint32_t max_comp_blk_size, uint32_t comp_block_align, @@ -1387,47 +1384,7 @@ std::optional CompressOrcDataStreams( max_comp_blk_size, comp_block_align); - if (compression == SNAPPY) { - try { - if (nvcomp::is_compression_disabled(nvcomp::compression_type::SNAPPY)) { - cudf::io::detail::gpu_snap(comp_in, comp_out, comp_res, stream); - } else { - nvcomp::batched_compress( - nvcomp::compression_type::SNAPPY, comp_in, comp_out, comp_res, stream); - } - } catch (...) { - // There was an error in compressing so set an error status for each block - thrust::for_each( - rmm::exec_policy(stream), - comp_res.begin(), - comp_res.end(), - [] __device__(compression_result & stat) { stat.status = compression_status::FAILURE; }); - // Since SNAPPY is the default compression (may not be explicitly requested), fall back to - // writing without compression - CUDF_LOG_WARN("ORC writer: compression failed, writing uncompressed data"); - } - } else if (compression == ZLIB) { - if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::DEFLATE); - reason) { - CUDF_FAIL("Compression error: " + reason.value()); - } - nvcomp::batched_compress( - nvcomp::compression_type::DEFLATE, comp_in, comp_out, comp_res, stream); - } else if (compression == ZSTD) { - if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::ZSTD); - reason) { - CUDF_FAIL("Compression error: " + reason.value()); - } - nvcomp::batched_compress(nvcomp::compression_type::ZSTD, comp_in, comp_out, comp_res, stream); - } else if (compression == LZ4) { - if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::LZ4); - reason) { - CUDF_FAIL("Compression error: " + reason.value()); - } - nvcomp::batched_compress(nvcomp::compression_type::LZ4, comp_in, comp_out, comp_res, stream); - } else if (compression != NONE) { - CUDF_FAIL("Unsupported compression type"); - } + cudf::io::detail::compress(compression, comp_in, comp_out, comp_res, stream); dim3 dim_block_compact(1024, 1); gpuCompactCompressedBlocks<<>>( diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index ce868b83c04..aa0b509981a 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,6 @@ * @brief cuDF-IO ORC writer class implementation */ -#include "io/comp/nvcomp_adapter.hpp" #include "io/orc/orc_gpu.hpp" #include "io/statistics/column_statistics.cuh" #include "io/utilities/column_utils.cuh" @@ -71,8 +70,6 @@ namespace cudf::io::orc::detail { -namespace nvcomp = cudf::io::detail::nvcomp; - template [[nodiscard]] constexpr int varint_size(T val) { @@ -92,21 +89,8 @@ struct row_group_index_info { }; namespace { - /** - * @brief Translates ORC compression to nvCOMP compression - */ -auto to_nvcomp_compression_type(CompressionKind compression_kind) -{ - if (compression_kind == SNAPPY) return nvcomp::compression_type::SNAPPY; - if (compression_kind == ZLIB) return nvcomp::compression_type::DEFLATE; - if (compression_kind == ZSTD) return nvcomp::compression_type::ZSTD; - if (compression_kind == LZ4) return nvcomp::compression_type::LZ4; - CUDF_FAIL("Unsupported compression type"); -} - -/** - * @brief Translates cuDF compression to ORC compression + * @brief Translates cuDF compression to ORC compression. */ orc::CompressionKind to_orc_compression(compression_type compression) { @@ -122,19 +106,14 @@ orc::CompressionKind to_orc_compression(compression_type compression) } /** - * @brief Returns the block size for a given compression kind. + * @brief Returns the block size for a given compression format. */ -constexpr size_t compression_block_size(orc::CompressionKind compression) +size_t compression_block_size(compression_type compression) { - if (compression == orc::CompressionKind::NONE) { return 0; } - - auto const ncomp_type = to_nvcomp_compression_type(compression); - auto const nvcomp_limit = nvcomp::is_compression_disabled(ncomp_type) - ? std::nullopt - : nvcomp::compress_max_allowed_chunk_size(ncomp_type); + auto const comp_limit = compress_max_allowed_chunk_size(compression); constexpr size_t max_block_size = 256 * 1024; - return std::min(nvcomp_limit.value_or(max_block_size), max_block_size); + return std::min(comp_limit.value_or(max_block_size), max_block_size); } /** @@ -534,26 +513,6 @@ size_t RLE_stream_size(TypeKind kind, size_t count) } } -auto uncomp_block_alignment(CompressionKind compression_kind) -{ - if (compression_kind == NONE or - nvcomp::is_compression_disabled(to_nvcomp_compression_type(compression_kind))) { - return 1ul; - } - - return nvcomp::required_alignment(to_nvcomp_compression_type(compression_kind)); -} - -auto comp_block_alignment(CompressionKind compression_kind) -{ - if (compression_kind == NONE or - nvcomp::is_compression_disabled(to_nvcomp_compression_type(compression_kind))) { - return 1ul; - } - - return nvcomp::required_alignment(to_nvcomp_compression_type(compression_kind)); -} - /** * @brief Builds up per-column streams. * @@ -566,7 +525,7 @@ orc_streams create_streams(host_span columns, file_segmentation const& segmentation, std::map const& decimal_column_sizes, bool enable_dictionary, - CompressionKind compression_kind, + compression_type compression, single_write_mode write_mode) { // 'column 0' row index stream @@ -610,7 +569,7 @@ orc_streams create_streams(host_span columns, auto add_stream = [&](gpu::StreamIndexType index_type, StreamKind kind, TypeKind type_kind, size_t size) { - auto const max_alignment_padding = uncomp_block_alignment(compression_kind) - 1; + auto const max_alignment_padding = compress_required_chunk_alignment(compression) - 1; const auto base = column.index() * gpu::CI_NUM_STREAMS; ids[base + index_type] = streams.size(); streams.push_back(orc::Stream{ @@ -1473,7 +1432,7 @@ encoded_footer_statistics finish_statistic_blobs(Footer const& footer, * @param[in] rg_stats row group level statistics * @param[in,out] stripe Stream's parent stripe * @param[in,out] streams List of all streams - * @param[in] compression_kind The compression kind + * @param[in] compression The compression format * @param[in] compression_blocksize The block size used for compression * @param[in] out_sink Sink for writing data */ @@ -1487,7 +1446,7 @@ void write_index_stream(int32_t stripe_id, host_span rg_stats, StripeInformation* stripe, orc_streams* streams, - CompressionKind compression_kind, + compression_type compression, size_t compression_blocksize, std::unique_ptr const& out_sink) { @@ -1501,7 +1460,7 @@ void write_index_stream(int32_t stripe_id, row_group_index_info record; if (stream.ids[type] > 0) { record.pos = 0; - if (compression_kind != NONE) { + if (compression != compression_type::NONE) { auto const& ss = strm_desc[stripe_id][stream.ids[type] - (columns.size() + 1)]; record.blk_pos = ss.first_block; record.comp_pos = 0; @@ -1541,7 +1500,7 @@ void write_index_stream(int32_t stripe_id, } } - ProtobufWriter pbw((compression_kind != NONE) ? 3 : 0); + ProtobufWriter pbw((compression != compression_type::NONE) ? 3 : 0); // Add row index entries auto const& rowgroups_range = segmentation.stripes[stripe_id]; @@ -1566,7 +1525,7 @@ void write_index_stream(int32_t stripe_id, }); (*streams)[stream_id].length = pbw.size(); - if (compression_kind != NONE) { + if (compression != compression_type::NONE) { uint32_t uncomp_ix_len = (uint32_t)((*streams)[stream_id].length - 3) * 2 + 1; pbw.buffer()[0] = static_cast(uncomp_ix_len >> 0); pbw.buffer()[1] = static_cast(uncomp_ix_len >> 8); @@ -1585,7 +1544,7 @@ void write_index_stream(int32_t stripe_id, * @param[in,out] bounce_buffer Pinned memory bounce buffer for D2H data transfer * @param[in,out] stripe Stream's parent stripe * @param[in,out] streams List of all streams - * @param[in] compression_kind The compression kind + * @param[in] compression The compression format * @param[in] out_sink Sink for writing data * @param[in] stream CUDA stream used for device memory operations and kernel launches * @return An std::future that should be synchronized to ensure the writing is complete @@ -1596,7 +1555,7 @@ std::future write_data_stream(gpu::StripeStream const& strm_desc, host_span bounce_buffer, StripeInformation* stripe, orc_streams* streams, - CompressionKind compression_kind, + compression_type compression, std::unique_ptr const& out_sink, rmm::cuda_stream_view stream) { @@ -1606,8 +1565,9 @@ std::future write_data_stream(gpu::StripeStream const& strm_desc, return std::async(std::launch::deferred, [] {}); } - auto const* stream_in = (compression_kind == NONE) ? enc_stream.data_ptrs[strm_desc.stream_type] - : (compressed_data + strm_desc.bfr_offset); + auto const* stream_in = (compression == compression_type::NONE) + ? enc_stream.data_ptrs[strm_desc.stream_type] + : (compressed_data + strm_desc.bfr_offset); auto write_task = [&]() { if (out_sink->is_device_write_preferred(length)) { @@ -1627,15 +1587,15 @@ std::future write_data_stream(gpu::StripeStream const& strm_desc, /** * @brief Insert 3-byte uncompressed block headers in a byte vector * - * @param compression_kind The compression kind + * @param compression The compression kind * @param compression_blocksize The block size used for compression * @param v The destitation byte vector to write, which must include initial 3-byte header */ -void add_uncompressed_block_headers(CompressionKind compression_kind, +void add_uncompressed_block_headers(compression_type compression, size_t compression_blocksize, std::vector& v) { - if (compression_kind != NONE) { + if (compression != compression_type::NONE) { size_t uncomp_len = v.size() - 3, pos = 0, block_len; while (uncomp_len > compression_blocksize) { block_len = compression_blocksize * 2 + 1; @@ -2021,14 +1981,6 @@ std::map decimal_column_sizes( return column_sizes; } -size_t max_compression_output_size(CompressionKind compression_kind, uint32_t compression_blocksize) -{ - if (compression_kind == NONE) return 0; - - return nvcomp::compress_max_output_chunk_size(to_nvcomp_compression_type(compression_kind), - compression_blocksize); -} - std::unique_ptr make_table_meta(table_view const& input) { auto table_meta = std::make_unique(input); @@ -2287,7 +2239,7 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table, * @param row_index_stride The row index stride * @param enable_dictionary Whether dictionary is enabled * @param sort_dictionaries Whether to sort the dictionaries - * @param compression_kind The compression kind + * @param compression The compression format * @param compression_blocksize The block size used for compression * @param stats_freq Column statistics granularity type for parquet/orc writers * @param collect_compression_stats Flag to indicate if compression statistics should be collected @@ -2302,7 +2254,7 @@ auto convert_table_to_orc_data(table_view const& input, size_type row_index_stride, bool enable_dictionary, bool sort_dictionaries, - CompressionKind compression_kind, + compression_type compression, size_t compression_blocksize, statistics_freq stats_freq, bool collect_compression_stats, @@ -2329,17 +2281,16 @@ auto convert_table_to_orc_data(table_view const& input, auto stripe_dicts = build_dictionaries(orc_table, segmentation, sort_dictionaries, stream); auto dec_chunk_sizes = decimal_chunk_sizes(orc_table, segmentation, stream); - auto const uncompressed_block_align = uncomp_block_alignment(compression_kind); - auto const compressed_block_align = comp_block_alignment(compression_kind); + auto const block_align = compress_required_chunk_alignment(compression); auto streams = create_streams(orc_table.columns, segmentation, decimal_column_sizes(dec_chunk_sizes.rg_sizes), enable_dictionary, - compression_kind, + compression, write_mode); auto enc_data = encode_columns( - orc_table, std::move(dec_chunk_sizes), segmentation, streams, uncompressed_block_align, stream); + orc_table, std::move(dec_chunk_sizes), segmentation, streams, block_align, stream); stripe_dicts.on_encode_complete(stream); @@ -2371,16 +2322,15 @@ auto convert_table_to_orc_data(table_view const& input, size_t compressed_bfr_size = 0; size_t num_compressed_blocks = 0; - auto const max_compressed_block_size = - max_compression_output_size(compression_kind, compression_blocksize); + auto const max_compressed_block_size = max_compressed_size(compression, compression_blocksize); auto const padded_max_compressed_block_size = - util::round_up_unsafe(max_compressed_block_size, compressed_block_align); + util::round_up_unsafe(max_compressed_block_size, block_align); auto const padded_block_header_size = - util::round_up_unsafe(block_header_size, compressed_block_align); + util::round_up_unsafe(block_header_size, block_align); for (auto& ss : strm_descs.host_view().flat_view()) { size_t stream_size = ss.stream_size; - if (compression_kind != NONE) { + if (compression != compression_type::NONE) { ss.first_block = num_compressed_blocks; ss.bfr_offset = compressed_bfr_size; @@ -2401,14 +2351,14 @@ auto convert_table_to_orc_data(table_view const& input, comp_results.d_begin(), comp_results.d_end(), compression_result{0, compression_status::FAILURE}); - if (compression_kind != NONE) { + if (compression != compression_type::NONE) { strm_descs.host_to_device_async(stream); compression_stats = gpu::CompressOrcDataStreams(compressed_data, num_compressed_blocks, - compression_kind, + compression, compression_blocksize, max_compressed_block_size, - compressed_block_align, + block_align, collect_compression_stats, strm_descs, enc_data.streams, @@ -2459,8 +2409,8 @@ writer::impl::impl(std::unique_ptr sink, : _stream(stream), _max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()}, _row_index_stride{options.get_row_index_stride()}, - _compression_kind(to_orc_compression(options.get_compression())), - _compression_blocksize(compression_block_size(_compression_kind)), + _compression{options.get_compression()}, + _compression_blocksize(compression_block_size(_compression)), _compression_statistics(options.get_compression_statistics()), _stats_freq(options.get_statistics_freq()), _sort_dictionaries{options.get_enable_dictionary_sort()}, @@ -2480,8 +2430,8 @@ writer::impl::impl(std::unique_ptr sink, : _stream(stream), _max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()}, _row_index_stride{options.get_row_index_stride()}, - _compression_kind(to_orc_compression(options.get_compression())), - _compression_blocksize(compression_block_size(_compression_kind)), + _compression{options.get_compression()}, + _compression_blocksize(compression_block_size(_compression)), _compression_statistics(options.get_compression_statistics()), _stats_freq(options.get_statistics_freq()), _sort_dictionaries{options.get_enable_dictionary_sort()}, @@ -2526,7 +2476,7 @@ void writer::impl::write(table_view const& input) _row_index_stride, _enable_dictionary, _sort_dictionaries, - _compression_kind, + _compression, _compression_blocksize, _stats_freq, _compression_statistics != nullptr, @@ -2613,7 +2563,7 @@ void writer::impl::write_orc_data_to_sink(encoded_data const& enc_data, rg_stats, &stripe, &streams, - _compression_kind, + _compression, _compression_blocksize, _out_sink); } @@ -2627,7 +2577,7 @@ void writer::impl::write_orc_data_to_sink(encoded_data const& enc_data, bounce_buffer, &stripe, &streams, - _compression_kind, + _compression, _out_sink, _stream)); } @@ -2645,10 +2595,10 @@ void writer::impl::write_orc_data_to_sink(encoded_data const& enc_data, : 0; if (orc_table.column(i - 1).orc_kind() == TIMESTAMP) { sf.writerTimezone = "UTC"; } } - ProtobufWriter pbw((_compression_kind != NONE) ? 3 : 0); + ProtobufWriter pbw((_compression != compression_type::NONE) ? 3 : 0); pbw.write(sf); stripe.footerLength = pbw.size(); - if (_compression_kind != NONE) { + if (_compression != compression_type::NONE) { uint32_t uncomp_sf_len = (stripe.footerLength - 3) * 2 + 1; pbw.buffer()[0] = static_cast(uncomp_sf_len >> 0); pbw.buffer()[1] = static_cast(uncomp_sf_len >> 8); @@ -2780,21 +2730,21 @@ void writer::impl::close() // Write statistics metadata if (not _orc_meta.stripeStats.empty()) { - ProtobufWriter pbw((_compression_kind != NONE) ? 3 : 0); + ProtobufWriter pbw((_compression != compression_type::NONE) ? 3 : 0); pbw.write(_orc_meta); - add_uncompressed_block_headers(_compression_kind, _compression_blocksize, pbw.buffer()); + add_uncompressed_block_headers(_compression, _compression_blocksize, pbw.buffer()); ps.metadataLength = pbw.size(); _out_sink->host_write(pbw.data(), pbw.size()); } else { ps.metadataLength = 0; } - ProtobufWriter pbw((_compression_kind != NONE) ? 3 : 0); + ProtobufWriter pbw((_compression != compression_type::NONE) ? 3 : 0); pbw.write(_footer); - add_uncompressed_block_headers(_compression_kind, _compression_blocksize, pbw.buffer()); + add_uncompressed_block_headers(_compression, _compression_blocksize, pbw.buffer()); // Write postscript metadata ps.footerLength = pbw.size(); - ps.compression = _compression_kind; + ps.compression = to_orc_compression(_compression); ps.compressionBlockSize = _compression_blocksize; ps.version = {0, 12}; // Hive 0.12 ps.writerVersion = cudf_writer_version; diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index cae849ee315..7d23482cb17 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -342,7 +342,7 @@ class writer::impl { // Writer options. stripe_size_limits const _max_stripe_size; size_type const _row_index_stride; - CompressionKind const _compression_kind; + compression_type const _compression; size_t const _compression_blocksize; std::shared_ptr _compression_statistics; // Optional output statistics_freq const _stats_freq; diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 77924ac0f35..1b67b53ae8e 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -23,8 +23,7 @@ #include "compact_protocol_reader.hpp" #include "compact_protocol_writer.hpp" #include "interop/decimal_conversion_utilities.cuh" -#include "io/comp/gpuinflate.hpp" -#include "io/comp/nvcomp_adapter.hpp" +#include "io/comp/comp.hpp" #include "io/parquet/parquet.hpp" #include "io/parquet/parquet_gpu.hpp" #include "io/statistics/column_statistics.cuh" @@ -67,6 +66,20 @@ namespace cudf::io::parquet::detail { using namespace cudf::io::detail; +Compression to_parquet_compression(compression_type compression) +{ + switch (compression) { + case compression_type::AUTO: + case compression_type::SNAPPY: return Compression::SNAPPY; + case compression_type::ZSTD: return Compression::ZSTD; + case compression_type::LZ4: + // Parquet refers to LZ4 as "LZ4_RAW"; Parquet's "LZ4" is not standard LZ4 + return Compression::LZ4_RAW; + case compression_type::NONE: return Compression::UNCOMPRESSED; + default: CUDF_FAIL("Unsupported compression type"); + } +} + struct aggregate_writer_metadata { aggregate_writer_metadata(host_span partitions, host_span const> kv_md, @@ -1172,7 +1185,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, size_t max_page_size_bytes, size_type max_page_size_rows, bool write_v2_headers, - Compression compression_codec, + compression_type compression, rmm::cuda_stream_view stream) { if (chunks.is_empty()) { return cudf::detail::hostdevice_vector{}; } @@ -1187,7 +1200,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, num_columns, max_page_size_bytes, max_page_size_rows, - page_alignment(compression_codec), + compress_required_chunk_alignment(compression), write_v2_headers, nullptr, nullptr, @@ -1212,7 +1225,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, num_columns, max_page_size_bytes, max_page_size_rows, - page_alignment(compression_codec), + compress_required_chunk_alignment(compression), write_v2_headers, nullptr, nullptr, @@ -1221,12 +1234,10 @@ auto init_page_sizes(hostdevice_2dvector& chunks, // Get per-page max compressed size cudf::detail::hostdevice_vector comp_page_sizes(num_pages, stream); - std::transform(page_sizes.begin(), - page_sizes.end(), - comp_page_sizes.begin(), - [compression_codec](auto page_size) { - return max_compression_output_size(compression_codec, page_size); - }); + std::transform( + page_sizes.begin(), page_sizes.end(), comp_page_sizes.begin(), [compression](auto page_size) { + return max_compressed_size(compression, page_size); + }); comp_page_sizes.host_to_device_async(stream); // Use per-page max compressed size to calculate chunk.compressed_size @@ -1238,7 +1249,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, num_columns, max_page_size_bytes, max_page_size_rows, - page_alignment(compression_codec), + compress_required_chunk_alignment(compression), write_v2_headers, nullptr, nullptr, @@ -1247,16 +1258,13 @@ auto init_page_sizes(hostdevice_2dvector& chunks, return comp_page_sizes; } -size_t max_page_bytes(Compression compression, size_t max_page_size_bytes) +size_t max_page_bytes(compression_type compression, size_t max_page_size_bytes) { - if (compression == Compression::UNCOMPRESSED) { return max_page_size_bytes; } + if (compression == compression_type::NONE) { return max_page_size_bytes; } - auto const ncomp_type = to_nvcomp_compression_type(compression); - auto const nvcomp_limit = nvcomp::is_compression_disabled(ncomp_type) - ? std::nullopt - : nvcomp::compress_max_allowed_chunk_size(ncomp_type); + auto const comp_limit = compress_max_allowed_chunk_size(compression); - auto max_size = std::min(nvcomp_limit.value_or(max_page_size_bytes), max_page_size_bytes); + auto max_size = std::min(comp_limit.value_or(max_page_size_bytes), max_page_size_bytes); // page size must fit in a 32-bit signed integer return std::min(max_size, std::numeric_limits::max()); } @@ -1265,7 +1273,7 @@ std::pair>, std::vector& chunks, host_span col_desc, device_2dspan frags, - Compression compression, + compression_type compression, dictionary_policy dict_policy, size_t max_dict_size, rmm::cuda_stream_view stream) @@ -1404,7 +1412,7 @@ build_chunk_dictionaries(hostdevice_2dvector& chunks, * @param num_columns Total number of columns * @param num_pages Total number of pages * @param num_stats_bfr Number of statistics buffers - * @param compression Compression format + * @param alignment Page alignment * @param max_page_size_bytes Maximum uncompressed page size, in bytes * @param max_page_size_rows Maximum page size, in rows * @param write_v2_headers True if version 2 page headers are to be written @@ -1419,7 +1427,7 @@ void init_encoder_pages(hostdevice_2dvector& chunks, uint32_t num_columns, uint32_t num_pages, uint32_t num_stats_bfr, - Compression compression, + size_t alignment, size_t max_page_size_bytes, size_type max_page_size_rows, bool write_v2_headers, @@ -1435,7 +1443,7 @@ void init_encoder_pages(hostdevice_2dvector& chunks, num_columns, max_page_size_bytes, max_page_size_rows, - page_alignment(compression), + alignment, write_v2_headers, (num_stats_bfr) ? page_stats_mrg.data() : nullptr, (num_stats_bfr > num_pages) ? page_stats_mrg.data() + num_pages : nullptr, @@ -1478,7 +1486,7 @@ void encode_pages(hostdevice_2dvector& chunks, statistics_chunk const* chunk_stats, statistics_chunk const* column_stats, std::optional& comp_stats, - Compression compression, + compression_type compression, int32_t column_index_truncate_length, bool write_v2_headers, rmm::cuda_stream_view stream) @@ -1488,7 +1496,7 @@ void encode_pages(hostdevice_2dvector& chunks, ? device_span(page_stats, num_pages) : device_span(); - uint32_t max_comp_pages = (compression != Compression::UNCOMPRESSED) ? num_pages : 0; + uint32_t max_comp_pages = (compression != compression_type::NONE) ? num_pages : 0; rmm::device_uvector> comp_in(max_comp_pages, stream); rmm::device_uvector> comp_out(max_comp_pages, stream); @@ -1499,34 +1507,7 @@ void encode_pages(hostdevice_2dvector& chunks, compression_result{0, compression_status::FAILURE}); EncodePages(pages, write_v2_headers, comp_in, comp_out, comp_res, stream); - switch (compression) { - case Compression::SNAPPY: - if (nvcomp::is_compression_disabled(nvcomp::compression_type::SNAPPY)) { - gpu_snap(comp_in, comp_out, comp_res, stream); - } else { - nvcomp::batched_compress( - nvcomp::compression_type::SNAPPY, comp_in, comp_out, comp_res, stream); - } - break; - case Compression::ZSTD: { - if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::ZSTD); - reason) { - CUDF_FAIL("Compression error: " + reason.value()); - } - nvcomp::batched_compress(nvcomp::compression_type::ZSTD, comp_in, comp_out, comp_res, stream); - break; - } - case Compression::LZ4_RAW: { - if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::LZ4); - reason) { - CUDF_FAIL("Compression error: " + reason.value()); - } - nvcomp::batched_compress(nvcomp::compression_type::LZ4, comp_in, comp_out, comp_res, stream); - break; - } - case Compression::UNCOMPRESSED: break; - default: CUDF_FAIL("invalid compression type"); - } + compress(compression, comp_in, comp_out, comp_res, stream); // TBD: Not clear if the official spec actually allows dynamically turning off compression at the // chunk-level @@ -1744,7 +1725,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, size_type max_page_size_rows, int32_t column_index_truncate_length, statistics_freq stats_granularity, - Compression compression, + compression_type compression, bool collect_compression_statistics, dictionary_policy dict_policy, size_t max_dictionary_size, @@ -2146,7 +2127,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, } // Clear compressed buffer size if compression has been turned off - if (compression == Compression::UNCOMPRESSED) { max_comp_bfr_size = 0; } + if (compression == compression_type::NONE) { max_comp_bfr_size = 0; } // Initialize data pointers uint32_t const num_stats_bfr = @@ -2214,7 +2195,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, num_columns, num_pages, num_stats_bfr, - compression, + compress_required_chunk_alignment(compression), max_page_size_bytes, max_page_size_rows, write_v2_headers, @@ -2270,7 +2251,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, auto const dev_bfr = ck.is_compressed ? ck.compressed_bfr : ck.uncompressed_bfr; auto& column_chunk_meta = row_group.columns[i].meta_data; - if (ck.is_compressed) { column_chunk_meta.codec = compression; } + if (ck.is_compressed) { column_chunk_meta.codec = to_parquet_compression(compression); } if (!out_sink[p]->is_device_write_preferred(ck.compressed_size)) { all_device_write = false; } @@ -2375,7 +2356,7 @@ writer::impl::impl(std::vector> sinks, single_write_mode mode, rmm::cuda_stream_view stream) : _stream(stream), - _compression(to_parquet_compression(options.get_compression())), + _compression(options.get_compression()), _max_row_group_size{options.get_row_group_size_bytes()}, _max_row_group_rows{options.get_row_group_size_rows()}, _max_page_size_bytes(max_page_bytes(_compression, options.get_max_page_size_bytes())), @@ -2406,7 +2387,7 @@ writer::impl::impl(std::vector> sinks, single_write_mode mode, rmm::cuda_stream_view stream) : _stream(stream), - _compression(to_parquet_compression(options.get_compression())), + _compression(options.get_compression()), _max_row_group_size{options.get_row_group_size_bytes()}, _max_row_group_rows{options.get_row_group_size_rows()}, _max_page_size_bytes(max_page_bytes(_compression, options.get_max_page_size_bytes())), diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index 63128faf993..d5a5a534b93 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -144,7 +144,7 @@ class writer::impl { rmm::cuda_stream_view _stream; // Writer options. - Compression const _compression; + compression_type const _compression; size_t const _max_row_group_size; size_type const _max_row_group_rows; size_t const _max_page_size_bytes; diff --git a/cpp/src/io/parquet/writer_impl_helpers.cpp b/cpp/src/io/parquet/writer_impl_helpers.cpp index f15ea1f3c37..ede788c97c2 100644 --- a/cpp/src/io/parquet/writer_impl_helpers.cpp +++ b/cpp/src/io/parquet/writer_impl_helpers.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,8 +21,6 @@ #include "writer_impl_helpers.hpp" -#include "io/comp/nvcomp_adapter.hpp" - #include #include #include @@ -32,48 +30,6 @@ namespace cudf::io::parquet::detail { using namespace cudf::io::detail; -Compression to_parquet_compression(compression_type compression) -{ - switch (compression) { - case compression_type::AUTO: - case compression_type::SNAPPY: return Compression::SNAPPY; - case compression_type::ZSTD: return Compression::ZSTD; - case compression_type::LZ4: - // Parquet refers to LZ4 as "LZ4_RAW"; Parquet's "LZ4" is not standard LZ4 - return Compression::LZ4_RAW; - case compression_type::NONE: return Compression::UNCOMPRESSED; - default: CUDF_FAIL("Unsupported compression type"); - } -} - -nvcomp::compression_type to_nvcomp_compression_type(Compression codec) -{ - switch (codec) { - case Compression::SNAPPY: return nvcomp::compression_type::SNAPPY; - case Compression::ZSTD: return nvcomp::compression_type::ZSTD; - // Parquet refers to LZ4 as "LZ4_RAW"; Parquet's "LZ4" is not standard LZ4 - case Compression::LZ4_RAW: return nvcomp::compression_type::LZ4; - default: CUDF_FAIL("Unsupported compression type"); - } -} - -uint32_t page_alignment(Compression codec) -{ - if (codec == Compression::UNCOMPRESSED or - nvcomp::is_compression_disabled(to_nvcomp_compression_type(codec))) { - return 1u; - } - - return nvcomp::required_alignment(to_nvcomp_compression_type(codec)); -} - -size_t max_compression_output_size(Compression codec, uint32_t compression_blocksize) -{ - if (codec == Compression::UNCOMPRESSED) return 0; - - return compress_max_output_chunk_size(to_nvcomp_compression_type(codec), compression_blocksize); -} - void fill_table_meta(table_input_metadata& table_meta) { // Fill unnamed columns' names in table_meta diff --git a/cpp/src/io/parquet/writer_impl_helpers.hpp b/cpp/src/io/parquet/writer_impl_helpers.hpp index 14a9a0ed5b7..b5c73c348fe 100644 --- a/cpp/src/io/parquet/writer_impl_helpers.hpp +++ b/cpp/src/io/parquet/writer_impl_helpers.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,48 +20,12 @@ */ #pragma once -#include "parquet_common.hpp" #include #include -#include namespace cudf::io::parquet::detail { -/** - * @brief Function that translates GDF compression to parquet compression. - * - * @param compression The compression type - * @return The supported Parquet compression - */ -Compression to_parquet_compression(compression_type compression); - -/** - * @brief Function that translates the given compression codec to nvcomp compression type. - * - * @param codec Compression codec - * @return Translated nvcomp compression type - */ -cudf::io::detail::nvcomp::compression_type to_nvcomp_compression_type(Compression codec); - -/** - * @brief Function that computes input alignment requirements for the given compression type. - * - * @param codec Compression codec - * @return Required alignment - */ -uint32_t page_alignment(Compression codec); - -/** - * @brief Gets the maximum compressed chunk size for the largest chunk uncompressed chunk in the - * batch. - * - * @param codec Compression codec - * @param compression_blocksize Size of the largest uncompressed chunk in the batch - * @return Maximum compressed chunk size - */ -size_t max_compression_output_size(Compression codec, uint32_t compression_blocksize); - /** * @brief Fill the table metadata with default column names. * diff --git a/cpp/tests/io/orc_test.cpp b/cpp/tests/io/orc_test.cpp index 2209a30149d..708c2045a74 100644 --- a/cpp/tests/io/orc_test.cpp +++ b/cpp/tests/io/orc_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -2068,6 +2068,7 @@ TEST_P(OrcCompressionTest, Basic) INSTANTIATE_TEST_CASE_P(OrcCompressionTest, OrcCompressionTest, ::testing::Values(cudf::io::compression_type::NONE, + cudf::io::compression_type::AUTO, cudf::io::compression_type::SNAPPY, cudf::io::compression_type::LZ4, cudf::io::compression_type::ZSTD)); diff --git a/cpp/tests/io/parquet_misc_test.cpp b/cpp/tests/io/parquet_misc_test.cpp index d66f685cd9c..419ac909ac6 100644 --- a/cpp/tests/io/parquet_misc_test.cpp +++ b/cpp/tests/io/parquet_misc_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * Copyright (c) 2023-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -268,6 +268,7 @@ TEST_P(ParquetCompressionTest, Basic) INSTANTIATE_TEST_CASE_P(ParquetCompressionTest, ParquetCompressionTest, ::testing::Values(cudf::io::compression_type::NONE, + cudf::io::compression_type::AUTO, cudf::io::compression_type::SNAPPY, cudf::io::compression_type::LZ4, cudf::io::compression_type::ZSTD));