Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compute and use the initial string offset when building nested large string cols with chunked parquet reader #17702

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
1ce570c
Compute and use str_offset for large nested string cols.
mhaseeb123 Jan 9, 2025
e73432d
Clean up, add docstrings
mhaseeb123 Jan 10, 2025
a146cd6
Fix copyright year
mhaseeb123 Jan 10, 2025
9f4ede3
Fix comment
mhaseeb123 Jan 10, 2025
edaff09
Revert comment
mhaseeb123 Jan 10, 2025
2a279de
Merge branch 'branch-25.02' into fix/str_offset-nested-large-str-cols
mhaseeb123 Jan 10, 2025
0d2317a
Minor optimization. Sync stream only for `str_offsets` vector
mhaseeb123 Jan 10, 2025
9622620
Remove leftover cout
mhaseeb123 Jan 14, 2025
38652c0
Remove code duplication with utility function
mhaseeb123 Jan 14, 2025
a57ccab
fix copyright header
mhaseeb123 Jan 14, 2025
3cffe1c
Remove explicit inline and simplify branch
mhaseeb123 Jan 14, 2025
29c1754
Refactor offset computing to avoid ambiguous use of util function.
mhaseeb123 Jan 14, 2025
28835c3
Change initial_offset type to int64 and subtract from last_elem
mhaseeb123 Jan 15, 2025
46ba4ab
Reuse code with a util function
mhaseeb123 Jan 15, 2025
ebea0cd
Merge branch 'branch-25.02' into fix/str_offset-nested-large-str-cols
mhaseeb123 Jan 15, 2025
73ced83
Minor optimization. Make const ptr to const page_state
mhaseeb123 Jan 15, 2025
b0acb4c
Merge branch 'branch-25.02' into fix/str_offset-nested-large-str-cols
mhaseeb123 Jan 15, 2025
52cd41a
Revert subtraction of initial offset
mhaseeb123 Jan 15, 2025
dde3285
Merge branch 'branch-25.02' into fix/str_offset-nested-large-str-cols
mhaseeb123 Jan 15, 2025
694ec53
Merge conflicts
mhaseeb123 Jan 15, 2025
5012bc7
Revert changes
mhaseeb123 Jan 15, 2025
094a02b
Merge conflicts with branch25.02
mhaseeb123 Jan 15, 2025
fbe52d6
Trivial refactoring. Separate offset functions for small and large st…
mhaseeb123 Jan 16, 2025
399d8b0
Revert `unsetenv` in merge_tests
mhaseeb123 Jan 16, 2025
b7564d8
Revert copyrights year
mhaseeb123 Jan 16, 2025
c9ed1fc
Fix copyrights year
mhaseeb123 Jan 16, 2025
b5ff95f
Merge branch 'branch-25.02' into fix/str_offset-nested-large-str-cols
mhaseeb123 Jan 17, 2025
16b9843
Address review comments
mhaseeb123 Jan 22, 2025
e838e72
Merge branch 'fix/str_offset-nested-large-str-cols' of https://github…
mhaseeb123 Jan 22, 2025
1a34807
Remove unnecessary stream sync for `h_initial_str_offsets`
mhaseeb123 Jan 22, 2025
7990f7c
Merge branch 'branch-25.02' into fix/str_offset-nested-large-str-cols
mhaseeb123 Jan 22, 2025
d8231ca
Address review comments
mhaseeb123 Jan 24, 2025
c668fc6
Merge branch 'branch-25.02' into fix/str_offset-nested-large-str-cols
mhaseeb123 Jan 24, 2025
218782d
Fill host vector with std instead of thrust
mhaseeb123 Jan 24, 2025
3cfbe24
Fix the sanity check for offsets
mhaseeb123 Jan 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions cpp/include/cudf/detail/sizes_to_offsets_iterator.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
* Copyright (c) 2020-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -255,12 +255,14 @@ static sizes_to_offsets_iterator<ScanIterator, LastType> make_sizes_to_offsets_i
* @param begin Input iterator for scan
* @param end End of the input iterator
* @param result Output iterator for scan result
* @param initial_offset Initial offset to add to scan
* @return The last element of the scan
*/
template <typename SizesIterator, typename OffsetsIterator>
auto sizes_to_offsets(SizesIterator begin,
SizesIterator end,
OffsetsIterator result,
uint64_t initial_offset,
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
rmm::cuda_stream_view stream)
{
using SizeType = typename thrust::iterator_traits<SizesIterator>::value_type;
Expand All @@ -273,7 +275,8 @@ auto sizes_to_offsets(SizesIterator begin,
make_sizes_to_offsets_iterator(result, result + std::distance(begin, end), last_element.data());
// This function uses the type of the initialization parameter as the accumulator type
// when computing the individual scan output elements.
thrust::exclusive_scan(rmm::exec_policy(stream), begin, end, output_itr, LastType{0});
thrust::exclusive_scan(
rmm::exec_policy(stream), begin, end, output_itr, static_cast<LastType>(initial_offset));
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
return last_element.value(stream);
}

Expand Down Expand Up @@ -319,7 +322,8 @@ std::pair<std::unique_ptr<column>, size_type> make_offsets_child_column(
});
auto input_itr = cudf::detail::make_counting_transform_iterator(0, map_fn);
// Use the sizes-to-offsets iterator to compute the total number of elements
auto const total_elements = sizes_to_offsets(input_itr, input_itr + count + 1, d_offsets, stream);
auto const total_elements =
sizes_to_offsets(input_itr, input_itr + count + 1, d_offsets, 0, stream);
CUDF_EXPECTS(
total_elements <= static_cast<decltype(total_elements)>(std::numeric_limits<size_type>::max()),
"Size of output exceeds the column size limit",
Expand Down
7 changes: 4 additions & 3 deletions cpp/include/cudf/strings/detail/strings_children.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -140,7 +140,7 @@ std::pair<std::unique_ptr<column>, int64_t> make_offsets_child_column(
auto input_itr = cudf::detail::make_counting_transform_iterator(0, map_fn);
// Use the sizes-to-offsets iterator to compute the total number of elements
auto const total_bytes =
cudf::detail::sizes_to_offsets(input_itr, input_itr + strings_count + 1, d_offsets, stream);
cudf::detail::sizes_to_offsets(input_itr, input_itr + strings_count + 1, d_offsets, 0, stream);

auto const threshold = cudf::strings::get_offset64_threshold();
CUDF_EXPECTS(cudf::strings::is_large_strings_enabled() || (total_bytes < threshold),
Expand All @@ -151,7 +151,8 @@ std::pair<std::unique_ptr<column>, int64_t> make_offsets_child_column(
offsets_column = make_numeric_column(
data_type{type_id::INT64}, strings_count + 1, mask_state::UNALLOCATED, stream, mr);
auto d_offsets64 = offsets_column->mutable_view().template data<int64_t>();
cudf::detail::sizes_to_offsets(input_itr, input_itr + strings_count + 1, d_offsets64, stream);
cudf::detail::sizes_to_offsets(
input_itr, input_itr + strings_count + 1, d_offsets64, 0, stream);
}

return std::pair(std::move(offsets_column), total_bytes);
Expand Down
76 changes: 53 additions & 23 deletions cpp/src/io/parquet/page_delta_decode.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -435,6 +435,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
size_t* initial_str_offsets,
kernel_error::pointer error_code)
{
using cudf::detail::warp_size;
Expand Down Expand Up @@ -581,15 +582,28 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)

// Now turn the array of lengths into offsets, but skip if this is a large string column. In the
// latter case, offsets will be computed during string column creation.
if (not s->col.is_large_string_col) {
int value_count = nesting_info_base[leaf_level_index].value_count;

// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if (!has_repetition) { value_count -= s->first_row; }

auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);
int value_count = nesting_info_base[leaf_level_index].value_count;

// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if (!has_repetition) { value_count -= s->first_row; }

// If we read > 0 values from this page.
if (value_count > 0) {
// Compute offsets if this is not a large strings col
if (not s->col.is_large_string_col) {
auto const offptr =
reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);
}
// Update the str_offset for offsets to be computed during output column construction
else {
if (!t) {
cuda::atomic_ref<size_t, cuda::std::thread_scope_device> ref{
initial_str_offsets[pages[page_idx].chunk_idx]};
ref.fetch_min(s->page.str_offset, cuda::std::memory_order_relaxed);
}
}
}

if (t == 0 and s->error != 0) { set_error(s->error, error_code); }
Expand All @@ -603,6 +617,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
size_t* initial_str_offsets,
kernel_error::pointer error_code)
{
using cudf::detail::warp_size;
Expand Down Expand Up @@ -743,15 +758,28 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)

// Now turn the array of lengths into offsets, but skip if this is a large string column. In the
// latter case, offsets will be computed during string column creation.
if (not s->col.is_large_string_col) {
int value_count = nesting_info_base[leaf_level_index].value_count;

// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if (!has_repetition) { value_count -= s->first_row; }

auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);
int value_count = nesting_info_base[leaf_level_index].value_count;

// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if (!has_repetition) { value_count -= s->first_row; }

// If we read > 0 values from this page.
if (value_count > 0) {
// Compute offsets if this is not a large strings col
if (not s->col.is_large_string_col) {
auto const offptr =
reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);
}
// Update the str_offset for offsets to be computed during output column construction
else {
if (!t) {
cuda::atomic_ref<size_t, cuda::std::thread_scope_device> ref{
initial_str_offsets[pages[page_idx].chunk_idx]};
ref.fetch_min(s->page.str_offset, cuda::std::memory_order_relaxed);
}
}
}

// finally, copy the string data into place
Expand Down Expand Up @@ -797,6 +825,7 @@ void DecodeDeltaByteArray(cudf::detail::hostdevice_span<PageInfo> pages,
size_t num_rows,
size_t min_row,
int level_type_size,
size_t* initial_str_offsets,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream)
{
Expand All @@ -807,10 +836,10 @@ void DecodeDeltaByteArray(cudf::detail::hostdevice_span<PageInfo> pages,

if (level_type_size == 1) {
gpuDecodeDeltaByteArray<uint8_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
pages.device_ptr(), chunks, min_row, num_rows, initial_str_offsets, error_code);
} else {
gpuDecodeDeltaByteArray<uint16_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
pages.device_ptr(), chunks, min_row, num_rows, initial_str_offsets, error_code);
}
}

Expand All @@ -822,6 +851,7 @@ void DecodeDeltaLengthByteArray(cudf::detail::hostdevice_span<PageInfo> pages,
size_t num_rows,
size_t min_row,
int level_type_size,
size_t* initial_str_offsets,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream)
{
Expand All @@ -832,10 +862,10 @@ void DecodeDeltaLengthByteArray(cudf::detail::hostdevice_span<PageInfo> pages,

if (level_type_size == 1) {
gpuDecodeDeltaLengthByteArray<uint8_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
pages.device_ptr(), chunks, min_row, num_rows, initial_str_offsets, error_code);
} else {
gpuDecodeDeltaLengthByteArray<uint16_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
pages.device_ptr(), chunks, min_row, num_rows, initial_str_offsets, error_code);
}
}

Expand Down
43 changes: 31 additions & 12 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
* Copyright (c) 2018-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@
#include <cudf/detail/utilities/stream_pool.hpp>
#include <cudf/strings/detail/gather.cuh>

#include <cuda/atomic>
#include <thrust/logical.h>
#include <thrust/transform_scan.h>

Expand Down Expand Up @@ -945,6 +946,8 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size) gpuComputePageStringSi
* @param chunks List of column chunks
* @param min_row Row index to start reading at
* @param num_rows Maximum number of rows to read
* @param initial_str_offsets A vector to store the initial str_offset for large nested
* string cols
* @tparam level_t Type used to store decoded repetition and definition levels
*/
template <typename level_t>
Expand All @@ -953,6 +956,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
size_t* initial_str_offsets,
kernel_error::pointer error_code)
{
using cudf::detail::warp_size;
Expand Down Expand Up @@ -1127,15 +1131,28 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)

// Now turn the array of lengths into offsets, but skip if this is a large string column. In the
// latter case, offsets will be computed during string column creation.
if (not s->col.is_large_string_col) {
int value_count = nesting_info_base[leaf_level_index].value_count;

// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if (!has_repetition) { value_count -= s->first_row; }

auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);
int value_count = nesting_info_base[leaf_level_index].value_count;

// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if (!has_repetition) { value_count -= s->first_row; }

// If we read > 0 values from this page.
if (value_count > 0) {
// Compute offsets if this is not a large strings col
if (not s->col.is_large_string_col) {
auto const offptr =
reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);
}
// Update the str_offset for offsets to be computed during output column construction
else {
if (!t) {
cuda::atomic_ref<size_t, cuda::std::thread_scope_device> ref{
initial_str_offsets[pages[page_idx].chunk_idx]};
ref.fetch_min(s->page.str_offset, cuda::std::memory_order_relaxed);
}
}
}

if (t == 0 and s->error != 0) { set_error(s->error, error_code); }
Expand Down Expand Up @@ -1253,20 +1270,22 @@ void __host__ DecodeStringPageData(cudf::detail::hostdevice_span<PageInfo> pages
size_t num_rows,
size_t min_row,
int level_type_size,
size_t* initial_str_offsets,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");

dim3 dim_block(decode_block_size, 1);
dim3 dim_grid(pages.size(), 1); // 1 threadblock per page
cudf::detail::device_scalar<size_t> str_offset{std::numeric_limits<size_t>::max(), stream};

if (level_type_size == 1) {
gpuDecodeStringPageData<uint8_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
pages.device_ptr(), chunks, min_row, num_rows, initial_str_offsets, error_code);
} else {
gpuDecodeStringPageData<uint16_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
pages.device_ptr(), chunks, min_row, num_rows, initial_str_offsets, error_code);
}
}

Expand Down
9 changes: 8 additions & 1 deletion cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
* Copyright (c) 2018-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -828,6 +828,8 @@ void WriteFinalOffsets(host_span<size_type const> offsets,
* @param[in] num_rows Total number of rows to read
* @param[in] min_row Minimum number of rows to read
* @param[in] level_type_size Size in bytes of the type for level decoding
* @param[out] initial_str_offsets A vector to store the initial str_offset for large nested
* string cols
* @param[out] error_code Error code for kernel failures
* @param[in] stream CUDA stream to use
*/
Expand All @@ -836,6 +838,7 @@ void DecodeStringPageData(cudf::detail::hostdevice_span<PageInfo> pages,
size_t num_rows,
size_t min_row,
int level_type_size,
size_t* initial_str_offsets,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream);

Expand Down Expand Up @@ -880,6 +883,7 @@ void DecodeDeltaByteArray(cudf::detail::hostdevice_span<PageInfo> pages,
size_t num_rows,
size_t min_row,
int level_type_size,
size_t* initial_str_offsets,
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
kernel_error::pointer error_code,
rmm::cuda_stream_view stream);

Expand All @@ -894,6 +898,8 @@ void DecodeDeltaByteArray(cudf::detail::hostdevice_span<PageInfo> pages,
* @param[in] num_rows Total number of rows to read
* @param[in] min_row Minimum number of rows to read
* @param[in] level_type_size Size in bytes of the type for level decoding
* @param[out] initial_str_offsets A vector to store the initial str_offset for large nested
* string cols
* @param[out] error_code Error code for kernel failures
* @param[in] stream CUDA stream to use
*/
Expand All @@ -902,6 +908,7 @@ void DecodeDeltaLengthByteArray(cudf::detail::hostdevice_span<PageInfo> pages,
size_t num_rows,
size_t min_row,
int level_type_size,
size_t* initial_str_offsets,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream);

Expand Down
Loading
Loading