Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/branch-25.02' into numba/depre…
Browse files Browse the repository at this point in the history
…cated_config
  • Loading branch information
mroeschke committed Jan 10, 2025
2 parents cfd71ef + 559cda2 commit 1e51ac4
Show file tree
Hide file tree
Showing 29 changed files with 329 additions and 362 deletions.
71 changes: 61 additions & 10 deletions cpp/src/io/json/read_json.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
* Copyright (c) 2022-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,19 +30,33 @@
#include <cudf/utilities/memory_resource.hpp>
#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_pool.hpp>
#include <rmm/device_uvector.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/distance.h>
#include <thrust/iterator/constant_iterator.h>
#include <thrust/scatter.h>

#include <BS_thread_pool.hpp>
#include <BS_thread_pool_utils.hpp>

#include <numeric>

namespace cudf::io::json::detail {

namespace {

namespace pools {

BS::thread_pool& tpool()
{
static BS::thread_pool _tpool(std::thread::hardware_concurrency());
return _tpool;
}

} // namespace pools

class compressed_host_buffer_source final : public datasource {
public:
explicit compressed_host_buffer_source(std::unique_ptr<datasource> const& src,
Expand All @@ -51,8 +65,8 @@ class compressed_host_buffer_source final : public datasource {
{
auto ch_buffer = host_span<uint8_t const>(reinterpret_cast<uint8_t const*>(_dbuf_ptr->data()),
_dbuf_ptr->size());
if (comptype == compression_type::GZIP || comptype == compression_type::ZIP ||
comptype == compression_type::SNAPPY) {
if (_comptype == compression_type::GZIP || _comptype == compression_type::ZIP ||
_comptype == compression_type::SNAPPY) {
_decompressed_ch_buffer_size = cudf::io::detail::get_uncompressed_size(_comptype, ch_buffer);
} else {
_decompressed_buffer = cudf::io::detail::decompress(_comptype, ch_buffer);
Expand Down Expand Up @@ -96,7 +110,22 @@ class compressed_host_buffer_source final : public datasource {
return std::make_unique<non_owning_buffer>(_decompressed_buffer.data() + offset, count);
}

[[nodiscard]] bool supports_device_read() const override { return false; }
std::future<size_t> device_read_async(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) override
{
auto& thread_pool = pools::tpool();
return thread_pool.submit_task([this, offset, size, dst, stream] {
auto hbuf = host_read(offset, size);
CUDF_CUDA_TRY(
cudaMemcpyAsync(dst, hbuf->data(), hbuf->size(), cudaMemcpyHostToDevice, stream.value()));
stream.synchronize();
return hbuf->size();
});
}

[[nodiscard]] bool supports_device_read() const override { return true; }

[[nodiscard]] size_t size() const override { return _decompressed_ch_buffer_size; }

Expand Down Expand Up @@ -431,6 +460,8 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
// line of file i+1 don't end up on the same JSON line, if file i does not already end with a line
// delimiter.
auto constexpr num_delimiter_chars = 1;
std::vector<std::future<size_t>> thread_tasks;
auto stream_pool = cudf::detail::fork_streams(stream, pools::tpool().get_thread_count());

auto delimiter_map = cudf::detail::make_empty_host_vector<std::size_t>(sources.size(), stream);
std::vector<std::size_t> prefsum_source_sizes(sources.size());
Expand All @@ -447,13 +478,17 @@ device_span<char> ingest_raw_input(device_span<char> buffer,

auto const total_bytes_to_read = std::min(range_size, prefsum_source_sizes.back() - range_offset);
range_offset -= start_source ? prefsum_source_sizes[start_source - 1] : 0;
for (std::size_t i = start_source; i < sources.size() && bytes_read < total_bytes_to_read; i++) {
for (std::size_t i = start_source, cur_stream = 0;
i < sources.size() && bytes_read < total_bytes_to_read;
i++) {
if (sources[i]->is_empty()) continue;
auto data_size = std::min(sources[i]->size() - range_offset, total_bytes_to_read - bytes_read);
auto destination = reinterpret_cast<uint8_t*>(buffer.data()) + bytes_read +
(num_delimiter_chars * delimiter_map.size());
if (sources[i]->is_device_read_preferred(data_size)) {
bytes_read += sources[i]->device_read(range_offset, data_size, destination, stream);
if (sources[i]->supports_device_read()) {
thread_tasks.emplace_back(sources[i]->device_read_async(
range_offset, data_size, destination, stream_pool[cur_stream++ % stream_pool.size()]));
bytes_read += data_size;
} else {
h_buffers.emplace_back(sources[i]->host_read(range_offset, data_size));
auto const& h_buffer = h_buffers.back();
Expand Down Expand Up @@ -481,6 +516,15 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
buffer.data());
}
stream.synchronize();

if (thread_tasks.size()) {
auto const bytes_read = std::accumulate(
thread_tasks.begin(), thread_tasks.end(), std::size_t{0}, [](std::size_t sum, auto& task) {
return sum + task.get();
});
CUDF_EXPECTS(bytes_read == total_bytes_to_read, "something's fishy");
}

return buffer.first(bytes_read + (delimiter_map.size() * num_delimiter_chars));
}

Expand All @@ -505,10 +549,17 @@ table_with_metadata read_json(host_span<std::unique_ptr<datasource>> sources,
return read_json_impl(sources, reader_opts, stream, mr);

std::vector<std::unique_ptr<datasource>> compressed_sources;
for (size_t i = 0; i < sources.size(); i++) {
compressed_sources.emplace_back(
std::make_unique<compressed_host_buffer_source>(sources[i], reader_opts.get_compression()));
std::vector<std::future<std::unique_ptr<compressed_host_buffer_source>>> thread_tasks;
auto& thread_pool = pools::tpool();
for (auto& src : sources) {
thread_tasks.emplace_back(thread_pool.submit_task([&reader_opts, &src] {
return std::make_unique<compressed_host_buffer_source>(src, reader_opts.get_compression());
}));
}
std::transform(thread_tasks.begin(),
thread_tasks.end(),
std::back_inserter(compressed_sources),
[](auto& task) { return task.get(); });
// in read_json_impl, we need the compressed source size to actually be the
// uncompressed source size for correct batching
return read_json_impl(compressed_sources, reader_opts, stream, mr);
Expand Down
48 changes: 16 additions & 32 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -97,38 +97,24 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num
_stream);
}

// Compute column string sizes (using page string offsets) for this subpass
// Compute column string sizes (using page string offsets) for this output table chunk
col_string_sizes = calculate_page_string_offsets();

// ensure cumulative column string sizes have been initialized
if (pass.cumulative_col_string_sizes.empty()) {
pass.cumulative_col_string_sizes.resize(_input_columns.size(), 0);
}

// Add to the cumulative column string sizes of this pass
std::transform(pass.cumulative_col_string_sizes.begin(),
pass.cumulative_col_string_sizes.end(),
col_string_sizes.begin(),
pass.cumulative_col_string_sizes.begin(),
std::plus<>{});

// Check for overflow in cumulative column string sizes of this pass so that the page string
// offsets of overflowing (large) string columns are treated as 64-bit.
auto const threshold = static_cast<size_t>(strings::detail::get_offset64_threshold());
auto const has_large_strings = std::any_of(pass.cumulative_col_string_sizes.cbegin(),
pass.cumulative_col_string_sizes.cend(),
auto const has_large_strings = std::any_of(col_string_sizes.cbegin(),
col_string_sizes.cend(),
[=](std::size_t sz) { return sz > threshold; });
if (has_large_strings and not strings::detail::is_large_strings_enabled()) {
CUDF_FAIL("String column exceeds the column size limit", std::overflow_error);
}

// Mark any chunks for which the cumulative column string size has exceeded the
// large strings threshold
if (has_large_strings) {
for (auto& chunk : pass.chunks) {
auto const idx = chunk.src_col_index;
if (pass.cumulative_col_string_sizes[idx] > threshold) { chunk.is_large_string_col = true; }
}
// Mark/unmark column-chunk descriptors depending on the string sizes of corresponding output
// column chunks and the large strings threshold.
for (auto& chunk : pass.chunks) {
auto const idx = chunk.src_col_index;
chunk.is_large_string_col = (col_string_sizes[idx] > threshold);
}
}

Expand Down Expand Up @@ -210,11 +196,9 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num
// only do string buffer for leaf
if (idx == max_depth - 1 and out_buf.string_size() == 0 and
col_string_sizes[pass.chunks[c].src_col_index] > 0) {
out_buf.create_string_data(
col_string_sizes[pass.chunks[c].src_col_index],
pass.cumulative_col_string_sizes[pass.chunks[c].src_col_index] >
static_cast<size_t>(strings::detail::get_offset64_threshold()),
_stream);
out_buf.create_string_data(col_string_sizes[pass.chunks[c].src_col_index],
pass.chunks[c].is_large_string_col,
_stream);
}
if (has_strings) { str_data[idx] = out_buf.string_data(); }
out_buf.user_data |=
Expand Down Expand Up @@ -416,11 +400,11 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num
final_offsets.emplace_back(offset);
out_buf.user_data |= PARQUET_COLUMN_BUFFER_FLAG_LIST_TERMINATED;
} else if (out_buf.type.id() == type_id::STRING) {
// need to cap off the string offsets column
auto const sz = static_cast<size_type>(col_string_sizes[idx]);
if (sz <= strings::detail::get_offset64_threshold()) {
// only if it is not a large strings column
if (col_string_sizes[idx] <=
static_cast<size_t>(strings::detail::get_offset64_threshold())) {
out_buffers.emplace_back(static_cast<size_type*>(out_buf.data()) + out_buf.size);
final_offsets.emplace_back(sz);
final_offsets.emplace_back(static_cast<size_type>(col_string_sizes[idx]));
}
}
}
Expand Down
5 changes: 1 addition & 4 deletions cpp/src/io/parquet/reader_impl_chunking.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -130,9 +130,6 @@ struct pass_intermediate_data {
rmm::device_buffer decomp_dict_data{0, cudf::get_default_stream()};
rmm::device_uvector<string_index_pair> str_dict_index{0, cudf::get_default_stream()};

// cumulative strings column sizes.
std::vector<size_t> cumulative_col_string_sizes{};

int level_type_size{0};

// skip_rows / num_rows for this pass.
Expand Down
4 changes: 2 additions & 2 deletions python/cudf/cudf/_lib/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# =============================================================================
# Copyright (c) 2022-2024, NVIDIA CORPORATION.
# Copyright (c) 2022-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
Expand All @@ -12,7 +12,7 @@
# the License.
# =============================================================================

set(cython_sources column.pyx scalar.pyx strings_udf.pyx types.pyx)
set(cython_sources column.pyx scalar.pyx strings_udf.pyx)
set(linked_libraries cudf::cudf)

rapids_cython_create_modules(
Expand Down
4 changes: 3 additions & 1 deletion python/cudf/cudf/_lib/column.pxd
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
# Copyright (c) 2020-2025, NVIDIA CORPORATION.

from typing import Literal

Expand All @@ -13,6 +13,8 @@ from pylibcudf.libcudf.column.column_view cimport (
from pylibcudf.libcudf.types cimport size_type
from rmm.librmm.device_buffer cimport device_buffer

cdef dtype_from_lists_column_view(column_view cv)
cdef dtype_from_column_view(column_view cv)

cdef class Column:
cdef public:
Expand Down
Loading

0 comments on commit 1e51ac4

Please sign in to comment.