Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - Parallelization of DistToPoint metrics #217

Merged
merged 15 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/qp/metrics/base_metric_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ class DistToPointMetric(BaseMetric):
def evaluate(self, estimate, reference):
raise NotImplementedError()

def initialize(self): #pragma: no cover
pass

def accumulate(self, estimate, reference): #pragma: no cover
raise NotImplementedError()

def finalize(self): #pragma: no cover
raise NotImplementedError()

class PointToPointMetric(BaseMetric):
"""A base class for metrics that require a point estimate as input for both
Expand Down
10 changes: 10 additions & 0 deletions src/qp/metrics/brier.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ def evaluate(self):
self._validate_data()
return self._calculate_metric()

def accumulate(self):
self._manipulate_data()
self._validate_data()
return self._calculate_metric_for_accumulation()

def _manipulate_data(self):
"""
Placeholder for data manipulation as required. i.e. converting from
Expand Down Expand Up @@ -95,3 +100,8 @@ def _calculate_metric(self):
return np.mean(
np.sum((self._prediction - self._truth) ** 2, axis=self._axis_for_summation)
)

def _calculate_metric_for_accumulation(self):
return np.sum(
np.sum((self._prediction - self._truth) ** 2, axis=self._axis_for_summation)
)
114 changes: 105 additions & 9 deletions src/qp/metrics/concrete_metric_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import numpy as np

from qp.ensemble import Ensemble
from qp.metrics.base_metric_classes import (
MetricOutputType,
DistToDistMetric,
Expand All @@ -11,6 +10,7 @@
)
from qp.metrics.metrics import (
calculate_brier,
calculate_brier_for_accumulation,
calculate_goodness_of_fit,
calculate_kld,
calculate_moment,
Expand All @@ -20,11 +20,56 @@
)
from qp.metrics.pit import PIT

from pytdigest import TDigest
from functools import reduce
from operator import add


class DistToPointMetricDigester(DistToPointMetric):

def __init__(self, tdigest_compression: int = 1000, **kwargs) -> None:
super().__init__(**kwargs)
self._tdigest_compression = tdigest_compression

def initialize(self):
pass

def accumulate(self, estimate, reference): #pragma: no cover
raise NotImplementedError()

def finalize(self, centroids: np.ndarray = []):
"""This function combines all the centroids that were calculated for the
input estimate and reference subsets and returns the resulting TDigest
object.

Parameters
----------
centroids : Numpy 2d array, optional
The output collected from prior calls to `accumulate`, by default []

Returns
-------
float
The result of the specific metric calculation defined in the subclasses
`compute_from_digest` method.
"""
digests = (
TDigest.of_centroids(np.array(centroid), compression=self._tdigest_compression)
for centroid in centroids
)
digest = reduce(add, digests)

return self.compute_from_digest(digest)

def compute_from_digest(self, digest): #pragma: no cover
raise NotImplementedError()


class MomentMetric(SingleEnsembleMetric):
"""Class wrapper around the `calculate_moment` function."""

metric_name = "moment"
metric_output_type = MetricOutputType.one_value_per_distribution

def __init__(
self,
Expand Down Expand Up @@ -80,7 +125,7 @@ def evaluate(self, estimate) -> list:


#! Should this be implemented as `DistToPointMetric` or `DistToDistMetric` ???
class BrierMetric(DistToPointMetric):
class BrierMetric(DistToPointMetricDigester):
"""Class wrapper around the calculate_brier function. (Which itself is a
wrapper around the `Brier` metric evaluator class).
"""
Expand All @@ -89,11 +134,23 @@ class BrierMetric(DistToPointMetric):
metric_output_type = MetricOutputType.one_value_per_distribution

def __init__(self, limits: tuple = (0.0, 3.0), dx: float = 0.01, **kwargs) -> None:
super().__init__(limits, dx)
kwargs.update({"limits": limits, "dx": dx})
super().__init__(**kwargs)

def evaluate(self, estimate, reference) -> list:
return calculate_brier(estimate, reference, self._limits, self._dx)

def accumulate(self, estimate, reference):
brier_sum_npdf_tuple = calculate_brier_for_accumulation(estimate, reference, self._limits, self._dx)
return brier_sum_npdf_tuple

def finalize(self, tuples):
# tuples is a list of tuples. The first value in the tuple is the Brier sum
# The second value is the number of PDFs
summed_terms = np.sum(np.atleast_2d(tuples), axis=0)

# calculate the mean from the summed terms
return summed_terms[0] / summed_terms[1]

class OutlierMetric(SingleEnsembleMetric):
"""Class wrapper around the outlier calculation metric."""
Expand Down Expand Up @@ -204,24 +261,47 @@ def evaluate(self, estimate, reference) -> list:
)


#! Confirm metric output type - perhaps a new type is appropriate ???
class PITMetric(DistToPointMetric):
class PITMetric(DistToPointMetricDigester):
"""Class wrapper for the PIT Metric class."""

metric_name = "pit"
metric_output_type = MetricOutputType.single_distribution
default_eval_grid = np.linspace(0, 1, 100)

def __init__(self, eval_grid: list = default_eval_grid, **kwargs) -> None:
super().__init__()
super().__init__(**kwargs)
self._eval_grid = eval_grid

def evaluate(self, estimate, reference) -> Ensemble:
def evaluate(self, estimate, reference):
pit_object = PIT(estimate, reference, self._eval_grid)
return pit_object.pit


class CDELossMetric(DistToPointMetric):
def accumulate(self, estimate, reference):
pit_samples = PIT(estimate, reference, self._eval_grid)._gather_pit_samples(estimate, reference)
digest = TDigest.compute(pit_samples, compression=self._tdigest_compression)
centroids = digest.get_centroids()
return centroids

def compute_from_digest(self, digest):
# Since we use equal weights for all the values in the digest
# digest.weight is the total number of values, it is stored as a float,
# so we cast to int.
eval_grid = self._eval_grid
total_samples = int(digest.weight)
n_pit = np.min([total_samples, len(eval_grid)])
if n_pit < len(eval_grid): # pragma: no cover
#! TODO: Determine what the appropriate style of logging is going to be for metrics.
print(
"Number of pit samples is smaller than the evaluation grid size. "
"Will create a new evaluation grid with size = number of pit samples"
)
eval_grid = np.linspace(0, 1, n_pit)

data_quants = digest.inverse_cdf(eval_grid)
return PIT._produce_output_ensemble(data_quants, eval_grid)


class CDELossMetric(DistToPointMetricDigester):
"""Conditional density loss"""

metric_name = "cdeloss"
Expand All @@ -248,3 +328,19 @@ def evaluate(self, estimate, reference):
term2 = np.mean(pdfs[range(npdf), nns])
cdeloss = term1 - 2 * term2
return cdeloss

def accumulate(self, estimate, reference):
pdfs = estimate.pdf(self._xvals)
npdf = estimate.npdf
term1_sum = np.sum(np.trapz(pdfs**2, x=self._xvals))

nns = [np.argmin(np.abs(self._xvals - z)) for z in reference]
term2_sum = np.sum(pdfs[range(npdf), nns])

return (term1_sum, term2_sum, npdf)

def finalize(self, tuples):
summed_terms = np.sum(np.atleast_2d(tuples), axis=0)
term1 = summed_terms[0] / summed_terms[2]
term2 = summed_terms[1] / summed_terms[2]
return term1 - 2 * term2
63 changes: 36 additions & 27 deletions src/qp/metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,32 +217,7 @@ def evaluate_pdf_at_z(z, dist):
return np.array(rbpes)


def calculate_brier(p, truth, limits, dx=0.01):
"""This function will do the following:

1) Generate a Mx1 sized grid based on `limits` and `dx`.
2) Produce an NxM array by evaluating the pdf for each of the N distribution objects
in the Ensemble p on the grid.
3) Produce an NxM truth_array using the input truth and the generated grid. All values will be 0 or 1.
4) Create a Brier metric evaluation object
5) Return the result of the Brier metric calculation.

Parameters
----------
p: qp.Ensemble object
of N distributions probability distribution functions that will be gridded and compared against truth.
truth: Nx1 sequence
the list of true values, 1 per distribution in p.
limits: 2-tuple of floats
endpoints grid to evaluate the PDFs for the distributions in p
dx: float
resolution of the grid Defaults to 0.01.

Returns
-------
Brier_metric: float
"""

def _prepare_for_brier(p, truth, limits, dx=0.01):
# Ensure that the number of distributions objects in the Ensemble
# is equal to the length of the truth array
if p.npdf != len(truth):
Expand Down Expand Up @@ -270,11 +245,45 @@ def calculate_brier(p, truth, limits, dx=0.01):
truth_array = np.squeeze([np.histogram(t, grid.hist_bin_edges)[0] for t in truth])

# instantiate the Brier metric object
brier_metric_evaluation = Brier(pdf_values, truth_array)
return Brier(pdf_values, truth_array)

def calculate_brier(p, truth, limits, dx=0.01):
"""This function will do the following:

1) Generate a Mx1 sized grid based on `limits` and `dx`.
2) Produce an NxM array by evaluating the pdf for each of the N distribution objects
in the Ensemble p on the grid.
3) Produce an NxM truth_array using the input truth and the generated grid. All values will be 0 or 1.
4) Create a Brier metric evaluation object
5) Return the result of the Brier metric calculation.

Parameters
----------
p: qp.Ensemble object
of N distributions probability distribution functions that will be gridded and compared against truth.
truth: Nx1 sequence
the list of true values, 1 per distribution in p.
limits: 2-tuple of floats
endpoints grid to evaluate the PDFs for the distributions in p
dx: float
resolution of the grid Defaults to 0.01.

Returns
-------
Brier_metric: float
"""

brier_metric_evaluation = _prepare_for_brier(p, truth, limits, dx)

# return the results of evaluating the Brier metric
return brier_metric_evaluation.evaluate()

def calculate_brier_for_accumulation(p, truth, limits, dx=0.01):
brier_metric_evaluation = _prepare_for_brier(p, truth, limits, dx)

brier_sum = brier_metric_evaluation.accumulate()

return (brier_sum, p.npdf)

@deprecated(
reason="""
Expand Down
65 changes: 34 additions & 31 deletions src/qp/metrics/pit.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,8 @@ def __init__(self, qp_ens, true_vals, eval_grid=DEFAULT_QUANTS):
eval_grid : [float], optional
A strictly increasing array-like sequence in the range [0,1], by default DEFAULT_QUANTS
"""
self._true_vals = true_vals

# For each distribution in the Ensemble, calculate the CDF where x = known_true_value
self._pit_samps = np.array(
[
qp_ens[i].cdf(self._true_vals[i])[0][0]
for i in range(len(self._true_vals))
]
)

# These two lines set all `NaN` values to 0. This may or may not make sense
# Alternatively if it's better to simply remove the `NaN`, this can be done
# efficiently on line 61 with `data_quants = np.nanquantile(...)`.`
samp_mask = np.isfinite(self._pit_samps)
self._pit_samps[~samp_mask] = 0
if not np.all(samp_mask): #pragma: no cover
logging.warning(
"Some PIT samples were `NaN`. They have been replacd with 0."
)
self._pit_samps = self._gather_pit_samples(qp_ens, true_vals)

n_pit = np.min([len(self._pit_samps), len(eval_grid)])
if n_pit < len(eval_grid):
Expand All @@ -72,19 +55,7 @@ def __init__(self, qp_ens, true_vals, eval_grid=DEFAULT_QUANTS):

data_quants = np.quantile(self._pit_samps, eval_grid)

# Remove duplicates values as well as values outside the range (0,1)
_, unique_indices = np.unique(data_quants, return_index=True)
unique_data_quants = data_quants[unique_indices]
unique_eval_grid = eval_grid[unique_indices]
quant_mask = self._create_quant_mask(unique_data_quants)

self._pit = qp.Ensemble(
qp.quant,
data=dict(
quants=unique_eval_grid[quant_mask],
locs=np.atleast_2d(unique_data_quants[quant_mask]),
),
)
self._pit = self._produce_output_ensemble(data_quants, eval_grid)

@property
def pit_samps(self):
Expand Down Expand Up @@ -201,6 +172,38 @@ def evaluate_PIT_outlier_rate(self, pit_min=0.0001, pit_max=0.9999):
"""
return calculate_outlier_rate(self._pit, pit_min, pit_max)[0]

@classmethod
def _gather_pit_samples(cls, qp_ens, true_vals):
pit_samples = np.squeeze(qp_ens.cdf(np.vstack(true_vals)))

# These two lines set all `NaN` values to 0. This may or may not make sense
# Alternatively if it's better to simply remove the `NaN`, this can be done
# efficiently on line 61 with `data_quants = np.nanquantile(...)`.`
sample_mask = np.isfinite(pit_samples)
pit_samples[~sample_mask] = 0
if not np.all(sample_mask): #pragma: no cover
logging.warning(
"Some PIT samples were `NaN`. They have been replacd with 0."
)

return pit_samples

@classmethod
def _produce_output_ensemble(cls, data_quants, eval_grid):
# Remove duplicates values as well as values outside the range (0,1)
_, unique_indices = np.unique(data_quants, return_index=True)
unique_data_quants = data_quants[unique_indices]
unique_eval_grid = eval_grid[unique_indices]
quant_mask = cls._create_quant_mask(unique_data_quants)

return qp.Ensemble(
qp.quant,
data=dict(
quants=unique_eval_grid[quant_mask],
locs=np.atleast_2d(unique_data_quants[quant_mask]),
),
)

@classmethod
def _create_quant_mask(cls, data_quants):
"""Create a numpy mask such that, when applied only values greater than
Expand Down
Loading
Loading