Skip to content

Commit

Permalink
refactored SOUM and UnanimityGame into Game with slightly updated docs
Browse files Browse the repository at this point in the history
  • Loading branch information
mmschlk committed Apr 10, 2024
1 parent 7c896eb commit 6accf66
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 86 deletions.
41 changes: 24 additions & 17 deletions shapiq/approximator/moebius_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class MoebiusConverter:
n_interactions: A pre-computed array containing the number of interactions up to the size of the index, e.g. n_interactions[4] is the number of interactions up to size 4.
"""

def __init__(self, N: dict, moebius_coefficients: InteractionValues):
def __init__(self, N: set, moebius_coefficients: InteractionValues):
self.N = N
self.n = len(N)
self.moebius_coefficients: InteractionValues = moebius_coefficients
Expand Down Expand Up @@ -136,6 +136,9 @@ def get_moebius_distribution_weight(
)
)
if index == "k-SII":
raise NotImplementedError(
"This does not work currently, workaround is implemented via SII + base_aggregation"
)
# This does not work currently, workaround is implemented via SII and base_aggregation
if moebius_size <= order:
return 1
Expand Down Expand Up @@ -165,10 +168,10 @@ def moebius_to_base_interaction(self, order: int, index: str):
distribution_weights = np.zeros((self.n + 1, order + 1))
for moebius_size in range(1, self.n + 1):
for interaction_size in range(1, min(order, moebius_size) + 1):
distribution_weights[
moebius_size, interaction_size
] = self.get_moebius_distribution_weight(
moebius_size, interaction_size, order, index
distribution_weights[moebius_size, interaction_size] = (
self.get_moebius_distribution_weight(
moebius_size, interaction_size, order, index
)
)

for moebius_set, moebius_val in zip(
Expand Down Expand Up @@ -224,10 +227,10 @@ def stii_routine(self, order: int):

for moebius_size in range(1, self.n + 1):
for interaction_size in range(1, min(order, moebius_size) + 1):
distribution_weights[
moebius_size, interaction_size
] = self.get_moebius_distribution_weight(
moebius_size, interaction_size, order, index
distribution_weights[moebius_size, interaction_size] = (
self.get_moebius_distribution_weight(
moebius_size, interaction_size, order, index
)
)

for moebius_set, moebius_val in zip(
Expand Down Expand Up @@ -288,18 +291,19 @@ def fsii_routine(self, order: int):
distribution_weights = np.zeros((self.n + 1, order + 1))
for moebius_size in range(1, self.n + 1):
for interaction_size in range(1, min(order, moebius_size) + 1):
distribution_weights[
moebius_size, interaction_size
] = self.get_moebius_distribution_weight(
moebius_size, interaction_size, order, index
distribution_weights[moebius_size, interaction_size] = (
self.get_moebius_distribution_weight(
moebius_size, interaction_size, order, index
)
)

for moebius_set, moebius_val in zip(
self.moebius_coefficients.interaction_lookup,
self.moebius_coefficients.values,
):
moebius_size = len(moebius_set)
# For higher-order Möbius sets (size > order) distribute the value among all contained interactions
# For higher-order Möbius sets (size > order) distribute the value among all
# contained interactions
for interaction in powerset(moebius_set, min_size=1, max_size=order):
val_distributed = distribution_weights[moebius_size, len(interaction)]
# Check if Möbius value is distributed onto this interaction
Expand Down Expand Up @@ -338,13 +342,16 @@ def moebius_to_shapley_interaction(self, order, index):

if index == "STII":
shapley_interactions = self.stii_routine(order)
if index == "FSII":
elif index == "FSII":
shapley_interactions = self.fsii_routine(order)
if index == "k-SII":
# The distribution formula for k-SII is not correct. We therefore compute SII and aggregate the values.
elif index == "k-SII":
# The distribution formula for k-SII is not correct. We therefore compute SII and
# aggregate the values.
base_interactions = self.moebius_to_base_interaction(order=order, index="SII")
shapley_interactions = self.base_aggregation(
base_interactions=base_interactions, order=order
)
else:
raise ValueError(f"Index {index} not supported. Please choose from STII, FSII, k-SII.")

return shapley_interactions
132 changes: 68 additions & 64 deletions shapiq/games/soum.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,56 @@
"""This module contains the SOUM class. The SOUM class is constructed from a linear combination of the UnanimityGame Class.
"""
"""This module contains the SOUM class. The SOUM class is constructed from a linear combination of
the UnanimityGame Class."""

from typing import Optional

import numpy as np

from shapiq.games import Game
from shapiq.interaction_values import InteractionValues


def _transform_dict_to_interaction_values(rslt_dict):
rslt = np.zeros(len(rslt_dict))
rslt_pos = {}
for i, (set, val) in enumerate(rslt_dict.items()):
rslt[i] = val
rslt_pos[set] = i
return rslt, rslt_pos


class UnanimityGame:
class UnanimityGame(Game):
"""Unanimity game as basis game in cooperative game theory based on single interaction.
When called, it returns 1, if the coalition contains the interaction, and 0 otherwise.
Args:
budget: The budget for the approximation (i.e., the number of game evaluations).
game: The game function as a callable that takes a set of players and returns the value.
batch_size: The size of the batch. If None, the batch size is set to `budget`.
Defaults to None.
pairing: Whether to use pairwise sampling (`True`) or not (`False`). Defaults to `True`.
Paired sampling can increase the approximation quality.
replacement: Whether to sample with replacement (`True`) or without replacement
(`False`). Defaults to `True`.
interaction_binary: The interaction encoded in a binary vector of shape (n,).
Attributes:
n: The number of players.
N: A set of n players
n_players: The number of players.
interaction_binary: The interaction encoded in a binary vector
interaction: The interaction encoded as a tuple
Examples:
>>> game = UnanimityGame(np.array([0, 1, 0, 1]))
>>> game.n_players
4
>>> coalitions = [[0, 0, 0, 0], [1, 0, 0, 0], [1, 1, 0, 1], [1, 1, 1, 1]]
>>> coalitions = np.array(coalitions).astype(bool)
>>> game(coalitions)
array([0., 0., 1., 1.])
"""

def __init__(self, interaction_binary: np.ndarray):
self.n = len(interaction_binary)
self.N = set(range(self.n))
n = len(interaction_binary)
self.interaction_binary: np.ndarray = interaction_binary
self.interaction: tuple = tuple(np.where(self.interaction_binary == 1)[0])
self.interaction: tuple[int, ...] = tuple(np.where(self.interaction_binary == 1)[0])
super().__init__(n_players=n, normalize=False) # call super class which handles calls

def __call__(self, coalition: np.ndarray) -> np.ndarray[float]:
"""Returns 1, if the coalition contains self.interaction, and zero otherwise.
def value_function(self, coalitions: np.ndarray) -> np.ndarray[float]:
"""Returns 1, if the coalition contains the UnanimityGame's interaction, and zero otherwise.
Args:
coalition: The coalition as a binary vector of shape (n,) or (batch_size, n).
coalitions: The coalition as a binary vector of shape (coalition_size, n_players).
Returns:
The worth of the coalition.
"""
if coalition.ndim == 1:
coalition = coalition.reshape((1, self.n))
worth = np.prod(coalition >= self.interaction_binary, 1)
worth = np.prod(coalitions >= self.interaction_binary, 1)
return worth


class SOUM:
class SOUM(Game):
"""The SOUM constructs a game based on linear combinations of instances of UnanimityGames.
When called, it returns 1, if the coalition contains the interaction, and 0 otherwise.
Expand All @@ -68,97 +61,108 @@ class SOUM:
max_interaction_size: Highest interaction size, if None then set to n
Attributes:
n: The number of players.
N: The set of players (starting from 0 to n - 1).
n_players: The number of players.
n_basis_games: The number of Unanimity gams
unanimity_games: A dictionary containing instances of UnanimityGame
linear_coefficients: A numpy array with coefficients between -1 and 1 for the unanimity games
min_interaction_size: The smallest interaction size
max_interaction_size: The highest interaction size.
moebius_coefficients: The list of non-zero Möbius coefficients used to compute ground truth values
moebius_coefficients: The list of non-zero Möbius coefficients used to compute ground truth
values
Examples:
>>> game = SOUM(4, 3)
>>> game.n_players
4
>>> game.n_basis_games
3
coalitions = [[0, 0, 0, 0], [1, 0, 0, 0], [1, 1, 0, 1], [1, 1, 1, 1]]
>>> coalitions = np.array(coalitions).astype(bool)
>>> game(coalitions)
array([0., 0.25, 1.5, 2.]) # depending on the random linear coefficients this can vary
>>> game.moebius_coefficients
InteractionValues(values=array([0.25, 0.25, 0.25]), index='Moebius', max_order=4, min_order=0, ...)
"""

def __init__(
self,
n,
n: int,
n_basis_games: int,
min_interaction_size: int = None,
max_interaction_size: int = None,
min_interaction_size: Optional[int] = None,
max_interaction_size: Optional[int] = None,
):
self.n = n
self.N = set(range(self.n))
if min_interaction_size is None:
self.min_interaction_size: int = 0
else:
self.min_interaction_size: int = min_interaction_size
if max_interaction_size is None:
self.max_interaction_size: int = self.n
else:
self.max_interaction_size: int = max_interaction_size
# init base game
super().__init__(n_players=n, normalize=False)

self.n_basis_games: int = n_basis_games
# set min_interaction_size and max_interaction_size to 0 and n if not specified
self.min_interaction_size = min_interaction_size if min_interaction_size is not None else 0
self.max_interaction_size = max_interaction_size if max_interaction_size is not None else n

# setup basis games
self.n_basis_games: int = n_basis_games
self.unanimity_games = {}
self.linear_coefficients = np.random.random(size=self.n_basis_games) * 2 - 1
# Compute interaction sizes (exclude size 0)
interaction_sizes = np.random.randint(
low=self.min_interaction_size, high=self.max_interaction_size, size=self.n_basis_games
)
for i, size in enumerate(interaction_sizes):
interaction = np.random.choice(tuple(self.N), size, replace=False)
interaction_binary = np.zeros(self.n)
interaction = np.random.choice(tuple(range(self.n_players)), size, replace=False)
interaction_binary = np.zeros(self.n_players, dtype=int)
interaction_binary[interaction] = 1
self.unanimity_games[i] = UnanimityGame(interaction_binary)

# Compute the Möbius transform
self.moebius_coefficients = self.moebius_transform()

def __call__(self, coalition: np.ndarray) -> np.ndarray[float]:
"""Computes the worth of the coalition for the SOUM, i.e. sums up all linear coefficients, if coalition contains the interaction of the corresponding unanimity game.
def value_function(self, coalitions: np.ndarray[bool]) -> np.ndarray[float]:
"""Computes the worth of the coalition for the SOUM, i.e. sums up all linear coefficients,
if coalition contains the interaction of the corresponding unanimity game.
Args:
coalition: The coalition as a binary vector of shape (n,) or (batch_size, n).
coalitions: The coalition as a binary vector of shape (coalition_size, n).
Returns:
The worth of the coalition.
"""
if coalition.ndim == 1:
coalition = coalition.reshape((1, self.n))

worth = 0
worth = np.zeros(coalitions.shape[0])
for i, game in self.unanimity_games.items():
worth += self.linear_coefficients[i] * game(coalition)
worth += self.linear_coefficients[i] * game(coalitions)
return worth

def moebius_transform(self):
"""Computes the (sparse) Möbius transform of the SOUM from the UnanimityGames. Used for ground truth calculations for interaction inidices.
"""Computes the (sparse) Möbius transform of the SOUM from the UnanimityGames. Used for
ground truth calculations for interaction indices.
Args:
Returns:
An InteractionValues object containing all non-zero Möbius coefficients of the SOUM.
"""
# fill the moebius coefficients dict from the game
moebius_coefficients_dict = {}
for i, game in self.unanimity_games.items():
if game.interaction in moebius_coefficients_dict:
try:
moebius_coefficients_dict[game.interaction] += self.linear_coefficients[i]
else:
except KeyError:
moebius_coefficients_dict[game.interaction] = self.linear_coefficients[i]

# generate the lookup for the moebius values
moebius_coefficients_values = np.zeros(len(moebius_coefficients_dict))
moebius_coefficients_lookup = {}
for i, (key, val) in enumerate(moebius_coefficients_dict.items()):
moebius_coefficients_values[i] = val
moebius_coefficients_lookup[key] = i

# handle baseline value and set to 0 if no empty set is present
baseline_value = 0 if tuple() not in moebius_coefficients_dict else None

moebius_coefficients = InteractionValues(
values=moebius_coefficients_values,
index="Moebius",
max_order=self.n,
max_order=self.n_players,
min_order=0,
n_players=self.n,
n_players=self.n_players,
interaction_lookup=moebius_coefficients_lookup,
estimated=False,
baseline_value=baseline_value,
Expand Down
2 changes: 1 addition & 1 deletion shapiq/interaction_values.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class InteractionValues:
max_order: int
min_order: int
n_players: int
interaction_lookup: dict[tuple[int], int] = None
interaction_lookup: dict[tuple[int, ...], int] = None
estimated: bool = True
estimation_budget: Optional[int] = None
baseline_value: Optional[float] = None
Expand Down
5 changes: 5 additions & 0 deletions tests/test_base_interaction_values.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ def test_initialization(index, n, min_order, max_order, estimation_budget, estim
assert interaction_values.estimated == estimated
assert interaction_values.interaction_lookup == interaction_lookup

# test dict_values property
assert interaction_values.dict_values == {
interaction: value for interaction, value in zip(interaction_lookup, values)
}

# check that default values are set correctly
interaction_values_2 = InteractionValues(
values=np.random.rand(len(interaction_lookup)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


def test_soum_moebius_conversion():
for i in range(100):
for i in range(10):
n = np.random.randint(low=2, high=20)
order = np.random.randint(low=1, high=min(n, 5))
n_basis_games = np.random.randint(low=1, high=100)
Expand All @@ -14,7 +14,8 @@ def test_soum_moebius_conversion():
predicted_value = soum(np.ones(n))[0]
emptyset_prediction = soum(np.zeros(n))[0]

moebius_converter = MoebiusConverter(soum.N, soum.moebius_coefficients)
player_set: set = set(range(soum.n_players))
moebius_converter = MoebiusConverter(player_set, soum.moebius_coefficients)

shapley_interactions = {}
for index in ["STII", "k-SII", "FSII"]:
Expand Down
6 changes: 4 additions & 2 deletions tests/tests_games/test_games_soum.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""This test module tests the SOUM Game class"""

from shapiq.games.soum import SOUM, UnanimityGame
import numpy as np


def test_soum_interations():
"""Test SOUM interactions."""

for i in range(100):
for i in range(10):
# run 100 times
n = np.random.randint(low=2, high=30)
M = np.random.randint(low=1, high=150)
Expand All @@ -16,7 +18,7 @@ def test_soum_interations():
coalition_matrix = np.random.randint(2, size=(M, n))
u_game_values = u_game(coalition_matrix)
assert len(u_game_values) == M
assert u_game.n == n
assert u_game.n_players == n
assert np.sum(u_game.interaction_binary) == len(u_game.interaction)
assert np.sum(interaction) == len(u_game.interaction)

Expand Down

0 comments on commit 6accf66

Please sign in to comment.