From 4ec389b4c515e4b3b85d6fd28b1471b1f1de830d Mon Sep 17 00:00:00 2001 From: Nghia Truong <7416935+ttnghia@users.noreply.github.com> Date: Mon, 13 Jan 2025 09:27:13 -0800 Subject: [PATCH] Implement `HOST_UDF` aggregation for reduction and segmented reduction (#17645) Following https://github.com/rapidsai/cudf/pull/17592, this enables `HOST_UDF` aggregation in reduction and segmented reduction, allowing to execute a host-side user-defined function (UDF) through libcudf aggregation framework. Closes https://github.com/rapidsai/cudf/issues/16633. Authors: - Nghia Truong (https://github.com/ttnghia) - Vyas Ramasubramani (https://github.com/vyasr) Approvers: - Robert (Bobby) Evans (https://github.com/revans2) - Kyle Edwards (https://github.com/KyleFromNVIDIA) - Yunsong Wang (https://github.com/PointKernel) - https://github.com/nvdbaranec URL: https://github.com/rapidsai/cudf/pull/17645 --- cpp/include/cudf/aggregation.hpp | 4 +- cpp/include/cudf/aggregation/host_udf.hpp | 478 +++++++++++------- .../cudf/detail/aggregation/aggregation.hpp | 6 +- cpp/src/groupby/groupby.cu | 9 +- cpp/src/groupby/sort/aggregate.cpp | 81 ++- cpp/src/groupby/sort/host_udf_aggregation.cpp | 48 +- cpp/src/reductions/reductions.cpp | 16 +- cpp/src/reductions/segmented/reductions.cpp | 17 +- cpp/tests/CMakeLists.txt | 3 +- cpp/tests/groupby/host_udf_example_tests.cu | 75 +-- cpp/tests/groupby/host_udf_tests.cpp | 245 ++++----- .../reductions/host_udf_example_tests.cu | 422 ++++++++++++++++ .../main/java/ai/rapids/cudf/Aggregation.java | 2 +- .../ai/rapids/cudf/GroupByAggregation.java | 2 +- .../ai/rapids/cudf/ReductionAggregation.java | 17 +- .../cudf/SegmentedReductionAggregation.java | 11 +- 16 files changed, 941 insertions(+), 495 deletions(-) create mode 100644 cpp/tests/reductions/host_udf_example_tests.cu diff --git a/cpp/include/cudf/aggregation.hpp b/cpp/include/cudf/aggregation.hpp index a1b7db5e08a..2b2a660bed7 100644 --- a/cpp/include/cudf/aggregation.hpp +++ b/cpp/include/cudf/aggregation.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -601,7 +601,7 @@ std::unique_ptr make_udf_aggregation(udf_type type, data_type output_type); // Forward declaration of `host_udf_base` for the factory function of `HOST_UDF` aggregation. -struct host_udf_base; +class host_udf_base; /** * @brief Factory to create a HOST_UDF aggregation. diff --git a/cpp/include/cudf/aggregation/host_udf.hpp b/cpp/include/cudf/aggregation/host_udf.hpp index bbce76dc5f3..451d75137e4 100644 --- a/cpp/include/cudf/aggregation/host_udf.hpp +++ b/cpp/include/cudf/aggregation/host_udf.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,18 +17,16 @@ #pragma once #include +#include #include #include #include -#include #include #include +#include #include -#include -#include -#include /** * @file host_udf.hpp @@ -43,49 +41,141 @@ namespace CUDF_EXPORT cudf { */ /** - * @brief The interface for host-based UDF implementation. + * @brief The fundamental interface for host-based UDF implementation. * - * An implementation of host-based UDF needs to be derived from this base class, defining - * its own version of the required functions. In particular: - * - The derived class is required to implement `get_empty_output`, `operator()`, `is_equal`, - * and `clone` functions. - * - If necessary, the derived class can also override `do_hash` to compute hashing for its - * instance, and `get_required_data` to selectively access to the input data as well as - * intermediate data provided by libcudf. + * This class declares the functions `do_hash`, `is_equal`, and `clone` that must be defined in + * the users' UDF implementation. These functions are required for libcudf aggregation framework + * to perform its operations. + */ +class host_udf_base { + // Declare constructor private to prevent the users from deriving from this class. + private: + host_udf_base() = default; ///< Default constructor + + // Only allow deriving from the structs below. + friend struct reduce_host_udf; + friend struct segmented_reduce_host_udf; + friend struct groupby_host_udf; + + public: + virtual ~host_udf_base() = default; ///< Default destructor + + /** + * @brief Computes hash value of the instance. + * + * Overriding this function is optional when the derived class has data members such that + * each instance needs to be differentiated from each other. + * + * @return The hash value of the instance + */ + [[nodiscard]] virtual std::size_t do_hash() const + { + return std::hash{}(static_cast(aggregation::Kind::HOST_UDF)); + } + + /** + * @brief Compares two instances of the derived class for equality. + * @param other The other instance to compare with + * @return True if the two instances are equal + */ + [[nodiscard]] virtual bool is_equal(host_udf_base const& other) const = 0; + + /** + * @brief Clones the instance. + * + * The instances of the derived class should be lightweight for efficient cloning. + * + * @return A new instance cloned from this one + */ + [[nodiscard]] virtual std::unique_ptr clone() const = 0; +}; + +/** + * @brief The interface for host-based UDF implementation for reduction contexts. + * + * An implementation of host-based UDF for reduction needs to be derived from this class. + * In addition to implementing the virtual functions declared in the base class `host_udf_base`, + * such derived classes must also define the `operator()` function to perform reduction + * operations. * - * Example of such implementation: + * Example: * @code{.cpp} - * struct my_udf_aggregation : cudf::host_udf_base { + * struct my_udf_aggregation : cudf::reduce_host_udf { * my_udf_aggregation() = default; * - * // This UDF aggregation needs `GROUPED_VALUES` and `GROUP_OFFSETS`, - * // and the result from groupby `MAX` aggregation. - * [[nodiscard]] data_attribute_set_t get_required_data() const override + * [[nodiscard]] std::unique_ptr operator()( + * column_view const& input, + * data_type output_dtype, + * std::optional> init, + * rmm::cuda_stream_view stream, + * rmm::device_async_resource_ref mr) const override * { - * return {groupby_data_attribute::GROUPED_VALUES, - * groupby_data_attribute::GROUP_OFFSETS, - * cudf::make_max_aggregation()}; + * // Perform reduction computation using the input data and return the reduction result. + * // This is where the actual reduction logic is implemented. * } * - * [[nodiscard]] output_t get_empty_output( - * [[maybe_unused]] std::optional output_dtype, - * [[maybe_unused]] rmm::cuda_stream_view stream, - * [[maybe_unused]] rmm::device_async_resource_ref mr) const override + * [[nodiscard]] bool is_equal(host_udf_base const& other) const override * { - * // This UDF aggregation always returns a column of type INT32. - * return cudf::make_empty_column(cudf::data_type{cudf::type_id::INT32}); + * // Check if the other object is also instance of this class. + * // If there are internal state variables, they may need to be checked for equality as well. + * return dynamic_cast(&other) != nullptr; * } * - * [[nodiscard]] output_t operator()(input_map_t const& input, - * rmm::cuda_stream_view stream, - * rmm::device_async_resource_ref mr) const override + * [[nodiscard]] std::unique_ptr clone() const override * { - * // Perform UDF computation using the input data and return the result. + * return std::make_unique(); + * } + * }; + * @endcode + */ +struct reduce_host_udf : host_udf_base { + /** + * @brief Perform reduction operations. + * + * @param input The input column for reduction + * @param output_dtype The data type for the final output scalar + * @param init The initial value of the reduction + * @param stream The CUDA stream to use for any kernel launches + * @param mr Device memory resource to use for any allocations + * @return The output result of the aggregation + */ + [[nodiscard]] virtual std::unique_ptr operator()( + column_view const& input, + data_type output_dtype, + std::optional> init, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const = 0; +}; + +/** + * @brief The interface for host-based UDF implementation for segmented reduction context. + * + * An implementation of host-based UDF for segmented reduction needs to be derived from this class. + * In addition to implementing the virtual functions declared in the base class `host_udf_base`, + * such derived class must also define the `operator()` function to perform segmented reduction. + * + * Example: + * @code{.cpp} + * struct my_udf_aggregation : cudf::segmented_reduce_host_udf { + * my_udf_aggregation() = default; + * + * [[nodiscard]] std::unique_ptr operator()( + * column_view const& input, + * device_span offsets, + * data_type output_dtype, + * null_policy null_handling, + * std::optional> init, + * rmm::cuda_stream_view stream, + * rmm::device_async_resource_ref mr) const override + * { + * // Perform computation using the input data and return the result. + * // This is where the actual segmented reduction logic is implemented. * } * * [[nodiscard]] bool is_equal(host_udf_base const& other) const override * { * // Check if the other object is also instance of this class. + * // If there are internal state variables, they may need to be checked for equality as well. * return dynamic_cast(&other) != nullptr; * } * @@ -96,198 +186,232 @@ namespace CUDF_EXPORT cudf { * }; * @endcode */ -struct host_udf_base { - host_udf_base() = default; - virtual ~host_udf_base() = default; - +struct segmented_reduce_host_udf : host_udf_base { /** - * @brief Define the possible data needed for groupby aggregations. + * @brief Perform segmented reduction operations. * - * Note that only sort-based groupby aggregations are supported. + * @param input The input column for reduction + * @param offsets A list of offsets defining the segments for reduction + * @param output_dtype The data type for the final output column + * @param null_handling If `INCLUDE` then the reduction result is valid only if all elements in + * the segment are valid, and if `EXCLUDE` then the reduction result is valid if any + * element in the segment is valid + * @param init The initial value of the reduction + * @param stream The CUDA stream to use for any kernel launches + * @param mr Device memory resource to use for any allocations + * @return The output result of the aggregation */ - enum class groupby_data_attribute : int32_t { - INPUT_VALUES, ///< The input values column. - GROUPED_VALUES, ///< The input values grouped according to the input `keys` for which the - ///< values within each group maintain their original order. - SORTED_GROUPED_VALUES, ///< The input values grouped according to the input `keys` and - ///< sorted within each group. - NUM_GROUPS, ///< The number of groups (i.e., number of distinct keys). - GROUP_OFFSETS, ///< The offsets separating groups. - GROUP_LABELS ///< Group labels (which is also the same as group indices). - }; + [[nodiscard]] virtual std::unique_ptr operator()( + column_view const& input, + device_span offsets, + data_type output_dtype, + null_policy null_handling, + std::optional> init, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const = 0; +}; +// Forward declaration. +namespace groupby ::detail { +struct aggregate_result_functor; +} + +/** + * @brief The interface for host-based UDF implementation for groupby aggregation context. + * + * An implementation of host-based UDF for groupby needs to be derived from this class. + * In addition to implementing the virtual functions declared in the base class `host_udf_base`, + * such a derived class must also define the functions `get_empty_output()` to return result when + * the input is empty, and ``operator()`` to perform its groupby operations. + * + * During execution, the derived class can access internal data provided by the libcudf groupby + * framework through a set of ``get*`` accessors, as well as calling other built-in groupby + * aggregations through the ``compute_aggregation`` function. + * + * @note The derived class can only perform sort-based groupby aggregations. Hash-based groupby + * aggregations require more complex data structure and is not yet supported. + * + * Example: + * @code{.cpp} + * struct my_udf_aggregation : cudf::groupby_host_udf { + * my_udf_aggregation() = default; + * + * [[nodiscard]] std::unique_ptr get_empty_output( + * rmm::cuda_stream_view stream, + * rmm::device_async_resource_ref mr) const override + * { + * // Return a column corresponding to the result when the input values column is empty. + * } + * + * [[nodiscard]] std::unique_ptr operator()( + * rmm::cuda_stream_view stream, + * rmm::device_async_resource_ref mr) const override + * { + * // Perform UDF computation using the input data and return the result. + * } + * + * [[nodiscard]] bool is_equal(host_udf_base const& other) const override + * { + * // Check if the other object is also instance of this class. + * // If there are internal state variables, they may need to be checked for equality as well. + * return dynamic_cast(&other) != nullptr; + * } + * + * [[nodiscard]] std::unique_ptr clone() const override + * { + * return std::make_unique(); + * } + * }; + * @endcode + */ +struct groupby_host_udf : host_udf_base { /** - * @brief Describe possible data that may be needed in the derived class for its operations. + * @brief Get the output when the input values column is empty. * - * Such data can be either intermediate data such as sorted values or group labels etc, or the - * results of other aggregations. + * This is called in libcudf when the input values column is empty. In such situations libcudf + * tries to generate the output directly without unnecessarily evaluating the intermediate data. * - * Each derived host-based UDF class may need a different set of data. It is inefficient to - * evaluate and pass down all these possible data at once from libcudf. A solution for that is, - * the derived class can define a subset of data that it needs and libcudf will evaluate - * and pass down only data requested from that set. + * @param stream The CUDA stream to use for any kernel launches + * @param mr Device memory resource to use for any allocations + * @return The output result of the aggregation when the input values column is empty */ - struct data_attribute { - /** - * @brief Hold all possible data types for the input of the aggregation in the derived class. - */ - using value_type = std::variant>; - value_type value; ///< The actual data attribute, wrapped by this struct - ///< as a wrapper is needed to define `hash` and `equal_to` functors. - - data_attribute() = default; ///< Default constructor - data_attribute(data_attribute&&) = default; ///< Move constructor - - /** - * @brief Construct a new data attribute from an aggregation attribute. - * @param value_ An aggregation attribute - */ - template )> - data_attribute(T value_) : value{value_} - { - } - - /** - * @brief Construct a new data attribute from another aggregation request. - * @param value_ An aggregation request - */ - template || - std::is_same_v)> - data_attribute(std::unique_ptr value_) : value{std::move(value_)} - { - CUDF_EXPECTS(std::get>(value) != nullptr, - "Invalid aggregation request."); - if constexpr (std::is_same_v) { - CUDF_EXPECTS( - dynamic_cast(std::get>(value).get()) != nullptr, - "Requesting results from other aggregations is only supported in groupby " - "aggregations."); - } - } - - /** - * @brief Copy constructor. - * @param other The other data attribute to copy from - */ - data_attribute(data_attribute const& other); - - /** - * @brief Hash functor for `data_attribute`. - */ - struct hash { - /** - * @brief Compute the hash value of a data attribute. - * @param attr The data attribute to hash - * @return The hash value of the data attribute - */ - std::size_t operator()(data_attribute const& attr) const; - }; // struct hash - - /** - * @brief Equality comparison functor for `data_attribute`. - */ - struct equal_to { - /** - * @brief Check if two data attributes are equal. - * @param lhs The left-hand side data attribute - * @param rhs The right-hand side data attribute - * @return True if the two data attributes are equal - */ - bool operator()(data_attribute const& lhs, data_attribute const& rhs) const; - }; // struct equal_to - }; // struct data_attribute + [[nodiscard]] virtual std::unique_ptr get_empty_output( + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const = 0; /** - * @brief Set of attributes for the input data that is needed for computing the aggregation. + * @brief Perform the main groupby computation for the host-based UDF. + * + * @param stream The CUDA stream to use for any kernel launches + * @param mr Device memory resource to use for any allocations + * @return The output result of the aggregation */ - using data_attribute_set_t = - std::unordered_set; + [[nodiscard]] virtual std::unique_ptr operator()( + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const = 0; + + private: + // Allow the struct `aggregate_result_functor` to set its private callback variables. + friend struct groupby::detail::aggregate_result_functor; /** - * @brief Return a set of attributes for the data that is needed for computing the aggregation. - * - * The derived class should return the attributes corresponding to only the data that it needs to - * avoid unnecessary computation performed in libcudf. If this function is not overridden, an - * empty set is returned. That means all the data attributes (except results from other - * aggregations in groupby) will be needed. - * - * @return A set of `data_attribute` + * @brief Callback to access the input values column. + */ + std::function callback_input_values; + + /** + * @brief Callback to access the input values grouped according to the input keys for which the + * values within each group maintain their original order. + */ + std::function callback_grouped_values; + + /** + * @brief Callback to access the input values grouped according to the input keys and sorted + * within each group. + */ + std::function callback_sorted_grouped_values; + + /** + * @brief Callback to access the number of groups (i.e., number of distinct keys). */ - [[nodiscard]] virtual data_attribute_set_t get_required_data() const { return {}; } + std::function callback_num_groups; /** - * @brief Hold all possible types of the data that is passed to the derived class for executing - * the aggregation. + * @brief Callback to access the offsets separating groups. */ - using input_data_t = std::variant>; + std::function(void)> callback_group_offsets; /** - * @brief Input to the aggregation, mapping from each data attribute to its actual data. + * @brief Callback to access the group labels (which is also the same as group indices). */ - using input_map_t = std:: - unordered_map; + std::function(void)> callback_group_labels; /** - * @brief Output type of the aggregation. + * @brief Callback to access the result from other groupby aggregations. + */ + std::function)> callback_compute_aggregation; + + protected: + /** + * @brief Access the input values column. * - * Currently only a single type is supported as the output of the aggregation, but it will hold - * more type in the future when reduction is supported. + * @return The input values column. */ - using output_t = std::variant>; + [[nodiscard]] column_view get_input_values() const + { + CUDF_EXPECTS(callback_input_values, "Uninitialized callback_input_values."); + return callback_input_values(); + } /** - * @brief Get the output when the input values column is empty. + * @brief Access the input values grouped according to the input keys for which the values + * within each group maintain their original order. * - * This is called in libcudf when the input values column is empty. In such situations libcudf - * tries to generate the output directly without unnecessarily evaluating the intermediate data. + * @return The grouped values column. + */ + [[nodiscard]] column_view get_grouped_values() const + { + CUDF_EXPECTS(callback_grouped_values, "Uninitialized callback_grouped_values."); + return callback_grouped_values(); + } + + /** + * @brief Access the input values grouped according to the input keys and sorted within each + * group. * - * @param output_dtype The expected output data type - * @param stream The CUDA stream to use for any kernel launches - * @param mr Device memory resource to use for any allocations - * @return The output result of the aggregation when input values is empty + * @return The sorted grouped values column. */ - [[nodiscard]] virtual output_t get_empty_output(std::optional output_dtype, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) const = 0; + [[nodiscard]] column_view get_sorted_grouped_values() const + { + CUDF_EXPECTS(callback_sorted_grouped_values, "Uninitialized callback_sorted_grouped_values."); + return callback_sorted_grouped_values(); + } /** - * @brief Perform the main computation for the host-based UDF. + * @brief Access the number of groups (i.e., number of distinct keys). * - * @param input The input data needed for performing all computation - * @param stream The CUDA stream to use for any kernel launches - * @param mr Device memory resource to use for any allocations - * @return The output result of the aggregation + * @return The number of groups. */ - [[nodiscard]] virtual output_t operator()(input_map_t const& input, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) const = 0; + [[nodiscard]] size_type get_num_groups() const + { + CUDF_EXPECTS(callback_num_groups, "Uninitialized callback_num_groups."); + return callback_num_groups(); + } /** - * @brief Computes hash value of the class's instance. - * @return The hash value of the instance + * @brief Access the offsets separating groups. + * + * @return The array of group offsets. */ - [[nodiscard]] virtual std::size_t do_hash() const + [[nodiscard]] device_span get_group_offsets() const { - return std::hash{}(static_cast(aggregation::Kind::HOST_UDF)); + CUDF_EXPECTS(callback_group_offsets, "Uninitialized callback_group_offsets."); + return callback_group_offsets(); } /** - * @brief Compares two instances of the derived class for equality. - * @param other The other derived class's instance to compare with - * @return True if the two instances are equal + * @brief Access the group labels (which is also the same as group indices). + * + * @return The array of group labels. */ - [[nodiscard]] virtual bool is_equal(host_udf_base const& other) const = 0; + [[nodiscard]] device_span get_group_labels() const + { + CUDF_EXPECTS(callback_group_labels, "Uninitialized callback_group_labels."); + return callback_group_labels(); + } /** - * @brief Clones the instance. + * @brief Compute a built-in groupby aggregation and access its result. * - * A class derived from `host_udf_base` should not store too much data such that its instances - * remain lightweight for efficient cloning. + * This allows the derived class to call any other built-in groupby aggregations on the same input + * values column and access the output for its operations. * - * @return A new instance cloned from this + * @param other_agg An arbitrary built-in groupby aggregation + * @return A `column_view` object corresponding to the output result of the given aggregation */ - [[nodiscard]] virtual std::unique_ptr clone() const = 0; + [[nodiscard]] column_view compute_aggregation(std::unique_ptr other_agg) const + { + CUDF_EXPECTS(callback_compute_aggregation, "Uninitialized callback for computing aggregation."); + return callback_compute_aggregation(std::move(other_agg)); + } }; /** @} */ // end of group diff --git a/cpp/include/cudf/detail/aggregation/aggregation.hpp b/cpp/include/cudf/detail/aggregation/aggregation.hpp index d873e93bd20..5574ed6ea6e 100644 --- a/cpp/include/cudf/detail/aggregation/aggregation.hpp +++ b/cpp/include/cudf/detail/aggregation/aggregation.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -967,7 +967,9 @@ class udf_aggregation final : public rolling_aggregation { /** * @brief Derived class for specifying host-based UDF aggregation. */ -class host_udf_aggregation final : public groupby_aggregation { +class host_udf_aggregation final : public groupby_aggregation, + public reduce_aggregation, + public segmented_reduce_aggregation { public: std::unique_ptr udf_ptr; diff --git a/cpp/src/groupby/groupby.cu b/cpp/src/groupby/groupby.cu index 4c90cd0eef5..6234148e9fa 100644 --- a/cpp/src/groupby/groupby.cu +++ b/cpp/src/groupby/groupby.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -145,8 +145,11 @@ struct empty_column_constructor { } if constexpr (k == aggregation::Kind::HOST_UDF) { - auto const& udf_ptr = dynamic_cast(agg).udf_ptr; - return std::get>(udf_ptr->get_empty_output(std::nullopt, stream, mr)); + auto const& udf_base_ptr = + dynamic_cast(agg).udf_ptr; + auto const udf_ptr = dynamic_cast(udf_base_ptr.get()); + CUDF_EXPECTS(udf_ptr != nullptr, "Invalid HOST_UDF instance for groupby aggregation."); + return udf_ptr->get_empty_output(stream, mr); } return make_empty_column(target_type(values.type(), k)); diff --git a/cpp/src/groupby/sort/aggregate.cpp b/cpp/src/groupby/sort/aggregate.cpp index 6480070e85a..fb3f7559d64 100644 --- a/cpp/src/groupby/sort/aggregate.cpp +++ b/cpp/src/groupby/sort/aggregate.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -795,58 +795,41 @@ void aggregate_result_functor::operator()(aggregation con { if (cache.has_result(values, agg)) { return; } - auto const& udf_ptr = dynamic_cast(agg).udf_ptr; - auto const data_attrs = [&]() -> host_udf_base::data_attribute_set_t { - if (auto tmp = udf_ptr->get_required_data(); !tmp.empty()) { return tmp; } - // Empty attribute set means everything. - return {host_udf_base::groupby_data_attribute::INPUT_VALUES, - host_udf_base::groupby_data_attribute::GROUPED_VALUES, - host_udf_base::groupby_data_attribute::SORTED_GROUPED_VALUES, - host_udf_base::groupby_data_attribute::NUM_GROUPS, - host_udf_base::groupby_data_attribute::GROUP_OFFSETS, - host_udf_base::groupby_data_attribute::GROUP_LABELS}; - }(); + auto const& udf_base_ptr = dynamic_cast(agg).udf_ptr; + auto const udf_ptr = dynamic_cast(udf_base_ptr.get()); + CUDF_EXPECTS(udf_ptr != nullptr, "Invalid HOST_UDF instance for groupby aggregation."); - // Do not cache udf_input, as the actual input data may change from run to run. - host_udf_base::input_map_t udf_input; - for (auto const& attr : data_attrs) { - CUDF_EXPECTS(std::holds_alternative(attr.value) || - std::holds_alternative>(attr.value), - "Invalid input data attribute for HOST_UDF groupby aggregation."); - if (std::holds_alternative(attr.value)) { - switch (std::get(attr.value)) { - case host_udf_base::groupby_data_attribute::INPUT_VALUES: - udf_input.emplace(attr, values); - break; - case host_udf_base::groupby_data_attribute::GROUPED_VALUES: - udf_input.emplace(attr, get_grouped_values()); - break; - case host_udf_base::groupby_data_attribute::SORTED_GROUPED_VALUES: - udf_input.emplace(attr, get_sorted_values()); - break; - case host_udf_base::groupby_data_attribute::NUM_GROUPS: - udf_input.emplace(attr, helper.num_groups(stream)); - break; - case host_udf_base::groupby_data_attribute::GROUP_OFFSETS: - udf_input.emplace(attr, helper.group_offsets(stream)); - break; - case host_udf_base::groupby_data_attribute::GROUP_LABELS: - udf_input.emplace(attr, helper.group_labels(stream)); - break; - default: CUDF_UNREACHABLE("Invalid input data attribute for HOST_UDF groupby aggregation."); - } - } else { // data is result from another aggregation - auto other_agg = std::get>(attr.value)->clone(); + if (!udf_ptr->callback_input_values) { + udf_ptr->callback_input_values = [&]() -> column_view { return values; }; + } + if (!udf_ptr->callback_grouped_values) { + udf_ptr->callback_grouped_values = [&]() -> column_view { return get_grouped_values(); }; + } + if (!udf_ptr->callback_sorted_grouped_values) { + udf_ptr->callback_sorted_grouped_values = [&]() -> column_view { return get_sorted_values(); }; + } + if (!udf_ptr->callback_num_groups) { + udf_ptr->callback_num_groups = [&]() -> size_type { return helper.num_groups(stream); }; + } + if (!udf_ptr->callback_group_offsets) { + udf_ptr->callback_group_offsets = [&]() -> device_span { + return helper.group_offsets(stream); + }; + } + if (!udf_ptr->callback_group_labels) { + udf_ptr->callback_group_labels = [&]() -> device_span { + return helper.group_labels(stream); + }; + } + if (!udf_ptr->callback_compute_aggregation) { + udf_ptr->callback_compute_aggregation = + [&](std::unique_ptr other_agg) -> column_view { cudf::detail::aggregation_dispatcher(other_agg->kind, *this, *other_agg); - auto result = cache.get_result(values, *other_agg); - udf_input.emplace(std::move(other_agg), std::move(result)); - } + return cache.get_result(values, *other_agg); + }; } - auto output = (*udf_ptr)(udf_input, stream, mr); - CUDF_EXPECTS(std::holds_alternative>(output), - "Invalid output type from HOST_UDF groupby aggregation."); - cache.add_result(values, agg, std::get>(std::move(output))); + cache.add_result(values, agg, (*udf_ptr)(stream, mr)); } } // namespace detail diff --git a/cpp/src/groupby/sort/host_udf_aggregation.cpp b/cpp/src/groupby/sort/host_udf_aggregation.cpp index 0da47e17f48..6f1fe80c4bd 100644 --- a/cpp/src/groupby/sort/host_udf_aggregation.cpp +++ b/cpp/src/groupby/sort/host_udf_aggregation.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,51 +16,9 @@ #include #include -#include namespace cudf { -host_udf_base::data_attribute::data_attribute(data_attribute const& other) - : value{std::visit(cudf::detail::visitor_overload{[](auto const& val) { return value_type{val}; }, - [](std::unique_ptr const& val) { - return value_type{val->clone()}; - }}, - other.value)} -{ -} - -std::size_t host_udf_base::data_attribute::hash::operator()(data_attribute const& attr) const -{ - auto const hash_value = - std::visit(cudf::detail::visitor_overload{ - [](auto const& val) { return std::hash{}(static_cast(val)); }, - [](std::unique_ptr const& val) { return val->do_hash(); }}, - attr.value); - return std::hash{}(attr.value.index()) ^ hash_value; -} - -bool host_udf_base::data_attribute::equal_to::operator()(data_attribute const& lhs, - data_attribute const& rhs) const -{ - auto const& lhs_val = lhs.value; - auto const& rhs_val = rhs.value; - if (lhs_val.index() != rhs_val.index()) { return false; } - return std::visit( - cudf::detail::visitor_overload{ - [](auto const& lhs_val, auto const& rhs_val) { - if constexpr (std::is_same_v) { - return lhs_val == rhs_val; - } else { - return false; - } - }, - [](std::unique_ptr const& lhs_val, std::unique_ptr const& rhs_val) { - return lhs_val->is_equal(*rhs_val); - }}, - lhs_val, - rhs_val); -} - namespace detail { host_udf_aggregation::host_udf_aggregation(std::unique_ptr udf_ptr_) @@ -99,5 +57,9 @@ template CUDF_EXPORT std::unique_ptr make_host_udf_aggregation); template CUDF_EXPORT std::unique_ptr make_host_udf_aggregation(std::unique_ptr); +template CUDF_EXPORT std::unique_ptr + make_host_udf_aggregation(std::unique_ptr); +template CUDF_EXPORT std::unique_ptr + make_host_udf_aggregation(std::unique_ptr); } // namespace cudf diff --git a/cpp/src/reductions/reductions.cpp b/cpp/src/reductions/reductions.cpp index 75ebc078930..928625a7e8f 100644 --- a/cpp/src/reductions/reductions.cpp +++ b/cpp/src/reductions/reductions.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,6 +14,7 @@ * limitations under the License. */ +#include #include #include #include @@ -144,6 +145,13 @@ struct reduce_dispatch_functor { auto td_agg = static_cast(agg); return tdigest::detail::reduce_merge_tdigest(col, td_agg.max_centroids, stream, mr); } + case aggregation::HOST_UDF: { + auto const& udf_base_ptr = + dynamic_cast(agg).udf_ptr; + auto const udf_ptr = dynamic_cast(udf_base_ptr.get()); + CUDF_EXPECTS(udf_ptr != nullptr, "Invalid HOST_UDF instance for reduction."); + return (*udf_ptr)(col, output_dtype, init, stream, mr); + } // case aggregation::HOST_UDF default: CUDF_FAIL("Unsupported reduction operator"); } } @@ -161,9 +169,11 @@ std::unique_ptr reduce(column_view const& col, cudf::data_type_error); if (init.has_value() && !(agg.kind == aggregation::SUM || agg.kind == aggregation::PRODUCT || agg.kind == aggregation::MIN || agg.kind == aggregation::MAX || - agg.kind == aggregation::ANY || agg.kind == aggregation::ALL)) { + agg.kind == aggregation::ANY || agg.kind == aggregation::ALL || + agg.kind == aggregation::HOST_UDF)) { CUDF_FAIL( - "Initial value is only supported for SUM, PRODUCT, MIN, MAX, ANY, and ALL aggregation types"); + "Initial value is only supported for SUM, PRODUCT, MIN, MAX, ANY, ALL, and HOST_UDF " + "aggregation types"); } // Returns default scalar if input column is empty or all null diff --git a/cpp/src/reductions/segmented/reductions.cpp b/cpp/src/reductions/segmented/reductions.cpp index 1c3a2b0c0f3..5835bfcf0a1 100644 --- a/cpp/src/reductions/segmented/reductions.cpp +++ b/cpp/src/reductions/segmented/reductions.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. + * Copyright (c) 2022-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,6 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +#include #include #include #include @@ -98,6 +100,13 @@ struct segmented_reduce_dispatch_functor { } case segmented_reduce_aggregation::NUNIQUE: return segmented_nunique(col, offsets, null_handling, stream, mr); + case aggregation::HOST_UDF: { + auto const& udf_base_ptr = + dynamic_cast(agg).udf_ptr; + auto const udf_ptr = dynamic_cast(udf_base_ptr.get()); + CUDF_EXPECTS(udf_ptr != nullptr, "Invalid HOST_UDF instance for segmented reduction."); + return (*udf_ptr)(col, offsets, output_dtype, null_handling, init, stream, mr); + } // case aggregation::HOST_UDF default: CUDF_FAIL("Unsupported aggregation type."); } } @@ -117,9 +126,11 @@ std::unique_ptr segmented_reduce(column_view const& segmented_values, cudf::data_type_error); if (init.has_value() && !(agg.kind == aggregation::SUM || agg.kind == aggregation::PRODUCT || agg.kind == aggregation::MIN || agg.kind == aggregation::MAX || - agg.kind == aggregation::ANY || agg.kind == aggregation::ALL)) { + agg.kind == aggregation::ANY || agg.kind == aggregation::ALL || + agg.kind == aggregation::HOST_UDF)) { CUDF_FAIL( - "Initial value is only supported for SUM, PRODUCT, MIN, MAX, ANY, and ALL aggregation types"); + "Initial value is only supported for SUM, PRODUCT, MIN, MAX, ANY, ALL, and HOST_UDF " + "aggregation types"); } if (segmented_values.is_empty() && offsets.empty()) { diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 344979e1288..35877ac34b9 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -220,11 +220,12 @@ ConfigureTest( REDUCTIONS_TEST reductions/collect_ops_tests.cpp reductions/ewm_tests.cpp + reductions/host_udf_example_tests.cu + reductions/list_rank_test.cpp reductions/rank_tests.cpp reductions/reduction_tests.cpp reductions/scan_tests.cpp reductions/segmented_reduction_tests.cpp - reductions/list_rank_test.cpp reductions/tdigest_tests.cu GPUS 1 PERCENT 70 diff --git a/cpp/tests/groupby/host_udf_example_tests.cu b/cpp/tests/groupby/host_udf_example_tests.cu index a454bd692fc..e1ded37d8a7 100644 --- a/cpp/tests/groupby/host_udf_example_tests.cu +++ b/cpp/tests/groupby/host_udf_example_tests.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,9 +21,7 @@ #include #include #include -#include #include -#include #include #include @@ -34,6 +32,9 @@ #include #include +using doubles_col = cudf::test::fixed_width_column_wrapper; +using int32s_col = cudf::test::fixed_width_column_wrapper; + namespace { /** * @brief A host-based UDF implementation for groupby. @@ -41,42 +42,21 @@ namespace { * For each group of values, the aggregation computes * `(group_idx + 1) * group_sum_of_squares - group_max * group_sum`. */ -struct host_udf_groupby_example : cudf::host_udf_base { +struct host_udf_groupby_example : cudf::groupby_host_udf { host_udf_groupby_example() = default; - [[nodiscard]] data_attribute_set_t get_required_data() const override - { - // We need grouped values, group offsets, group labels, and also results from groups' - // MAX and SUM aggregations. - return {groupby_data_attribute::GROUPED_VALUES, - groupby_data_attribute::GROUP_OFFSETS, - groupby_data_attribute::GROUP_LABELS, - cudf::make_max_aggregation(), - cudf::make_sum_aggregation()}; - } - - [[nodiscard]] output_t get_empty_output( - [[maybe_unused]] std::optional output_dtype, - [[maybe_unused]] rmm::cuda_stream_view stream, - [[maybe_unused]] rmm::device_async_resource_ref mr) const override + [[nodiscard]] std::unique_ptr get_empty_output( + rmm::cuda_stream_view, rmm::device_async_resource_ref) const override { return cudf::make_empty_column( cudf::data_type{cudf::type_to_id()}); } - [[nodiscard]] output_t operator()(input_map_t const& input, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) const override + [[nodiscard]] std::unique_ptr operator()( + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const override { - auto const& values = - std::get(input.at(groupby_data_attribute::GROUPED_VALUES)); - return cudf::type_dispatcher(values.type(), groupby_fn{this}, input, stream, mr); - } - - [[nodiscard]] std::size_t do_hash() const override - { - // Just return the same hash for all instances of this class. - return std::size_t{12345}; + auto const values = get_grouped_values(); + return cudf::type_dispatcher(values.type(), groupby_fn{*this}, stream, mr); } [[nodiscard]] bool is_equal(host_udf_base const& other) const override @@ -92,37 +72,33 @@ struct host_udf_groupby_example : cudf::host_udf_base { struct groupby_fn { // Store pointer to the parent class so we can call its functions. - host_udf_groupby_example const* parent; + host_udf_groupby_example const& parent; - // For simplicity, this example only accepts double input and always produces double output. + // For simplicity, this example only accepts a single type input and output. using InputType = double; using OutputType = double; template )> - output_t operator()(Args...) const + std::unique_ptr operator()(Args...) const { CUDF_FAIL("Unsupported input type."); } template )> - output_t operator()(input_map_t const& input, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) const + std::unique_ptr operator()(rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const { - auto const& values = - std::get(input.at(groupby_data_attribute::GROUPED_VALUES)); - if (values.size() == 0) { return parent->get_empty_output(std::nullopt, stream, mr); } + auto const values = parent.get_grouped_values(); + if (values.size() == 0) { return parent.get_empty_output(stream, mr); } - auto const offsets = std::get>( - input.at(groupby_data_attribute::GROUP_OFFSETS)); + auto const offsets = parent.get_group_offsets(); CUDF_EXPECTS(offsets.size() > 0, "Invalid offsets."); auto const num_groups = static_cast(offsets.size()) - 1; - auto const group_indices = std::get>( - input.at(groupby_data_attribute::GROUP_LABELS)); - auto const group_max = std::get( - input.at(cudf::make_max_aggregation())); - auto const group_sum = std::get( - input.at(cudf::make_sum_aggregation())); + auto const group_indices = parent.get_group_labels(); + auto const group_max = + parent.compute_aggregation(cudf::make_max_aggregation()); + auto const group_sum = + parent.compute_aggregation(cudf::make_sum_aggregation()); auto const values_dv_ptr = cudf::column_device_view::create(values, stream); auto const output = cudf::make_numeric_column(cudf::data_type{cudf::type_to_id()}, @@ -191,9 +167,6 @@ struct host_udf_groupby_example : cudf::host_udf_base { } // namespace -using doubles_col = cudf::test::fixed_width_column_wrapper; -using int32s_col = cudf::test::fixed_width_column_wrapper; - struct HostUDFGroupbyExampleTest : cudf::test::BaseFixture {}; TEST_F(HostUDFGroupbyExampleTest, SimpleInput) diff --git a/cpp/tests/groupby/host_udf_tests.cpp b/cpp/tests/groupby/host_udf_tests.cpp index 1a0f68c0c6c..17da28cdefc 100644 --- a/cpp/tests/groupby/host_udf_tests.cpp +++ b/cpp/tests/groupby/host_udf_tests.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,178 +26,121 @@ #include namespace { + /** - * @brief A host-based UDF implementation used for unit tests. + * @brief Generate a random aggregation object from {min, max, sum, product}. */ -struct host_udf_test_base : cudf::host_udf_base { +std::unique_ptr get_random_agg() +{ + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution distr(1, 4); + switch (distr(gen)) { + case 1: return cudf::make_min_aggregation(); + case 2: return cudf::make_max_aggregation(); + case 3: return cudf::make_sum_aggregation(); + case 4: return cudf::make_product_aggregation(); + default: CUDF_UNREACHABLE("This should not be reached."); + } + return nullptr; +} + +/** + * @brief A host-based UDF implementation used for unit tests for groupby aggregation. + */ +struct host_udf_groupby_test : cudf::groupby_host_udf { int test_location_line; // the location where testing is called bool* test_run; // to check if the test is accidentally skipped - data_attribute_set_t input_attrs; + bool test_other_agg; // test calling other aggregation - host_udf_test_base(int test_location_line_, bool* test_run_, data_attribute_set_t input_attrs_) - : test_location_line{test_location_line_}, - test_run{test_run_}, - input_attrs(std::move(input_attrs_)) + host_udf_groupby_test(int test_location_line_, bool* test_run_, bool test_other_agg_) + : test_location_line{test_location_line_}, test_run{test_run_}, test_other_agg{test_other_agg_} { } - [[nodiscard]] data_attribute_set_t get_required_data() const override { return input_attrs; } - - // This is the main testing function, which checks for the correctness of input data. - // The rests are just to satisfy the interface. - [[nodiscard]] output_t operator()(input_map_t const& input, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) const override + [[nodiscard]] std::size_t do_hash() const override { return 0; } + [[nodiscard]] bool is_equal(host_udf_base const& other) const override { - SCOPED_TRACE("Test instance created at line: " + std::to_string(test_location_line)); - - test_data_attributes(input, stream, mr); - - *test_run = true; // test is run successfully - return get_empty_output(std::nullopt, stream, mr); + // Just check if the other object is also instance of this class. + return dynamic_cast(&other) != nullptr; + } + [[nodiscard]] std::unique_ptr clone() const override + { + return std::make_unique(test_location_line, test_run, test_other_agg); } - [[nodiscard]] output_t get_empty_output( - [[maybe_unused]] std::optional output_dtype, + [[nodiscard]] std::unique_ptr get_empty_output( [[maybe_unused]] rmm::cuda_stream_view stream, [[maybe_unused]] rmm::device_async_resource_ref mr) const override { - // Unused function - dummy output. + // Dummy output. return cudf::make_empty_column(cudf::data_type{cudf::type_id::INT32}); } - [[nodiscard]] std::size_t do_hash() const override { return 0; } - [[nodiscard]] bool is_equal(host_udf_base const& other) const override { return true; } + [[nodiscard]] std::unique_ptr operator()( + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const override + { + SCOPED_TRACE("Test instance created at line: " + std::to_string(test_location_line)); - // The main test function, which must be implemented for each kind of aggregations - // (groupby/reduction/segmented_reduction). - virtual void test_data_attributes(input_map_t const& input, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) const = 0; -}; + // Perform tests on types for the groupby data: we must ensure the data corresponding to each + // `groupby_data` enum having the correct type. -/** - * @brief A host-based UDF implementation used for unit tests for groupby aggregation. - */ -struct host_udf_groupby_test : host_udf_test_base { - host_udf_groupby_test(int test_location_line_, - bool* test_run_, - data_attribute_set_t input_attrs_ = {}) - : host_udf_test_base(test_location_line_, test_run_, std::move(input_attrs_)) - { - } + { + auto const inp_data = get_input_values(); + EXPECT_TRUE((std::is_same_v>)); + } - [[nodiscard]] std::unique_ptr clone() const override - { - return std::make_unique(test_location_line, test_run, input_attrs); - } + { + auto const inp_data = get_grouped_values(); + EXPECT_TRUE((std::is_same_v>)); + } - void test_data_attributes(input_map_t const& input, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) const override - { - data_attribute_set_t check_attrs = input_attrs; - if (check_attrs.empty()) { - check_attrs = data_attribute_set_t{groupby_data_attribute::INPUT_VALUES, - groupby_data_attribute::GROUPED_VALUES, - groupby_data_attribute::SORTED_GROUPED_VALUES, - groupby_data_attribute::NUM_GROUPS, - groupby_data_attribute::GROUP_OFFSETS, - groupby_data_attribute::GROUP_LABELS}; + { + auto const inp_data = get_sorted_grouped_values(); + EXPECT_TRUE((std::is_same_v>)); } - EXPECT_EQ(input.size(), check_attrs.size()); - for (auto const& attr : check_attrs) { - EXPECT_TRUE(input.count(attr) > 0); - EXPECT_TRUE(std::holds_alternative(attr.value) || - std::holds_alternative>(attr.value)); - if (std::holds_alternative(attr.value)) { - switch (std::get(attr.value)) { - case groupby_data_attribute::INPUT_VALUES: - EXPECT_TRUE(std::holds_alternative(input.at(attr))); - break; - case groupby_data_attribute::GROUPED_VALUES: - EXPECT_TRUE(std::holds_alternative(input.at(attr))); - break; - case groupby_data_attribute::SORTED_GROUPED_VALUES: - EXPECT_TRUE(std::holds_alternative(input.at(attr))); - break; - case groupby_data_attribute::NUM_GROUPS: - EXPECT_TRUE(std::holds_alternative(input.at(attr))); - break; - case groupby_data_attribute::GROUP_OFFSETS: - EXPECT_TRUE( - std::holds_alternative>(input.at(attr))); - break; - case groupby_data_attribute::GROUP_LABELS: - EXPECT_TRUE( - std::holds_alternative>(input.at(attr))); - break; - default:; - } - } else { // std::holds_alternative>(attr.value) - EXPECT_TRUE(std::holds_alternative(input.at(attr))); - } + + { + auto const inp_data = get_num_groups(); + EXPECT_TRUE((std::is_same_v>)); } - } -}; -/** - * @brief Get a random subset of input data attributes. - */ -cudf::host_udf_base::data_attribute_set_t get_subset( - cudf::host_udf_base::data_attribute_set_t const& attrs) -{ - std::random_device rd; - std::mt19937 gen(rd()); - std::uniform_int_distribution size_distr(1, attrs.size() - 1); - auto const subset_size = size_distr(gen); - auto const elements = - std::vector(attrs.begin(), attrs.end()); - std::uniform_int_distribution idx_distr(0, attrs.size() - 1); - cudf::host_udf_base::data_attribute_set_t output; - while (output.size() < subset_size) { - output.insert(elements[idx_distr(gen)]); - } - return output; -} + { + auto const inp_data = get_group_offsets(); + EXPECT_TRUE((std::is_same_v, + std::decay_t>)); + } -/** - * @brief Generate a random aggregation object from {min, max, sum, product}. - */ -std::unique_ptr get_random_agg() -{ - std::random_device rd; - std::mt19937 gen(rd()); - std::uniform_int_distribution distr(1, 4); - switch (distr(gen)) { - case 1: return cudf::make_min_aggregation(); - case 2: return cudf::make_max_aggregation(); - case 3: return cudf::make_sum_aggregation(); - case 4: return cudf::make_product_aggregation(); - default: CUDF_UNREACHABLE("This should not be reached."); + { + auto const inp_data = get_group_labels(); + EXPECT_TRUE((std::is_same_v, + std::decay_t>)); + } + + // Perform tests on type of the result from computing other aggregations. + if (test_other_agg) { + auto const inp_data = compute_aggregation(get_random_agg()); + EXPECT_TRUE((std::is_same_v>)); + } + + *test_run = true; // test is run successfully + return get_empty_output(stream, mr); } - return nullptr; -} +}; } // namespace using int32s_col = cudf::test::fixed_width_column_wrapper; -// Number of randomly testing on the input data attributes. -// For each test, a subset of data attributes will be randomly generated from all the possible input -// data attributes. The input data corresponding to that subset passed from libcudf will be tested -// for correctness. -constexpr int NUM_RANDOM_TESTS = 20; - struct HostUDFTest : cudf::test::BaseFixture {}; -TEST_F(HostUDFTest, GroupbyAllInput) +TEST_F(HostUDFTest, GroupbyBuiltinInput) { bool test_run = false; auto const keys = int32s_col{0, 1, 2}; auto const vals = int32s_col{0, 1, 2}; auto agg = cudf::make_host_udf_aggregation( - std::make_unique(__LINE__, &test_run)); + std::make_unique(__LINE__, &test_run, /*test_other_agg*/ false)); std::vector requests; requests.emplace_back(); @@ -205,28 +148,22 @@ TEST_F(HostUDFTest, GroupbyAllInput) requests[0].aggregations.push_back(std::move(agg)); cudf::groupby::groupby gb_obj( cudf::table_view({keys}), cudf::null_policy::INCLUDE, cudf::sorted::NO, {}, {}); - [[maybe_unused]] auto const grp_result = - gb_obj.aggregate(requests, cudf::test::get_default_stream()); + [[maybe_unused]] auto const grp_result = gb_obj.aggregate( + requests, cudf::test::get_default_stream(), cudf::get_current_device_resource_ref()); EXPECT_TRUE(test_run); } -TEST_F(HostUDFTest, GroupbySomeInput) +TEST_F(HostUDFTest, GroupbyWithCallingOtherAggregations) { - auto const keys = int32s_col{0, 1, 2}; - auto const vals = int32s_col{0, 1, 2}; - auto const all_attrs = cudf::host_udf_base::data_attribute_set_t{ - cudf::host_udf_base::groupby_data_attribute::INPUT_VALUES, - cudf::host_udf_base::groupby_data_attribute::GROUPED_VALUES, - cudf::host_udf_base::groupby_data_attribute::SORTED_GROUPED_VALUES, - cudf::host_udf_base::groupby_data_attribute::NUM_GROUPS, - cudf::host_udf_base::groupby_data_attribute::GROUP_OFFSETS, - cudf::host_udf_base::groupby_data_attribute::GROUP_LABELS}; + auto const keys = int32s_col{0, 1, 2}; + auto const vals = int32s_col{0, 1, 2}; + + constexpr int NUM_RANDOM_TESTS = 20; + for (int i = 0; i < NUM_RANDOM_TESTS; ++i) { - bool test_run = false; - auto input_attrs = get_subset(all_attrs); - input_attrs.insert(get_random_agg()); - auto agg = cudf::make_host_udf_aggregation( - std::make_unique(__LINE__, &test_run, std::move(input_attrs))); + bool test_run = false; + auto agg = cudf::make_host_udf_aggregation( + std::make_unique(__LINE__, &test_run, /*test_other_agg*/ true)); std::vector requests; requests.emplace_back(); @@ -234,8 +171,8 @@ TEST_F(HostUDFTest, GroupbySomeInput) requests[0].aggregations.push_back(std::move(agg)); cudf::groupby::groupby gb_obj( cudf::table_view({keys}), cudf::null_policy::INCLUDE, cudf::sorted::NO, {}, {}); - [[maybe_unused]] auto const grp_result = - gb_obj.aggregate(requests, cudf::test::get_default_stream()); + [[maybe_unused]] auto const grp_result = gb_obj.aggregate( + requests, cudf::test::get_default_stream(), cudf::get_current_device_resource_ref()); EXPECT_TRUE(test_run); } } diff --git a/cpp/tests/reductions/host_udf_example_tests.cu b/cpp/tests/reductions/host_udf_example_tests.cu new file mode 100644 index 00000000000..67b88c5306b --- /dev/null +++ b/cpp/tests/reductions/host_udf_example_tests.cu @@ -0,0 +1,422 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include + +using doubles_col = cudf::test::fixed_width_column_wrapper; +using int32s_col = cudf::test::fixed_width_column_wrapper; +using int64s_col = cudf::test::fixed_width_column_wrapper; + +namespace { +/** + * @brief A host-based UDF implementation for reduction. + * + * The aggregation computes `sum(value^2, for value in group)` (this is sum of squared). + */ +struct host_udf_reduction_example : cudf::reduce_host_udf { + host_udf_reduction_example() = default; + + [[nodiscard]] std::unique_ptr operator()( + cudf::column_view const& input, + cudf::data_type output_dtype, + std::optional> init, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const override + { + return cudf::double_type_dispatcher( + input.type(), output_dtype, reduce_fn{}, input, output_dtype, init, stream, mr); + } + + [[nodiscard]] bool is_equal(host_udf_base const& other) const override + { + // Just check if the other object is also instance of this class. + return dynamic_cast(&other) != nullptr; + } + + [[nodiscard]] std::unique_ptr clone() const override + { + return std::make_unique(); + } + + struct reduce_fn { + // For simplicity, this example only accepts a single type input and output. + using InputType = double; + using OutputType = int64_t; + + template || !std::is_same_v)> + std::unique_ptr operator()(Args...) const + { + CUDF_FAIL("Unsupported input/output type."); + } + + template && std::is_same_v)> + [[nodiscard]] std::unique_ptr operator()( + cudf::column_view const& input, + cudf::data_type output_dtype, + std::optional> init, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const + { + CUDF_EXPECTS(output_dtype == cudf::data_type{cudf::type_to_id()}, + "Invalid output type."); + if (input.size() == 0) { + return cudf::make_default_constructed_scalar(output_dtype, stream, mr); + } + + auto const init_value = [&]() -> InputType { + if (init.has_value() && init.value().get().is_valid(stream)) { + auto const numeric_init_scalar = + dynamic_cast const*>(&init.value().get()); + CUDF_EXPECTS(numeric_init_scalar != nullptr, "Invalid init scalar for reduction."); + return numeric_init_scalar->value(stream); + } + return InputType{0}; + }(); + + auto const input_dv_ptr = cudf::column_device_view::create(input, stream); + auto const result = thrust::transform_reduce(rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(input.size()), + transform_fn{*input_dv_ptr}, + static_cast(init_value), + thrust::plus<>{}); + + auto output = cudf::make_numeric_scalar(output_dtype, stream, mr); + static_cast*>(output.get())->set_value(result, stream); + return output; + } + + struct transform_fn { + cudf::column_device_view values; + OutputType __device__ operator()(cudf::size_type idx) const + { + if (values.is_null(idx)) { return OutputType{0}; } + auto const val = static_cast(values.element(idx)); + return val * val; + } + }; + }; +}; + +} // namespace + +struct HostUDFReductionExampleTest : cudf::test::BaseFixture {}; + +TEST_F(HostUDFReductionExampleTest, SimpleInput) +{ + auto const vals = doubles_col{0.0, 1.0, 2.0, 3.0, 4.0, 5.0}; + auto const agg = cudf::make_host_udf_aggregation( + std::make_unique()); + auto const reduced = cudf::reduce(vals, + *agg, + cudf::data_type{cudf::type_id::INT64}, + cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + EXPECT_TRUE(reduced->is_valid()); + EXPECT_EQ(cudf::type_id::INT64, reduced->type().id()); + auto const result = + static_cast*>(reduced.get())->value(cudf::get_default_stream()); + auto constexpr expected = 55; // 0^2 + 1^2 + 2^2 + 3^2 + 4^2 + 5^2 = 55 + EXPECT_EQ(expected, result); +} + +TEST_F(HostUDFReductionExampleTest, EmptyInput) +{ + auto const vals = doubles_col{}; + auto const agg = cudf::make_host_udf_aggregation( + std::make_unique()); + auto const reduced = cudf::reduce(vals, + *agg, + cudf::data_type{cudf::type_id::INT64}, + cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + EXPECT_FALSE(reduced->is_valid()); + EXPECT_EQ(cudf::type_id::INT64, reduced->type().id()); +} + +namespace { + +/** + * @brief A host-based UDF implementation for segmented reduction. + * + * The aggregation computes `sum(value^2, for value in group)` (this is sum of squared). + */ +struct host_udf_segmented_reduction_example : cudf::segmented_reduce_host_udf { + host_udf_segmented_reduction_example() = default; + + [[nodiscard]] std::unique_ptr operator()( + cudf::column_view const& input, + cudf::device_span offsets, + cudf::data_type output_dtype, + cudf::null_policy null_handling, + std::optional> init, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const override + { + return cudf::double_type_dispatcher(input.type(), + output_dtype, + segmented_reduce_fn{}, + input, + offsets, + output_dtype, + null_handling, + init, + stream, + mr); + } + + [[nodiscard]] bool is_equal(host_udf_base const& other) const override + { + // Just check if the other object is also instance of this class. + return dynamic_cast(&other) != nullptr; + } + + [[nodiscard]] std::unique_ptr clone() const override + { + return std::make_unique(); + } + + struct segmented_reduce_fn { + // For simplicity, this example only accepts a single type input and output. + using InputType = double; + using OutputType = int64_t; + + template || !std::is_same_v)> + std::unique_ptr operator()(Args...) const + { + CUDF_FAIL("Unsupported input/output type."); + } + + template && std::is_same_v)> + std::unique_ptr operator()( + cudf::column_view const& input, + cudf::device_span offsets, + cudf::data_type output_dtype, + cudf::null_policy null_handling, + std::optional> init, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const + { + CUDF_EXPECTS(output_dtype == cudf::data_type{cudf::type_to_id()}, + "Invalid output type."); + CUDF_EXPECTS(offsets.size() > 0, "Invalid offsets."); + auto const num_segments = static_cast(offsets.size()) - 1; + + if (input.size() == 0) { + if (num_segments <= 0) { return cudf::make_empty_column(output_dtype); } + return cudf::make_numeric_column( + output_dtype, num_segments, cudf::mask_state::ALL_NULL, stream, mr); + } + + auto const init_value = [&]() -> InputType { + if (init.has_value() && init.value().get().is_valid(stream)) { + auto const numeric_init_scalar = + dynamic_cast const*>(&init.value().get()); + CUDF_EXPECTS(numeric_init_scalar != nullptr, "Invalid init scalar for reduction."); + return numeric_init_scalar->value(stream); + } + return InputType{0}; + }(); + + auto const input_dv_ptr = cudf::column_device_view::create(input, stream); + auto output = cudf::make_numeric_column( + output_dtype, num_segments, cudf::mask_state::UNALLOCATED, stream); + + // Store row index if it is valid, otherwise store a negative value denoting a null row. + rmm::device_uvector valid_idx(num_segments, stream); + + thrust::transform( + rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(num_segments), + thrust::make_zip_iterator(output->mutable_view().begin(), valid_idx.begin()), + transform_fn{*input_dv_ptr, offsets, static_cast(init_value), null_handling}); + + auto const valid_idx_cv = cudf::column_view{ + cudf::data_type{cudf::type_id::INT32}, num_segments, valid_idx.begin(), nullptr, 0}; + return std::move(cudf::gather(cudf::table_view{{output->view()}}, + valid_idx_cv, + cudf::out_of_bounds_policy::NULLIFY, + stream, + mr) + ->release() + .front()); + } + + struct transform_fn { + cudf::column_device_view values; + cudf::device_span offsets; + OutputType init_value; + cudf::null_policy null_handling; + + thrust::tuple __device__ operator()(cudf::size_type idx) const + { + auto const start = offsets[idx]; + auto const end = offsets[idx + 1]; + + auto constexpr invalid_idx = cuda::std::numeric_limits::lowest(); + if (start == end) { return {OutputType{0}, invalid_idx}; } + + auto sum = init_value; + for (auto i = start; i < end; ++i) { + if (values.is_null(i)) { + if (null_handling == cudf::null_policy::INCLUDE) { sum += init_value * init_value; } + continue; + } + auto const val = static_cast(values.element(i)); + sum += val * val; + } + auto const segment_size = end - start; + return {static_cast(segment_size) * sum, idx}; + } + }; + }; +}; + +} // namespace + +struct HostUDFSegmentedReductionExampleTest : cudf::test::BaseFixture {}; + +TEST_F(HostUDFSegmentedReductionExampleTest, SimpleInput) +{ + double constexpr null = 0.0; + auto const vals = doubles_col{{0.0, null, 2.0, 3.0, null, 5.0, null, null, 8.0, 9.0}, + {true, false, true, true, false, true, false, false, true, true}}; + auto const offsets = int32s_col{0, 3, 5, 10}.release(); + auto const agg = cudf::make_host_udf_aggregation( + std::make_unique()); + + // Test without init value. + { + auto const result = cudf::segmented_reduce( + vals, + cudf::device_span(offsets->view().begin(), offsets->size()), + *agg, + cudf::data_type{cudf::type_id::INT64}, + cudf::null_policy::INCLUDE, + std::nullopt, // init value + cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + + // When null_policy is set to `INCLUDE`, the null values are replaced with the init value. + // Since init value is not given, it is set to 0. + // [ 3 * (0^2 + init^2 + 2^2), 2 * (3^2 + init^2), 5 * (5^2 + init^2 + init^2 + 8^2 + 9^2) ] + auto const expected = int64s_col{{12, 18, 850}, {true, true, true}}; + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *result); + } + + // Test with init value, and include nulls. + { + auto const init_scalar = cudf::make_fixed_width_scalar(3.0); + auto const result = cudf::segmented_reduce( + vals, + cudf::device_span(offsets->view().begin(), offsets->size()), + *agg, + cudf::data_type{cudf::type_id::INT64}, + cudf::null_policy::INCLUDE, + *init_scalar, + cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + + // When null_policy is set to `INCLUDE`, the null values are replaced with the init value. + // [ 3 * (3 + 0^2 + 3^2 + 2^2), 2 * (3 + 3^2 + 3^2), 5 * (3 + 5^2 + 3^2 + 3^2 + 8^2 + 9^2) ] + auto const expected = int64s_col{{48, 42, 955}, {true, true, true}}; + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *result); + } + + // Test with init value, and exclude nulls. + { + auto const init_scalar = cudf::make_fixed_width_scalar(3.0); + auto const result = cudf::segmented_reduce( + vals, + cudf::device_span(offsets->view().begin(), offsets->size()), + *agg, + cudf::data_type{cudf::type_id::INT64}, + cudf::null_policy::EXCLUDE, + *init_scalar, + cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + + // [ 3 * (3 + 0^2 + 2^2), 2 * (3 + 3^2), 5 * (3 + 5^2 + 8^2 + 9^2) ] + auto const expected = int64s_col{{21, 24, 865}, {true, true, true}}; + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *result); + } +} + +TEST_F(HostUDFSegmentedReductionExampleTest, EmptySegments) +{ + auto const vals = doubles_col{}; + auto const offsets = int32s_col{0, 0, 0, 0}.release(); + auto const agg = cudf::make_host_udf_aggregation( + std::make_unique()); + auto const result = cudf::segmented_reduce( + vals, + cudf::device_span(offsets->view().begin(), offsets->size()), + *agg, + cudf::data_type{cudf::type_id::INT64}, + cudf::null_policy::INCLUDE, + std::nullopt, // init value + cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + auto const expected = int64s_col{{0, 0, 0}, {false, false, false}}; + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *result); +} + +TEST_F(HostUDFSegmentedReductionExampleTest, EmptyInput) +{ + auto const vals = doubles_col{}; + auto const offsets = int32s_col{}.release(); + auto const agg = cudf::make_host_udf_aggregation( + std::make_unique()); + auto const result = cudf::segmented_reduce( + vals, + cudf::device_span(offsets->view().begin(), offsets->size()), + *agg, + cudf::data_type{cudf::type_id::INT64}, + cudf::null_policy::INCLUDE, + std::nullopt, // init value + cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + auto const expected = int64s_col{}; + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *result); +} diff --git a/java/src/main/java/ai/rapids/cudf/Aggregation.java b/java/src/main/java/ai/rapids/cudf/Aggregation.java index 2276b223740..c07a58ed8a5 100644 --- a/java/src/main/java/ai/rapids/cudf/Aggregation.java +++ b/java/src/main/java/ai/rapids/cudf/Aggregation.java @@ -1,6 +1,6 @@ /* * - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/java/src/main/java/ai/rapids/cudf/GroupByAggregation.java b/java/src/main/java/ai/rapids/cudf/GroupByAggregation.java index 27966ddfdd4..234a9ec1ced 100644 --- a/java/src/main/java/ai/rapids/cudf/GroupByAggregation.java +++ b/java/src/main/java/ai/rapids/cudf/GroupByAggregation.java @@ -1,6 +1,6 @@ /* * - * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * Copyright (c) 2021-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/java/src/main/java/ai/rapids/cudf/ReductionAggregation.java b/java/src/main/java/ai/rapids/cudf/ReductionAggregation.java index ba8ae379bae..4f047a68f06 100644 --- a/java/src/main/java/ai/rapids/cudf/ReductionAggregation.java +++ b/java/src/main/java/ai/rapids/cudf/ReductionAggregation.java @@ -1,6 +1,6 @@ /* * - * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * Copyright (c) 2021-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -161,14 +161,14 @@ public static ReductionAggregation median() { /** * Aggregate to compute the specified quantiles. Uses linear interpolation by default. */ - public static ReductionAggregation quantile(double ... quantiles) { + public static ReductionAggregation quantile(double... quantiles) { return new ReductionAggregation(Aggregation.quantile(quantiles)); } /** * Aggregate to compute various quantiles. */ - public static ReductionAggregation quantile(QuantileMethod method, double ... quantiles) { + public static ReductionAggregation quantile(QuantileMethod method, double... quantiles) { return new ReductionAggregation(Aggregation.quantile(method, quantiles)); } @@ -256,7 +256,7 @@ public static ReductionAggregation collectSet() { * @param nanEquality Flag to specify whether NaN values in floating point column should be considered equal. */ public static ReductionAggregation collectSet(NullPolicy nullPolicy, - NullEquality nullEquality, NaNEquality nanEquality) { + NullEquality nullEquality, NaNEquality nanEquality) { return new ReductionAggregation(Aggregation.collectSet(nullPolicy, nullEquality, nanEquality)); } @@ -286,6 +286,15 @@ public static ReductionAggregation mergeSets(NullEquality nullEquality, NaNEqual return new ReductionAggregation(Aggregation.mergeSets(nullEquality, nanEquality)); } + /** + * Execute a reduction using a host-side user-defined function (UDF). + * @param wrapper The wrapper for the native host UDF instance. + * @return A new ReductionAggregation instance + */ + public static ReductionAggregation hostUDF(HostUDFWrapper wrapper) { + return new ReductionAggregation(Aggregation.hostUDF(wrapper)); + } + /** * Create HistogramAggregation, computing the frequencies for each unique row. * diff --git a/java/src/main/java/ai/rapids/cudf/SegmentedReductionAggregation.java b/java/src/main/java/ai/rapids/cudf/SegmentedReductionAggregation.java index 7ed150a2fec..18e7d874886 100644 --- a/java/src/main/java/ai/rapids/cudf/SegmentedReductionAggregation.java +++ b/java/src/main/java/ai/rapids/cudf/SegmentedReductionAggregation.java @@ -1,6 +1,6 @@ /* * - * Copyright (c) 2022, NVIDIA CORPORATION. + * Copyright (c) 2022-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -101,4 +101,13 @@ public static SegmentedReductionAggregation any() { public static SegmentedReductionAggregation all() { return new SegmentedReductionAggregation(Aggregation.all()); } + + /** + * Execute a reduction using a host-side user-defined function (UDF). + * @param wrapper The wrapper for the native host UDF instance. + * @return A new SegmentedReductionAggregation instance + */ + public static SegmentedReductionAggregation hostUDF(HostUDFWrapper wrapper) { + return new SegmentedReductionAggregation(Aggregation.hostUDF(wrapper)); + } }