Skip to content

Commit

Permalink
Merge branch 'branch-25.02' into streams-copying
Browse files Browse the repository at this point in the history
  • Loading branch information
shrshi authored Jan 14, 2025
2 parents 3a91b73 + 41215e2 commit 5e8b17c
Show file tree
Hide file tree
Showing 17 changed files with 1,100 additions and 70 deletions.
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ add_library(
src/datetime/timezone.cpp
src/io/orc/writer_impl.cu
src/io/parquet/arrow_schema_writer.cpp
src/io/parquet/bloom_filter_reader.cu
src/io/parquet/compact_protocol_reader.cpp
src/io/parquet/compact_protocol_writer.cpp
src/io/parquet/decode_preprocess.cu
Expand Down
683 changes: 683 additions & 0 deletions cpp/src/io/parquet/bloom_filter_reader.cu

Large diffs are not rendered by default.

35 changes: 33 additions & 2 deletions cpp/src/io/parquet/compact_protocol_reader.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
* Copyright (c) 2018-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -658,14 +658,43 @@ void CompactProtocolReader::read(ColumnChunk* c)
function_builder(this, op);
}

void CompactProtocolReader::read(BloomFilterAlgorithm* alg)
{
auto op = std::make_tuple(parquet_field_union_enumerator(1, alg->algorithm));
function_builder(this, op);
}

void CompactProtocolReader::read(BloomFilterHash* hash)
{
auto op = std::make_tuple(parquet_field_union_enumerator(1, hash->hash));
function_builder(this, op);
}

void CompactProtocolReader::read(BloomFilterCompression* comp)
{
auto op = std::make_tuple(parquet_field_union_enumerator(1, comp->compression));
function_builder(this, op);
}

void CompactProtocolReader::read(BloomFilterHeader* bf)
{
auto op = std::make_tuple(parquet_field_int32(1, bf->num_bytes),
parquet_field_struct(2, bf->algorithm),
parquet_field_struct(3, bf->hash),
parquet_field_struct(4, bf->compression));
function_builder(this, op);
}

void CompactProtocolReader::read(ColumnChunkMetaData* c)
{
using optional_size_statistics =
parquet_field_optional<SizeStatistics, parquet_field_struct<SizeStatistics>>;
using optional_list_enc_stats =
parquet_field_optional<std::vector<PageEncodingStats>,
parquet_field_struct_list<PageEncodingStats>>;
auto op = std::make_tuple(parquet_field_enum<Type>(1, c->type),
using optional_i64 = parquet_field_optional<int64_t, parquet_field_int64>;
using optional_i32 = parquet_field_optional<int32_t, parquet_field_int32>;
auto op = std::make_tuple(parquet_field_enum<Type>(1, c->type),
parquet_field_enum_list(2, c->encodings),
parquet_field_string_list(3, c->path_in_schema),
parquet_field_enum<Compression>(4, c->codec),
Expand All @@ -677,6 +706,8 @@ void CompactProtocolReader::read(ColumnChunkMetaData* c)
parquet_field_int64(11, c->dictionary_page_offset),
parquet_field_struct(12, c->statistics),
optional_list_enc_stats(13, c->encoding_stats),
optional_i64(14, c->bloom_filter_offset),
optional_i32(15, c->bloom_filter_length),
optional_size_statistics(16, c->size_statistics));
function_builder(this, op);
}
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/io/parquet/compact_protocol_reader.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
* Copyright (c) 2018-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -108,6 +108,10 @@ class CompactProtocolReader {
void read(IntType* t);
void read(RowGroup* r);
void read(ColumnChunk* c);
void read(BloomFilterAlgorithm* bf);
void read(BloomFilterHash* bf);
void read(BloomFilterCompression* bf);
void read(BloomFilterHeader* bf);
void read(ColumnChunkMetaData* c);
void read(PageHeader* p);
void read(DataPageHeader* d);
Expand Down
52 changes: 51 additions & 1 deletion cpp/src/io/parquet/parquet.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
* Copyright (c) 2018-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -382,12 +382,62 @@ struct ColumnChunkMetaData {
// Set of all encodings used for pages in this column chunk. This information can be used to
// determine if all data pages are dictionary encoded for example.
std::optional<std::vector<PageEncodingStats>> encoding_stats;
// Byte offset from beginning of file to Bloom filter data.
std::optional<int64_t> bloom_filter_offset;
// Size of Bloom filter data including the serialized header, in bytes. Added in 2.10 so readers
// may not read this field from old files and it can be obtained after the BloomFilterHeader has
// been deserialized. Writers should write this field so readers can read the bloom filter in a
// single I/O.
std::optional<int32_t> bloom_filter_length;
// Optional statistics to help estimate total memory when converted to in-memory representations.
// The histograms contained in these statistics can also be useful in some cases for more
// fine-grained nullability/list length filter pushdown.
std::optional<SizeStatistics> size_statistics;
};

/**
* @brief The algorithm used in bloom filter
*/
struct BloomFilterAlgorithm {
// Block-based Bloom filter.
enum class Algorithm { UNDEFINED, SPLIT_BLOCK };
Algorithm algorithm{Algorithm::SPLIT_BLOCK};
};

/**
* @brief The hash function used in Bloom filter
*/
struct BloomFilterHash {
// xxHash_64
enum class Hash { UNDEFINED, XXHASH };
Hash hash{Hash::XXHASH};
};

/**
* @brief The compression used in the bloom filter
*/
struct BloomFilterCompression {
enum class Compression { UNDEFINED, UNCOMPRESSED };
Compression compression{Compression::UNCOMPRESSED};
};

/**
* @brief Bloom filter header struct
*
* The bloom filter data of a column chunk stores this header at the beginning
* following by the filter bitset.
*/
struct BloomFilterHeader {
// The size of bitset in bytes
int32_t num_bytes;
// The algorithm for setting bits
BloomFilterAlgorithm algorithm;
// The hash function used for bloom filter
BloomFilterHash hash;
// The compression used in the bloom filter
BloomFilterCompression compression;
};

/**
* @brief Thrift-derived struct describing a chunk of data for a particular
* column
Expand Down
136 changes: 88 additions & 48 deletions cpp/src/io/parquet/predicate_pushdown.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,6 +32,7 @@
#include <thrust/iterator/counting_iterator.h>

#include <algorithm>
#include <limits>
#include <numeric>
#include <optional>
#include <unordered_set>
Expand Down Expand Up @@ -388,6 +389,7 @@ class stats_expression_converter : public ast::detail::expression_transformer {
} // namespace

std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::filter_row_groups(
host_span<std::unique_ptr<datasource> const> sources,
host_span<std::vector<size_type> const> row_group_indices,
host_span<data_type const> output_dtypes,
host_span<int const> output_column_schemas,
Expand All @@ -396,7 +398,6 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::fi
{
auto mr = cudf::get_current_device_resource_ref();
// Create row group indices.
std::vector<std::vector<size_type>> filtered_row_group_indices;
std::vector<std::vector<size_type>> all_row_group_indices;
host_span<std::vector<size_type> const> input_row_group_indices;
if (row_group_indices.empty()) {
Expand All @@ -412,18 +413,22 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::fi
} else {
input_row_group_indices = row_group_indices;
}
auto const total_row_groups = std::accumulate(input_row_group_indices.begin(),
input_row_group_indices.end(),
0,
[](size_type sum, auto const& per_file_row_groups) {
return sum + per_file_row_groups.size();
});
auto const total_row_groups = std::accumulate(
input_row_group_indices.begin(),
input_row_group_indices.end(),
size_t{0},
[](size_t sum, auto const& per_file_row_groups) { return sum + per_file_row_groups.size(); });

// Check if we have less than 2B total row groups.
CUDF_EXPECTS(total_row_groups <= std::numeric_limits<cudf::size_type>::max(),
"Total number of row groups exceed the size_type's limit");

// Converts Column chunk statistics to a table
// where min(col[i]) = columns[i*2], max(col[i])=columns[i*2+1]
// For each column, it contains #sources * #column_chunks_per_src rows.
std::vector<std::unique_ptr<column>> columns;
stats_caster const stats_col{total_row_groups, per_file_metadata, input_row_group_indices};
stats_caster const stats_col{
static_cast<size_type>(total_row_groups), per_file_metadata, input_row_group_indices};
for (size_t col_idx = 0; col_idx < output_dtypes.size(); col_idx++) {
auto const schema_idx = output_column_schemas[col_idx];
auto const& dtype = output_dtypes[col_idx];
Expand Down Expand Up @@ -452,44 +457,23 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::fi
CUDF_EXPECTS(predicate.type().id() == cudf::type_id::BOOL8,
"Filter expression must return a boolean column");

auto const host_bitmask = [&] {
auto const num_bitmasks = num_bitmask_words(predicate.size());
if (predicate.nullable()) {
return cudf::detail::make_host_vector_sync(
device_span<bitmask_type const>(predicate.null_mask(), num_bitmasks), stream);
} else {
auto bitmask = cudf::detail::make_host_vector<bitmask_type>(num_bitmasks, stream);
std::fill(bitmask.begin(), bitmask.end(), ~bitmask_type{0});
return bitmask;
}
}();
// Filter stats table with StatsAST expression and collect filtered row group indices
auto const filtered_row_group_indices = collect_filtered_row_group_indices(
stats_table, stats_expr.get_stats_expr(), input_row_group_indices, stream);

auto validity_it = cudf::detail::make_counting_transform_iterator(
0, [bitmask = host_bitmask.data()](auto bit_index) { return bit_is_set(bitmask, bit_index); });
// Span of row groups to apply bloom filtering on.
auto const bloom_filter_input_row_groups =
filtered_row_group_indices.has_value()
? host_span<std::vector<size_type> const>(filtered_row_group_indices.value())
: input_row_group_indices;

auto const is_row_group_required = cudf::detail::make_host_vector_sync(
device_span<uint8_t const>(predicate.data<uint8_t>(), predicate.size()), stream);
// Apply bloom filtering on the bloom filter input row groups
auto const bloom_filtered_row_groups = apply_bloom_filters(
sources, bloom_filter_input_row_groups, output_dtypes, output_column_schemas, filter, stream);

// Return only filtered row groups based on predicate
// if all are required or all are nulls, return.
if (std::all_of(is_row_group_required.cbegin(),
is_row_group_required.cend(),
[](auto i) { return bool(i); }) or
predicate.null_count() == predicate.size()) {
return std::nullopt;
}
size_type is_required_idx = 0;
for (auto const& input_row_group_index : input_row_group_indices) {
std::vector<size_type> filtered_row_groups;
for (auto const rg_idx : input_row_group_index) {
if ((!validity_it[is_required_idx]) || is_row_group_required[is_required_idx]) {
filtered_row_groups.push_back(rg_idx);
}
++is_required_idx;
}
filtered_row_group_indices.push_back(std::move(filtered_row_groups));
}
return {std::move(filtered_row_group_indices)};
// Return bloom filtered row group indices iff collected
return bloom_filtered_row_groups.has_value() ? bloom_filtered_row_groups
: filtered_row_group_indices;
}

// convert column named expression to column index reference expression
Expand All @@ -510,14 +494,14 @@ named_to_reference_converter::named_to_reference_converter(
std::reference_wrapper<ast::expression const> named_to_reference_converter::visit(
ast::literal const& expr)
{
_stats_expr = std::reference_wrapper<ast::expression const>(expr);
_converted_expr = std::reference_wrapper<ast::expression const>(expr);
return expr;
}

std::reference_wrapper<ast::expression const> named_to_reference_converter::visit(
ast::column_reference const& expr)
{
_stats_expr = std::reference_wrapper<ast::expression const>(expr);
_converted_expr = std::reference_wrapper<ast::expression const>(expr);
return expr;
}

Expand All @@ -531,7 +515,7 @@ std::reference_wrapper<ast::expression const> named_to_reference_converter::visi
}
auto col_index = col_index_it->second;
_col_ref.emplace_back(col_index);
_stats_expr = std::reference_wrapper<ast::expression const>(_col_ref.back());
_converted_expr = std::reference_wrapper<ast::expression const>(_col_ref.back());
return std::reference_wrapper<ast::expression const>(_col_ref.back());
}

Expand All @@ -546,7 +530,7 @@ std::reference_wrapper<ast::expression const> named_to_reference_converter::visi
} else if (cudf::ast::detail::ast_operator_arity(op) == 1) {
_operators.emplace_back(op, new_operands.front());
}
_stats_expr = std::reference_wrapper<ast::expression const>(_operators.back());
_converted_expr = std::reference_wrapper<ast::expression const>(_operators.back());
return std::reference_wrapper<ast::expression const>(_operators.back());
}

Expand Down Expand Up @@ -640,4 +624,60 @@ class names_from_expression : public ast::detail::expression_transformer {
return names_from_expression(expr, skip_names).to_vector();
}

std::optional<std::vector<std::vector<size_type>>> collect_filtered_row_group_indices(
cudf::table_view table,
std::reference_wrapper<ast::expression const> ast_expr,
host_span<std::vector<size_type> const> input_row_group_indices,
rmm::cuda_stream_view stream)
{
// Filter the input table using AST expression
auto predicate_col = cudf::detail::compute_column(
table, ast_expr.get(), stream, cudf::get_current_device_resource_ref());
auto predicate = predicate_col->view();
CUDF_EXPECTS(predicate.type().id() == cudf::type_id::BOOL8,
"Filter expression must return a boolean column");

auto const host_bitmask = [&] {
auto const num_bitmasks = num_bitmask_words(predicate.size());
if (predicate.nullable()) {
return cudf::detail::make_host_vector_sync(
device_span<bitmask_type const>(predicate.null_mask(), num_bitmasks), stream);
} else {
auto bitmask = cudf::detail::make_host_vector<bitmask_type>(num_bitmasks, stream);
std::fill(bitmask.begin(), bitmask.end(), ~bitmask_type{0});
return bitmask;
}
}();

auto validity_it = cudf::detail::make_counting_transform_iterator(
0, [bitmask = host_bitmask.data()](auto bit_index) { return bit_is_set(bitmask, bit_index); });

// Return only filtered row groups based on predicate
auto const is_row_group_required = cudf::detail::make_host_vector_sync(
device_span<uint8_t const>(predicate.data<uint8_t>(), predicate.size()), stream);

// Return if all are required, or all are nulls.
if (predicate.null_count() == predicate.size() or std::all_of(is_row_group_required.cbegin(),
is_row_group_required.cend(),
[](auto i) { return bool(i); })) {
return std::nullopt;
}

// Collect indices of the filtered row groups
size_type is_required_idx = 0;
std::vector<std::vector<size_type>> filtered_row_group_indices;
for (auto const& input_row_group_index : input_row_group_indices) {
std::vector<size_type> filtered_row_groups;
for (auto const rg_idx : input_row_group_index) {
if ((!validity_it[is_required_idx]) || is_row_group_required[is_required_idx]) {
filtered_row_groups.push_back(rg_idx);
}
++is_required_idx;
}
filtered_row_group_indices.push_back(std::move(filtered_row_groups));
}

return {filtered_row_group_indices};
}

} // namespace cudf::io::parquet::detail
5 changes: 3 additions & 2 deletions cpp/src/io/parquet/reader_impl_helpers.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
* Copyright (c) 2022-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1030,6 +1030,7 @@ std::vector<std::string> aggregate_reader_metadata::get_pandas_index_names() con

std::tuple<int64_t, size_type, std::vector<row_group_info>, std::vector<size_t>>
aggregate_reader_metadata::select_row_groups(
host_span<std::unique_ptr<datasource> const> sources,
host_span<std::vector<size_type> const> row_group_indices,
int64_t skip_rows_opt,
std::optional<size_type> const& num_rows_opt,
Expand All @@ -1042,7 +1043,7 @@ aggregate_reader_metadata::select_row_groups(
// if filter is not empty, then gather row groups to read after predicate pushdown
if (filter.has_value()) {
filtered_row_group_indices = filter_row_groups(
row_group_indices, output_dtypes, output_column_schemas, filter.value(), stream);
sources, row_group_indices, output_dtypes, output_column_schemas, filter.value(), stream);
if (filtered_row_group_indices.has_value()) {
row_group_indices =
host_span<std::vector<size_type> const>(filtered_row_group_indices.value());
Expand Down
Loading

0 comments on commit 5e8b17c

Please sign in to comment.