Skip to content

Commit

Permalink
finishes baseline imputer and adds joint-marginal-dist. to MarginalIm…
Browse files Browse the repository at this point in the history
…puter
  • Loading branch information
mmschlk committed Oct 29, 2024
1 parent 93bcf63 commit 60e8f79
Show file tree
Hide file tree
Showing 10 changed files with 260 additions and 134 deletions.
16 changes: 11 additions & 5 deletions shapiq/games/imputer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ class Imputer(Game):
with shape ``(n_samples, n_features)``.
categorical_features: A list of indices of the categorical features in the background data.
random_state: The random state to use for sampling. Defaults to ``None``.
Attributes:
n_features: The number of features in the data (equals the number of players in the game).
data: The background data to use for the imputer.
model: The model to impute missing values for as a callable function.
sample_size: The number of samples to draw from the background data.
"""

@abstractmethod
Expand All @@ -33,15 +39,15 @@ def __init__(
) -> None:
if callable(model):
self._predict_function = utils.predict_callable
else: # shapiq.Explainer
else: # shapiq.Explainer adds a predict function to the model to make it callable
self._predict_function = model._predict_function
self.model = model
# check if data is a vector
if data.ndim == 1:
data = data.reshape(1, data.shape[0])
self.data = data
self.sample_size = sample_size
self._n_features = self.data.shape[1]
self.n_features = self.data.shape[1]
self._cat_features: list = [] if categorical_features is None else categorical_features
self._random_state = random_state
self._rng = np.random.default_rng(self._random_state)
Expand All @@ -52,7 +58,7 @@ def __init__(
self.fit(x)

# the normalization_value needs to be set in the subclass
super().__init__(n_players=self._n_features, normalize=False)
super().__init__(n_players=self.n_features, normalize=False)

@property
def x(self) -> Optional[np.ndarray]:
Expand All @@ -73,7 +79,7 @@ def fit(self, x: np.ndarray) -> "Imputer":
Returns:
The fitted imputer.
"""
if x.ndim == 1:
x = x.reshape(1, x.shape[0])
self._x = x.copy()
if self._x.ndim == 1:
self._x = self._x.reshape(1, x.shape[0])
return self
42 changes: 32 additions & 10 deletions shapiq/games/imputer/baseline_imputer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Implementation of the baseline imputer."""

import warnings
from typing import Optional

import numpy as np
Expand Down Expand Up @@ -45,7 +46,7 @@ def __init__(
super().__init__(model, data, x, 1, categorical_features, random_state)

# setup attributes
self.baseline_values: np.ndarray = np.zeros((1, self._n_features)) # will be overwritten
self.baseline_values: np.ndarray = np.zeros((1, self.n_features)) # will be overwritten
self.init_background(self.data)

# set empty value and normalization
Expand Down Expand Up @@ -76,28 +77,46 @@ def init_background(self, data: np.ndarray) -> "BaselineImputer":
Args:
data: The background data to use for the imputer. Either a vector of baseline values
of shape ``(1, n_features)`` or a matrix of shape ``(n_samples, n_features)``.
of shape ``(n_features,)`` or a matrix of shape ``(n_samples, n_features)``.
If the data is a matrix, the baseline values are calculated from the data.
Returns:
The initialized imputer.
Examples:
>>> import numpy as np
>>> from shapiq.games.imputer import BaselineImputer
>>> data = np.array([[1, 2, "a"], [2, 3, "a"], [2, 4, "b"]], dtype=object)
>>> x = np.array([1, 2, 3])
>>> imputer = BaselineImputer(model=lambda x: np.sum(x, axis=1), data=data, x=x)
>>> imputer.baseline_values
array([[1.66, 3, 'a']], dtype=object) # computed from data
>>> baseline_vector = np.array([0, 0, 0])
>>> imputer.init_background(baseline_vector)
>>> imputer.baseline_values
array([[0, 0, 0]]) # given as input
"""
if data.ndim == 1 or data.shape[0] == 1: # data is a vector -> use as baseline values
self.baseline_values = data.reshape(1, self._n_features)
self.baseline_values = data.reshape(1, self.n_features)
return self
# data is a matrix -> calculate baseline values as mean or mode
self.baseline_values = np.zeros((1, self._n_features), dtype=object)
for feature in range(self._n_features):
self.baseline_values = np.zeros((1, self.n_features), dtype=object)
for feature in range(self.n_features):
feature_column = data[:, feature]
if feature in self._cat_features: # get mode for categorical features
counts = np.unique(feature_column, return_counts=True)
summarized_feature = counts[0][np.argmax(counts[1])]
values, counts = np.unique(feature_column, return_counts=True)
summarized_feature = values[np.argmax(counts)]
else:
try: # try to use mean for numerical features
summarized_feature = np.mean(feature_column)
except TypeError: # fallback to mode for potentially string features
counts = np.unique(feature_column, return_counts=True)
summarized_feature = counts[0][np.argmax(counts[1])]
values, counts = np.unique(feature_column, return_counts=True)
summarized_feature = values[np.argmax(counts)]
# add feature to categorical features
warnings.warn(
f"Feature {feature} is not numerical. Adding it to categorical features."
)
self._cat_features.append(feature)
self.baseline_values[0, feature] = summarized_feature
return self

Expand All @@ -108,4 +127,7 @@ def _calc_empty_prediction(self) -> float:
The empty prediction.
"""
empty_predictions = self.predict(self.baseline_values)
return float(empty_predictions[0])
empty_prediction = float(empty_predictions[0])
if self.normalize: # reset the normalization value
self.normalization_value = empty_prediction
return empty_prediction
11 changes: 3 additions & 8 deletions shapiq/games/imputer/conditional_imputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,20 +134,15 @@ def _sample_background_data(self) -> np.ndarray:
The sampled replacement values. The shape of the array is (sample_size, n_subsets,
n_features).
"""
try:
x_embedded = self._tree_embedder.apply(self._x)
except ValueError: # not correct shape
x_embedded = self._tree_embedder.apply(self._x.reshape(1, -1))
x_embedded = self._tree_embedder.apply(self._x)
distances = hamming_distance(self._data_embedded, x_embedded)
conditional_data = self.data[
distances <= np.quantile(distances, self.conditional_threshold)
]
if self.sample_size < conditional_data.shape[0]:
idc = self._rng.choice(conditional_data.shape[0], size=self.sample_size, replace=False)
background_data = conditional_data[idc, :]
else:
background_data = conditional_data
return background_data
return conditional_data[idc, :]
return conditional_data

def _calc_empty_prediction(self) -> float:
"""Runs the model on empty data points (all features missing) to get the empty prediction.
Expand Down
93 changes: 54 additions & 39 deletions shapiq/games/imputer/marginal_imputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@

from shapiq.games.imputer.base import Imputer

_too_large_sample_size_warning = (
"The sample size is larger than the number of data points in the background set. "
"Reducing the sample size to the number of background samples."
)
_deprecated_sample_replacements_warning = (
"The 'sample_replacements' argument is deprecated and will be removed in the next release. "
"The marginal imputer now always samples from the background data."
)


class MarginalImputer(Imputer):
"""The marginal imputer for the shapiq package.
Expand All @@ -31,8 +40,8 @@ class MarginalImputer(Imputer):
random_state: The random state to use for sampling. Defaults to ``None``.
Attributes:
replacement_data: The data to use for imputation. Either samples from the background data
or the mean / median of the background data.
replacement_data: The data to use for imputation. To change the data, use the
``init_background`` method.
empty_prediction: The model's prediction on an empty data point (all features missing).
"""

Expand All @@ -44,19 +53,17 @@ def __init__(
sample_replacements: bool = True,
sample_size: int = 100,
categorical_features: list[int] = None,
joint_marginal_distribution: bool = False,
normalize: bool = True,
random_state: Optional[int] = None,
) -> None:
if not sample_replacements:
warnings.warn(
"The 'sample_replacements' argument is deprecated and will be removed in the next "
"release. The marginal imputer now always samples from the background data.",
DeprecationWarning,
)
warnings.warn(DeprecationWarning(_deprecated_sample_replacements_warning))
super().__init__(model, data, x, sample_size, categorical_features, random_state)

# setup attributes
self.replacement_data: np.ndarray = np.zeros((1, self._n_features)) # will be overwritten
self.joint_marginal_distribution: bool = joint_marginal_distribution
self.replacement_data: np.ndarray = np.zeros((1, self.n_features)) # will be overwritten
self.init_background(self.data)

# set empty value and normalization
Expand All @@ -76,52 +83,58 @@ def value_function(self, coalitions: np.ndarray) -> np.ndarray:
``(n_subsets, n_outputs)``.
"""
n_coalitions = coalitions.shape[0]
data = np.tile(np.copy(self._x), (n_coalitions, 1))
# sampling from background returning array of shape (sample_size, n_subsets, n_features)
replacement_data = self._sample_replacement_values(coalitions)
outputs = np.zeros((self.sample_size, n_coalitions))
for i in range(self.sample_size):
replacements = replacement_data[i].reshape(n_coalitions, self._n_features)
data[~coalitions] = replacements[~coalitions]
outputs[i] = self.predict(data)
replacement_data = self._sample_replacement_values(self.sample_size)
sample_size = replacement_data.shape[0]
outputs = np.zeros((sample_size, n_coalitions))
imputed_data = np.tile(np.copy(self._x), (n_coalitions, 1))
for j in range(sample_size):
for i in range(n_coalitions):
imputed_data[i, ~coalitions[i]] = replacement_data[j, ~coalitions[i]]
predictions = self.predict(imputed_data)
outputs[j] = predictions
outputs = np.mean(outputs, axis=0) # average over the samples
return outputs

def init_background(self, data: np.ndarray) -> "MarginalImputer":
"""Initializes the imputer to the background data.
The background data is used to sample replacement values for the missing features.
To change the background data, use this method.
Args:
data: The background data to use for the imputer. The shape of the array must
be ``(n_samples, n_features)``.
Returns:
The initialized imputer.
Examples:
>>> model = lambda x: np.sum(x, axis=1)
>>> data = np.random.rand(10, 3)
>>> imputer = MarginalImputer(model=model, data=data, x=data[0])
>>> new_data = np.random.rand(10, 3)
>>> imputer.init_background(data=new_data)
"""
self.replacement_data = data
if self.sample_size > self.replacement_data.shape[0]:
warnings.warn(UserWarning(_too_large_sample_size_warning))
self.sample_size = self.replacement_data.shape[0]
return self

def _sample_replacement_values(self, coalitions: np.ndarray) -> np.ndarray:
"""Samples replacement values from the background data.
Args:
coalitions: A boolean array indicating which features are present (``True``) and which are
missing (``False``). The shape of the array must be ``(n_subsets, n_features)``.
Returns:
The sampled replacement values. The shape of the array is ``(sample_size, n_subsets,
n_features)``.
"""
n_coalitions = coalitions.shape[0]
replacement_data = np.zeros(
(self.sample_size, n_coalitions, self._n_features), dtype=object
)
for feature in range(self._n_features):
sampled_feature_values = self._rng.choice(
self.replacement_data[:, feature],
size=(self.sample_size, n_coalitions),
replace=True,
)
replacement_data[:, :, feature] = sampled_feature_values
def _sample_replacement_values(self, sample_size: int) -> np.ndarray:
"""Samples replacement values from the background data."""
replacement_data = np.copy(self.replacement_data)
rng = np.random.default_rng(self._random_state)
if not self.joint_marginal_distribution:
for feature in range(self.n_features):
rng.shuffle(replacement_data[:, feature])
# sample replacement values
n_samples = replacement_data.shape[0]
if sample_size > n_samples:
sample_size = n_samples
warnings.warn(UserWarning(_too_large_sample_size_warning))
replacement_idx = rng.choice(n_samples, size=sample_size, replace=False)
replacement_data = replacement_data[replacement_idx]
return replacement_data

def _calc_empty_prediction(self) -> float:
Expand All @@ -131,5 +144,7 @@ def _calc_empty_prediction(self) -> float:
The empty prediction.
"""
empty_predictions = self.predict(self.replacement_data)
empty_prediction = np.mean(empty_predictions)
empty_prediction = float(np.mean(empty_predictions))
if self.normalize: # reset the normalization value
self.normalization_value = empty_prediction
return empty_prediction
8 changes: 4 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,16 @@ def rf_clf_binary_model() -> RandomForestClassifier:


@pytest.fixture
def background_reg_data() -> tuple[np.ndarray, np.ndarray]:
def background_reg_data() -> np.ndarray:
"""Return a simple background dataset."""
X, y = make_regression(n_samples=100, n_features=7, random_state=42)
X, _ = make_regression(n_samples=100, n_features=7, random_state=42)
return X


@pytest.fixture
def background_clf_data() -> tuple[np.ndarray, np.ndarray]:
def background_clf_data() -> np.ndarray:
"""Return a simple background dataset."""
X, y = make_classification(
X, _ = make_classification(
n_samples=100,
n_features=7,
random_state=42,
Expand Down
2 changes: 1 addition & 1 deletion tests/test_abstract_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def model(x):
imputer = concreter(Imputer)(model, data)
assert imputer.model == model
assert np.all(imputer.data == data)
assert imputer._n_features == 3
assert imputer.n_features == 3
assert imputer._cat_features == []
assert imputer._random_state is None
assert imputer._rng is not None
Expand Down
61 changes: 0 additions & 61 deletions tests/tests_imputer/test_base_imputer.py

This file was deleted.

Loading

0 comments on commit 60e8f79

Please sign in to comment.