Skip to content

Commit

Permalink
Host compression (#17656)
Browse files Browse the repository at this point in the history
Add compression APIs to make the nvCOMP use transparent.
Remove direct dependency on nvCOMP in the ORC and Parquet writers.
Add multi-threaded host-side compression; currently off by default, can only be enabled via `LIBCUDF_USE_HOST_COMPRESSION` environment variable.

Currently the host compression adds D2H + H2D transfers. Avoiding the H2D transfer requires large changes to the writers.

Also moved handling of the AUTO compression type to the options classes, which should own such defaults (translate AUTO to SNAPPY in this case).

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Yunsong Wang (https://github.com/PointKernel)
  - Shruti Shivakumar (https://github.com/shrshi)
  - Vyas Ramasubramani (https://github.com/vyasr)

URL: #17656
  • Loading branch information
vuule authored Jan 13, 2025
1 parent 4ec389b commit bbf4f78
Show file tree
Hide file tree
Showing 17 changed files with 338 additions and 319 deletions.
2 changes: 1 addition & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -477,13 +477,13 @@ add_library(
src/io/avro/reader_impl.cu
src/io/comp/brotli_dict.cpp
src/io/comp/comp.cpp
src/io/comp/comp.cu
src/io/comp/cpu_unbz2.cpp
src/io/comp/debrotli.cu
src/io/comp/gpuinflate.cu
src/io/comp/nvcomp_adapter.cpp
src/io/comp/nvcomp_adapter.cu
src/io/comp/snap.cu
src/io/comp/statistics.cu
src/io/comp/uncomp.cpp
src/io/comp/unsnap.cu
src/io/csv/csv_gpu.cu
Expand Down
22 changes: 15 additions & 7 deletions cpp/include/cudf/io/orc.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
* Copyright (c) 2020-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -578,7 +578,7 @@ class orc_writer_options {
// Specify the sink to use for writer output
sink_info _sink;
// Specify the compression format to use
compression_type _compression = compression_type::AUTO;
compression_type _compression = compression_type::SNAPPY;
// Specify frequency of statistics collection
statistics_freq _stats_freq = ORC_STATISTICS_ROW_GROUP;
// Maximum size of each stripe (unless smaller than a single row group)
Expand Down Expand Up @@ -733,7 +733,11 @@ class orc_writer_options {
*
* @param comp Compression type
*/
void set_compression(compression_type comp) { _compression = comp; }
void set_compression(compression_type comp)
{
_compression = comp;
if (comp == compression_type::AUTO) { _compression = compression_type::SNAPPY; }
}

/**
* @brief Choose granularity of statistics collection.
Expand Down Expand Up @@ -865,7 +869,7 @@ class orc_writer_options_builder {
*/
orc_writer_options_builder& compression(compression_type comp)
{
options._compression = comp;
options.set_compression(comp);
return *this;
}

Expand Down Expand Up @@ -1026,7 +1030,7 @@ class chunked_orc_writer_options {
// Specify the sink to use for writer output
sink_info _sink;
// Specify the compression format to use
compression_type _compression = compression_type::AUTO;
compression_type _compression = compression_type::SNAPPY;
// Specify granularity of statistics collection
statistics_freq _stats_freq = ORC_STATISTICS_ROW_GROUP;
// Maximum size of each stripe (unless smaller than a single row group)
Expand Down Expand Up @@ -1157,7 +1161,11 @@ class chunked_orc_writer_options {
*
* @param comp The compression type to use
*/
void set_compression(compression_type comp) { _compression = comp; }
void set_compression(compression_type comp)
{
_compression = comp;
if (comp == compression_type::AUTO) { _compression = compression_type::SNAPPY; }
}

/**
* @brief Choose granularity of statistics collection
Expand Down Expand Up @@ -1279,7 +1287,7 @@ class chunked_orc_writer_options_builder {
*/
chunked_orc_writer_options_builder& compression(compression_type comp)
{
options._compression = comp;
options.set_compression(comp);
return *this;
}

Expand Down
163 changes: 162 additions & 1 deletion cpp/src/io/comp/comp.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
* Copyright (c) 2018-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,22 +16,45 @@

#include "comp.hpp"

#include "gpuinflate.hpp"
#include "io/utilities/getenv_or.hpp"
#include "io/utilities/hostdevice_vector.hpp"
#include "nvcomp_adapter.hpp"

#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/cuda_memcpy.hpp>
#include <cudf/detail/utilities/stream_pool.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/utilities/error.hpp>
#include <cudf/utilities/memory_resource.hpp>
#include <cudf/utilities/span.hpp>

#include <BS_thread_pool.hpp>
#include <zlib.h> // GZIP compression

namespace cudf::io::detail {

namespace {

auto& h_comp_pool()
{
static std::size_t pool_size =
getenv_or("LIBCUDF_HOST_COMPRESSION_NUM_THREADS", std::thread::hardware_concurrency());
static BS::thread_pool pool(pool_size);
return pool;
}

std::optional<nvcomp::compression_type> to_nvcomp_compression(compression_type compression)
{
switch (compression) {
case compression_type::SNAPPY: return nvcomp::compression_type::SNAPPY;
case compression_type::ZSTD: return nvcomp::compression_type::ZSTD;
case compression_type::LZ4: return nvcomp::compression_type::LZ4;
case compression_type::ZLIB: return nvcomp::compression_type::DEFLATE;
default: return std::nullopt;
}
}

/**
* @brief GZIP host compressor (includes header)
*/
Expand Down Expand Up @@ -98,8 +121,132 @@ std::vector<std::uint8_t> compress_snappy(host_span<uint8_t const> src,
return cudf::detail::make_std_vector_sync<uint8_t>(d_dst, stream);
}

void device_compress(compression_type compression,
device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
device_span<compression_result> results,
rmm::cuda_stream_view stream)
{
if (compression == compression_type::NONE) { return; }

auto const nvcomp_type = to_nvcomp_compression(compression);
auto nvcomp_disabled = nvcomp_type.has_value() ? nvcomp::is_compression_disabled(*nvcomp_type)
: "invalid compression type";
if (not nvcomp_disabled) {
return nvcomp::batched_compress(*nvcomp_type, inputs, outputs, results, stream);
}

switch (compression) {
case compression_type::SNAPPY: return gpu_snap(inputs, outputs, results, stream);
default: CUDF_FAIL("Compression error: " + nvcomp_disabled.value());
}
}

void host_compress(compression_type compression,
device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
device_span<compression_result> results,
rmm::cuda_stream_view stream)
{
if (compression == compression_type::NONE) { return; }

auto const num_chunks = inputs.size();
auto h_results = cudf::detail::make_host_vector<compression_result>(num_chunks, stream);
auto const h_inputs = cudf::detail::make_host_vector_async(inputs, stream);
auto const h_outputs = cudf::detail::make_host_vector_async(outputs, stream);
stream.synchronize();

std::vector<std::future<size_t>> tasks;
auto const num_streams =
std::min<std::size_t>({num_chunks,
cudf::detail::global_cuda_stream_pool().get_stream_pool_size(),
h_comp_pool().get_thread_count()});
auto const streams = cudf::detail::fork_streams(stream, num_streams);
for (size_t i = 0; i < num_chunks; ++i) {
auto const cur_stream = streams[i % streams.size()];
auto task = [d_in = h_inputs[i], d_out = h_outputs[i], cur_stream, compression]() -> size_t {
auto const h_in = cudf::detail::make_host_vector_sync(d_in, cur_stream);
auto const h_out = compress(compression, h_in, cur_stream);
cudf::detail::cuda_memcpy<uint8_t>(d_out.subspan(0, h_out.size()), h_out, cur_stream);
return h_out.size();
};
tasks.emplace_back(h_comp_pool().submit_task(std::move(task)));
}

for (auto i = 0ul; i < num_chunks; ++i) {
h_results[i] = {tasks[i].get(), compression_status::SUCCESS};
}
cudf::detail::cuda_memcpy_async<compression_result>(results, h_results, stream);
}

[[nodiscard]] bool host_compression_supported(compression_type compression)
{
switch (compression) {
case compression_type::GZIP:
case compression_type::NONE: return true;
default: return false;
}
}

[[nodiscard]] bool device_compression_supported(compression_type compression)
{
auto const nvcomp_type = to_nvcomp_compression(compression);
switch (compression) {
case compression_type::LZ4:
case compression_type::ZLIB:
case compression_type::ZSTD: return not nvcomp::is_compression_disabled(nvcomp_type.value());
case compression_type::SNAPPY:
case compression_type::NONE: return true;
default: return false;
}
}

[[nodiscard]] bool use_host_compression(
compression_type compression,
[[maybe_unused]] device_span<device_span<uint8_t const> const> inputs,
[[maybe_unused]] device_span<device_span<uint8_t> const> outputs)
{
CUDF_EXPECTS(
not host_compression_supported(compression) or device_compression_supported(compression),
"Unsupported compression type");
if (not host_compression_supported(compression)) { return false; }
if (not device_compression_supported(compression)) { return true; }
// If both host and device compression are supported, use the host if the env var is set
return getenv_or("LIBCUDF_USE_HOST_COMPRESSION", 0);
}

} // namespace

std::optional<size_t> compress_max_allowed_chunk_size(compression_type compression)
{
if (auto nvcomp_type = to_nvcomp_compression(compression);
nvcomp_type.has_value() and not nvcomp::is_compression_disabled(*nvcomp_type)) {
return nvcomp::compress_max_allowed_chunk_size(*nvcomp_type);
}
return std::nullopt;
}

[[nodiscard]] size_t compress_required_chunk_alignment(compression_type compression)
{
auto nvcomp_type = to_nvcomp_compression(compression);
if (compression == compression_type::NONE or not nvcomp_type.has_value() or
nvcomp::is_compression_disabled(*nvcomp_type)) {
return 1ul;
}

return nvcomp::required_alignment(*nvcomp_type);
}

[[nodiscard]] size_t max_compressed_size(compression_type compression, uint32_t uncompressed_size)
{
if (compression == compression_type::NONE) { return uncompressed_size; }

if (auto nvcomp_type = to_nvcomp_compression(compression); nvcomp_type.has_value()) {
return nvcomp::compress_max_output_chunk_size(*nvcomp_type, uncompressed_size);
}
CUDF_FAIL("Unsupported compression type");
}

std::vector<std::uint8_t> compress(compression_type compression,
host_span<uint8_t const> src,
rmm::cuda_stream_view stream)
Expand All @@ -112,4 +259,18 @@ std::vector<std::uint8_t> compress(compression_type compression,
}
}

void compress(compression_type compression,
device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
device_span<compression_result> results,
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
if (use_host_compression(compression, inputs, outputs)) {
return host_compress(compression, inputs, outputs, results, stream);
} else {
return device_compress(compression, inputs, outputs, results, stream);
}
}

} // namespace cudf::io::detail
4 changes: 2 additions & 2 deletions cpp/src/io/comp/statistics.cu → cpp/src/io/comp/comp.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,7 +14,7 @@
* limitations under the License.
*/

#include "gpuinflate.hpp"
#include "comp.hpp"

#include <rmm/exec_policy.hpp>

Expand Down
54 changes: 53 additions & 1 deletion cpp/src/io/comp/comp.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -57,5 +57,57 @@ std::vector<uint8_t> compress(compression_type compression,
host_span<uint8_t const> src,
rmm::cuda_stream_view stream);

/**
* @brief Maximum size of uncompressed chunks that can be compressed.
*
* @param compression Compression type
* @returns maximum chunk size
*/
[[nodiscard]] std::optional<size_t> compress_max_allowed_chunk_size(compression_type compression);

/**
* @brief Gets input and output alignment requirements for the given compression type.
*
* @param compression Compression type
* @returns required alignment
*/
[[nodiscard]] size_t compress_required_chunk_alignment(compression_type compression);

/**
* @brief Gets the maximum size any chunk could compress to in the batch.
*
* @param compression Compression type
* @param uncompressed_size Size of the largest uncompressed chunk in the batch
*/
[[nodiscard]] size_t max_compressed_size(compression_type compression, uint32_t uncompressed_size);

/**
* @brief Compresses device memory buffers.
*
* @param compression Type of compression of the input data
* @param inputs Device memory buffers to compress
* @param outputs Device memory buffers to store the compressed output
* @param results Compression results
* @param stream CUDA stream used for device memory operations and kernel launches
*/
void compress(compression_type compression,
device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
device_span<compression_result> results,
rmm::cuda_stream_view stream);

/**
* @brief Aggregate results of compression into a single statistics object.
*
* @param inputs List of uncompressed input buffers
* @param results List of compression results
* @param stream CUDA stream to use
* @return writer_compression_statistics
*/
[[nodiscard]] writer_compression_statistics collect_compression_statistics(
device_span<device_span<uint8_t const> const> inputs,
device_span<compression_result const> results,
rmm::cuda_stream_view stream);

} // namespace io::detail
} // namespace CUDF_EXPORT cudf
15 changes: 1 addition & 14 deletions cpp/src/io/comp/gpuinflate.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
* Copyright (c) 2018-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -124,17 +124,4 @@ void gpu_snap(device_span<device_span<uint8_t const> const> inputs,
device_span<compression_result> results,
rmm::cuda_stream_view stream);

/**
* @brief Aggregate results of compression into a single statistics object.
*
* @param inputs List of uncompressed input buffers
* @param results List of compression results
* @param stream CUDA stream to use
* @return writer_compression_statistics
*/
[[nodiscard]] writer_compression_statistics collect_compression_statistics(
device_span<device_span<uint8_t const> const> inputs,
device_span<compression_result const> results,
rmm::cuda_stream_view stream);

} // namespace cudf::io::detail
Loading

0 comments on commit bbf4f78

Please sign in to comment.