Skip to content

Commit

Permalink
Merge branch 'branch-25.02' into bug-write_orc-multiblock-sf
Browse files Browse the repository at this point in the history
  • Loading branch information
vuule authored Jan 10, 2025
2 parents 729023a + dc2a75c commit 8626bf8
Show file tree
Hide file tree
Showing 52 changed files with 499 additions and 452 deletions.
4 changes: 2 additions & 2 deletions conda/environments/all_cuda-118_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ dependencies:
- nbsphinx
- ninja
- notebook
- numba-cuda>=0.0.13,<0.0.18
- numba-cuda>=0.2.0,<0.3.0
- numpy>=1.23,<3.0a0
- numpydoc
- nvcc_linux-64=11.8
Expand All @@ -66,7 +66,7 @@ dependencies:
- pandas
- pandas>=2.0,<2.2.4dev0
- pandoc
- polars>=1.11,<1.15
- polars>=1.11,<1.18
- pre-commit
- ptxcompiler
- pyarrow>=14.0.0,<19.0.0a0
Expand Down
4 changes: 2 additions & 2 deletions conda/environments/all_cuda-125_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ dependencies:
- nbsphinx
- ninja
- notebook
- numba-cuda>=0.0.13,<0.0.18
- numba-cuda>=0.2.0,<0.3.0
- numpy>=1.23,<3.0a0
- numpydoc
- nvcomp==4.1.0.6
Expand All @@ -64,7 +64,7 @@ dependencies:
- pandas
- pandas>=2.0,<2.2.4dev0
- pandoc
- polars>=1.11,<1.15
- polars>=1.11,<1.18
- pre-commit
- pyarrow>=14.0.0,<19.0.0a0
- pydata-sphinx-theme!=0.14.2
Expand Down
4 changes: 2 additions & 2 deletions conda/recipes/cudf-polars/meta.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
# Copyright (c) 2024-2025, NVIDIA CORPORATION.

{% set version = environ['RAPIDS_PACKAGE_VERSION'].lstrip('v') %}
{% set minor_version = version.split('.')[0] + '.' + version.split('.')[1] %}
Expand Down Expand Up @@ -43,7 +43,7 @@ requirements:
run:
- python
- pylibcudf ={{ version }}
- polars >=1.11,<1.15
- polars >=1.11,<1.18
- {{ pin_compatible('cuda-version', max_pin='x', min_pin='x') }}

test:
Expand Down
4 changes: 2 additions & 2 deletions conda/recipes/cudf/meta.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2018-2024, NVIDIA CORPORATION.
# Copyright (c) 2018-2025, NVIDIA CORPORATION.

{% set version = environ['RAPIDS_PACKAGE_VERSION'].lstrip('v') %}
{% set minor_version = version.split('.')[0] + '.' + version.split('.')[1] %}
Expand Down Expand Up @@ -80,7 +80,7 @@ requirements:
- typing_extensions >=4.0.0
- pandas >=2.0,<2.2.4dev0
- cupy >=12.0.0
- numba-cuda >=0.0.13,<0.0.18
- numba-cuda >=0.2.0,<0.3.0
- numpy >=1.23,<3.0a0
- pyarrow>=14.0.0,<18.0.0a0
- libcudf ={{ version }}
Expand Down
12 changes: 9 additions & 3 deletions cpp/include/cudf/detail/utilities/integer_utils.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* Copyright 2019 BlazingDB, Inc.
* Copyright 2019 Eyal Rozenberg <[email protected]>
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
* Copyright (c) 2020-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,8 @@
*/

#include <cudf/fixed_point/temporary.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/error.hpp>

#include <cmath>
#include <cstdlib>
Expand All @@ -44,13 +46,17 @@ namespace util {
* `modulus` is positive. The safety is in regard to rollover.
*/
template <typename S>
constexpr S round_up_safe(S number_to_round, S modulus)
CUDF_HOST_DEVICE constexpr S round_up_safe(S number_to_round, S modulus)
{
auto remainder = number_to_round % modulus;
if (remainder == 0) { return number_to_round; }
auto rounded_up = number_to_round - remainder + modulus;
if (rounded_up < number_to_round) {
throw std::invalid_argument("Attempt to round up beyond the type's maximum value");
#ifndef __CUDA_ARCH__
CUDF_FAIL("Attempt to round up beyond the type's maximum value", cudf::data_type_error);
#else
CUDF_UNREACHABLE("Attempt to round up beyond the type's maximum value");
#endif
}
return rounded_up;
}
Expand Down
40 changes: 24 additions & 16 deletions cpp/include/cudf/utilities/span.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
* Copyright (c) 2020-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -197,11 +197,16 @@ struct host_span : public cudf::detail::span_base<T, Extent, host_span<T, Extent

constexpr host_span() noexcept : base() {} // required to compile on centos

/// Constructor from pointer and size
/// @param data Pointer to the first element in the span
/// @param size The number of elements in the span
/// @param is_device_accessible Whether the data is device accessible (e.g. pinned memory)
constexpr host_span(T* data, std::size_t size, bool is_device_accessible)
/**
* @brief Constructor from pointer and size
*
* @note This needs to be host-device , as it's used by a host-device function in base_2dspan
*
* @param data Pointer to the first element in the span
* @param size The number of elements in the span
* @param is_device_accessible Whether the data is device accessible (e.g. pinned memory)
*/
CUDF_HOST_DEVICE constexpr host_span(T* data, std::size_t size, bool is_device_accessible)
: base(data, size), _is_device_accessible{is_device_accessible}
{
}
Expand Down Expand Up @@ -311,8 +316,8 @@ struct host_span : public cudf::detail::span_base<T, Extent, host_span<T, Extent
* @param count The number of elements in the subspan
* @return A subspan of the sequence, of requested count and offset
*/
[[nodiscard]] constexpr host_span subspan(typename base::size_type offset,
typename base::size_type count) const noexcept
[[nodiscard]] CUDF_HOST_DEVICE constexpr host_span subspan(
typename base::size_type offset, typename base::size_type count) const noexcept
{
return host_span{this->data() + offset, count, _is_device_accessible};
}
Expand Down Expand Up @@ -434,8 +439,8 @@ struct device_span : public cudf::detail::span_base<T, Extent, device_span<T, Ex
* @param count The number of elements in the subspan
* @return A subspan of the sequence, of requested count and offset
*/
[[nodiscard]] constexpr device_span subspan(typename base::size_type offset,
typename base::size_type count) const noexcept
[[nodiscard]] CUDF_HOST_DEVICE constexpr device_span subspan(
typename base::size_type offset, typename base::size_type count) const noexcept
{
return device_span{this->data() + offset, count};
}
Expand Down Expand Up @@ -475,28 +480,28 @@ class base_2dspan {
*
* @return A pointer to the first element of the span
*/
[[nodiscard]] constexpr auto data() const noexcept { return _flat.data(); }
[[nodiscard]] CUDF_HOST_DEVICE constexpr auto data() const noexcept { return _flat.data(); }

/**
* @brief Returns the size in the span as pair.
*
* @return pair representing rows and columns size of the span
*/
[[nodiscard]] constexpr auto size() const noexcept { return _size; }
[[nodiscard]] CUDF_HOST_DEVICE constexpr auto size() const noexcept { return _size; }

/**
* @brief Returns the number of elements in the span.
*
* @return Number of elements in the span
*/
[[nodiscard]] constexpr auto count() const noexcept { return _flat.size(); }
[[nodiscard]] CUDF_HOST_DEVICE constexpr auto count() const noexcept { return _flat.size(); }

/**
* @brief Checks if the span is empty.
*
* @return True if the span is empty, false otherwise
*/
[[nodiscard]] constexpr bool is_empty() const noexcept { return count() == 0; }
[[nodiscard]] CUDF_HOST_DEVICE constexpr bool is_empty() const noexcept { return count() == 0; }

/**
* @brief Returns a reference to the row-th element of the sequence.
Expand All @@ -507,7 +512,7 @@ class base_2dspan {
* @param row the index of the element to access
* @return A reference to the row-th element of the sequence, i.e., `data()[row]`
*/
constexpr RowType<T, dynamic_extent> operator[](size_t row) const
CUDF_HOST_DEVICE constexpr RowType<T, dynamic_extent> operator[](size_t row) const
{
return _flat.subspan(row * _size.second, _size.second);
}
Expand All @@ -517,7 +522,10 @@ class base_2dspan {
*
* @return A flattened span of the 2D span
*/
[[nodiscard]] constexpr RowType<T, dynamic_extent> flat_view() const { return _flat; }
[[nodiscard]] CUDF_HOST_DEVICE constexpr RowType<T, dynamic_extent> flat_view() const
{
return _flat;
}

/**
* @brief Construct a 2D span from another 2D span of convertible type
Expand Down
71 changes: 61 additions & 10 deletions cpp/src/io/json/read_json.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
* Copyright (c) 2022-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,19 +30,33 @@
#include <cudf/utilities/memory_resource.hpp>
#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_pool.hpp>
#include <rmm/device_uvector.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/distance.h>
#include <thrust/iterator/constant_iterator.h>
#include <thrust/scatter.h>

#include <BS_thread_pool.hpp>
#include <BS_thread_pool_utils.hpp>

#include <numeric>

namespace cudf::io::json::detail {

namespace {

namespace pools {

BS::thread_pool& tpool()
{
static BS::thread_pool _tpool(std::thread::hardware_concurrency());
return _tpool;
}

} // namespace pools

class compressed_host_buffer_source final : public datasource {
public:
explicit compressed_host_buffer_source(std::unique_ptr<datasource> const& src,
Expand All @@ -51,8 +65,8 @@ class compressed_host_buffer_source final : public datasource {
{
auto ch_buffer = host_span<uint8_t const>(reinterpret_cast<uint8_t const*>(_dbuf_ptr->data()),
_dbuf_ptr->size());
if (comptype == compression_type::GZIP || comptype == compression_type::ZIP ||
comptype == compression_type::SNAPPY) {
if (_comptype == compression_type::GZIP || _comptype == compression_type::ZIP ||
_comptype == compression_type::SNAPPY) {
_decompressed_ch_buffer_size = cudf::io::detail::get_uncompressed_size(_comptype, ch_buffer);
} else {
_decompressed_buffer = cudf::io::detail::decompress(_comptype, ch_buffer);
Expand Down Expand Up @@ -96,7 +110,22 @@ class compressed_host_buffer_source final : public datasource {
return std::make_unique<non_owning_buffer>(_decompressed_buffer.data() + offset, count);
}

[[nodiscard]] bool supports_device_read() const override { return false; }
std::future<size_t> device_read_async(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) override
{
auto& thread_pool = pools::tpool();
return thread_pool.submit_task([this, offset, size, dst, stream] {
auto hbuf = host_read(offset, size);
CUDF_CUDA_TRY(
cudaMemcpyAsync(dst, hbuf->data(), hbuf->size(), cudaMemcpyHostToDevice, stream.value()));
stream.synchronize();
return hbuf->size();
});
}

[[nodiscard]] bool supports_device_read() const override { return true; }

[[nodiscard]] size_t size() const override { return _decompressed_ch_buffer_size; }

Expand Down Expand Up @@ -431,6 +460,8 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
// line of file i+1 don't end up on the same JSON line, if file i does not already end with a line
// delimiter.
auto constexpr num_delimiter_chars = 1;
std::vector<std::future<size_t>> thread_tasks;
auto stream_pool = cudf::detail::fork_streams(stream, pools::tpool().get_thread_count());

auto delimiter_map = cudf::detail::make_empty_host_vector<std::size_t>(sources.size(), stream);
std::vector<std::size_t> prefsum_source_sizes(sources.size());
Expand All @@ -447,13 +478,17 @@ device_span<char> ingest_raw_input(device_span<char> buffer,

auto const total_bytes_to_read = std::min(range_size, prefsum_source_sizes.back() - range_offset);
range_offset -= start_source ? prefsum_source_sizes[start_source - 1] : 0;
for (std::size_t i = start_source; i < sources.size() && bytes_read < total_bytes_to_read; i++) {
for (std::size_t i = start_source, cur_stream = 0;
i < sources.size() && bytes_read < total_bytes_to_read;
i++) {
if (sources[i]->is_empty()) continue;
auto data_size = std::min(sources[i]->size() - range_offset, total_bytes_to_read - bytes_read);
auto destination = reinterpret_cast<uint8_t*>(buffer.data()) + bytes_read +
(num_delimiter_chars * delimiter_map.size());
if (sources[i]->is_device_read_preferred(data_size)) {
bytes_read += sources[i]->device_read(range_offset, data_size, destination, stream);
if (sources[i]->supports_device_read()) {
thread_tasks.emplace_back(sources[i]->device_read_async(
range_offset, data_size, destination, stream_pool[cur_stream++ % stream_pool.size()]));
bytes_read += data_size;
} else {
h_buffers.emplace_back(sources[i]->host_read(range_offset, data_size));
auto const& h_buffer = h_buffers.back();
Expand Down Expand Up @@ -481,6 +516,15 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
buffer.data());
}
stream.synchronize();

if (thread_tasks.size()) {
auto const bytes_read = std::accumulate(
thread_tasks.begin(), thread_tasks.end(), std::size_t{0}, [](std::size_t sum, auto& task) {
return sum + task.get();
});
CUDF_EXPECTS(bytes_read == total_bytes_to_read, "something's fishy");
}

return buffer.first(bytes_read + (delimiter_map.size() * num_delimiter_chars));
}

Expand All @@ -505,10 +549,17 @@ table_with_metadata read_json(host_span<std::unique_ptr<datasource>> sources,
return read_json_impl(sources, reader_opts, stream, mr);

std::vector<std::unique_ptr<datasource>> compressed_sources;
for (size_t i = 0; i < sources.size(); i++) {
compressed_sources.emplace_back(
std::make_unique<compressed_host_buffer_source>(sources[i], reader_opts.get_compression()));
std::vector<std::future<std::unique_ptr<compressed_host_buffer_source>>> thread_tasks;
auto& thread_pool = pools::tpool();
for (auto& src : sources) {
thread_tasks.emplace_back(thread_pool.submit_task([&reader_opts, &src] {
return std::make_unique<compressed_host_buffer_source>(src, reader_opts.get_compression());
}));
}
std::transform(thread_tasks.begin(),
thread_tasks.end(),
std::back_inserter(compressed_sources),
[](auto& task) { return task.get(); });
// in read_json_impl, we need the compressed source size to actually be the
// uncompressed source size for correct batching
return read_json_impl(compressed_sources, reader_opts, stream, mr);
Expand Down
5 changes: 1 addition & 4 deletions cpp/src/io/parquet/decode_fixed.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -961,9 +961,6 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t, 8)
return;
}

// if we have no work to do (eg, in a skip_rows/num_rows case) in this page.
if (s->num_rows == 0) { return; }

using value_decoder_type = std::conditional_t<
split_decode_t,
decode_fixed_width_split_values_func<decode_block_size_t, has_lists_t, state_buf_t>,
Expand Down
Loading

0 comments on commit 8626bf8

Please sign in to comment.