diff --git a/shapiq/games/imputer/base.py b/shapiq/games/imputer/base.py index 8ea400ab..891d4894 100644 --- a/shapiq/games/imputer/base.py +++ b/shapiq/games/imputer/base.py @@ -19,6 +19,12 @@ class Imputer(Game): with shape ``(n_samples, n_features)``. categorical_features: A list of indices of the categorical features in the background data. random_state: The random state to use for sampling. Defaults to ``None``. + + Attributes: + n_features: The number of features in the data (equals the number of players in the game). + data: The background data to use for the imputer. + model: The model to impute missing values for as a callable function. + sample_size: The number of samples to draw from the background data. """ @abstractmethod @@ -33,7 +39,7 @@ def __init__( ) -> None: if callable(model): self._predict_function = utils.predict_callable - else: # shapiq.Explainer + else: # shapiq.Explainer adds a predict function to the model to make it callable self._predict_function = model._predict_function self.model = model # check if data is a vector @@ -41,7 +47,7 @@ def __init__( data = data.reshape(1, data.shape[0]) self.data = data self.sample_size = sample_size - self._n_features = self.data.shape[1] + self.n_features = self.data.shape[1] self._cat_features: list = [] if categorical_features is None else categorical_features self._random_state = random_state self._rng = np.random.default_rng(self._random_state) @@ -52,7 +58,7 @@ def __init__( self.fit(x) # the normalization_value needs to be set in the subclass - super().__init__(n_players=self._n_features, normalize=False) + super().__init__(n_players=self.n_features, normalize=False) @property def x(self) -> Optional[np.ndarray]: @@ -73,7 +79,7 @@ def fit(self, x: np.ndarray) -> "Imputer": Returns: The fitted imputer. """ - if x.ndim == 1: - x = x.reshape(1, x.shape[0]) self._x = x.copy() + if self._x.ndim == 1: + self._x = self._x.reshape(1, x.shape[0]) return self diff --git a/shapiq/games/imputer/baseline_imputer.py b/shapiq/games/imputer/baseline_imputer.py index 6e621398..618aca13 100644 --- a/shapiq/games/imputer/baseline_imputer.py +++ b/shapiq/games/imputer/baseline_imputer.py @@ -1,5 +1,6 @@ """Implementation of the baseline imputer.""" +import warnings from typing import Optional import numpy as np @@ -45,7 +46,7 @@ def __init__( super().__init__(model, data, x, 1, categorical_features, random_state) # setup attributes - self.baseline_values: np.ndarray = np.zeros((1, self._n_features)) # will be overwritten + self.baseline_values: np.ndarray = np.zeros((1, self.n_features)) # will be overwritten self.init_background(self.data) # set empty value and normalization @@ -76,28 +77,46 @@ def init_background(self, data: np.ndarray) -> "BaselineImputer": Args: data: The background data to use for the imputer. Either a vector of baseline values - of shape ``(1, n_features)`` or a matrix of shape ``(n_samples, n_features)``. + of shape ``(n_features,)`` or a matrix of shape ``(n_samples, n_features)``. If the data is a matrix, the baseline values are calculated from the data. Returns: The initialized imputer. + + Examples: + >>> import numpy as np + >>> from shapiq.games.imputer import BaselineImputer + >>> data = np.array([[1, 2, "a"], [2, 3, "a"], [2, 4, "b"]], dtype=object) + >>> x = np.array([1, 2, 3]) + >>> imputer = BaselineImputer(model=lambda x: np.sum(x, axis=1), data=data, x=x) + >>> imputer.baseline_values + array([[1.66, 3, 'a']], dtype=object) # computed from data + >>> baseline_vector = np.array([0, 0, 0]) + >>> imputer.init_background(baseline_vector) + >>> imputer.baseline_values + array([[0, 0, 0]]) # given as input """ if data.ndim == 1 or data.shape[0] == 1: # data is a vector -> use as baseline values - self.baseline_values = data.reshape(1, self._n_features) + self.baseline_values = data.reshape(1, self.n_features) return self # data is a matrix -> calculate baseline values as mean or mode - self.baseline_values = np.zeros((1, self._n_features), dtype=object) - for feature in range(self._n_features): + self.baseline_values = np.zeros((1, self.n_features), dtype=object) + for feature in range(self.n_features): feature_column = data[:, feature] if feature in self._cat_features: # get mode for categorical features - counts = np.unique(feature_column, return_counts=True) - summarized_feature = counts[0][np.argmax(counts[1])] + values, counts = np.unique(feature_column, return_counts=True) + summarized_feature = values[np.argmax(counts)] else: try: # try to use mean for numerical features summarized_feature = np.mean(feature_column) except TypeError: # fallback to mode for potentially string features - counts = np.unique(feature_column, return_counts=True) - summarized_feature = counts[0][np.argmax(counts[1])] + values, counts = np.unique(feature_column, return_counts=True) + summarized_feature = values[np.argmax(counts)] + # add feature to categorical features + warnings.warn( + f"Feature {feature} is not numerical. Adding it to categorical features." + ) + self._cat_features.append(feature) self.baseline_values[0, feature] = summarized_feature return self @@ -108,4 +127,7 @@ def _calc_empty_prediction(self) -> float: The empty prediction. """ empty_predictions = self.predict(self.baseline_values) - return float(empty_predictions[0]) + empty_prediction = float(empty_predictions[0]) + if self.normalize: # reset the normalization value + self.normalization_value = empty_prediction + return empty_prediction diff --git a/shapiq/games/imputer/conditional_imputer.py b/shapiq/games/imputer/conditional_imputer.py index 12b9cbc8..063246c1 100644 --- a/shapiq/games/imputer/conditional_imputer.py +++ b/shapiq/games/imputer/conditional_imputer.py @@ -134,20 +134,15 @@ def _sample_background_data(self) -> np.ndarray: The sampled replacement values. The shape of the array is (sample_size, n_subsets, n_features). """ - try: - x_embedded = self._tree_embedder.apply(self._x) - except ValueError: # not correct shape - x_embedded = self._tree_embedder.apply(self._x.reshape(1, -1)) + x_embedded = self._tree_embedder.apply(self._x) distances = hamming_distance(self._data_embedded, x_embedded) conditional_data = self.data[ distances <= np.quantile(distances, self.conditional_threshold) ] if self.sample_size < conditional_data.shape[0]: idc = self._rng.choice(conditional_data.shape[0], size=self.sample_size, replace=False) - background_data = conditional_data[idc, :] - else: - background_data = conditional_data - return background_data + return conditional_data[idc, :] + return conditional_data def _calc_empty_prediction(self) -> float: """Runs the model on empty data points (all features missing) to get the empty prediction. diff --git a/shapiq/games/imputer/marginal_imputer.py b/shapiq/games/imputer/marginal_imputer.py index 06c312dc..26cd7fb9 100644 --- a/shapiq/games/imputer/marginal_imputer.py +++ b/shapiq/games/imputer/marginal_imputer.py @@ -7,6 +7,15 @@ from shapiq.games.imputer.base import Imputer +_too_large_sample_size_warning = ( + "The sample size is larger than the number of data points in the background set. " + "Reducing the sample size to the number of background samples." +) +_deprecated_sample_replacements_warning = ( + "The 'sample_replacements' argument is deprecated and will be removed in the next release. " + "The marginal imputer now always samples from the background data." +) + class MarginalImputer(Imputer): """The marginal imputer for the shapiq package. @@ -31,8 +40,8 @@ class MarginalImputer(Imputer): random_state: The random state to use for sampling. Defaults to ``None``. Attributes: - replacement_data: The data to use for imputation. Either samples from the background data - or the mean / median of the background data. + replacement_data: The data to use for imputation. To change the data, use the + ``init_background`` method. empty_prediction: The model's prediction on an empty data point (all features missing). """ @@ -44,19 +53,17 @@ def __init__( sample_replacements: bool = True, sample_size: int = 100, categorical_features: list[int] = None, + joint_marginal_distribution: bool = False, normalize: bool = True, random_state: Optional[int] = None, ) -> None: if not sample_replacements: - warnings.warn( - "The 'sample_replacements' argument is deprecated and will be removed in the next " - "release. The marginal imputer now always samples from the background data.", - DeprecationWarning, - ) + warnings.warn(DeprecationWarning(_deprecated_sample_replacements_warning)) super().__init__(model, data, x, sample_size, categorical_features, random_state) # setup attributes - self.replacement_data: np.ndarray = np.zeros((1, self._n_features)) # will be overwritten + self.joint_marginal_distribution: bool = joint_marginal_distribution + self.replacement_data: np.ndarray = np.zeros((1, self.n_features)) # will be overwritten self.init_background(self.data) # set empty value and normalization @@ -76,52 +83,58 @@ def value_function(self, coalitions: np.ndarray) -> np.ndarray: ``(n_subsets, n_outputs)``. """ n_coalitions = coalitions.shape[0] - data = np.tile(np.copy(self._x), (n_coalitions, 1)) - # sampling from background returning array of shape (sample_size, n_subsets, n_features) - replacement_data = self._sample_replacement_values(coalitions) - outputs = np.zeros((self.sample_size, n_coalitions)) - for i in range(self.sample_size): - replacements = replacement_data[i].reshape(n_coalitions, self._n_features) - data[~coalitions] = replacements[~coalitions] - outputs[i] = self.predict(data) + replacement_data = self._sample_replacement_values(self.sample_size) + sample_size = replacement_data.shape[0] + outputs = np.zeros((sample_size, n_coalitions)) + imputed_data = np.tile(np.copy(self._x), (n_coalitions, 1)) + for j in range(sample_size): + for i in range(n_coalitions): + imputed_data[i, ~coalitions[i]] = replacement_data[j, ~coalitions[i]] + predictions = self.predict(imputed_data) + outputs[j] = predictions outputs = np.mean(outputs, axis=0) # average over the samples return outputs def init_background(self, data: np.ndarray) -> "MarginalImputer": """Initializes the imputer to the background data. + The background data is used to sample replacement values for the missing features. + To change the background data, use this method. + Args: data: The background data to use for the imputer. The shape of the array must be ``(n_samples, n_features)``. Returns: The initialized imputer. + + Examples: + >>> model = lambda x: np.sum(x, axis=1) + >>> data = np.random.rand(10, 3) + >>> imputer = MarginalImputer(model=model, data=data, x=data[0]) + >>> new_data = np.random.rand(10, 3) + >>> imputer.init_background(data=new_data) """ self.replacement_data = data + if self.sample_size > self.replacement_data.shape[0]: + warnings.warn(UserWarning(_too_large_sample_size_warning)) + self.sample_size = self.replacement_data.shape[0] return self - def _sample_replacement_values(self, coalitions: np.ndarray) -> np.ndarray: - """Samples replacement values from the background data. - - Args: - coalitions: A boolean array indicating which features are present (``True``) and which are - missing (``False``). The shape of the array must be ``(n_subsets, n_features)``. - - Returns: - The sampled replacement values. The shape of the array is ``(sample_size, n_subsets, - n_features)``. - """ - n_coalitions = coalitions.shape[0] - replacement_data = np.zeros( - (self.sample_size, n_coalitions, self._n_features), dtype=object - ) - for feature in range(self._n_features): - sampled_feature_values = self._rng.choice( - self.replacement_data[:, feature], - size=(self.sample_size, n_coalitions), - replace=True, - ) - replacement_data[:, :, feature] = sampled_feature_values + def _sample_replacement_values(self, sample_size: int) -> np.ndarray: + """Samples replacement values from the background data.""" + replacement_data = np.copy(self.replacement_data) + rng = np.random.default_rng(self._random_state) + if not self.joint_marginal_distribution: + for feature in range(self.n_features): + rng.shuffle(replacement_data[:, feature]) + # sample replacement values + n_samples = replacement_data.shape[0] + if sample_size > n_samples: + sample_size = n_samples + warnings.warn(UserWarning(_too_large_sample_size_warning)) + replacement_idx = rng.choice(n_samples, size=sample_size, replace=False) + replacement_data = replacement_data[replacement_idx] return replacement_data def _calc_empty_prediction(self) -> float: @@ -131,5 +144,7 @@ def _calc_empty_prediction(self) -> float: The empty prediction. """ empty_predictions = self.predict(self.replacement_data) - empty_prediction = np.mean(empty_predictions) + empty_prediction = float(np.mean(empty_predictions)) + if self.normalize: # reset the normalization value + self.normalization_value = empty_prediction return empty_prediction diff --git a/tests/conftest.py b/tests/conftest.py index b950c60d..2fa77af4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -103,16 +103,16 @@ def rf_clf_binary_model() -> RandomForestClassifier: @pytest.fixture -def background_reg_data() -> tuple[np.ndarray, np.ndarray]: +def background_reg_data() -> np.ndarray: """Return a simple background dataset.""" - X, y = make_regression(n_samples=100, n_features=7, random_state=42) + X, _ = make_regression(n_samples=100, n_features=7, random_state=42) return X @pytest.fixture -def background_clf_data() -> tuple[np.ndarray, np.ndarray]: +def background_clf_data() -> np.ndarray: """Return a simple background dataset.""" - X, y = make_classification( + X, _ = make_classification( n_samples=100, n_features=7, random_state=42, diff --git a/tests/test_abstract_classes.py b/tests/test_abstract_classes.py index 3e60902c..2f5808de 100644 --- a/tests/test_abstract_classes.py +++ b/tests/test_abstract_classes.py @@ -43,7 +43,7 @@ def model(x): imputer = concreter(Imputer)(model, data) assert imputer.model == model assert np.all(imputer.data == data) - assert imputer._n_features == 3 + assert imputer.n_features == 3 assert imputer._cat_features == [] assert imputer._random_state is None assert imputer._rng is not None diff --git a/tests/tests_imputer/test_base_imputer.py b/tests/tests_imputer/test_base_imputer.py deleted file mode 100644 index 91da0401..00000000 --- a/tests/tests_imputer/test_base_imputer.py +++ /dev/null @@ -1,61 +0,0 @@ -"""This test module contains all tests for the baseline imputer module of the shapiq package.""" - -import numpy as np - -from shapiq.games.imputer import BaselineImputer - - -def test_baseline_imputer_init(): - """Test the initialization of the marginal imputer.""" - - def model(x: np.ndarray) -> np.ndarray: - return np.sum(x, axis=1) - - # get np data set of 10 rows and 3 columns of random numbers - n_features = 3 - data = np.random.rand(10, n_features) - - # init with a baseline vector - imputer = BaselineImputer( - model=model, - data=np.zeros((n_features,)), # baseline vector of shape (n_features,) - x=np.ones((1, n_features)), - random_state=42, - ) - assert imputer.sample_size == 1 # sample size is always 1 for baseline imputer - assert imputer._random_state == 42 - assert imputer._n_features == 3 - - # call with two inputs - imputed_values = imputer(np.array([[False, False, False], [True, False, True]])) - assert len(imputed_values) == 2 - assert imputed_values[0] == imputer.empty_prediction - - # test without x - x = np.random.rand(1, 3) - imputer = BaselineImputer( - model=model, - data=data, - x=None, - random_state=42, - ) - assert imputer._x is None - imputer.fit(x) - assert np.array_equal(imputer.x, x) - assert imputer._n_features == 3 - assert imputer._random_state == 42 - imputer.fit(x=np.ones((n_features,))) # test with vector - assert np.array_equal(imputer.x, np.ones((1, n_features))) - - def model_cat(x: np.ndarray) -> np.ndarray: - return np.zeros(x.shape[0]) - - data = np.asarray([["a", "b", 1], ["c", "d", 2], ["e", "f", 3]]) - categorical_features = [0] # only first specified - imputer = BaselineImputer( - model=model_cat, - data=data, - categorical_features=categorical_features, - random_state=42, - ) - assert imputer._cat_features == [0] diff --git a/tests/tests_imputer/test_baseline_imputer.py b/tests/tests_imputer/test_baseline_imputer.py new file mode 100644 index 00000000..5bb9dab1 --- /dev/null +++ b/tests/tests_imputer/test_baseline_imputer.py @@ -0,0 +1,134 @@ +"""This test module contains all tests for the baseline imputer module of the shapiq package.""" + +import numpy as np +import pytest + +from shapiq.games.imputer import BaselineImputer + + +def test_baseline_init_background(): + """Test the initialization of the baseline imputer.""" + + def model(x: np.ndarray) -> np.ndarray: + return np.zeros(x.shape[0]) + + data = np.array([[1, 2, "a"], [2, 3, "a"], [3, 4, "b"]], dtype=object) + x = np.array([1, 2, 3]) + imputer = BaselineImputer( + model=model, + data=data, + x=x, + random_state=42, + ) + assert np.array_equal(imputer.baseline_values, np.array([[2, 3, "a"]], dtype=object)) + + baseline_values = np.zeros((1, 3)) + imputer.init_background(data=baseline_values) + assert np.array_equal(imputer.baseline_values, baseline_values) + + +def test_baseline_imputer_with_model(dt_reg_model, background_reg_dataset): + """Test the baseline imputer with a real model.""" + # create a coalitions + data, target = background_reg_dataset + x = data[0] + coalitions = [ + [False for _ in range(data.shape[1])], + [False for _ in range(data.shape[1])], + [True for _ in range(data.shape[1])], + ] + coalitions[1][0] = True # first feature is present + coalitions = np.array(coalitions) + + imputer = BaselineImputer( + model=dt_reg_model.predict, + data=data, + x=x, + random_state=42, + normalize=False, + ) + assert np.array_equal(imputer.x[0], x) + assert imputer.sample_size == 1 + assert imputer._random_state == 42 + assert imputer.n_features == data.shape[1] + imputed_values = imputer(coalitions) + assert len(imputed_values) == 3 + + +def test_baseline_imputer_background_computation(background_reg_dataset): + """Checks weather the baseline values from data are computed correctly with the mean/mode.""" + + # set up a dummy model function that returns zeros + def model_cat(x: np.ndarray) -> np.ndarray: + return np.zeros(x.shape[0]) + + # make a feature into a categorical feature + data, target = background_reg_dataset + data = np.copy(data).astype(object) + data[:, 0] = np.random.choice(["a", "b", "c"], size=data.shape[0]) + data[:, 1] = np.random.choice(["a", "b", "c"], size=data.shape[0]) + x = data[0] + + with pytest.warns(UserWarning): # we expect the warning that feature 1 is not numerical + imputer = BaselineImputer( + model=model_cat, + data=data, + x=x, + random_state=42, + categorical_features=[0], # only tell the imputer the first feature is categorical + ) + + # check if the categorical feature is correctly identified + assert imputer._cat_features == [0, 1] + + # check if modes are correct + for feature in imputer._cat_features: + val, count = np.unique(data[:, feature], return_counts=True) + mode = val[np.argmax(count)] + assert imputer.baseline_values[0, feature] == mode + + # check if the mean is correct + assert imputer.baseline_values[0, 2] == np.mean(data[:, 2]) + + +def test_baseline_imputer_init(): + """Test the initialization of the marginal imputer.""" + + def model(x: np.ndarray) -> np.ndarray: + return np.sum(x, axis=1) + + # get np data set of 10 rows and 3 columns of random numbers + n_features = 3 + data = np.random.rand(10, n_features) + + # init with a baseline vector + imputer = BaselineImputer( + model=model, + data=np.zeros((n_features,)), # baseline vector of shape (n_features,) + x=np.ones((1, n_features)), + random_state=42, + ) + assert imputer.sample_size == 1 # sample size is always 1 for baseline imputer + assert imputer._random_state == 42 + assert imputer.n_features == 3 + + # call with two inputs + imputed_values = imputer(np.array([[False, False, False], [True, False, True]])) + assert len(imputed_values) == 2 + assert imputed_values[0] == imputer.empty_prediction + + # test without x + x = np.random.rand(1, 3) + imputer = BaselineImputer( + model=model, + data=data, + x=None, + random_state=42, + ) + assert imputer._x is None + imputer.fit(x) + assert np.array_equal(imputer.x, x) + assert imputer.n_features == 3 + assert imputer._random_state == 42 + imputer.fit(x=np.ones((n_features,))) # test with vector + assert np.array_equal(imputer.x, np.ones((1, n_features))) diff --git a/tests/tests_imputer/test_conditional_imputer.py b/tests/tests_imputer/test_conditional_imputer.py index f9b163f4..b61dc2b0 100644 --- a/tests/tests_imputer/test_conditional_imputer.py +++ b/tests/tests_imputer/test_conditional_imputer.py @@ -12,8 +12,9 @@ def test_conditional_imputer_init(): def model(x: np.ndarray) -> np.ndarray: return np.sum(x, axis=1) - data = np.random.rand(10, 3) - x = np.random.rand(1, 3) + rng = np.random.default_rng(42) + data = rng.random((100, 3)) + x = rng.random((1, 3)) imputer = ConditionalImputer( model=model, @@ -25,7 +26,7 @@ def model(x: np.ndarray) -> np.ndarray: assert np.array_equal(imputer._x, x) assert imputer.sample_size == 9 assert imputer._random_state == 42 - assert imputer._n_features == 3 + assert imputer.n_features == 3 # test raise warning with non generative method with pytest.raises(ValueError): @@ -38,6 +39,20 @@ def model(x: np.ndarray) -> np.ndarray: method="not_generative", ) + # test with conditional sample size higher than 2**n_features + with pytest.warns(UserWarning): + imputer = ConditionalImputer( + model=model, + data=data, + x=x, + sample_size=1, + conditional_budget=2 ** data.shape[1] + 1, # budget for warning here + random_state=42, + conditional_threshold=0.5, # increases the conditional samples drawn + ) + coalitions = np.zeros((1, data.shape[1]), dtype=bool) + imputer(coalitions) + def test_conditional_imputer_value_function(): def model(x: np.ndarray) -> np.ndarray: diff --git a/tests/tests_imputer/test_marginal_imputer.py b/tests/tests_imputer/test_marginal_imputer.py index 5b4e07eb..9d7f9875 100644 --- a/tests/tests_imputer/test_marginal_imputer.py +++ b/tests/tests_imputer/test_marginal_imputer.py @@ -23,7 +23,7 @@ def model(x: np.ndarray) -> np.ndarray: ) assert imputer.sample_size == 10 assert imputer._random_state == 42 - assert imputer._n_features == 3 + assert imputer.n_features == 3 # test with x x = np.random.rand(1, 3) @@ -34,7 +34,7 @@ def model(x: np.ndarray) -> np.ndarray: random_state=42, ) assert np.array_equal(imputer._x, x) - assert imputer._n_features == 3 + assert imputer.n_features == 3 assert imputer._random_state == 42 # check with categorical features and a wrong numerical feature @@ -64,7 +64,7 @@ def model(x: np.ndarray) -> np.ndarray: model=model, data=data, x=np.ones((1, 3)), - sample_size=10, + sample_size=8, random_state=42, )