Skip to content

Commit

Permalink
Merge branch 'branch-25.02' into decimal32-decimal64
Browse files Browse the repository at this point in the history
  • Loading branch information
davidwendt authored Jan 22, 2025
2 parents 570e416 + 2a21f6c commit a23fc13
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 29 deletions.
93 changes: 65 additions & 28 deletions cpp/src/io/parquet/bloom_filter_reader.cu
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_buffer.hpp>
#include <rmm/exec_policy.hpp>
#include <rmm/mr/device/aligned_resource_adaptor.hpp>

#include <cuco/bloom_filter_policies.cuh>
#include <cuco/bloom_filter_ref.cuh>
Expand Down Expand Up @@ -66,9 +67,12 @@ struct bloom_filter_caster {
ast::literal const* const literal,
rmm::cuda_stream_view stream) const
{
using key_type = T;
using policy_type = cuco::arrow_filter_policy<key_type, cudf::hashing::detail::XXHash_64>;
using word_type = typename policy_type::word_type;
using key_type = T;
using policy_type = cuco::arrow_filter_policy<key_type, cudf::hashing::detail::XXHash_64>;
using bloom_filter_type = cuco::
bloom_filter_ref<key_type, cuco::extent<std::size_t>, cuco::thread_scope_thread, policy_type>;
using filter_block_type = typename bloom_filter_type::filter_block_type;
using word_type = typename policy_type::word_type;

// Check if the literal has the same type as the predicate column
CUDF_EXPECTS(
Expand Down Expand Up @@ -104,16 +108,13 @@ struct bloom_filter_caster {
auto const num_filter_blocks = filter_size / bytes_per_block;

// Create a bloom filter view.
cuco::bloom_filter_ref<key_type,
cuco::extent<std::size_t>,
cuco::thread_scope_thread,
policy_type>
filter{reinterpret_cast<word_type*>(filter_span[filter_idx].data()),
num_filter_blocks,
{}, // Thread scope as the same literal is being searched across different bitsets
// per thread
{}}; // Arrow policy with cudf::hashing::detail::XXHash_64 seeded with 0 for Arrow
// compatibility
bloom_filter_type filter{
reinterpret_cast<filter_block_type*>(filter_span[filter_idx].data()),
num_filter_blocks,
{}, // Thread scope as the same literal is being searched across different bitsets per
// thread
{}}; // Arrow policy with cudf::hashing::detail::XXHash_64 seeded with 0 for Arrow
// compatibility

// If int96_timestamp type, convert literal to string_view and query bloom
// filter
Expand Down Expand Up @@ -381,15 +382,27 @@ class bloom_filter_expression_converter : public equality_literals_collector {
* @param bloom_filter_sizes Bloom filter sizes for all chunks
* @param chunk_source_map Association between each column chunk and its source
* @param stream CUDA stream used for device memory operations and kernel launches
* @param aligned_mr Aligned device memory resource to allocate bloom filter buffers
*/
void read_bloom_filter_data(host_span<std::unique_ptr<datasource> const> sources,
size_t num_chunks,
cudf::host_span<rmm::device_buffer> bloom_filter_data,
cudf::host_span<std::optional<int64_t>> bloom_filter_offsets,
cudf::host_span<std::optional<int32_t>> bloom_filter_sizes,
std::vector<size_type> const& chunk_source_map,
rmm::cuda_stream_view stream)
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref aligned_mr)
{
// Using `cuco::arrow_filter_policy` with a temporary `cuda::std::byte` key type to extract bloom
// filter properties
using policy_type = cuco::arrow_filter_policy<cuda::std::byte, cudf::hashing::detail::XXHash_64>;
auto constexpr filter_block_alignment =
alignof(cuco::bloom_filter_ref<cuda::std::byte,
cuco::extent<std::size_t>,
cuco::thread_scope_thread,
policy_type>::filter_block_type);
auto constexpr words_per_block = policy_type::words_per_block;

// Read tasks for bloom filter data
std::vector<std::future<size_t>> read_tasks;

Expand Down Expand Up @@ -422,12 +435,6 @@ void read_bloom_filter_data(host_span<std::unique_ptr<datasource> const> sources
CompactProtocolReader cp{buffer->data(), buffer->size()};
cp.read(&header);

// Get the hardcoded words_per_block value from `cuco::arrow_filter_policy` using a temporary
// `std::byte` key type.
auto constexpr words_per_block =
cuco::arrow_filter_policy<cuda::std::byte,
cudf::hashing::detail::XXHash_64>::words_per_block;

// Check if the bloom filter header is valid.
auto const is_header_valid =
(header.num_bytes % words_per_block) == 0 and
Expand All @@ -448,15 +455,25 @@ void read_bloom_filter_data(host_span<std::unique_ptr<datasource> const> sources

// Check if we already read in the filter bitset in the initial read.
if (initial_read_size >= bloom_filter_header_size + bitset_size) {
bloom_filter_data[chunk] =
rmm::device_buffer{buffer->data() + bloom_filter_header_size, bitset_size, stream};
bloom_filter_data[chunk] = rmm::device_buffer{
buffer->data() + bloom_filter_header_size, bitset_size, stream, aligned_mr};
// The allocated bloom filter buffer must be aligned
CUDF_EXPECTS(reinterpret_cast<std::uintptr_t>(bloom_filter_data[chunk].data()) %
filter_block_alignment ==
0,
"Encountered misaligned bloom filter block");
}
// Read the bitset from datasource.
else {
auto const bitset_offset = bloom_filter_offset + bloom_filter_header_size;
// Directly read to device if preferred
if (source->is_device_read_preferred(bitset_size)) {
bloom_filter_data[chunk] = rmm::device_buffer{bitset_size, stream};
bloom_filter_data[chunk] = rmm::device_buffer{bitset_size, stream, aligned_mr};
// The allocated bloom filter buffer must be aligned
CUDF_EXPECTS(reinterpret_cast<std::uintptr_t>(bloom_filter_data[chunk].data()) %
filter_block_alignment ==
0,
"Encountered misaligned bloom filter block");
auto future_read_size =
source->device_read_async(bitset_offset,
bitset_size,
Expand All @@ -465,8 +482,14 @@ void read_bloom_filter_data(host_span<std::unique_ptr<datasource> const> sources

read_tasks.emplace_back(std::move(future_read_size));
} else {
buffer = source->host_read(bitset_offset, bitset_size);
bloom_filter_data[chunk] = rmm::device_buffer{buffer->data(), buffer->size(), stream};
buffer = source->host_read(bitset_offset, bitset_size);
bloom_filter_data[chunk] =
rmm::device_buffer{buffer->data(), buffer->size(), stream, aligned_mr};
// The allocated bloom filter buffer must be aligned
CUDF_EXPECTS(reinterpret_cast<std::uintptr_t>(bloom_filter_data[chunk].data()) %
filter_block_alignment ==
0,
"Encountered misaligned bloom filter block");
}
}
});
Expand All @@ -484,7 +507,8 @@ std::vector<rmm::device_buffer> aggregate_reader_metadata::read_bloom_filters(
host_span<std::vector<size_type> const> row_group_indices,
host_span<int const> column_schemas,
size_type total_row_groups,
rmm::cuda_stream_view stream) const
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref aligned_mr) const
{
// Descriptors for all the chunks that make up the selected columns
auto const num_input_columns = column_schemas.size();
Expand Down Expand Up @@ -543,7 +567,8 @@ std::vector<rmm::device_buffer> aggregate_reader_metadata::read_bloom_filters(
bloom_filter_offsets,
bloom_filter_sizes,
chunk_source_map,
stream);
stream,
aligned_mr);

// Return bloom filter data
return bloom_filter_data;
Expand Down Expand Up @@ -612,10 +637,22 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::ap
// Return early if no column with equality predicate(s)
if (equality_col_schemas.empty()) { return std::nullopt; }

// Required alignment:
// https://github.com/NVIDIA/cuCollections/blob/deab5799f3e4226cb8a49acf2199c03b14941ee4/include/cuco/detail/bloom_filter/bloom_filter_impl.cuh#L55-L67
using policy_type = cuco::arrow_filter_policy<cuda::std::byte, cudf::hashing::detail::XXHash_64>;
auto constexpr alignment = alignof(cuco::bloom_filter_ref<cuda::std::byte,
cuco::extent<std::size_t>,
cuco::thread_scope_thread,
policy_type>::filter_block_type);

// Aligned resource adaptor to allocate bloom filter buffers with
auto aligned_mr =
rmm::mr::aligned_resource_adaptor(cudf::get_current_device_resource(), alignment);

// Read a vector of bloom filter bitset device buffers for all columns with equality
// predicate(s) across all row groups
auto bloom_filter_data = read_bloom_filters(
sources, input_row_group_indices, equality_col_schemas, total_row_groups, stream);
sources, input_row_group_indices, equality_col_schemas, total_row_groups, stream, aligned_mr);

// No bloom filter buffers, return the original row group indices
if (bloom_filter_data.empty()) { return std::nullopt; }
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/io/parquet/reader_impl_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ class aggregate_reader_metadata {
* @param[out] bloom_filter_data List of bloom filter data device buffers
* @param column_schemas Schema indices of columns whose bloom filters will be read
* @param stream CUDA stream used for device memory operations and kernel launches
* @param aligned_mr Aligned device memory resource to allocate bloom filter buffers
*
* @return A flattened list of bloom filter bitset device buffers for each predicate column across
* row group
Expand All @@ -213,7 +214,8 @@ class aggregate_reader_metadata {
host_span<std::vector<size_type> const> row_group_indices,
host_span<int const> column_schemas,
size_type num_row_groups,
rmm::cuda_stream_view stream) const;
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref aligned_mr) const;

/**
* @brief Collects Parquet types for the columns with the specified schema indices
Expand Down

0 comments on commit a23fc13

Please sign in to comment.