diff --git a/cpp/include/cudf/aggregation.hpp b/cpp/include/cudf/aggregation.hpp
index a1b7db5e08a..2b2a660bed7 100644
--- a/cpp/include/cudf/aggregation.hpp
+++ b/cpp/include/cudf/aggregation.hpp
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2019-2024, NVIDIA CORPORATION.
+ * Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -601,7 +601,7 @@ std::unique_ptr make_udf_aggregation(udf_type type,
data_type output_type);
// Forward declaration of `host_udf_base` for the factory function of `HOST_UDF` aggregation.
-struct host_udf_base;
+class host_udf_base;
/**
* @brief Factory to create a HOST_UDF aggregation.
diff --git a/cpp/include/cudf/aggregation/host_udf.hpp b/cpp/include/cudf/aggregation/host_udf.hpp
index bbce76dc5f3..451d75137e4 100644
--- a/cpp/include/cudf/aggregation/host_udf.hpp
+++ b/cpp/include/cudf/aggregation/host_udf.hpp
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2024, NVIDIA CORPORATION.
+ * Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,18 +17,16 @@
#pragma once
#include
+#include
#include
#include
#include
-#include
#include
#include
+#include
#include
-#include
-#include
-#include
/**
* @file host_udf.hpp
@@ -43,49 +41,141 @@ namespace CUDF_EXPORT cudf {
*/
/**
- * @brief The interface for host-based UDF implementation.
+ * @brief The fundamental interface for host-based UDF implementation.
*
- * An implementation of host-based UDF needs to be derived from this base class, defining
- * its own version of the required functions. In particular:
- * - The derived class is required to implement `get_empty_output`, `operator()`, `is_equal`,
- * and `clone` functions.
- * - If necessary, the derived class can also override `do_hash` to compute hashing for its
- * instance, and `get_required_data` to selectively access to the input data as well as
- * intermediate data provided by libcudf.
+ * This class declares the functions `do_hash`, `is_equal`, and `clone` that must be defined in
+ * the users' UDF implementation. These functions are required for libcudf aggregation framework
+ * to perform its operations.
+ */
+class host_udf_base {
+ // Declare constructor private to prevent the users from deriving from this class.
+ private:
+ host_udf_base() = default; ///< Default constructor
+
+ // Only allow deriving from the structs below.
+ friend struct reduce_host_udf;
+ friend struct segmented_reduce_host_udf;
+ friend struct groupby_host_udf;
+
+ public:
+ virtual ~host_udf_base() = default; ///< Default destructor
+
+ /**
+ * @brief Computes hash value of the instance.
+ *
+ * Overriding this function is optional when the derived class has data members such that
+ * each instance needs to be differentiated from each other.
+ *
+ * @return The hash value of the instance
+ */
+ [[nodiscard]] virtual std::size_t do_hash() const
+ {
+ return std::hash{}(static_cast(aggregation::Kind::HOST_UDF));
+ }
+
+ /**
+ * @brief Compares two instances of the derived class for equality.
+ * @param other The other instance to compare with
+ * @return True if the two instances are equal
+ */
+ [[nodiscard]] virtual bool is_equal(host_udf_base const& other) const = 0;
+
+ /**
+ * @brief Clones the instance.
+ *
+ * The instances of the derived class should be lightweight for efficient cloning.
+ *
+ * @return A new instance cloned from this one
+ */
+ [[nodiscard]] virtual std::unique_ptr clone() const = 0;
+};
+
+/**
+ * @brief The interface for host-based UDF implementation for reduction contexts.
+ *
+ * An implementation of host-based UDF for reduction needs to be derived from this class.
+ * In addition to implementing the virtual functions declared in the base class `host_udf_base`,
+ * such derived classes must also define the `operator()` function to perform reduction
+ * operations.
*
- * Example of such implementation:
+ * Example:
* @code{.cpp}
- * struct my_udf_aggregation : cudf::host_udf_base {
+ * struct my_udf_aggregation : cudf::reduce_host_udf {
* my_udf_aggregation() = default;
*
- * // This UDF aggregation needs `GROUPED_VALUES` and `GROUP_OFFSETS`,
- * // and the result from groupby `MAX` aggregation.
- * [[nodiscard]] data_attribute_set_t get_required_data() const override
+ * [[nodiscard]] std::unique_ptr operator()(
+ * column_view const& input,
+ * data_type output_dtype,
+ * std::optional> init,
+ * rmm::cuda_stream_view stream,
+ * rmm::device_async_resource_ref mr) const override
* {
- * return {groupby_data_attribute::GROUPED_VALUES,
- * groupby_data_attribute::GROUP_OFFSETS,
- * cudf::make_max_aggregation()};
+ * // Perform reduction computation using the input data and return the reduction result.
+ * // This is where the actual reduction logic is implemented.
* }
*
- * [[nodiscard]] output_t get_empty_output(
- * [[maybe_unused]] std::optional output_dtype,
- * [[maybe_unused]] rmm::cuda_stream_view stream,
- * [[maybe_unused]] rmm::device_async_resource_ref mr) const override
+ * [[nodiscard]] bool is_equal(host_udf_base const& other) const override
* {
- * // This UDF aggregation always returns a column of type INT32.
- * return cudf::make_empty_column(cudf::data_type{cudf::type_id::INT32});
+ * // Check if the other object is also instance of this class.
+ * // If there are internal state variables, they may need to be checked for equality as well.
+ * return dynamic_cast(&other) != nullptr;
* }
*
- * [[nodiscard]] output_t operator()(input_map_t const& input,
- * rmm::cuda_stream_view stream,
- * rmm::device_async_resource_ref mr) const override
+ * [[nodiscard]] std::unique_ptr clone() const override
* {
- * // Perform UDF computation using the input data and return the result.
+ * return std::make_unique();
+ * }
+ * };
+ * @endcode
+ */
+struct reduce_host_udf : host_udf_base {
+ /**
+ * @brief Perform reduction operations.
+ *
+ * @param input The input column for reduction
+ * @param output_dtype The data type for the final output scalar
+ * @param init The initial value of the reduction
+ * @param stream The CUDA stream to use for any kernel launches
+ * @param mr Device memory resource to use for any allocations
+ * @return The output result of the aggregation
+ */
+ [[nodiscard]] virtual std::unique_ptr operator()(
+ column_view const& input,
+ data_type output_dtype,
+ std::optional> init,
+ rmm::cuda_stream_view stream,
+ rmm::device_async_resource_ref mr) const = 0;
+};
+
+/**
+ * @brief The interface for host-based UDF implementation for segmented reduction context.
+ *
+ * An implementation of host-based UDF for segmented reduction needs to be derived from this class.
+ * In addition to implementing the virtual functions declared in the base class `host_udf_base`,
+ * such derived class must also define the `operator()` function to perform segmented reduction.
+ *
+ * Example:
+ * @code{.cpp}
+ * struct my_udf_aggregation : cudf::segmented_reduce_host_udf {
+ * my_udf_aggregation() = default;
+ *
+ * [[nodiscard]] std::unique_ptr operator()(
+ * column_view const& input,
+ * device_span offsets,
+ * data_type output_dtype,
+ * null_policy null_handling,
+ * std::optional> init,
+ * rmm::cuda_stream_view stream,
+ * rmm::device_async_resource_ref mr) const override
+ * {
+ * // Perform computation using the input data and return the result.
+ * // This is where the actual segmented reduction logic is implemented.
* }
*
* [[nodiscard]] bool is_equal(host_udf_base const& other) const override
* {
* // Check if the other object is also instance of this class.
+ * // If there are internal state variables, they may need to be checked for equality as well.
* return dynamic_cast(&other) != nullptr;
* }
*
@@ -96,198 +186,232 @@ namespace CUDF_EXPORT cudf {
* };
* @endcode
*/
-struct host_udf_base {
- host_udf_base() = default;
- virtual ~host_udf_base() = default;
-
+struct segmented_reduce_host_udf : host_udf_base {
/**
- * @brief Define the possible data needed for groupby aggregations.
+ * @brief Perform segmented reduction operations.
*
- * Note that only sort-based groupby aggregations are supported.
+ * @param input The input column for reduction
+ * @param offsets A list of offsets defining the segments for reduction
+ * @param output_dtype The data type for the final output column
+ * @param null_handling If `INCLUDE` then the reduction result is valid only if all elements in
+ * the segment are valid, and if `EXCLUDE` then the reduction result is valid if any
+ * element in the segment is valid
+ * @param init The initial value of the reduction
+ * @param stream The CUDA stream to use for any kernel launches
+ * @param mr Device memory resource to use for any allocations
+ * @return The output result of the aggregation
*/
- enum class groupby_data_attribute : int32_t {
- INPUT_VALUES, ///< The input values column.
- GROUPED_VALUES, ///< The input values grouped according to the input `keys` for which the
- ///< values within each group maintain their original order.
- SORTED_GROUPED_VALUES, ///< The input values grouped according to the input `keys` and
- ///< sorted within each group.
- NUM_GROUPS, ///< The number of groups (i.e., number of distinct keys).
- GROUP_OFFSETS, ///< The offsets separating groups.
- GROUP_LABELS ///< Group labels (which is also the same as group indices).
- };
+ [[nodiscard]] virtual std::unique_ptr operator()(
+ column_view const& input,
+ device_span offsets,
+ data_type output_dtype,
+ null_policy null_handling,
+ std::optional> init,
+ rmm::cuda_stream_view stream,
+ rmm::device_async_resource_ref mr) const = 0;
+};
+// Forward declaration.
+namespace groupby ::detail {
+struct aggregate_result_functor;
+}
+
+/**
+ * @brief The interface for host-based UDF implementation for groupby aggregation context.
+ *
+ * An implementation of host-based UDF for groupby needs to be derived from this class.
+ * In addition to implementing the virtual functions declared in the base class `host_udf_base`,
+ * such a derived class must also define the functions `get_empty_output()` to return result when
+ * the input is empty, and ``operator()`` to perform its groupby operations.
+ *
+ * During execution, the derived class can access internal data provided by the libcudf groupby
+ * framework through a set of ``get*`` accessors, as well as calling other built-in groupby
+ * aggregations through the ``compute_aggregation`` function.
+ *
+ * @note The derived class can only perform sort-based groupby aggregations. Hash-based groupby
+ * aggregations require more complex data structure and is not yet supported.
+ *
+ * Example:
+ * @code{.cpp}
+ * struct my_udf_aggregation : cudf::groupby_host_udf {
+ * my_udf_aggregation() = default;
+ *
+ * [[nodiscard]] std::unique_ptr get_empty_output(
+ * rmm::cuda_stream_view stream,
+ * rmm::device_async_resource_ref mr) const override
+ * {
+ * // Return a column corresponding to the result when the input values column is empty.
+ * }
+ *
+ * [[nodiscard]] std::unique_ptr operator()(
+ * rmm::cuda_stream_view stream,
+ * rmm::device_async_resource_ref mr) const override
+ * {
+ * // Perform UDF computation using the input data and return the result.
+ * }
+ *
+ * [[nodiscard]] bool is_equal(host_udf_base const& other) const override
+ * {
+ * // Check if the other object is also instance of this class.
+ * // If there are internal state variables, they may need to be checked for equality as well.
+ * return dynamic_cast(&other) != nullptr;
+ * }
+ *
+ * [[nodiscard]] std::unique_ptr clone() const override
+ * {
+ * return std::make_unique();
+ * }
+ * };
+ * @endcode
+ */
+struct groupby_host_udf : host_udf_base {
/**
- * @brief Describe possible data that may be needed in the derived class for its operations.
+ * @brief Get the output when the input values column is empty.
*
- * Such data can be either intermediate data such as sorted values or group labels etc, or the
- * results of other aggregations.
+ * This is called in libcudf when the input values column is empty. In such situations libcudf
+ * tries to generate the output directly without unnecessarily evaluating the intermediate data.
*
- * Each derived host-based UDF class may need a different set of data. It is inefficient to
- * evaluate and pass down all these possible data at once from libcudf. A solution for that is,
- * the derived class can define a subset of data that it needs and libcudf will evaluate
- * and pass down only data requested from that set.
+ * @param stream The CUDA stream to use for any kernel launches
+ * @param mr Device memory resource to use for any allocations
+ * @return The output result of the aggregation when the input values column is empty
*/
- struct data_attribute {
- /**
- * @brief Hold all possible data types for the input of the aggregation in the derived class.
- */
- using value_type = std::variant>;
- value_type value; ///< The actual data attribute, wrapped by this struct
- ///< as a wrapper is needed to define `hash` and `equal_to` functors.
-
- data_attribute() = default; ///< Default constructor
- data_attribute(data_attribute&&) = default; ///< Move constructor
-
- /**
- * @brief Construct a new data attribute from an aggregation attribute.
- * @param value_ An aggregation attribute
- */
- template )>
- data_attribute(T value_) : value{value_}
- {
- }
-
- /**
- * @brief Construct a new data attribute from another aggregation request.
- * @param value_ An aggregation request
- */
- template ||
- std::is_same_v)>
- data_attribute(std::unique_ptr value_) : value{std::move(value_)}
- {
- CUDF_EXPECTS(std::get>(value) != nullptr,
- "Invalid aggregation request.");
- if constexpr (std::is_same_v) {
- CUDF_EXPECTS(
- dynamic_cast(std::get>(value).get()) != nullptr,
- "Requesting results from other aggregations is only supported in groupby "
- "aggregations.");
- }
- }
-
- /**
- * @brief Copy constructor.
- * @param other The other data attribute to copy from
- */
- data_attribute(data_attribute const& other);
-
- /**
- * @brief Hash functor for `data_attribute`.
- */
- struct hash {
- /**
- * @brief Compute the hash value of a data attribute.
- * @param attr The data attribute to hash
- * @return The hash value of the data attribute
- */
- std::size_t operator()(data_attribute const& attr) const;
- }; // struct hash
-
- /**
- * @brief Equality comparison functor for `data_attribute`.
- */
- struct equal_to {
- /**
- * @brief Check if two data attributes are equal.
- * @param lhs The left-hand side data attribute
- * @param rhs The right-hand side data attribute
- * @return True if the two data attributes are equal
- */
- bool operator()(data_attribute const& lhs, data_attribute const& rhs) const;
- }; // struct equal_to
- }; // struct data_attribute
+ [[nodiscard]] virtual std::unique_ptr get_empty_output(
+ rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const = 0;
/**
- * @brief Set of attributes for the input data that is needed for computing the aggregation.
+ * @brief Perform the main groupby computation for the host-based UDF.
+ *
+ * @param stream The CUDA stream to use for any kernel launches
+ * @param mr Device memory resource to use for any allocations
+ * @return The output result of the aggregation
*/
- using data_attribute_set_t =
- std::unordered_set;
+ [[nodiscard]] virtual std::unique_ptr operator()(
+ rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const = 0;
+
+ private:
+ // Allow the struct `aggregate_result_functor` to set its private callback variables.
+ friend struct groupby::detail::aggregate_result_functor;
/**
- * @brief Return a set of attributes for the data that is needed for computing the aggregation.
- *
- * The derived class should return the attributes corresponding to only the data that it needs to
- * avoid unnecessary computation performed in libcudf. If this function is not overridden, an
- * empty set is returned. That means all the data attributes (except results from other
- * aggregations in groupby) will be needed.
- *
- * @return A set of `data_attribute`
+ * @brief Callback to access the input values column.
+ */
+ std::function callback_input_values;
+
+ /**
+ * @brief Callback to access the input values grouped according to the input keys for which the
+ * values within each group maintain their original order.
+ */
+ std::function callback_grouped_values;
+
+ /**
+ * @brief Callback to access the input values grouped according to the input keys and sorted
+ * within each group.
+ */
+ std::function callback_sorted_grouped_values;
+
+ /**
+ * @brief Callback to access the number of groups (i.e., number of distinct keys).
*/
- [[nodiscard]] virtual data_attribute_set_t get_required_data() const { return {}; }
+ std::function callback_num_groups;
/**
- * @brief Hold all possible types of the data that is passed to the derived class for executing
- * the aggregation.
+ * @brief Callback to access the offsets separating groups.
*/
- using input_data_t = std::variant>;
+ std::function(void)> callback_group_offsets;
/**
- * @brief Input to the aggregation, mapping from each data attribute to its actual data.
+ * @brief Callback to access the group labels (which is also the same as group indices).
*/
- using input_map_t = std::
- unordered_map;
+ std::function(void)> callback_group_labels;
/**
- * @brief Output type of the aggregation.
+ * @brief Callback to access the result from other groupby aggregations.
+ */
+ std::function)> callback_compute_aggregation;
+
+ protected:
+ /**
+ * @brief Access the input values column.
*
- * Currently only a single type is supported as the output of the aggregation, but it will hold
- * more type in the future when reduction is supported.
+ * @return The input values column.
*/
- using output_t = std::variant>;
+ [[nodiscard]] column_view get_input_values() const
+ {
+ CUDF_EXPECTS(callback_input_values, "Uninitialized callback_input_values.");
+ return callback_input_values();
+ }
/**
- * @brief Get the output when the input values column is empty.
+ * @brief Access the input values grouped according to the input keys for which the values
+ * within each group maintain their original order.
*
- * This is called in libcudf when the input values column is empty. In such situations libcudf
- * tries to generate the output directly without unnecessarily evaluating the intermediate data.
+ * @return The grouped values column.
+ */
+ [[nodiscard]] column_view get_grouped_values() const
+ {
+ CUDF_EXPECTS(callback_grouped_values, "Uninitialized callback_grouped_values.");
+ return callback_grouped_values();
+ }
+
+ /**
+ * @brief Access the input values grouped according to the input keys and sorted within each
+ * group.
*
- * @param output_dtype The expected output data type
- * @param stream The CUDA stream to use for any kernel launches
- * @param mr Device memory resource to use for any allocations
- * @return The output result of the aggregation when input values is empty
+ * @return The sorted grouped values column.
*/
- [[nodiscard]] virtual output_t get_empty_output(std::optional output_dtype,
- rmm::cuda_stream_view stream,
- rmm::device_async_resource_ref mr) const = 0;
+ [[nodiscard]] column_view get_sorted_grouped_values() const
+ {
+ CUDF_EXPECTS(callback_sorted_grouped_values, "Uninitialized callback_sorted_grouped_values.");
+ return callback_sorted_grouped_values();
+ }
/**
- * @brief Perform the main computation for the host-based UDF.
+ * @brief Access the number of groups (i.e., number of distinct keys).
*
- * @param input The input data needed for performing all computation
- * @param stream The CUDA stream to use for any kernel launches
- * @param mr Device memory resource to use for any allocations
- * @return The output result of the aggregation
+ * @return The number of groups.
*/
- [[nodiscard]] virtual output_t operator()(input_map_t const& input,
- rmm::cuda_stream_view stream,
- rmm::device_async_resource_ref mr) const = 0;
+ [[nodiscard]] size_type get_num_groups() const
+ {
+ CUDF_EXPECTS(callback_num_groups, "Uninitialized callback_num_groups.");
+ return callback_num_groups();
+ }
/**
- * @brief Computes hash value of the class's instance.
- * @return The hash value of the instance
+ * @brief Access the offsets separating groups.
+ *
+ * @return The array of group offsets.
*/
- [[nodiscard]] virtual std::size_t do_hash() const
+ [[nodiscard]] device_span get_group_offsets() const
{
- return std::hash{}(static_cast(aggregation::Kind::HOST_UDF));
+ CUDF_EXPECTS(callback_group_offsets, "Uninitialized callback_group_offsets.");
+ return callback_group_offsets();
}
/**
- * @brief Compares two instances of the derived class for equality.
- * @param other The other derived class's instance to compare with
- * @return True if the two instances are equal
+ * @brief Access the group labels (which is also the same as group indices).
+ *
+ * @return The array of group labels.
*/
- [[nodiscard]] virtual bool is_equal(host_udf_base const& other) const = 0;
+ [[nodiscard]] device_span get_group_labels() const
+ {
+ CUDF_EXPECTS(callback_group_labels, "Uninitialized callback_group_labels.");
+ return callback_group_labels();
+ }
/**
- * @brief Clones the instance.
+ * @brief Compute a built-in groupby aggregation and access its result.
*
- * A class derived from `host_udf_base` should not store too much data such that its instances
- * remain lightweight for efficient cloning.
+ * This allows the derived class to call any other built-in groupby aggregations on the same input
+ * values column and access the output for its operations.
*
- * @return A new instance cloned from this
+ * @param other_agg An arbitrary built-in groupby aggregation
+ * @return A `column_view` object corresponding to the output result of the given aggregation
*/
- [[nodiscard]] virtual std::unique_ptr clone() const = 0;
+ [[nodiscard]] column_view compute_aggregation(std::unique_ptr other_agg) const
+ {
+ CUDF_EXPECTS(callback_compute_aggregation, "Uninitialized callback for computing aggregation.");
+ return callback_compute_aggregation(std::move(other_agg));
+ }
};
/** @} */ // end of group
diff --git a/cpp/include/cudf/detail/aggregation/aggregation.hpp b/cpp/include/cudf/detail/aggregation/aggregation.hpp
index d873e93bd20..5574ed6ea6e 100644
--- a/cpp/include/cudf/detail/aggregation/aggregation.hpp
+++ b/cpp/include/cudf/detail/aggregation/aggregation.hpp
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2019-2024, NVIDIA CORPORATION.
+ * Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -967,7 +967,9 @@ class udf_aggregation final : public rolling_aggregation {
/**
* @brief Derived class for specifying host-based UDF aggregation.
*/
-class host_udf_aggregation final : public groupby_aggregation {
+class host_udf_aggregation final : public groupby_aggregation,
+ public reduce_aggregation,
+ public segmented_reduce_aggregation {
public:
std::unique_ptr udf_ptr;
diff --git a/cpp/include/cudf/detail/utilities/integer_utils.hpp b/cpp/include/cudf/detail/utilities/integer_utils.hpp
index 2e3d71815c0..44a86f1c84f 100644
--- a/cpp/include/cudf/detail/utilities/integer_utils.hpp
+++ b/cpp/include/cudf/detail/utilities/integer_utils.hpp
@@ -1,7 +1,7 @@
/*
* Copyright 2019 BlazingDB, Inc.
* Copyright 2019 Eyal Rozenberg
- * Copyright (c) 2020-2024, NVIDIA CORPORATION.
+ * Copyright (c) 2020-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,6 +23,8 @@
*/
#include
+#include
+#include
#include
#include
@@ -44,13 +46,17 @@ namespace util {
* `modulus` is positive. The safety is in regard to rollover.
*/
template
-constexpr S round_up_safe(S number_to_round, S modulus)
+CUDF_HOST_DEVICE constexpr S round_up_safe(S number_to_round, S modulus)
{
auto remainder = number_to_round % modulus;
if (remainder == 0) { return number_to_round; }
auto rounded_up = number_to_round - remainder + modulus;
if (rounded_up < number_to_round) {
- throw std::invalid_argument("Attempt to round up beyond the type's maximum value");
+#ifndef __CUDA_ARCH__
+ CUDF_FAIL("Attempt to round up beyond the type's maximum value", cudf::data_type_error);
+#else
+ CUDF_UNREACHABLE("Attempt to round up beyond the type's maximum value");
+#endif
}
return rounded_up;
}
diff --git a/cpp/include/cudf/utilities/span.hpp b/cpp/include/cudf/utilities/span.hpp
index e7b76946248..b5044a58934 100644
--- a/cpp/include/cudf/utilities/span.hpp
+++ b/cpp/include/cudf/utilities/span.hpp
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020-2024, NVIDIA CORPORATION.
+ * Copyright (c) 2020-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -197,11 +197,16 @@ struct host_span : public cudf::detail::span_basedata() + offset, count, _is_device_accessible};
}
@@ -434,8 +439,8 @@ struct device_span : public cudf::detail::span_basedata() + offset, count};
}
@@ -475,28 +480,28 @@ class base_2dspan {
*
* @return A pointer to the first element of the span
*/
- [[nodiscard]] constexpr auto data() const noexcept { return _flat.data(); }
+ [[nodiscard]] CUDF_HOST_DEVICE constexpr auto data() const noexcept { return _flat.data(); }
/**
* @brief Returns the size in the span as pair.
*
* @return pair representing rows and columns size of the span
*/
- [[nodiscard]] constexpr auto size() const noexcept { return _size; }
+ [[nodiscard]] CUDF_HOST_DEVICE constexpr auto size() const noexcept { return _size; }
/**
* @brief Returns the number of elements in the span.
*
* @return Number of elements in the span
*/
- [[nodiscard]] constexpr auto count() const noexcept { return _flat.size(); }
+ [[nodiscard]] CUDF_HOST_DEVICE constexpr auto count() const noexcept { return _flat.size(); }
/**
* @brief Checks if the span is empty.
*
* @return True if the span is empty, false otherwise
*/
- [[nodiscard]] constexpr bool is_empty() const noexcept { return count() == 0; }
+ [[nodiscard]] CUDF_HOST_DEVICE constexpr bool is_empty() const noexcept { return count() == 0; }
/**
* @brief Returns a reference to the row-th element of the sequence.
@@ -507,7 +512,7 @@ class base_2dspan {
* @param row the index of the element to access
* @return A reference to the row-th element of the sequence, i.e., `data()[row]`
*/
- constexpr RowType operator[](size_t row) const
+ CUDF_HOST_DEVICE constexpr RowType operator[](size_t row) const
{
return _flat.subspan(row * _size.second, _size.second);
}
@@ -517,7 +522,10 @@ class base_2dspan {
*
* @return A flattened span of the 2D span
*/
- [[nodiscard]] constexpr RowType flat_view() const { return _flat; }
+ [[nodiscard]] CUDF_HOST_DEVICE constexpr RowType flat_view() const
+ {
+ return _flat;
+ }
/**
* @brief Construct a 2D span from another 2D span of convertible type
diff --git a/cpp/include/nvtext/detail/generate_ngrams.hpp b/cpp/include/nvtext/detail/generate_ngrams.hpp
deleted file mode 100644
index ae48fed4e79..00000000000
--- a/cpp/include/nvtext/detail/generate_ngrams.hpp
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright (c) 2023-2024, NVIDIA CORPORATION.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include
-
-#include
-
-#include
-
-namespace CUDF_EXPORT nvtext {
-namespace detail {
-
-/**
- * @copydoc hash_character_ngrams(cudf::strings_column_view const&,
- * cudf::size_type, rmm::device_async_resource_ref)
- *
- * @param stream CUDA stream used for allocating/copying device memory and launching kernels
- */
-std::unique_ptr hash_character_ngrams(cudf::strings_column_view const& strings,
- cudf::size_type ngrams,
- rmm::cuda_stream_view stream,
- rmm::device_async_resource_ref mr);
-
-} // namespace detail
-} // namespace CUDF_EXPORT nvtext
diff --git a/cpp/include/nvtext/generate_ngrams.hpp b/cpp/include/nvtext/generate_ngrams.hpp
index 54282b8ef3c..b2ba1798a8f 100644
--- a/cpp/include/nvtext/generate_ngrams.hpp
+++ b/cpp/include/nvtext/generate_ngrams.hpp
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020-2024, NVIDIA CORPORATION.
+ * Copyright (c) 2020-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -117,6 +117,7 @@ std::unique_ptr generate_character_ngrams(
*
* @param input Strings column to produce ngrams from
* @param ngrams The ngram number to generate. Default is 5.
+ * @param seed The seed value to use with the hash algorithm. Default is 0.
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory.
* @return A lists column of hash values
@@ -124,6 +125,7 @@ std::unique_ptr generate_character_ngrams(
std::unique_ptr hash_character_ngrams(
cudf::strings_column_view const& input,
cudf::size_type ngrams = 5,
+ uint32_t seed = 0,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());
diff --git a/cpp/src/groupby/groupby.cu b/cpp/src/groupby/groupby.cu
index 4c90cd0eef5..6234148e9fa 100644
--- a/cpp/src/groupby/groupby.cu
+++ b/cpp/src/groupby/groupby.cu
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2019-2024, NVIDIA CORPORATION.
+ * Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -145,8 +145,11 @@ struct empty_column_constructor {
}
if constexpr (k == aggregation::Kind::HOST_UDF) {
- auto const& udf_ptr = dynamic_cast(agg).udf_ptr;
- return std::get>(udf_ptr->get_empty_output(std::nullopt, stream, mr));
+ auto const& udf_base_ptr =
+ dynamic_cast(agg).udf_ptr;
+ auto const udf_ptr = dynamic_cast(udf_base_ptr.get());
+ CUDF_EXPECTS(udf_ptr != nullptr, "Invalid HOST_UDF instance for groupby aggregation.");
+ return udf_ptr->get_empty_output(stream, mr);
}
return make_empty_column(target_type(values.type(), k));
diff --git a/cpp/src/groupby/sort/aggregate.cpp b/cpp/src/groupby/sort/aggregate.cpp
index 6480070e85a..fb3f7559d64 100644
--- a/cpp/src/groupby/sort/aggregate.cpp
+++ b/cpp/src/groupby/sort/aggregate.cpp
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2019-2024, NVIDIA CORPORATION.
+ * Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -795,58 +795,41 @@ void aggregate_result_functor::operator()(aggregation con
{
if (cache.has_result(values, agg)) { return; }
- auto const& udf_ptr = dynamic_cast(agg).udf_ptr;
- auto const data_attrs = [&]() -> host_udf_base::data_attribute_set_t {
- if (auto tmp = udf_ptr->get_required_data(); !tmp.empty()) { return tmp; }
- // Empty attribute set means everything.
- return {host_udf_base::groupby_data_attribute::INPUT_VALUES,
- host_udf_base::groupby_data_attribute::GROUPED_VALUES,
- host_udf_base::groupby_data_attribute::SORTED_GROUPED_VALUES,
- host_udf_base::groupby_data_attribute::NUM_GROUPS,
- host_udf_base::groupby_data_attribute::GROUP_OFFSETS,
- host_udf_base::groupby_data_attribute::GROUP_LABELS};
- }();
+ auto const& udf_base_ptr = dynamic_cast(agg).udf_ptr;
+ auto const udf_ptr = dynamic_cast(udf_base_ptr.get());
+ CUDF_EXPECTS(udf_ptr != nullptr, "Invalid HOST_UDF instance for groupby aggregation.");
- // Do not cache udf_input, as the actual input data may change from run to run.
- host_udf_base::input_map_t udf_input;
- for (auto const& attr : data_attrs) {
- CUDF_EXPECTS(std::holds_alternative(attr.value) ||
- std::holds_alternative>(attr.value),
- "Invalid input data attribute for HOST_UDF groupby aggregation.");
- if (std::holds_alternative(attr.value)) {
- switch (std::get(attr.value)) {
- case host_udf_base::groupby_data_attribute::INPUT_VALUES:
- udf_input.emplace(attr, values);
- break;
- case host_udf_base::groupby_data_attribute::GROUPED_VALUES:
- udf_input.emplace(attr, get_grouped_values());
- break;
- case host_udf_base::groupby_data_attribute::SORTED_GROUPED_VALUES:
- udf_input.emplace(attr, get_sorted_values());
- break;
- case host_udf_base::groupby_data_attribute::NUM_GROUPS:
- udf_input.emplace(attr, helper.num_groups(stream));
- break;
- case host_udf_base::groupby_data_attribute::GROUP_OFFSETS:
- udf_input.emplace(attr, helper.group_offsets(stream));
- break;
- case host_udf_base::groupby_data_attribute::GROUP_LABELS:
- udf_input.emplace(attr, helper.group_labels(stream));
- break;
- default: CUDF_UNREACHABLE("Invalid input data attribute for HOST_UDF groupby aggregation.");
- }
- } else { // data is result from another aggregation
- auto other_agg = std::get>(attr.value)->clone();
+ if (!udf_ptr->callback_input_values) {
+ udf_ptr->callback_input_values = [&]() -> column_view { return values; };
+ }
+ if (!udf_ptr->callback_grouped_values) {
+ udf_ptr->callback_grouped_values = [&]() -> column_view { return get_grouped_values(); };
+ }
+ if (!udf_ptr->callback_sorted_grouped_values) {
+ udf_ptr->callback_sorted_grouped_values = [&]() -> column_view { return get_sorted_values(); };
+ }
+ if (!udf_ptr->callback_num_groups) {
+ udf_ptr->callback_num_groups = [&]() -> size_type { return helper.num_groups(stream); };
+ }
+ if (!udf_ptr->callback_group_offsets) {
+ udf_ptr->callback_group_offsets = [&]() -> device_span {
+ return helper.group_offsets(stream);
+ };
+ }
+ if (!udf_ptr->callback_group_labels) {
+ udf_ptr->callback_group_labels = [&]() -> device_span {
+ return helper.group_labels(stream);
+ };
+ }
+ if (!udf_ptr->callback_compute_aggregation) {
+ udf_ptr->callback_compute_aggregation =
+ [&](std::unique_ptr other_agg) -> column_view {
cudf::detail::aggregation_dispatcher(other_agg->kind, *this, *other_agg);
- auto result = cache.get_result(values, *other_agg);
- udf_input.emplace(std::move(other_agg), std::move(result));
- }
+ return cache.get_result(values, *other_agg);
+ };
}
- auto output = (*udf_ptr)(udf_input, stream, mr);
- CUDF_EXPECTS(std::holds_alternative>(output),
- "Invalid output type from HOST_UDF groupby aggregation.");
- cache.add_result(values, agg, std::get>(std::move(output)));
+ cache.add_result(values, agg, (*udf_ptr)(stream, mr));
}
} // namespace detail
diff --git a/cpp/src/groupby/sort/host_udf_aggregation.cpp b/cpp/src/groupby/sort/host_udf_aggregation.cpp
index 0da47e17f48..6f1fe80c4bd 100644
--- a/cpp/src/groupby/sort/host_udf_aggregation.cpp
+++ b/cpp/src/groupby/sort/host_udf_aggregation.cpp
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2024, NVIDIA CORPORATION.
+ * Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,51 +16,9 @@
#include
#include
-#include
namespace cudf {
-host_udf_base::data_attribute::data_attribute(data_attribute const& other)
- : value{std::visit(cudf::detail::visitor_overload{[](auto const& val) { return value_type{val}; },
- [](std::unique_ptr const& val) {
- return value_type{val->clone()};
- }},
- other.value)}
-{
-}
-
-std::size_t host_udf_base::data_attribute::hash::operator()(data_attribute const& attr) const
-{
- auto const hash_value =
- std::visit(cudf::detail::visitor_overload{
- [](auto const& val) { return std::hash{}(static_cast(val)); },
- [](std::unique_ptr const& val) { return val->do_hash(); }},
- attr.value);
- return std::hash{}(attr.value.index()) ^ hash_value;
-}
-
-bool host_udf_base::data_attribute::equal_to::operator()(data_attribute const& lhs,
- data_attribute const& rhs) const
-{
- auto const& lhs_val = lhs.value;
- auto const& rhs_val = rhs.value;
- if (lhs_val.index() != rhs_val.index()) { return false; }
- return std::visit(
- cudf::detail::visitor_overload{
- [](auto const& lhs_val, auto const& rhs_val) {
- if constexpr (std::is_same_v) {
- return lhs_val == rhs_val;
- } else {
- return false;
- }
- },
- [](std::unique_ptr const& lhs_val, std::unique_ptr const& rhs_val) {
- return lhs_val->is_equal(*rhs_val);
- }},
- lhs_val,
- rhs_val);
-}
-
namespace detail {
host_udf_aggregation::host_udf_aggregation(std::unique_ptr udf_ptr_)
@@ -99,5 +57,9 @@ template CUDF_EXPORT std::unique_ptr make_host_udf_aggregation);
template CUDF_EXPORT std::unique_ptr
make_host_udf_aggregation(std::unique_ptr);
+template CUDF_EXPORT std::unique_ptr
+ make_host_udf_aggregation(std::unique_ptr);
+template CUDF_EXPORT std::unique_ptr
+ make_host_udf_aggregation(std::unique_ptr);
} // namespace cudf
diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu
index 82d8152ca1c..113342e9cbf 100644
--- a/cpp/src/io/json/read_json.cu
+++ b/cpp/src/io/json/read_json.cu
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2022-2024, NVIDIA CORPORATION.
+ * Copyright (c) 2022-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -30,6 +30,7 @@
#include
#include
+#include
#include
#include
@@ -37,12 +38,25 @@
#include
#include
+#include
+#include
+
#include
namespace cudf::io::json::detail {
namespace {
+namespace pools {
+
+BS::thread_pool& tpool()
+{
+ static BS::thread_pool _tpool(std::thread::hardware_concurrency());
+ return _tpool;
+}
+
+} // namespace pools
+
class compressed_host_buffer_source final : public datasource {
public:
explicit compressed_host_buffer_source(std::unique_ptr const& src,
@@ -51,8 +65,8 @@ class compressed_host_buffer_source final : public datasource {
{
auto ch_buffer = host_span(reinterpret_cast(_dbuf_ptr->data()),
_dbuf_ptr->size());
- if (comptype == compression_type::GZIP || comptype == compression_type::ZIP ||
- comptype == compression_type::SNAPPY) {
+ if (_comptype == compression_type::GZIP || _comptype == compression_type::ZIP ||
+ _comptype == compression_type::SNAPPY) {
_decompressed_ch_buffer_size = cudf::io::detail::get_uncompressed_size(_comptype, ch_buffer);
} else {
_decompressed_buffer = cudf::io::detail::decompress(_comptype, ch_buffer);
@@ -96,7 +110,22 @@ class compressed_host_buffer_source final : public datasource {
return std::make_unique(_decompressed_buffer.data() + offset, count);
}
- [[nodiscard]] bool supports_device_read() const override { return false; }
+ std::future device_read_async(size_t offset,
+ size_t size,
+ uint8_t* dst,
+ rmm::cuda_stream_view stream) override
+ {
+ auto& thread_pool = pools::tpool();
+ return thread_pool.submit_task([this, offset, size, dst, stream] {
+ auto hbuf = host_read(offset, size);
+ CUDF_CUDA_TRY(
+ cudaMemcpyAsync(dst, hbuf->data(), hbuf->size(), cudaMemcpyHostToDevice, stream.value()));
+ stream.synchronize();
+ return hbuf->size();
+ });
+ }
+
+ [[nodiscard]] bool supports_device_read() const override { return true; }
[[nodiscard]] size_t size() const override { return _decompressed_ch_buffer_size; }
@@ -431,6 +460,8 @@ device_span ingest_raw_input(device_span buffer,
// line of file i+1 don't end up on the same JSON line, if file i does not already end with a line
// delimiter.
auto constexpr num_delimiter_chars = 1;
+ std::vector> thread_tasks;
+ auto stream_pool = cudf::detail::fork_streams(stream, pools::tpool().get_thread_count());
auto delimiter_map = cudf::detail::make_empty_host_vector(sources.size(), stream);
std::vector prefsum_source_sizes(sources.size());
@@ -447,13 +478,17 @@ device_span ingest_raw_input(device_span buffer,
auto const total_bytes_to_read = std::min(range_size, prefsum_source_sizes.back() - range_offset);
range_offset -= start_source ? prefsum_source_sizes[start_source - 1] : 0;
- for (std::size_t i = start_source; i < sources.size() && bytes_read < total_bytes_to_read; i++) {
+ for (std::size_t i = start_source, cur_stream = 0;
+ i < sources.size() && bytes_read < total_bytes_to_read;
+ i++) {
if (sources[i]->is_empty()) continue;
auto data_size = std::min(sources[i]->size() - range_offset, total_bytes_to_read - bytes_read);
auto destination = reinterpret_cast(buffer.data()) + bytes_read +
(num_delimiter_chars * delimiter_map.size());
- if (sources[i]->is_device_read_preferred(data_size)) {
- bytes_read += sources[i]->device_read(range_offset, data_size, destination, stream);
+ if (sources[i]->supports_device_read()) {
+ thread_tasks.emplace_back(sources[i]->device_read_async(
+ range_offset, data_size, destination, stream_pool[cur_stream++ % stream_pool.size()]));
+ bytes_read += data_size;
} else {
h_buffers.emplace_back(sources[i]->host_read(range_offset, data_size));
auto const& h_buffer = h_buffers.back();
@@ -481,6 +516,15 @@ device_span ingest_raw_input(device_span buffer,
buffer.data());
}
stream.synchronize();
+
+ if (thread_tasks.size()) {
+ auto const bytes_read = std::accumulate(
+ thread_tasks.begin(), thread_tasks.end(), std::size_t{0}, [](std::size_t sum, auto& task) {
+ return sum + task.get();
+ });
+ CUDF_EXPECTS(bytes_read == total_bytes_to_read, "something's fishy");
+ }
+
return buffer.first(bytes_read + (delimiter_map.size() * num_delimiter_chars));
}
@@ -505,10 +549,17 @@ table_with_metadata read_json(host_span> sources,
return read_json_impl(sources, reader_opts, stream, mr);
std::vector> compressed_sources;
- for (size_t i = 0; i < sources.size(); i++) {
- compressed_sources.emplace_back(
- std::make_unique(sources[i], reader_opts.get_compression()));
+ std::vector>> thread_tasks;
+ auto& thread_pool = pools::tpool();
+ for (auto& src : sources) {
+ thread_tasks.emplace_back(thread_pool.submit_task([&reader_opts, &src] {
+ return std::make_unique(src, reader_opts.get_compression());
+ }));
}
+ std::transform(thread_tasks.begin(),
+ thread_tasks.end(),
+ std::back_inserter(compressed_sources),
+ [](auto& task) { return task.get(); });
// in read_json_impl, we need the compressed source size to actually be the
// uncompressed source size for correct batching
return read_json_impl(compressed_sources, reader_opts, stream, mr);
diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp
index bed1d5500d2..dff1f3f0c0e 100644
--- a/cpp/src/io/parquet/reader_impl.cpp
+++ b/cpp/src/io/parquet/reader_impl.cpp
@@ -97,38 +97,24 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num
_stream);
}
- // Compute column string sizes (using page string offsets) for this subpass
+ // Compute column string sizes (using page string offsets) for this output table chunk
col_string_sizes = calculate_page_string_offsets();
- // ensure cumulative column string sizes have been initialized
- if (pass.cumulative_col_string_sizes.empty()) {
- pass.cumulative_col_string_sizes.resize(_input_columns.size(), 0);
- }
-
- // Add to the cumulative column string sizes of this pass
- std::transform(pass.cumulative_col_string_sizes.begin(),
- pass.cumulative_col_string_sizes.end(),
- col_string_sizes.begin(),
- pass.cumulative_col_string_sizes.begin(),
- std::plus<>{});
-
// Check for overflow in cumulative column string sizes of this pass so that the page string
// offsets of overflowing (large) string columns are treated as 64-bit.
auto const threshold = static_cast(strings::detail::get_offset64_threshold());
- auto const has_large_strings = std::any_of(pass.cumulative_col_string_sizes.cbegin(),
- pass.cumulative_col_string_sizes.cend(),
+ auto const has_large_strings = std::any_of(col_string_sizes.cbegin(),
+ col_string_sizes.cend(),
[=](std::size_t sz) { return sz > threshold; });
if (has_large_strings and not strings::detail::is_large_strings_enabled()) {
CUDF_FAIL("String column exceeds the column size limit", std::overflow_error);
}
- // Mark any chunks for which the cumulative column string size has exceeded the
- // large strings threshold
- if (has_large_strings) {
- for (auto& chunk : pass.chunks) {
- auto const idx = chunk.src_col_index;
- if (pass.cumulative_col_string_sizes[idx] > threshold) { chunk.is_large_string_col = true; }
- }
+ // Mark/unmark column-chunk descriptors depending on the string sizes of corresponding output
+ // column chunks and the large strings threshold.
+ for (auto& chunk : pass.chunks) {
+ auto const idx = chunk.src_col_index;
+ chunk.is_large_string_col = (col_string_sizes[idx] > threshold);
}
}
@@ -210,11 +196,9 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num
// only do string buffer for leaf
if (idx == max_depth - 1 and out_buf.string_size() == 0 and
col_string_sizes[pass.chunks[c].src_col_index] > 0) {
- out_buf.create_string_data(
- col_string_sizes[pass.chunks[c].src_col_index],
- pass.cumulative_col_string_sizes[pass.chunks[c].src_col_index] >
- static_cast(strings::detail::get_offset64_threshold()),
- _stream);
+ out_buf.create_string_data(col_string_sizes[pass.chunks[c].src_col_index],
+ pass.chunks[c].is_large_string_col,
+ _stream);
}
if (has_strings) { str_data[idx] = out_buf.string_data(); }
out_buf.user_data |=
@@ -450,11 +434,11 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num
final_offsets.emplace_back(offset);
out_buf.user_data |= PARQUET_COLUMN_BUFFER_FLAG_LIST_TERMINATED;
} else if (out_buf.type.id() == type_id::STRING) {
- // need to cap off the string offsets column
- auto const sz = static_cast(col_string_sizes[idx]);
- if (sz <= strings::detail::get_offset64_threshold()) {
+ // only if it is not a large strings column
+ if (col_string_sizes[idx] <=
+ static_cast(strings::detail::get_offset64_threshold())) {
out_buffers.emplace_back(static_cast(out_buf.data()) + out_buf.size);
- final_offsets.emplace_back(sz);
+ final_offsets.emplace_back(static_cast(col_string_sizes[idx]));
}
}
}
diff --git a/cpp/src/io/parquet/reader_impl_chunking.hpp b/cpp/src/io/parquet/reader_impl_chunking.hpp
index ca46f198bb8..4a773fbced1 100644
--- a/cpp/src/io/parquet/reader_impl_chunking.hpp
+++ b/cpp/src/io/parquet/reader_impl_chunking.hpp
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2023-2024, NVIDIA CORPORATION.
+ * Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -130,9 +130,6 @@ struct pass_intermediate_data {
rmm::device_buffer decomp_dict_data{0, cudf::get_default_stream()};
rmm::device_uvector str_dict_index{0, cudf::get_default_stream()};
- // cumulative strings column sizes.
- std::vector cumulative_col_string_sizes{};
-
int level_type_size{0};
// skip_rows / num_rows for this pass.
diff --git a/cpp/src/io/utilities/parsing_utils.cuh b/cpp/src/io/utilities/parsing_utils.cuh
index 75e45a68842..9833dab282e 100644
--- a/cpp/src/io/utilities/parsing_utils.cuh
+++ b/cpp/src/io/utilities/parsing_utils.cuh
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020-2024, NVIDIA CORPORATION.
+ * Copyright (c) 2020-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -171,7 +171,10 @@ constexpr uint8_t decode_digit(char c, bool* valid_flag)
}
// Converts character to lowercase.
-constexpr char to_lower(char const c) { return c >= 'A' && c <= 'Z' ? c + ('a' - 'A') : c; }
+CUDF_HOST_DEVICE constexpr char to_lower(char const c)
+{
+ return c >= 'A' && c <= 'Z' ? c + ('a' - 'A') : c;
+}
/**
* @brief Checks if string is infinity, case insensitive with/without sign
@@ -515,13 +518,13 @@ struct ConvertFunctor {
template and !std::is_same_v and
!cudf::is_fixed_point())>
- __host__ __device__ __forceinline__ bool operator()(char const* begin,
- char const* end,
- void* out_buffer,
- size_t row,
- data_type const output_type,
- parse_options_view const& opts,
- bool as_hex = false)
+ __device__ __forceinline__ bool operator()(char const* begin,
+ char const* end,
+ void* out_buffer,
+ size_t row,
+ data_type const output_type,
+ parse_options_view const& opts,
+ bool as_hex = false)
{
auto const value = [as_hex, &opts, begin, end]() -> cuda::std::optional {
// Check for user-specified true/false values
@@ -564,13 +567,13 @@ struct ConvertFunctor {
* @brief Dispatch for boolean type types.
*/
template )>
- __host__ __device__ __forceinline__ bool operator()(char const* begin,
- char const* end,
- void* out_buffer,
- size_t row,
- data_type const output_type,
- parse_options_view const& opts,
- bool as_hex)
+ __device__ __forceinline__ bool operator()(char const* begin,
+ char const* end,
+ void* out_buffer,
+ size_t row,
+ data_type const output_type,
+ parse_options_view const& opts,
+ bool as_hex)
{
auto const value = [&opts, begin, end]() -> cuda::std::optional {
// Check for user-specified true/false values
@@ -593,13 +596,13 @@ struct ConvertFunctor {
* is not valid. In such case, the validity mask is set to zero too.
*/
template )>
- __host__ __device__ __forceinline__ bool operator()(char const* begin,
- char const* end,
- void* out_buffer,
- size_t row,
- data_type const output_type,
- parse_options_view const& opts,
- bool as_hex)
+ __device__ __forceinline__ bool operator()(char const* begin,
+ char const* end,
+ void* out_buffer,
+ size_t row,
+ data_type const output_type,
+ parse_options_view const& opts,
+ bool as_hex)
{
auto const value = [&opts, begin, end]() -> cuda::std::optional {
// Check for user-specified true/false values
diff --git a/cpp/src/io/utilities/trie.cuh b/cpp/src/io/utilities/trie.cuh
index c0efc5b6f20..dbdc4a34277 100644
--- a/cpp/src/io/utilities/trie.cuh
+++ b/cpp/src/io/utilities/trie.cuh
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2018-2024, NVIDIA CORPORATION.
+ * Copyright (c) 2018-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -74,16 +74,14 @@ CUDF_EXPORT trie create_serialized_trie(std::vector const& keys,
/*
* @brief Searches for a string in a serialized trie.
*
- * Can be executed on host or device, as long as the data is available
- *
* @param trie Pointer to the array of nodes that make up the trie
* @param key Pointer to the start of the string to find
* @param key_len Length of the string to find
*
* @return Boolean value; true if string is found, false otherwise
*/
-CUDF_HOST_DEVICE inline bool serialized_trie_contains(device_span trie,
- device_span key)
+__device__ inline bool serialized_trie_contains(device_span trie,
+ device_span key)
{
if (trie.empty()) { return false; }
if (key.empty()) { return trie.front().is_leaf; }
diff --git a/cpp/src/reductions/reductions.cpp b/cpp/src/reductions/reductions.cpp
index 75ebc078930..928625a7e8f 100644
--- a/cpp/src/reductions/reductions.cpp
+++ b/cpp/src/reductions/reductions.cpp
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2019-2024, NVIDIA CORPORATION.
+ * Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,6 +14,7 @@
* limitations under the License.
*/
+#include
#include
#include
#include
@@ -144,6 +145,13 @@ struct reduce_dispatch_functor {
auto td_agg = static_cast(agg);
return tdigest::detail::reduce_merge_tdigest(col, td_agg.max_centroids, stream, mr);
}
+ case aggregation::HOST_UDF: {
+ auto const& udf_base_ptr =
+ dynamic_cast(agg).udf_ptr;
+ auto const udf_ptr = dynamic_cast(udf_base_ptr.get());
+ CUDF_EXPECTS(udf_ptr != nullptr, "Invalid HOST_UDF instance for reduction.");
+ return (*udf_ptr)(col, output_dtype, init, stream, mr);
+ } // case aggregation::HOST_UDF
default: CUDF_FAIL("Unsupported reduction operator");
}
}
@@ -161,9 +169,11 @@ std::unique_ptr reduce(column_view const& col,
cudf::data_type_error);
if (init.has_value() && !(agg.kind == aggregation::SUM || agg.kind == aggregation::PRODUCT ||
agg.kind == aggregation::MIN || agg.kind == aggregation::MAX ||
- agg.kind == aggregation::ANY || agg.kind == aggregation::ALL)) {
+ agg.kind == aggregation::ANY || agg.kind == aggregation::ALL ||
+ agg.kind == aggregation::HOST_UDF)) {
CUDF_FAIL(
- "Initial value is only supported for SUM, PRODUCT, MIN, MAX, ANY, and ALL aggregation types");
+ "Initial value is only supported for SUM, PRODUCT, MIN, MAX, ANY, ALL, and HOST_UDF "
+ "aggregation types");
}
// Returns default scalar if input column is empty or all null
diff --git a/cpp/src/reductions/segmented/reductions.cpp b/cpp/src/reductions/segmented/reductions.cpp
index 1c3a2b0c0f3..5835bfcf0a1 100644
--- a/cpp/src/reductions/segmented/reductions.cpp
+++ b/cpp/src/reductions/segmented/reductions.cpp
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2022-2024, NVIDIA CORPORATION.
+ * Copyright (c) 2022-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,6 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
+#include
#include
#include
#include
@@ -98,6 +100,13 @@ struct segmented_reduce_dispatch_functor {
}
case segmented_reduce_aggregation::NUNIQUE:
return segmented_nunique(col, offsets, null_handling, stream, mr);
+ case aggregation::HOST_UDF: {
+ auto const& udf_base_ptr =
+ dynamic_cast(agg).udf_ptr;
+ auto const udf_ptr = dynamic_cast(udf_base_ptr.get());
+ CUDF_EXPECTS(udf_ptr != nullptr, "Invalid HOST_UDF instance for segmented reduction.");
+ return (*udf_ptr)(col, offsets, output_dtype, null_handling, init, stream, mr);
+ } // case aggregation::HOST_UDF
default: CUDF_FAIL("Unsupported aggregation type.");
}
}
@@ -117,9 +126,11 @@ std::unique_ptr segmented_reduce(column_view const& segmented_values,
cudf::data_type_error);
if (init.has_value() && !(agg.kind == aggregation::SUM || agg.kind == aggregation::PRODUCT ||
agg.kind == aggregation::MIN || agg.kind == aggregation::MAX ||
- agg.kind == aggregation::ANY || agg.kind == aggregation::ALL)) {
+ agg.kind == aggregation::ANY || agg.kind == aggregation::ALL ||
+ agg.kind == aggregation::HOST_UDF)) {
CUDF_FAIL(
- "Initial value is only supported for SUM, PRODUCT, MIN, MAX, ANY, and ALL aggregation types");
+ "Initial value is only supported for SUM, PRODUCT, MIN, MAX, ANY, ALL, and HOST_UDF "
+ "aggregation types");
}
if (segmented_values.is_empty() && offsets.empty()) {
diff --git a/cpp/src/text/generate_ngrams.cu b/cpp/src/text/generate_ngrams.cu
index 997b0278fe2..33d52ccd570 100644
--- a/cpp/src/text/generate_ngrams.cu
+++ b/cpp/src/text/generate_ngrams.cu
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020-2024, NVIDIA CORPORATION.
+ * Copyright (c) 2020-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -32,7 +32,7 @@
#include
#include
-#include
+#include
#include
#include
@@ -315,6 +315,7 @@ namespace {
*/
CUDF_KERNEL void character_ngram_hash_kernel(cudf::column_device_view const d_strings,
cudf::size_type ngrams,
+ uint32_t seed,
cudf::size_type const* d_ngram_offsets,
cudf::hash_value_type* d_results)
{
@@ -332,7 +333,7 @@ CUDF_KERNEL void character_ngram_hash_kernel(cudf::column_device_view const d_st
__shared__ cudf::hash_value_type hvs[block_size]; // temp store for hash values
auto const ngram_offset = d_ngram_offsets[str_idx];
- auto const hasher = cudf::hashing::detail::MurmurHash3_x86_32{0};
+ auto const hasher = cudf::hashing::detail::MurmurHash3_x86_32{seed};
auto const end = d_str.data() + d_str.size_bytes();
auto const warp_count = (d_str.size_bytes() / cudf::detail::warp_size) + 1;
@@ -368,6 +369,7 @@ CUDF_KERNEL void character_ngram_hash_kernel(cudf::column_device_view const d_st
std::unique_ptr hash_character_ngrams(cudf::strings_column_view const& input,
cudf::size_type ngrams,
+ uint32_t seed,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
@@ -400,7 +402,7 @@ std::unique_ptr hash_character_ngrams(cudf::strings_column_view co
auto d_hashes = hashes->mutable_view().data();
character_ngram_hash_kernel<<>>(
- *d_strings, ngrams, d_offsets, d_hashes);
+ *d_strings, ngrams, seed, d_offsets, d_hashes);
return make_lists_column(
input.size(), std::move(offsets), std::move(hashes), 0, rmm::device_buffer{}, stream, mr);
@@ -419,11 +421,12 @@ std::unique_ptr generate_character_ngrams(cudf::strings_column_vie
std::unique_ptr hash_character_ngrams(cudf::strings_column_view const& strings,
cudf::size_type ngrams,
+ uint32_t seed,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
- return detail::hash_character_ngrams(strings, ngrams, stream, mr);
+ return detail::hash_character_ngrams(strings, ngrams, seed, stream, mr);
}
} // namespace nvtext
diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt
index 344979e1288..35877ac34b9 100644
--- a/cpp/tests/CMakeLists.txt
+++ b/cpp/tests/CMakeLists.txt
@@ -220,11 +220,12 @@ ConfigureTest(
REDUCTIONS_TEST
reductions/collect_ops_tests.cpp
reductions/ewm_tests.cpp
+ reductions/host_udf_example_tests.cu
+ reductions/list_rank_test.cpp
reductions/rank_tests.cpp
reductions/reduction_tests.cpp
reductions/scan_tests.cpp
reductions/segmented_reduction_tests.cpp
- reductions/list_rank_test.cpp
reductions/tdigest_tests.cu
GPUS 1
PERCENT 70
diff --git a/cpp/tests/groupby/host_udf_example_tests.cu b/cpp/tests/groupby/host_udf_example_tests.cu
index a454bd692fc..e1ded37d8a7 100644
--- a/cpp/tests/groupby/host_udf_example_tests.cu
+++ b/cpp/tests/groupby/host_udf_example_tests.cu
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2024, NVIDIA CORPORATION.
+ * Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,9 +21,7 @@
#include
#include
#include
-#include
#include
-#include
#include
#include
@@ -34,6 +32,9 @@
#include
#include
+using doubles_col = cudf::test::fixed_width_column_wrapper;
+using int32s_col = cudf::test::fixed_width_column_wrapper;
+
namespace {
/**
* @brief A host-based UDF implementation for groupby.
@@ -41,42 +42,21 @@ namespace {
* For each group of values, the aggregation computes
* `(group_idx + 1) * group_sum_of_squares - group_max * group_sum`.
*/
-struct host_udf_groupby_example : cudf::host_udf_base {
+struct host_udf_groupby_example : cudf::groupby_host_udf {
host_udf_groupby_example() = default;
- [[nodiscard]] data_attribute_set_t get_required_data() const override
- {
- // We need grouped values, group offsets, group labels, and also results from groups'
- // MAX and SUM aggregations.
- return {groupby_data_attribute::GROUPED_VALUES,
- groupby_data_attribute::GROUP_OFFSETS,
- groupby_data_attribute::GROUP_LABELS,
- cudf::make_max_aggregation(),
- cudf::make_sum_aggregation()};
- }
-
- [[nodiscard]] output_t get_empty_output(
- [[maybe_unused]] std::optional output_dtype,
- [[maybe_unused]] rmm::cuda_stream_view stream,
- [[maybe_unused]] rmm::device_async_resource_ref mr) const override
+ [[nodiscard]] std::unique_ptr get_empty_output(
+ rmm::cuda_stream_view, rmm::device_async_resource_ref) const override
{
return cudf::make_empty_column(
cudf::data_type{cudf::type_to_id()});
}
- [[nodiscard]] output_t operator()(input_map_t const& input,
- rmm::cuda_stream_view stream,
- rmm::device_async_resource_ref mr) const override
+ [[nodiscard]] std::unique_ptr operator()(
+ rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const override
{
- auto const& values =
- std::get(input.at(groupby_data_attribute::GROUPED_VALUES));
- return cudf::type_dispatcher(values.type(), groupby_fn{this}, input, stream, mr);
- }
-
- [[nodiscard]] std::size_t do_hash() const override
- {
- // Just return the same hash for all instances of this class.
- return std::size_t{12345};
+ auto const values = get_grouped_values();
+ return cudf::type_dispatcher(values.type(), groupby_fn{*this}, stream, mr);
}
[[nodiscard]] bool is_equal(host_udf_base const& other) const override
@@ -92,37 +72,33 @@ struct host_udf_groupby_example : cudf::host_udf_base {
struct groupby_fn {
// Store pointer to the parent class so we can call its functions.
- host_udf_groupby_example const* parent;
+ host_udf_groupby_example const& parent;
- // For simplicity, this example only accepts double input and always produces double output.
+ // For simplicity, this example only accepts a single type input and output.
using InputType = double;
using OutputType = double;
template )>
- output_t operator()(Args...) const
+ std::unique_ptr operator()(Args...) const
{
CUDF_FAIL("Unsupported input type.");
}
template )>
- output_t operator()(input_map_t const& input,
- rmm::cuda_stream_view stream,
- rmm::device_async_resource_ref mr) const
+ std::unique_ptr operator()(rmm::cuda_stream_view stream,
+ rmm::device_async_resource_ref mr) const
{
- auto const& values =
- std::get(input.at(groupby_data_attribute::GROUPED_VALUES));
- if (values.size() == 0) { return parent->get_empty_output(std::nullopt, stream, mr); }
+ auto const values = parent.get_grouped_values();
+ if (values.size() == 0) { return parent.get_empty_output(stream, mr); }
- auto const offsets = std::get>(
- input.at(groupby_data_attribute::GROUP_OFFSETS));
+ auto const offsets = parent.get_group_offsets();
CUDF_EXPECTS(offsets.size() > 0, "Invalid offsets.");
auto const num_groups = static_cast(offsets.size()) - 1;
- auto const group_indices = std::get>(
- input.at(groupby_data_attribute::GROUP_LABELS));
- auto const group_max = std::get(
- input.at(cudf::make_max_aggregation()));
- auto const group_sum = std::get(
- input.at(cudf::make_sum_aggregation()));
+ auto const group_indices = parent.get_group_labels();
+ auto const group_max =
+ parent.compute_aggregation(cudf::make_max_aggregation());
+ auto const group_sum =
+ parent.compute_aggregation(cudf::make_sum_aggregation());
auto const values_dv_ptr = cudf::column_device_view::create(values, stream);
auto const output = cudf::make_numeric_column(cudf::data_type{cudf::type_to_id()},
@@ -191,9 +167,6 @@ struct host_udf_groupby_example : cudf::host_udf_base {
} // namespace
-using doubles_col = cudf::test::fixed_width_column_wrapper;
-using int32s_col = cudf::test::fixed_width_column_wrapper;
-
struct HostUDFGroupbyExampleTest : cudf::test::BaseFixture {};
TEST_F(HostUDFGroupbyExampleTest, SimpleInput)
diff --git a/cpp/tests/groupby/host_udf_tests.cpp b/cpp/tests/groupby/host_udf_tests.cpp
index 1a0f68c0c6c..17da28cdefc 100644
--- a/cpp/tests/groupby/host_udf_tests.cpp
+++ b/cpp/tests/groupby/host_udf_tests.cpp
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2024, NVIDIA CORPORATION.
+ * Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,178 +26,121 @@
#include
namespace {
+
/**
- * @brief A host-based UDF implementation used for unit tests.
+ * @brief Generate a random aggregation object from {min, max, sum, product}.
*/
-struct host_udf_test_base : cudf::host_udf_base {
+std::unique_ptr get_random_agg()
+{
+ std::random_device rd;
+ std::mt19937 gen(rd());
+ std::uniform_int_distribution distr(1, 4);
+ switch (distr(gen)) {
+ case 1: return cudf::make_min_aggregation();
+ case 2: return cudf::make_max_aggregation();
+ case 3: return cudf::make_sum_aggregation();
+ case 4: return cudf::make_product_aggregation();
+ default: CUDF_UNREACHABLE("This should not be reached.");
+ }
+ return nullptr;
+}
+
+/**
+ * @brief A host-based UDF implementation used for unit tests for groupby aggregation.
+ */
+struct host_udf_groupby_test : cudf::groupby_host_udf {
int test_location_line; // the location where testing is called
bool* test_run; // to check if the test is accidentally skipped
- data_attribute_set_t input_attrs;
+ bool test_other_agg; // test calling other aggregation
- host_udf_test_base(int test_location_line_, bool* test_run_, data_attribute_set_t input_attrs_)
- : test_location_line{test_location_line_},
- test_run{test_run_},
- input_attrs(std::move(input_attrs_))
+ host_udf_groupby_test(int test_location_line_, bool* test_run_, bool test_other_agg_)
+ : test_location_line{test_location_line_}, test_run{test_run_}, test_other_agg{test_other_agg_}
{
}
- [[nodiscard]] data_attribute_set_t get_required_data() const override { return input_attrs; }
-
- // This is the main testing function, which checks for the correctness of input data.
- // The rests are just to satisfy the interface.
- [[nodiscard]] output_t operator()(input_map_t const& input,
- rmm::cuda_stream_view stream,
- rmm::device_async_resource_ref mr) const override
+ [[nodiscard]] std::size_t do_hash() const override { return 0; }
+ [[nodiscard]] bool is_equal(host_udf_base const& other) const override
{
- SCOPED_TRACE("Test instance created at line: " + std::to_string(test_location_line));
-
- test_data_attributes(input, stream, mr);
-
- *test_run = true; // test is run successfully
- return get_empty_output(std::nullopt, stream, mr);
+ // Just check if the other object is also instance of this class.
+ return dynamic_cast(&other) != nullptr;
+ }
+ [[nodiscard]] std::unique_ptr clone() const override
+ {
+ return std::make_unique(test_location_line, test_run, test_other_agg);
}
- [[nodiscard]] output_t get_empty_output(
- [[maybe_unused]] std::optional output_dtype,
+ [[nodiscard]] std::unique_ptr get_empty_output(
[[maybe_unused]] rmm::cuda_stream_view stream,
[[maybe_unused]] rmm::device_async_resource_ref mr) const override
{
- // Unused function - dummy output.
+ // Dummy output.
return cudf::make_empty_column(cudf::data_type{cudf::type_id::INT32});
}
- [[nodiscard]] std::size_t do_hash() const override { return 0; }
- [[nodiscard]] bool is_equal(host_udf_base const& other) const override { return true; }
+ [[nodiscard]] std::unique_ptr operator()(
+ rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const override
+ {
+ SCOPED_TRACE("Test instance created at line: " + std::to_string(test_location_line));
- // The main test function, which must be implemented for each kind of aggregations
- // (groupby/reduction/segmented_reduction).
- virtual void test_data_attributes(input_map_t const& input,
- rmm::cuda_stream_view stream,
- rmm::device_async_resource_ref mr) const = 0;
-};
+ // Perform tests on types for the groupby data: we must ensure the data corresponding to each
+ // `groupby_data` enum having the correct type.
-/**
- * @brief A host-based UDF implementation used for unit tests for groupby aggregation.
- */
-struct host_udf_groupby_test : host_udf_test_base {
- host_udf_groupby_test(int test_location_line_,
- bool* test_run_,
- data_attribute_set_t input_attrs_ = {})
- : host_udf_test_base(test_location_line_, test_run_, std::move(input_attrs_))
- {
- }
+ {
+ auto const inp_data = get_input_values();
+ EXPECT_TRUE((std::is_same_v>));
+ }
- [[nodiscard]] std::unique_ptr