From 27d89b4b33a3faf31a010c3a5f3831fd2fbbd4eb Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Tue, 17 Oct 2023 17:56:33 +0000 Subject: [PATCH] [GSProcessing] Add support for numerical and multi-numerical transformations. --- .../developer/input-configuration.rst | 75 +++- docs/source/index.rst | 1 + .../config_conversion/converter_base.py | 3 +- .../config_conversion/gconstruct_converter.py | 49 ++- .../config/config_parser.py | 8 +- .../config/feature_config_base.py | 11 +- .../config/numerical_configs.py | 92 +++++ .../graphstorm_processing/constants.py | 3 - .../dist_feature_transformer.py | 15 +- .../dist_transformations/__init__.py | 4 + .../dist_numerical_transformation.py | 351 ++++++++++++++++++ .../data_transformations/spark_utils.py | 36 +- .../graph_loaders/schema_utils.py | 6 +- .../gsprocessing-config.json | 136 +++++++ graphstorm-processing/tests/test_converter.py | 27 +- .../tests/test_dist_heterogenous_loader.py | 13 +- .../tests/test_dist_label_loader.py | 2 - .../test_dist_numerical_transformation.py | 299 +++++++++++++++ 18 files changed, 1061 insertions(+), 70 deletions(-) create mode 100644 graphstorm-processing/graphstorm_processing/config/numerical_configs.py create mode 100644 graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_numerical_transformation.py create mode 100644 graphstorm-processing/tests/resources/small_heterogeneous_graph/gsprocessing-config.json create mode 100644 graphstorm-processing/tests/test_dist_numerical_transformation.py diff --git a/docs/source/gs-processing/developer/input-configuration.rst b/docs/source/gs-processing/developer/input-configuration.rst index e6e2d7ae98..78784c0fe5 100644 --- a/docs/source/gs-processing/developer/input-configuration.rst +++ b/docs/source/gs-processing/developer/input-configuration.rst @@ -12,8 +12,8 @@ between other config formats, such as the one used by the single-machine GConstruct module. GSProcessing can take a GConstruct-formatted file -directly, and we also provide `a script ` -that can convert a `GConstruct ` +directly, and we also provide `a script `_ +that can convert a `GConstruct `_ input configuration file into the ``GSProcessing`` format, although this is mostly aimed at developers, users are can rely on the automatic conversion. @@ -30,11 +30,11 @@ The GSProcessing input data configuration has two top-level objects: - ``version`` (String, required): The version of configuration file being used. We include the package name to allow self-contained identification of the file format. - ``graph`` (JSON object, required): one configuration object that defines each - of the node types and edge types that describe the graph. + of the edge and node types that constitute the graph. We describe the ``graph`` object next. -``graph`` configuration object +Contents of the ``graph`` configuration object ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The ``graph`` configuration object can have two top-level objects: @@ -135,13 +135,12 @@ objects: ``source`` key, with a JSON object that contains ``{“column: String, and ”type“: String}``. - ``relation``: (JSON object, required): Describes the relation - modeled by the edges. A relation can be common among all edges, or it - can have sub-types. The top-level objects for the object are: + modeled by the edges. The top-level keys for the object are: - ``type`` (String, required): The type of the relation described by the edges. For example, for a source type ``user``, destination - ``movie`` we can have a relation type ``interacted_with`` for an - edge type ``user:interacted_with:movie``. + ``movie`` we can have a relation type ``rated`` for an + edge type ``user:rated:movie``. - ``labels`` (List of JSON objects, optional): Describes the label for the current edge type. The label object has the following @@ -171,9 +170,9 @@ objects: - ``train``: The percentage of the data with available labels to assign to the train set (0.0, 1.0]. - ``val``: The percentage of the data with available labels to - assign to the train set [0.0, 1.0). + assign to the validation set [0.0, 1.0). - ``test``: The percentage of the data with available labels to - assign to the train set [0.0, 1.0). + assign to the test set [0.0, 1.0). - ``features`` (List of JSON objects, optional)\ **:** Describes the set of features for the current edge type. See the :ref:`features-object` section for details. @@ -248,12 +247,12 @@ following top-level keys: - ``train``: The percentage of the data with available labels to assign to the train set (0.0, 1.0]. - ``val``: The percentage of the data with available labels to - assign to the train set [0.0, 1.0). + assign to the validation set [0.0, 1.0). - ``test``: The percentage of the data with available labels to - assign to the train set [0.0, 1.0). + assign to the test set [0.0, 1.0). - ``features`` (List of JSON objects, optional): Describes - the set of features for the current edge type. See the next section, :ref:`features-object` + the set of features for the current node type. See the next section, :ref:`features-object` for details. -------------- @@ -285,7 +284,7 @@ can contain the following top-level keys: } - ``column`` (String, required): The column that contains the raw - feature values in the dataset + feature values in the data. - ``transformation`` (JSON object, optional): The type of transformation that will be applied to the feature. For details on the individual transformations supported see :ref:`supported-transformations`. @@ -309,7 +308,7 @@ can contain the following top-level keys: # Example node config with multiple features { - # This is where the node structure data exist just need an id col + # This is where the node structure data exist, just need an id col in these files "data": { "format": "parquet", "files": ["path/to/node_ids"] @@ -356,7 +355,7 @@ Supported transformations In this section we'll describe the transformations we support. The name of the transformation is the value that would appear -in the ``transform['name']`` element of the feature configuration, +in the ``['transformation']['name']`` element of the feature configuration, with the attached ``kwargs`` for the transformations that support arguments. @@ -373,6 +372,35 @@ arguments. split the values in the column and create a vector column output. Example: for a separator ``'|'`` the CSV value ``1|2|3`` would be transformed to a vector, ``[1, 2, 3]``. +- ``numerical`` + + - Transforms a numerical column using a missing data imputer and an + optional normalizer. + - ``kwargs``: + + - ``imputer`` (String, optional): A method to fill in missing values in the data. + Valid values are: + ``mean`` (Default), ``median``, and ``most_frequent``. Missing values will be replaced + with the respective value computed from the data. + - ``normalizer`` (String, optional): Applies a normalization to the data, after + imputation. Can take the following values: + - ``none``: (Default) Don't normalize the numerical values during encoding. + - ``min-max``: Normalize each value by subtracting the minimum value from it, + and then dividing it by the difference between the maximum value and the minimum. + - ``standard``: Normalize each value by dividing it by the sum of all the values. +- ``multi-numerical`` + + - Column-wise transformation for vector-like numerical data using a missing data imputer and an + optional normalizer. + - ``kwargs``: + + - ``imputer`` (String, optional): Same as for ``numerical`` transformation, will + apply the ``mean`` transformation by default. + - ``normalizer`` (String, optional): Same as for ``numerical`` transformation, no + normalization is applied by default. + - ``separator`` (String, optional): Same as for ``no-op`` transformation, used to separate numerical + values in CSV input. If the input data are in Parquet format, each value in the + column is assumed to be an array of floats. -------------- @@ -403,6 +431,8 @@ OAG-Paper dataset ], "nodes" : [ { + "type": "paper", + "column": "ID", "data": { "format": "csv", "separator": ",", @@ -410,8 +440,17 @@ OAG-Paper dataset "node_feat.csv" ] }, - "type": "paper", - "column": "ID", + "features": [ + { + "column": "n_citation", + "transformation": { + "name": "numerical", + "kwargs": { + "imputer": "mean", + "normalizer": "min-max" + } + } + ] "labels": [ { "column": "field", diff --git a/docs/source/index.rst b/docs/source/index.rst index c3af321dea..eb55c766c4 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -23,6 +23,7 @@ Welcome to the GraphStorm Documentation and Tutorials gs-processing/usage/example gs-processing/usage/distributed-processing-setup gs-processing/usage/amazon-sagemaker + gs-processing/developer/input-configuration .. toctree:: :maxdepth: 1 diff --git a/graphstorm-processing/graphstorm_processing/config/config_conversion/converter_base.py b/graphstorm-processing/graphstorm_processing/config/config_conversion/converter_base.py index 8ac9a4bfa1..dd67dbc049 100644 --- a/graphstorm-processing/graphstorm_processing/config/config_conversion/converter_base.py +++ b/graphstorm-processing/graphstorm_processing/config/config_conversion/converter_base.py @@ -89,8 +89,7 @@ def convert_to_gsprocessing(self, input_dictionary: dict) -> dict: gsprocessing_dict: dict[str, Any] = {} - # hardcode the version number for the first version - gsprocessing_dict["version"] = "gsprocessing-1.0" + gsprocessing_dict["version"] = "gsprocessing-v1.0" gsprocessing_dict["graph"] = {} # deal with nodes diff --git a/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py b/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py index 341037748d..56d5edeea0 100644 --- a/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py +++ b/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py @@ -13,6 +13,8 @@ See the License for the specific language governing permissions and limitations under the License. """ +from typing import Any + from .converter_base import ConfigConverter from .meta_configuration import NodeConfig, EdgeConfig @@ -80,27 +82,40 @@ def _convert_feature(feats: list[dict]) -> list[dict]: list[dict] The feature information in the GSProcessing format """ - feats_list = [] + gsp_feats_list = [] if feats in [[], [{}]]: return [] - for ele in feats: - if "transform" in ele: - raise ValueError( - "Currently only support no-op operation, " - "we do not support any other no-op operation" - ) - feat_dict = {} - kwargs = {"name": "no-op"} - for col in ele["feature_col"]: - feat_dict = {"column": col, "transform": kwargs} - feats_list.append(feat_dict) - if "out_dtype" in ele: + for gconstruct_feat_dict in feats: + gsp_feat_dict = {} + gsp_feat_dict["column"] = gconstruct_feat_dict["feature_col"][0] + if "feature_name" in gconstruct_feat_dict: + gsp_feat_dict["name"] = gconstruct_feat_dict["feature_name"] + + gsp_transformation_dict: dict[str, Any] = {} + if "transform" in gconstruct_feat_dict: + gconstruct_transform_dict = gconstruct_feat_dict["transform"] + + if gconstruct_transform_dict["name"] == "max_min_norm": + gsp_transformation_dict["name"] = "numerical" + gsp_transformation_dict["kwargs"] = {"normalizer": "min-max", "imputer": "mean"} + # TODO: Add support for other common transformations here + else: + raise ValueError( + "Unsupported GConstruct transformation name: " + f"{gconstruct_transform_dict['name']}" + ) + else: + gsp_transformation_dict["name"] = "no-op" + + if "out_dtype" in gconstruct_feat_dict: assert ( - ele["out_dtype"] == "float32" + gconstruct_feat_dict["out_dtype"] == "float32" ), "GSProcessing currently only supports float32 features" - if "feature_name" in ele: - feat_dict["name"] = ele["feature_name"] - return feats_list + + gsp_feat_dict["transformation"] = gsp_transformation_dict + gsp_feats_list.append(gsp_feat_dict) + + return gsp_feats_list @staticmethod def convert_nodes(nodes_entries): diff --git a/graphstorm-processing/graphstorm_processing/config/config_parser.py b/graphstorm-processing/graphstorm_processing/config/config_parser.py index ae0d4f69e7..4a03ef515d 100644 --- a/graphstorm-processing/graphstorm_processing/config/config_parser.py +++ b/graphstorm-processing/graphstorm_processing/config/config_parser.py @@ -16,12 +16,13 @@ Configuration parsing for edges and nodes """ from abc import ABC - from typing import Any, Dict, List, Optional, Sequence +import logging from graphstorm_processing.constants import SUPPORTED_FILE_TYPES from .label_config_base import LabelConfig, EdgeLabelConfig, NodeLabelConfig from .feature_config_base import FeatureConfig, NoopFeatureConfig +from .numerical_configs import MultiNumericalFeatureConfig, NumericalFeatureConfig from .data_config_base import DataStorageConfig @@ -49,9 +50,14 @@ def parse_feat_config(feature_dict: Dict) -> FeatureConfig: return NoopFeatureConfig(feature_dict) transformation_name = feature_dict["transformation"]["name"] + logging.info("Transformation name: %s", transformation_name) if transformation_name == "no-op": return NoopFeatureConfig(feature_dict) + elif transformation_name == "numerical": + return NumericalFeatureConfig(feature_dict) + elif transformation_name == "multi-numerical": + return MultiNumericalFeatureConfig(feature_dict) else: raise RuntimeError(f"Unknown transformation name: '{transformation_name}'") diff --git a/graphstorm-processing/graphstorm_processing/config/feature_config_base.py b/graphstorm-processing/graphstorm_processing/config/feature_config_base.py index d0b4bc28da..85e447bd7b 100644 --- a/graphstorm-processing/graphstorm_processing/config/feature_config_base.py +++ b/graphstorm-processing/graphstorm_processing/config/feature_config_base.py @@ -82,8 +82,13 @@ def _sanity_check(self) -> None: class NoopFeatureConfig(FeatureConfig): - """ - Feature configuration for features that do not need to be transformed. + """Feature configuration for features that do not need to be transformed. + + Supported kwargs + ---------------- + separator: str + When provided will treat the input as strings, split each value in the string using + the separator, and convert the resulting list of floats into a float-vector feature. """ def __init__(self, config: Mapping): @@ -98,4 +103,4 @@ def __init__(self, config: Mapping): def _sanity_check(self) -> None: super()._sanity_check() if self._data_config and self.value_separator and self._data_config.format != "csv": - raise RuntimeError("value_separator should only be provided for CSV data") + raise RuntimeError("separator should only be provided for CSV data") diff --git a/graphstorm-processing/graphstorm_processing/config/numerical_configs.py b/graphstorm-processing/graphstorm_processing/config/numerical_configs.py new file mode 100644 index 0000000000..5d626f0ed4 --- /dev/null +++ b/graphstorm-processing/graphstorm_processing/config/numerical_configs.py @@ -0,0 +1,92 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"). +You may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from typing import Mapping + +from .feature_config_base import FeatureConfig + + +class NumericalFeatureConfig(FeatureConfig): + """Feature configuration for single-column numerical features. + + Supported kwargs + ---------------- + imputer: str + A method to fill in missing values in the data. Valid values are: + "mean" (Default), "median", and "most_frequent". Missing values will be replaced + with the respective value computed from the data. + normalizer: str + A normalization to apply to each column. Valid values are + "none", "min-max", and "standard". + + The transformation applied will be: + + * "none": (Default) Don't normalize the numerical values during encoding. + * "min-max": Normalize each value by subtracting the minimum value from it, + and then dividing it by the difference between the maximum value and the minimum. + * "standard": Normalize each value by dividing it by the sum of all the values. + """ + + def __init__(self, config: Mapping): + super().__init__(config) + self.imputer = self._transformation_kwargs.get("imputer", "mean") + self.norm = self._transformation_kwargs.get("normalizer", "none") + + self._sanity_check() + + def _sanity_check(self) -> None: + super()._sanity_check() + valid_imputers = ["mean", "median", "most_frequent"] + assert ( + self.imputer in valid_imputers + ), f"Unknown imputer requested, expected one of {valid_imputers}, got {self.imputer}" + valid_normalizers = ["none", "min-max", "standard"] + assert ( + self.norm in valid_normalizers + ), f"Unknown normalizer requested, expected one of {valid_normalizers}, got {self.norm}" + + +class MultiNumericalFeatureConfig(NumericalFeatureConfig): + """Feature configuration for multi-column numerical features. + + Supported kwargs + ---------------- + imputer: str + A method to fill in missing values in the data. Valid values are: + "mean" (Default), "median", and "most_frequent". Missing values will be replaced + with the respective value computed from the data. + normalizer: str + A normalization to apply to each column. Valid values are + "none", "min-max", and "standard". + + The transformation applied will be: + + * "none": (Default) Don't normalize the numerical values during encoding. + * "min-max": Normalize each value by subtracting the minimum value from it, + and then dividing it by the difference between the maximum value and the minimum. + * "standard": Normalize each value by dividing it by the sum of all the values. + separator: str, optional + A separator to use when splitting a delimited string into multiple numerical values + as a vector. Only applicable to CSV input. Example: for a separator `'|'` the CSV + value `1|2|3` would be transformed to a vector, `[1, 2, 3]`. When `None` the expected + input format is an array of numerical values. + + """ + + def __init__(self, config: Mapping): + super().__init__(config) + self.separator = self._transformation_kwargs.get("separator", None) + + self._sanity_check() diff --git a/graphstorm-processing/graphstorm_processing/constants.py b/graphstorm-processing/graphstorm_processing/constants.py index 510d8a06bc..80c12347f5 100644 --- a/graphstorm-processing/graphstorm_processing/constants.py +++ b/graphstorm-processing/graphstorm_processing/constants.py @@ -19,9 +19,6 @@ MISSING_CATEGORY = "GSP_CONSTANT_UNKNOWN" SINGLE_CATEGORY_COL = "SINGLE_CATEGORY" -################ Multi-numerical Limits ##################### -MAX_COLUMNS_TO_IMPUTE = 50 - SUPPORTED_FILE_TYPES = ["csv", "parquet"] ################### Label Properties ######################## diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_feature_transformer.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_feature_transformer.py index 3753823aec..3e2afd517e 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_feature_transformer.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_feature_transformer.py @@ -13,10 +13,17 @@ See the License for the specific language governing permissions and limitations under the License. """ +import logging + from pyspark.sql import DataFrame from graphstorm_processing.config.feature_config_base import FeatureConfig -from .dist_transformations import DistributedTransformation, NoopTransformation +from .dist_transformations import ( + DistributedTransformation, + NoopTransformation, + DistNumericalTransformation, + DistMultiNumericalTransformation, +) class DistFeatureTransformer(object): @@ -32,9 +39,15 @@ def __init__(self, feature_config: FeatureConfig): self.transformation: DistributedTransformation default_kwargs = {"cols": feature_config.cols} + logging.info("Feature name: %s", feat_name) + logging.info("Transformation type: %s", feat_type) if feat_type == "no-op": self.transformation = NoopTransformation(**default_kwargs, **args_dict) + elif feat_type == "numerical": + self.transformation = DistNumericalTransformation(**default_kwargs, **args_dict) + elif feat_type == "multi-numerical": + self.transformation = DistMultiNumericalTransformation(**default_kwargs, **args_dict) else: raise NotImplementedError( f"Feature {feat_name} has type: {feat_type} that is not supported" diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/__init__.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/__init__.py index 15de48dbd9..3163b65555 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/__init__.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/__init__.py @@ -8,3 +8,7 @@ ) from .dist_noop_transformation import NoopTransformation from .dist_label_transformation import DistSingleLabelTransformation, DistMultiLabelTransformation +from .dist_numerical_transformation import ( + DistMultiNumericalTransformation, + DistNumericalTransformation, +) diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_numerical_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_numerical_transformation.py new file mode 100644 index 0000000000..46121a5f5a --- /dev/null +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_numerical_transformation.py @@ -0,0 +1,351 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"). +You may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import logging +from typing import Optional, Sequence + +from pyspark.sql import DataFrame +from pyspark.sql import functions as F +from pyspark.sql.types import ArrayType, FloatType +from pyspark.ml.feature import MinMaxScaler, Imputer, VectorAssembler, ElementwiseProduct +from pyspark.ml.linalg import DenseVector +from pyspark.ml.stat import Summarizer +from pyspark.ml import Pipeline +from pyspark.ml.functions import array_to_vector, vector_to_array + +import numpy as np + +from graphstorm_processing.constants import SPECIAL_CHARACTERS +from .base_dist_transformation import DistributedTransformation +from ..spark_utils import rename_multiple_cols + + +def apply_imputation(cols: Sequence[str], shared_imputation: str, input_df: DataFrame) -> DataFrame: + """ + Applies a single imputation to input dataframe, individually to each of the columns + provided in the cols argument. + """ + assert shared_imputation in [ + "mean", + "median", + "mode", + "none", + ], f"Unsupported imputation strategy requested: {shared_imputation}" + if shared_imputation == "none": + imputed_df = input_df + else: + imputed_col_names = [col_name + "_imputed" for col_name in cols] + imputer = Imputer(strategy=shared_imputation, inputCols=cols, outputCols=imputed_col_names) + model = imputer.fit(input_df) + + # Create transformed columns and drop originals, then rename transformed cols to original + input_df = model.transform(input_df).drop(*cols) + imputed_df, _ = rename_multiple_cols(input_df, imputed_col_names, cols) + + return imputed_df + + +def apply_norm(cols: Sequence[str], shared_norm: str, imputed_df: DataFrame) -> DataFrame: + """ + Applies a single normalizer to the imputed dataframe, individually to each of the columns + provided in the cols argument. + """ + other_cols = list(set(imputed_df.columns).difference(cols)) + + def single_vec_to_float(vec): + return float(vec[0]) + + vec_udf = F.udf(single_vec_to_float, FloatType()) + + if shared_norm == "none": + scaled_df = imputed_df + elif shared_norm == "min-max": + # Because the scalers expect Vector input, we need to use VectorAssembler on each, + # creating one (scaled) vector per normalizer type + # TODO: See if it's possible to have all features under one assembler and scaler, + # speeding up the process. Then do the "disentaglement" on the caller side. + assemblers = [VectorAssembler(inputCols=[col], outputCol=col + "_vec") for col in cols] + scalers = [MinMaxScaler(inputCol=col + "_vec", outputCol=col + "_scaled") for col in cols] + + vector_cols = [col + "_vec" for col in cols] + scaled_cols = [col + "_scaled" for col in cols] + pipeline = Pipeline(stages=assemblers + scalers) + scaler_model = pipeline.fit(imputed_df) + scaled_df = scaler_model.transform(imputed_df).drop(*vector_cols).drop(*cols) + + scaled_df = scaled_df.select( + *[ + (vec_udf(F.col(scaled_col_name))).alias(orig_col) + for scaled_col_name, orig_col in zip(scaled_cols, cols) + ] + + other_cols + ) + elif shared_norm == "standard": + col_sums = imputed_df.agg({col: "sum" for col in cols}).collect()[0].asDict() + # TODO: See if it's possible to exclude NaN values from the sum + for col, val in col_sums.items(): + if np.isinf(val) or np.isnan(val): + col_sums[col] = 0 + scaled_df = imputed_df.select( + [(F.col(c) / col_sums[f"sum({c})"]).alias(c) for c in cols] + other_cols + ) + else: + raise RuntimeError(f"Uknown normalizer type requested for cols {cols}: {shared_norm}") + + return scaled_df + + +class DistNumericalTransformation(DistributedTransformation): + """Transformation to apply missing value imputation and various forms of normalization + to a numerical input. + + Parameters + ---------- + cols : Sequence[str] + The list of columns to apply the transformations on. + normalizer : str + The normalization to apply to the columns. + Valid values are "none", "min-max", and "standard". + imputer : str + The type of missing value imputation to apply to the column. + Valid values are "mean", "median" and "most_frequent". + """ + + def __init__(self, cols: Sequence[str], normalizer: str, imputer: str) -> None: + super().__init__(cols) + self.cols = cols + self.shared_norm = normalizer + # Spark uses 'mode' for the most frequent element + self.shared_imputation = "mode" if imputer == "most_frequent" else imputer + + def apply(self, input_df: DataFrame) -> DataFrame: + logging.debug( + "Applying normalizer: %s, imputation: %s", self.shared_norm, self.shared_imputation + ) + + imputed_df = apply_imputation(self.cols, self.shared_imputation, input_df) + scaled_df = apply_norm(self.cols, self.shared_norm, imputed_df) + + # TODO: Figure out why the transformation is producing Double values, and switch to float + return scaled_df + + @staticmethod + def get_transformation_name() -> str: + return "DistNumericalTransformation" + + +class DistMultiNumericalTransformation(DistNumericalTransformation): + """Transformation to apply missing value imputation and various forms of normalization + to a multi-column numerical input, where the input is a string separated by a delimiter. + + Parameters + ---------- + cols : Sequence[str] + The list of columns to apply the transformations on. + separator: str + The separator string that divides the string values. + normalizer : str + The normalization to apply to the columns. + Valid values are "none", "min-max", and "standard". + imputer : str + The type of missing value imputation to apply to the column. + Valid values are "mean", "median" and "most_frequent". + """ + + def __init__( + self, cols: Sequence[str], separator: Optional[str], normalizer: str, imputer: str + ) -> None: + assert ( + len(cols) == 1 + ), "DistMultiNumericalTransformation only supports one column at a time." + super().__init__(cols, normalizer, imputer) + self.multi_column = cols[0] + + self.separator = separator + # Keep the original separator to split in pure Python + self.original_separator = self.separator + # Spark's split function uses a regexp so we need to escape + # special chars to be used as separators + if self.separator in SPECIAL_CHARACTERS: + self.separator = f"\\{self.separator}" + + @staticmethod + def get_transformation_name() -> str: + return "DistMultiNumericalTransformation" + + @staticmethod + def apply_norm_vector(vector_col: str, shared_norm: str, vector_df: DataFrame) -> DataFrame: + """ + Applies normalizer column-wise with a single vector column as input. + """ + other_cols = list(set(vector_df.columns).difference([vector_col])) + + if shared_norm == "none": + scaled_df = vector_df + elif shared_norm == "min-max": + min_max_scaler = MinMaxScaler(inputCol=vector_col, outputCol=vector_col + "_scaled") + + scaler_model = min_max_scaler.fit(vector_df) + scaled_df = scaler_model.transform(vector_df).drop(vector_col) + + scaled_df = scaled_df.withColumnRenamed(vector_col + "_scaled", vector_col) + scaled_df = scaled_df.select([vector_col] + other_cols) + elif shared_norm == "standard": + col_sums_df = vector_df.select(Summarizer.sum(vector_df[vector_col]).alias("sum")) + + col_sums = col_sums_df.collect()[0]["sum"] # type: DenseVector + + col_sums_array = col_sums.toArray() + for i, col_sum in enumerate(col_sums_array): + if np.isinf(col_sum) or np.isnan(col_sum) or col_sum == 0: + col_sums_array[i] = 0.0 + else: + col_sums_array[i] = 1.0 / col_sums_array[i] + + elwise_divider = ElementwiseProduct( + scalingVec=DenseVector(col_sums_array), + inputCol=vector_col, + outputCol=f"{vector_col}_scaled", + ) + + scaled_df = elwise_divider.transform(vector_df).drop(vector_col) + + scaled_df = scaled_df.withColumnRenamed(vector_col + "_scaled", vector_col) + scaled_df = scaled_df.select([vector_col] + other_cols) + else: + raise RuntimeError(f"Unknown normalizer requested for col {vector_col}: {shared_norm}") + + return scaled_df + + def apply(self, input_df: DataFrame) -> DataFrame: + logging.info("Calling apply in MultiNumericalTransformation") + + def replace_empty_with_nan(x): + return F.when(x == "", "NaN").otherwise(x) + + def convert_multistring_to_vector_df( + multi_string_df: DataFrame, separator: str + ) -> DataFrame: + """ + Convert the provided DataFrame that is assumed to have one string + column named with the value of self.multi_column, + to a single-column DenseVector DF with the same column name. + """ + # Hint: read the transformation comments inside-out, starting with split + vector_df = multi_string_df.select( + # transform outputs to an array so we convert to a DenseVector + array_to_vector( + # After split, replace empty strings with 'NaN' and cast to Array + F.transform( + F.split( + multi_string_df[self.multi_column], separator + ), # Split along the separator + replace_empty_with_nan, + ).cast(ArrayType(FloatType(), True)) + ).alias(self.multi_column) + ) + return vector_df + + def vector_df_has_nan(vector_df: DataFrame, vector_col: str) -> bool: + """ + Returns true if there exists at least one NaN value in `vector_df[vector_col]` + """ + sum_vector_df = vector_df.select(Summarizer.mean(vector_df[vector_col]).alias("sum")) + sums_vector = sum_vector_df.take(1)[0]["sum"] + for val in sums_vector: + if np.isnan(val): + return True + return False + + # Convert the input column from either a delimited string or array to a Vector column + multi_col_type = input_df.schema.jsonValue()["fields"][0]["type"] + if multi_col_type == "string": + assert self.separator + vector_df = convert_multistring_to_vector_df(input_df, self.separator) + else: + vector_df = input_df.select( + array_to_vector(F.col(self.multi_column)).alias(self.multi_column) + ) + + if self.shared_imputation != "none": + # First we check if any NaN values exist in the input DF + # TODO: Replace column-level NaN search with row-level missing which should be faster. + input_has_nan = vector_df_has_nan(vector_df, self.multi_column) + + if input_has_nan: + # If the input has NaN, check how values exist in the vectors + feat_row = input_df.take(1)[0] + + len_array = len(feat_row[self.multi_column].split(self.original_separator)) + + # Doing the column splitting in memory can be expensive + if len_array > 50: + logging.warning( + "Attempting imputation on %d columns can lead to OOM errors", len_array + ) + + # Splitting the vectors requires an array DF + if multi_col_type == "string": + assert self.separator, "Separator must be provided for string-separated vectors" + split_array_df = input_df.select( + # After split, replace empty strings with 'NaN' and cast to Array + F.transform( + F.split( + input_df[self.multi_column], self.separator + ), # Split along the separator + replace_empty_with_nan, + ) + .cast(ArrayType(FloatType(), True)) + .alias(self.multi_column) + ) + else: + split_array_df = input_df.select( + F.col(self.multi_column) + .cast(ArrayType(FloatType(), True)) + .alias(self.multi_column) + ) + + # Split the values into separate columns + split_col_df = split_array_df.select( + [F.col(self.multi_column)[i] for i in range(len_array)] + ) + + # Set the newly created columns as the ones to be processed, + # and call the base numerical transformer + imputed_df = apply_imputation( + split_col_df.columns, self.shared_imputation, split_col_df + ) + + # Assemble the separate columns back into a single vector column + assembler = VectorAssembler( + inputCols=imputed_df.columns, outputCol=self.multi_column, handleInvalid="keep" + ) + + imputed_df = assembler.transform(imputed_df).drop(*imputed_df.columns) + else: + # If there input_df had no NaN values, we just pass the vector df to the scaler + imputed_df = vector_df + else: + # If the user did not request any imputation, just pass to the scaler + imputed_df = vector_df + + scaled_df = self.apply_norm_vector(self.multi_column, self.shared_norm, imputed_df) + + # Convert DenseVector data to List[float] + scaled_df = scaled_df.select( + vector_to_array(scaled_df[self.multi_column]).alias(self.multi_column) + ) + + return scaled_df diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/spark_utils.py b/graphstorm-processing/graphstorm_processing/data_transformations/spark_utils.py index 600ee37304..4cc60a43fa 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/spark_utils.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/spark_utils.py @@ -10,11 +10,11 @@ """ import logging import uuid -from typing import Tuple +from typing import Tuple, Sequence import psutil -from pyspark.sql import SparkSession, DataFrame +from pyspark.sql import SparkSession, DataFrame, functions as F from graphstorm_processing import constants try: @@ -172,8 +172,8 @@ def safe_rename_column( Returns ------- - DataFrame - The modified dataframe. + tuple[DataFrame, str] + The modified dataframe and the new column name. """ if old_colum_name in dataframe.columns: if new_column_name in dataframe.columns: @@ -190,3 +190,31 @@ def safe_rename_column( else: logging.warning("Column %s not found in dataframe. Skipping renaming.", old_colum_name) return dataframe, new_column_name + + +def rename_multiple_cols( + df: DataFrame, old_cols: Sequence[str], new_cols: Sequence[str] +) -> Tuple[DataFrame, Sequence[str]]: + """Safely renames multiple columns at once. All columns not listed in the passed args are left as is. + + Parameters + ---------- + df : DataFrame + Input DataFrame + old_cols : Sequence[str] + List of column names to change + new_cols : Sequence[str] + List of new column names. + + Returns + ------- + Tuple[DataFrame, Sequence[str]] + DataFrame with renamed columns, and a list of the new column names. + """ + assert len(old_cols) == len(new_cols) + safe_new_cols = [] + for old_name, new_name in zip(old_cols, new_cols): + _, safe_new_name = safe_rename_column(df, old_name, new_name) + safe_new_cols.append(safe_new_name) + mapping = dict(zip(old_cols, safe_new_cols)) + return df.select([F.col(c).alias(mapping.get(c, c)) for c in df.columns]), safe_new_cols diff --git a/graphstorm-processing/graphstorm_processing/graph_loaders/schema_utils.py b/graphstorm-processing/graphstorm_processing/graph_loaders/schema_utils.py index 2fb37963bb..a37ea81606 100644 --- a/graphstorm-processing/graphstorm_processing/graph_loaders/schema_utils.py +++ b/graphstorm-processing/graphstorm_processing/graph_loaders/schema_utils.py @@ -92,11 +92,9 @@ def determine_spark_feature_type(feature_type: str) -> Type[DataType]: In case an unsupported feature_type is provided. """ # TODO: Replace with pattern matching after moving to Python 3.10? - if feature_type in [ - "no-op", - ] or feature_type.startswith("text"): + if feature_type in ["no-op", "multi-numerical"] or feature_type.startswith("text"): return StringType - if feature_type in ["numerical", "bucket_numerical", "none"]: + if feature_type in ["numerical", "none"]: return FloatType else: raise NotImplementedError(f"Unknown feature type: {feature_type}") diff --git a/graphstorm-processing/tests/resources/small_heterogeneous_graph/gsprocessing-config.json b/graphstorm-processing/tests/resources/small_heterogeneous_graph/gsprocessing-config.json new file mode 100644 index 0000000000..48b3b2deb8 --- /dev/null +++ b/graphstorm-processing/tests/resources/small_heterogeneous_graph/gsprocessing-config.json @@ -0,0 +1,136 @@ +{ + "version": "gsprocessing-v1.0", + "graph": { + "nodes": [ + { + "data": { + "format": "csv", + "files": [ + "nodes/genre.csv" + ], + "separator": "," + }, + "type": "genre", + "column": "~id" + }, + { + "data": { + "format": "csv", + "files": [ + "nodes/movie.csv" + ], + "separator": "," + }, + "type": "movies", + "column": "~id" + }, + { + "data": { + "format": "csv", + "files": [ + "nodes/user.csv" + ], + "separator": "," + }, + "type": "user", + "column": "~id", + "features": [ + { + "column": "age", + "transformation": { + "name": "numerical", + "kwargs": { + "normalizer": "none", + "imputer": "mean" + } + } + }, + { + "column": "multi", + "transformation": { + "name": "multi-numerical", + "kwargs": { + "normalizer": "standard", + "imputer": "mean", + "separator": "|" + } + } + } + ], + "labels": [ + { + "column": "gender", + "type": "classification", + "split_rate": { + "train": 0.8, + "val": 0.1, + "test": 0.1 + } + } + ] + } + ], + "edges": [ + { + "data": { + "format": "csv", + "files": [ + "edges/movie-included_in-genre.csv" + ], + "separator": "," + }, + "source": { + "column": "~from", + "type": "movie" + }, + "dest": { + "column": "~to", + "type": "genre" + }, + "relation": { + "type": "included_in" + } + }, + { + "data": { + "format": "csv", + "files": [ + "edges/user-rated-movie.csv" + ], + "separator": "," + }, + "source": { + "column": "~from", + "type": "user" + }, + "dest": { + "column": "~to", + "type": "movie" + }, + "relation": { + "type": "rated" + } + }, + { + "data": { + "format": "csv", + "files": [ + "edges/director-directed-movie.csv" + ], + "separator": "," + }, + "source": { + "column": "~from", + "type": "director" + }, + "dest": { + "column": "~to", + "type": "movie" + }, + "relation": { + "type": "directed" + } + } + ] + } +} \ No newline at end of file diff --git a/graphstorm-processing/tests/test_converter.py b/graphstorm-processing/tests/test_converter.py index ecd23e7961..e2935160c9 100644 --- a/graphstorm-processing/tests/test_converter.py +++ b/graphstorm-processing/tests/test_converter.py @@ -52,12 +52,11 @@ def test_try_read_file_with_wildcard( def test_try_read_unsupported_feature(converter: GConstructConfigConverter, node_dict: dict): - """We currently only support no-op features, so should error out otherwise.""" + """We currently only support no-op and numerical features, so should error out otherwise.""" node_dict["nodes"][0]["features"] = [ { - "feature_col": ["citation_time"], - "feature_name": "feat", - "transform": {"name": "max_min_norm"}, + "feature_col": ["paper_title"], + "transform": {"name": "tokenize_hf"}, } ] @@ -101,7 +100,7 @@ def test_read_node_gconstruct(converter: GConstructConfigConverter, node_dict: d assert node_config.separator is None assert node_config.column == "node_id" assert node_config.features == [ - {"column": "citation_time", "transform": {"name": "no-op"}, "name": "feat"} + {"column": "citation_time", "transformation": {"name": "no-op"}, "name": "feat"} ] assert node_config.labels == [ { @@ -174,7 +173,7 @@ def test_read_edge_gconstruct(converter: GConstructConfigConverter): assert edge_config.separator is None assert edge_config.relation == "writing" assert edge_config.features == [ - {"column": "author", "transform": {"name": "no-op"}, "name": "feat"} + {"column": "author", "transformation": {"name": "no-op"}, "name": "feat"} ] assert edge_config.labels == [ { @@ -206,7 +205,10 @@ def test_convert_gsprocessing(converter: GConstructConfigConverter): "files": ["/tmp/acm_raw/nodes/paper.parquet"], "separator": ",", "node_id_col": "node_id", - "features": [{"feature_col": ["citation_time"], "feature_name": "feat"}], + "features": [ + {"feature_col": ["citation_time"], "feature_name": "feat"}, + {"feature_col": ["num_citations"], "transform": {"name": "max_min_norm"}}, + ], "labels": [ {"label_col": "label", "task_type": "classification", "split_pct": [0.8, 0.1, 0.1]} ], @@ -242,7 +244,14 @@ def test_convert_gsprocessing(converter: GConstructConfigConverter): assert nodes_output["type"] == "paper" assert nodes_output["column"] == "node_id" assert nodes_output["features"] == [ - {"column": "citation_time", "transform": {"name": "no-op"}, "name": "feat"} + {"column": "citation_time", "transformation": {"name": "no-op"}, "name": "feat"}, + { + "column": "num_citations", + "transformation": { + "name": "numerical", + "kwargs": {"normalizer": "min-max", "imputer": "mean"}, + }, + }, ] assert nodes_output["labels"] == [ { @@ -260,7 +269,7 @@ def test_convert_gsprocessing(converter: GConstructConfigConverter): assert edges_output["dest"] == {"column": "~to", "type": "paper"} assert edges_output["relation"] == {"type": "writing"} assert edges_output["features"] == [ - {"column": "author", "transform": {"name": "no-op"}, "name": "feat"} + {"column": "author", "transformation": {"name": "no-op"}, "name": "feat"} ] assert edges_output["labels"] == [ { diff --git a/graphstorm-processing/tests/test_dist_heterogenous_loader.py b/graphstorm-processing/tests/test_dist_heterogenous_loader.py index 13e5038f9f..975a03e7ea 100644 --- a/graphstorm-processing/tests/test_dist_heterogenous_loader.py +++ b/graphstorm-processing/tests/test_dist_heterogenous_loader.py @@ -58,12 +58,13 @@ def tempdir_fixture(): @pytest.fixture(scope="function", name="data_configs_with_label") def data_configs_with_label_fixture(): - """Create data configuration object that contain labels""" - config_path = os.path.join(_ROOT, "resources/small_heterogeneous_graph/gconstruct-config.json") + """Create data configuration object that contain features and labels""" + config_path = os.path.join( + _ROOT, "resources/small_heterogeneous_graph/gsprocessing-config.json" + ) with open(config_path, "r", encoding="utf-8") as conf_file: - gconstruct_config = json.load(conf_file) - gsprocessing_config = GConstructConfigConverter().convert_to_gsprocessing(gconstruct_config) + gsprocessing_config = json.load(conf_file) data_configs_dict = create_config_objects(gsprocessing_config["graph"]) @@ -230,7 +231,7 @@ def test_load_dist_heterogen_node_class(dghl_loader: DistHeterogeneousGraphLoade metadata = json.load(mfile) graphinfo_updates = { - "nfeat_size": {"user": {"age": 1}}, + "nfeat_size": {"user": {"age": 1, "multi": 2}}, "etype_label": [], "etype_label_property": [], "ntype_label": ["user"], @@ -245,7 +246,7 @@ def test_load_dist_heterogen_node_class(dghl_loader: DistHeterogeneousGraphLoade verify_integ_test_output(metadata, dghl_loader, graphinfo_updates) expected_node_data = { - "user": {"gender", "train_mask", "val_mask", "test_mask", "age"}, + "user": {"gender", "train_mask", "val_mask", "test_mask", "age", "multi"}, } for node_type in metadata["node_data"]: diff --git a/graphstorm-processing/tests/test_dist_label_loader.py b/graphstorm-processing/tests/test_dist_label_loader.py index 2df3e2cefb..3ba747d538 100644 --- a/graphstorm-processing/tests/test_dist_label_loader.py +++ b/graphstorm-processing/tests/test_dist_label_loader.py @@ -46,10 +46,8 @@ def test_dist_classification_label(spark: SparkSession, check_df_schema): label_transformer = DistLabelLoader(LabelConfig(classification_config), spark) transformed_labels = label_transformer.process_label(names_df) - transformed_labels.show() label_map = label_transformer.label_map - print(label_map) assert set(label_map.keys()) == {"mark", "john", "tara", "jen"} diff --git a/graphstorm-processing/tests/test_dist_numerical_transformation.py b/graphstorm-processing/tests/test_dist_numerical_transformation.py new file mode 100644 index 0000000000..4cabb9710f --- /dev/null +++ b/graphstorm-processing/tests/test_dist_numerical_transformation.py @@ -0,0 +1,299 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"). +You may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import os + +import pytest +import pandas as pd +from numpy.testing import assert_array_equal, assert_array_almost_equal +from pyspark.sql import SparkSession, DataFrame, functions as F +from pyspark.sql.types import ArrayType, FloatType, StructField, StructType, StringType + +from graphstorm_processing.data_transformations.dist_transformations import ( + DistNumericalTransformation, + DistMultiNumericalTransformation, +) + +_ROOT = os.path.abspath(os.path.dirname(__file__)) + + +@pytest.fixture(scope="function", name="multi_num_df_with_missing") +def multi_num_df_with_missing_fixture(spark: SparkSession): + """Create a multi-numerical text DataFrame with missing data""" + data = [("1|10",), ("5|2",), ("3|",), ("1|3",), ("5|2",)] + + schema = StructType([StructField("ratings", StringType(), True)]) + df = spark.createDataFrame(data, schema=schema) + + yield df + + +@pytest.fixture(scope="function", name="multi_num_df_without_missing") +def multi_num_df_without_missing_fixture(spark: SparkSession): + """Create a multi-numerical text DataFrame without missing data""" + data = [("1|10",), ("5|2",), ("3|2",), ("1|3",), ("5|2",)] + + schema = StructType([StructField("ratings", StringType(), True)]) + df = spark.createDataFrame(data, schema=schema) + + yield df + + +def test_numerical_transformation_with_mode_imputer(input_df: DataFrame): + """Test numerical mode imputer""" + dist_numerical_transformation = DistNumericalTransformation( + ["salary", "age"], imputer="most_frequent", normalizer="none" + ) + + transformed_df = dist_numerical_transformation.apply(input_df) + + transformed_rows = transformed_df.collect() + + for row in transformed_rows: + if row["name"] == "mark": + assert row["salary"] == 10000 + elif row["name"] == "john": + assert row["age"] == 40 + else: + assert row["salary"] in {10000, 20000, 40000} + assert row["age"] in {20, 40, 60} + + +def test_numerical_transformation_with_minmax_scaler(input_df: DataFrame): + """Test numerical min-max normalizer""" + no_na_df = input_df.na.fill(0) + dist_numerical_transformation = DistNumericalTransformation( + ["age", "salary"], imputer="none", normalizer="min-max" + ) + + transformed_df = dist_numerical_transformation.apply(no_na_df) + + transformed_rows = transformed_df.collect() + + for row in transformed_rows: + if row["name"] == "kate": + assert row["salary"] == 1.0 + elif row["name"] == "mark": + assert row["salary"] == 0.0 + else: + assert row["salary"] < 1.0 and row["salary"] > 0.0 + + +def test_numerical_transformation_without_transformation(input_df: DataFrame, check_df_schema): + """Test numerical transformation without any transformation applied""" + no_na_df = input_df.select(["age", "salary"]).na.fill(0) + + dist_numerical_transformation = DistNumericalTransformation( + ["age", "salary"], imputer="none", normalizer="none" + ) + + transformed_df = dist_numerical_transformation.apply(no_na_df) + + check_df_schema(transformed_df) + + transformed_rows = transformed_df.collect() + + expected_salaries = [0, 10000, 20000, 10000, 40000] + + for row, expected_salary in zip(transformed_rows, expected_salaries): + assert row["salary"] == expected_salary + + +def test_numerical_transformation_with_median_imputer_and_std_norm( + input_df: DataFrame, check_df_schema +): + """Test numerical standard normalizer with median imputation""" + input_df = input_df.select(["age", "salary"]) + dist_numerical_transformation = DistNumericalTransformation( + ["age", "salary"], imputer="median", normalizer="standard" + ) + + transformed_df = dist_numerical_transformation.apply(input_df) + + check_df_schema(transformed_df) + + transformed_rows = transformed_df.collect() + + expected_imputed_std_ages = [0.2, 0.2, 0.1, 0.3, 0.2] + + for row, expected_val in zip(transformed_rows, expected_imputed_std_ages): + assert row["age"] == expected_val + + +def test_multi_numerical_transformation_without_norm_and_imputer(input_df: DataFrame): + """Test multi-numerical transformation without any transformation applied""" + dist_multi_numerical_transormation = DistMultiNumericalTransformation( + cols=["ratings"], separator="|", normalizer="none", imputer="none" + ) + + transformed_df = dist_multi_numerical_transormation.apply(input_df) + + transformed_rows = transformed_df.collect() + + for i, row in zip(range(1, 10, 2), transformed_rows): + assert row["ratings"][0] == float(i) + assert row["ratings"][1] == float(i + 1) + + +@pytest.mark.parametrize("delimiter", ["\\", "+", "^", "."]) +def test_multi_numerical_transformation_with_special_delimiter( + spark: SparkSession, delimiter: str, check_df_schema +): + """Test multi-num with various special delimiters""" + data = [ + (f"1{delimiter}2",), + (f"3{delimiter}4",), + (f"5{delimiter}6",), + (f"7{delimiter}8",), + (f"9{delimiter}10",), + ] + + schema = StructType([StructField("ratings", StringType(), True)]) + df = spark.createDataFrame(data, schema=schema) + + dist_multi_numerical_transormation = DistMultiNumericalTransformation( + cols=["ratings"], separator=delimiter, normalizer="none", imputer="mean" + ) + + transformed_df = dist_multi_numerical_transormation.apply(df) + + check_df_schema(transformed_df) + + transformed_rows = transformed_df.collect() + + for i, row in zip(range(1, 10, 2), transformed_rows): + assert row["ratings"][0] == float(i) + assert row["ratings"][1] == float(i + 1) + + +def test_multi_numerical_transformation_with_imputer( + multi_num_df_with_missing: DataFrame, check_df_schema +): + """Test multi-num with mean imputation""" + dist_multi_numerical_transormation = DistMultiNumericalTransformation( + cols=["ratings"], separator="|", normalizer="none", imputer="mean" + ) + + transformed_df = dist_multi_numerical_transormation.apply(multi_num_df_with_missing) + + check_df_schema(transformed_df) + + transformed_rows = transformed_df.collect() + + for row in transformed_rows: + # One missing val for row [3, None] with expected val (10+2+3+2)/4 + if row["ratings"][0] == 3.0: + assert row["ratings"][1] == 4.25 + + +def test_multi_numerical_transformation_with_minmax_scaler( + multi_num_df_without_missing: DataFrame, check_df_schema +): + """Test multi-num with min-max normalizer""" + dist_multi_numerical_transormation = DistMultiNumericalTransformation( + cols=["ratings"], separator="|", normalizer="min-max", imputer="none" + ) + + transformed_df = dist_multi_numerical_transormation.apply(multi_num_df_without_missing) + + check_df_schema(transformed_df) + + transformed_rows = transformed_df.collect() + + expected_vals = [[0.0, 1.0], [1.0, 0.0], [0.5, 0.0], [0.0, 0.125], [1.0, 0.0]] + assert len(expected_vals) == len(transformed_rows) + for row, expected_vector in zip(transformed_rows, expected_vals): + assert_array_equal(row["ratings"], expected_vector) + + +def test_multi_numerical_transformation_with_standard_scaler( + multi_num_df_without_missing: DataFrame, +): + """Test multi-num with min-max normalizer""" + dist_multi_numerical_transormation = DistMultiNumericalTransformation( + cols=["ratings"], separator="|", normalizer="standard", imputer="none" + ) + + transformed_df = dist_multi_numerical_transormation.apply(multi_num_df_without_missing) + + # Expected: arr/arr.sum(axis=0) + expected_vals = [ + [0.06666667, 0.52631579], + [0.33333333, 0.10526316], + [0.2, 0.10526316], + [0.06666667, 0.15789474], + [0.33333333, 0.10526316], + ] + + transformed_rows = transformed_df.collect() + assert len(expected_vals) == len(transformed_rows) + for row, expected_vector in zip(transformed_rows, expected_vals): + assert_array_almost_equal(row["ratings"], expected_vector, decimal=3) + + +def test_multi_numerical_transformation_with_long_vectors(spark: SparkSession, check_df_schema): + """Test multi-num with long string vectors (768 dim)""" + data_path = os.path.join(_ROOT, "resources/multi_num_numerical/multi_num.csv") + long_vector_df = spark.read.csv(data_path, sep=",", header=True) + dist_multi_numerical_transormation = DistMultiNumericalTransformation( + cols=["feat"], separator=";", normalizer="standard", imputer="mean" + ) + + pd_df = pd.read_csv(data_path, delimiter=",") + # Select feature vector col and convert to Series + col_df = pd_df["feat"].apply(lambda x: [float(val) for val in x.split(";")]).apply(pd.Series) + arr = col_df.to_numpy() + # Divide by sum of col to get standardized vals + expected_vals = arr / arr.sum(axis=0) + + transformed_df = dist_multi_numerical_transormation.apply(long_vector_df) + check_df_schema(transformed_df) + + transformed_rows = transformed_df.collect() + for row, expected_vector in zip(transformed_rows, expected_vals): + assert_array_almost_equal(row["feat"], expected_vector, decimal=3) + + +# TODO: Add tests for imputer requested but no NaNs + + +def test_multi_numerical_transformation_with_array_input(spark: SparkSession, check_df_schema): + """Test multi-num with long array vectors (768 dim)""" + feat_col = "feat" + data_path = os.path.join(_ROOT, "resources/multi_num_numerical/multi_num.csv") + # Convert the string list to an array of floats before transforming + long_vector_df = spark.read.csv( + data_path, sep=",", schema="id STRING, feat STRING", header=True + ) + long_vector_df = long_vector_df.select( + F.split(F.col(feat_col), ";").cast(ArrayType(FloatType(), True)).alias(feat_col) + ) + dist_multi_numerical_transormation = DistMultiNumericalTransformation( + cols=["feat"], separator=None, normalizer="standard", imputer="mean" + ) + + pd_df = pd.read_csv(data_path) + # Select feature vector col and convert to Series + col_df = pd_df["feat"].apply(lambda x: [float(val) for val in x.split(";")]).apply(pd.Series) + arr = col_df.to_numpy() + # Divide by sum of col to get standardized vals + expected_vals = arr / arr.sum(axis=0) + + transformed_df = dist_multi_numerical_transormation.apply(long_vector_df) + check_df_schema(transformed_df) + + transformed_rows = transformed_df.collect() + for row, expected_vector in zip(transformed_rows, expected_vals): + assert_array_almost_equal(row["feat"], expected_vector, decimal=3)