From 8c3c463e1c48150c2dcd29176d0fd2f545678506 Mon Sep 17 00:00:00 2001 From: Drew Oldag Date: Wed, 28 Feb 2024 21:11:58 -0800 Subject: [PATCH 01/14] WIP - initial parallelization of PIT metric. Not fully tested yet. --- src/qp/metrics/base_metric_classes.py | 8 +++ src/qp/metrics/concrete_metric_classes.py | 69 +++++++++++++++++++++- src/qp/metrics/pit.py | 70 +++++++++++++---------- tests/qp/test_metrics.py | 1 - tests/qp/test_pit.py | 1 - 5 files changed, 114 insertions(+), 35 deletions(-) diff --git a/src/qp/metrics/base_metric_classes.py b/src/qp/metrics/base_metric_classes.py index 5e29e80..5efa360 100644 --- a/src/qp/metrics/base_metric_classes.py +++ b/src/qp/metrics/base_metric_classes.py @@ -139,6 +139,14 @@ class DistToPointMetric(BaseMetric): def evaluate(self, estimate, reference): raise NotImplementedError() + def initialize(self): #pragma: no cover + pass + + def accumulate(self, estimate, reference): #pragma: no cover + raise NotImplementedError() + + def finalize(self): #pragma: no cover + raise NotImplementedError() class PointToPointMetric(BaseMetric): """A base class for metrics that require a point estimate as input for both diff --git a/src/qp/metrics/concrete_metric_classes.py b/src/qp/metrics/concrete_metric_classes.py index 1e1daf7..c9cb57d 100644 --- a/src/qp/metrics/concrete_metric_classes.py +++ b/src/qp/metrics/concrete_metric_classes.py @@ -20,6 +20,50 @@ ) from qp.metrics.pit import PIT +from pytdigest import TDigest +from functools import reduce +from operator import add + + +class DistToPointMetricDigester(DistToPointMetric): + + def __init__(self, tdigest_compression: int = 1000, **kwargs) -> None: + super().__init__() + self._tdigest_compression = tdigest_compression + + def initialize(self): + pass + + def accumulate(self, estimate, reference): #pragma: no cover + raise NotImplementedError() + + def finalize(self, centroids: np.ndarray = []): + """This function combines all the centroids that were calculated for the + input estimate and reference subsets and returns the resulting TDigest + object. + + Parameters + ---------- + centroids : Numpy 2d array, optional + The output collected from prior calls to `accumulate`, by default [] + + Returns + ------- + float + The result of the specific metric calculation defined in the subclasses + `compute_from_digest` method. + """ + digests = ( + TDigest.of_centroids(np.array(centroid), compression=self._tdigest_compression) + for centroid in centroids + ) + digest = reduce(add, digests) + + return self.compute_from_digest(digest) + + def compute_from_digest(self, digest): #pragma: no cover + raise NotImplementedError() + class MomentMetric(SingleEnsembleMetric): """Class wrapper around the `calculate_moment` function.""" @@ -204,8 +248,7 @@ def evaluate(self, estimate, reference) -> list: ) -#! Confirm metric output type - perhaps a new type is appropriate ??? -class PITMetric(DistToPointMetric): +class PITMetric(DistToPointMetricDigester): """Class wrapper for the PIT Metric class.""" metric_name = "pit" @@ -220,6 +263,28 @@ def evaluate(self, estimate, reference) -> Ensemble: pit_object = PIT(estimate, reference, self._eval_grid) return pit_object.pit + def accumulate(self, estimate, reference): + pit_samples = PIT(estimate, reference, self._eval_grid)._gather_pit_samples(estimate, reference) + digest = TDigest.compute(pit_samples, compression=self._tdigest_compression) + centroids = digest.get_centroids() + return centroids + + def compute_from_digest(self, digest): + # Since we use equal weights for all the values in the digest + # digest.weight is the total number of values, it is stored as a float, + # so we cast to int. + total_samples = int(digest.weight) + n_pit = np.min([total_samples, len(eval_grid)]) + if n_pit < len(eval_grid): + #! TODO: Determine what the appropriate style of logging is going to be for metrics. + print( + "Number of pit samples is smaller than the evaluation grid size. " + "Will create a new evaluation grid with size = number of pit samples" + ) + eval_grid = np.linspace(0, 1, n_pit) + + data_quants = digest.inverse_cdf(eval_grid) + PIT()._produce_output_ensemble(data_quants, eval_grid) class CDELossMetric(DistToPointMetric): """Conditional density loss""" diff --git a/src/qp/metrics/pit.py b/src/qp/metrics/pit.py index f434240..448ce8d 100644 --- a/src/qp/metrics/pit.py +++ b/src/qp/metrics/pit.py @@ -42,25 +42,8 @@ def __init__(self, qp_ens, true_vals, eval_grid=DEFAULT_QUANTS): eval_grid : [float], optional A strictly increasing array-like sequence in the range [0,1], by default DEFAULT_QUANTS """ - self._true_vals = true_vals - # For each distribution in the Ensemble, calculate the CDF where x = known_true_value - self._pit_samps = np.array( - [ - qp_ens[i].cdf(self._true_vals[i])[0][0] - for i in range(len(self._true_vals)) - ] - ) - - # These two lines set all `NaN` values to 0. This may or may not make sense - # Alternatively if it's better to simply remove the `NaN`, this can be done - # efficiently on line 61 with `data_quants = np.nanquantile(...)`.` - samp_mask = np.isfinite(self._pit_samps) - self._pit_samps[~samp_mask] = 0 - if not np.all(samp_mask): #pragma: no cover - logging.warning( - "Some PIT samples were `NaN`. They have been replacd with 0." - ) + self._pit_samps = self._gather_pit_samples(qp_ens, true_vals) n_pit = np.min([len(self._pit_samps), len(eval_grid)]) if n_pit < len(eval_grid): @@ -72,19 +55,7 @@ def __init__(self, qp_ens, true_vals, eval_grid=DEFAULT_QUANTS): data_quants = np.quantile(self._pit_samps, eval_grid) - # Remove duplicates values as well as values outside the range (0,1) - _, unique_indices = np.unique(data_quants, return_index=True) - unique_data_quants = data_quants[unique_indices] - unique_eval_grid = eval_grid[unique_indices] - quant_mask = self._create_quant_mask(unique_data_quants) - - self._pit = qp.Ensemble( - qp.quant, - data=dict( - quants=unique_eval_grid[quant_mask], - locs=np.atleast_2d(unique_data_quants[quant_mask]), - ), - ) + self._pit = self._produce_output_ensemble(data_quants, eval_grid) @property def pit_samps(self): @@ -201,6 +172,43 @@ def evaluate_PIT_outlier_rate(self, pit_min=0.0001, pit_max=0.9999): """ return calculate_outlier_rate(self._pit, pit_min, pit_max)[0] + @classmethod + def _gather_pit_samples(cls, qp_ens, true_vals): + pit_samples = np.array( + [ + qp_ens[i].cdf(true_vals[i])[0][0] + for i in range(len(true_vals)) + ] + ) + + # These two lines set all `NaN` values to 0. This may or may not make sense + # Alternatively if it's better to simply remove the `NaN`, this can be done + # efficiently on line 61 with `data_quants = np.nanquantile(...)`.` + sample_mask = np.isfinite(pit_samples) + pit_samples[~sample_mask] = 0 + if not np.all(sample_mask): #pragma: no cover + logging.warning( + "Some PIT samples were `NaN`. They have been replacd with 0." + ) + + return pit_samples + + @classmethod + def _produce_output_ensemble(cls, data_quants, eval_grid): + # Remove duplicates values as well as values outside the range (0,1) + _, unique_indices = np.unique(data_quants, return_index=True) + unique_data_quants = data_quants[unique_indices] + unique_eval_grid = eval_grid[unique_indices] + quant_mask = cls._create_quant_mask(unique_data_quants) + + return qp.Ensemble( + qp.quant, + data=dict( + quants=unique_eval_grid[quant_mask], + locs=np.atleast_2d(unique_data_quants[quant_mask]), + ), + ) + @classmethod def _create_quant_mask(cls, data_quants): """Create a numpy mask such that, when applied only values greater than diff --git a/tests/qp/test_metrics.py b/tests/qp/test_metrics.py index 0ccb762..e7a7ef4 100644 --- a/tests/qp/test_metrics.py +++ b/tests/qp/test_metrics.py @@ -299,7 +299,6 @@ def test_calculate_brier(self): brier_class = BrierMetric(limits=limits) brier_class.initialize() class_result = brier_class.evaluate(self.ens_n, truth) - brier_class.finalize() assert np.all(result == class_result) def test_calculate_brier_mismatched_number_of_truths(self): diff --git a/tests/qp/test_pit.py b/tests/qp/test_pit.py index e8126bc..d060b22 100644 --- a/tests/qp/test_pit.py +++ b/tests/qp/test_pit.py @@ -121,7 +121,6 @@ def test_pit_metric_class_matches_original_pit_class(self): pit_metric = PITMetric(eval_grid=quant_grid) pit_metric.initialize() class_result = pit_metric.evaluate(self.grid_ens, self.true_zs) - pit_metric.finalize() eval_grid = np.linspace(0, 3, 100) assert np.all(class_result.pdf(eval_grid) == pit_obj.pit.pdf(eval_grid)) From 39c9c6f6d501245e46faf9fe64298b25815ec72b Mon Sep 17 00:00:00 2001 From: drewoldag <47493171+drewoldag@users.noreply.github.com> Date: Thu, 29 Feb 2024 14:34:25 -0800 Subject: [PATCH 02/14] Replacing for loop with vectorized approach in the pit metric calculation. --- src/qp/metrics/concrete_metric_classes.py | 1 + src/qp/metrics/pit.py | 7 +------ 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/src/qp/metrics/concrete_metric_classes.py b/src/qp/metrics/concrete_metric_classes.py index c9cb57d..b92825b 100644 --- a/src/qp/metrics/concrete_metric_classes.py +++ b/src/qp/metrics/concrete_metric_classes.py @@ -69,6 +69,7 @@ class MomentMetric(SingleEnsembleMetric): """Class wrapper around the `calculate_moment` function.""" metric_name = "moment" + metric_output_type = MetricOutputType.one_value_per_distribution def __init__( self, diff --git a/src/qp/metrics/pit.py b/src/qp/metrics/pit.py index 448ce8d..b83586b 100644 --- a/src/qp/metrics/pit.py +++ b/src/qp/metrics/pit.py @@ -174,12 +174,7 @@ def evaluate_PIT_outlier_rate(self, pit_min=0.0001, pit_max=0.9999): @classmethod def _gather_pit_samples(cls, qp_ens, true_vals): - pit_samples = np.array( - [ - qp_ens[i].cdf(true_vals[i])[0][0] - for i in range(len(true_vals)) - ] - ) + pit_samples = np.squeeze(qp_ens.cdf(np.vstack(true_vals))) # These two lines set all `NaN` values to 0. This may or may not make sense # Alternatively if it's better to simply remove the `NaN`, this can be done From 1d9a9726da901fdee4d91925ad5bca0f6b435add Mon Sep 17 00:00:00 2001 From: drewoldag <47493171+drewoldag@users.noreply.github.com> Date: Thu, 29 Feb 2024 15:59:28 -0800 Subject: [PATCH 03/14] Parallelized CDELossMetric. --- src/qp/metrics/concrete_metric_classes.py | 18 +++++++++++++++++- tests/qp/test_point_metrics.py | 5 +++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/src/qp/metrics/concrete_metric_classes.py b/src/qp/metrics/concrete_metric_classes.py index b92825b..225beb4 100644 --- a/src/qp/metrics/concrete_metric_classes.py +++ b/src/qp/metrics/concrete_metric_classes.py @@ -287,7 +287,7 @@ def compute_from_digest(self, digest): data_quants = digest.inverse_cdf(eval_grid) PIT()._produce_output_ensemble(data_quants, eval_grid) -class CDELossMetric(DistToPointMetric): +class CDELossMetric(DistToPointMetricDigester): """Conditional density loss""" metric_name = "cdeloss" @@ -314,3 +314,19 @@ def evaluate(self, estimate, reference): term2 = np.mean(pdfs[range(npdf), nns]) cdeloss = term1 - 2 * term2 return cdeloss + + def accumulate(self, estimate, reference): + pdfs = estimate.pdf(self._xvals) + npdf = estimate.npdf + term1_sum = np.sum(np.trapz(pdfs**2, x=self._xvals)) + + nns = [np.argmin(np.abs(self._xvals - z)) for z in reference] + term2_sum = np.sum(pdfs[range(npdf), nns]) + + return (term1_sum, term2_sum, npdf) + + def finalize(self, tuples): + summed_terms = np.sum(tuples, axis=0) + term1 = summed_terms[0] / summed_terms[2] + term2 = summed_terms[1] / summed_terms[2] + return term1 - 2 * term2 diff --git a/tests/qp/test_point_metrics.py b/tests/qp/test_point_metrics.py index abec9db..a0b2e73 100644 --- a/tests/qp/test_point_metrics.py +++ b/tests/qp/test_point_metrics.py @@ -109,3 +109,8 @@ def test_cde_loss_metric(self): cde_loss_class = CDELossMetric(zgrid) result = cde_loss_class.evaluate(pdf_ens, zspec) assert np.isclose(result, CDEVAL) + + chunk_output = cde_loss_class.accumulate(pdf_ens, zspec) + chunked_result = cde_loss_class.finalize([chunk_output]) + + assert np.isclose(chunked_result, CDEVAL) From 8fe0a6a08292cf2783e7c6979687e761d7b33e50 Mon Sep 17 00:00:00 2001 From: Drew Oldag Date: Thu, 29 Feb 2024 21:26:38 -0800 Subject: [PATCH 04/14] Parallelizing Brier metric. --- src/qp/metrics/brier.py | 10 ++++ src/qp/metrics/concrete_metric_classes.py | 20 +++++-- src/qp/metrics/metrics.py | 63 +++++++++++++---------- tests/qp/test_metrics.py | 6 +++ 4 files changed, 69 insertions(+), 30 deletions(-) diff --git a/src/qp/metrics/brier.py b/src/qp/metrics/brier.py index 3e51fdf..5dbca79 100644 --- a/src/qp/metrics/brier.py +++ b/src/qp/metrics/brier.py @@ -36,6 +36,11 @@ def evaluate(self): self._validate_data() return self._calculate_metric() + def accumulate(self): + self._manipulate_data() + self._validate_data() + return self._calculate_metric_for_accumulation() + def _manipulate_data(self): """ Placeholder for data manipulation as required. i.e. converting from @@ -95,3 +100,8 @@ def _calculate_metric(self): return np.mean( np.sum((self._prediction - self._truth) ** 2, axis=self._axis_for_summation) ) + + def _calculate_metric_for_accumulation(self): + return np.sum( + np.sum((self._prediction - self._truth) ** 2, axis=self._axis_for_summation) + ) diff --git a/src/qp/metrics/concrete_metric_classes.py b/src/qp/metrics/concrete_metric_classes.py index 225beb4..640d226 100644 --- a/src/qp/metrics/concrete_metric_classes.py +++ b/src/qp/metrics/concrete_metric_classes.py @@ -11,6 +11,7 @@ ) from qp.metrics.metrics import ( calculate_brier, + calculate_brier_for_accumulation, calculate_goodness_of_fit, calculate_kld, calculate_moment, @@ -28,7 +29,7 @@ class DistToPointMetricDigester(DistToPointMetric): def __init__(self, tdigest_compression: int = 1000, **kwargs) -> None: - super().__init__() + super().__init__(**kwargs) self._tdigest_compression = tdigest_compression def initialize(self): @@ -125,7 +126,7 @@ def evaluate(self, estimate) -> list: #! Should this be implemented as `DistToPointMetric` or `DistToDistMetric` ??? -class BrierMetric(DistToPointMetric): +class BrierMetric(DistToPointMetricDigester): """Class wrapper around the calculate_brier function. (Which itself is a wrapper around the `Brier` metric evaluator class). """ @@ -134,11 +135,23 @@ class BrierMetric(DistToPointMetric): metric_output_type = MetricOutputType.one_value_per_distribution def __init__(self, limits: tuple = (0.0, 3.0), dx: float = 0.01, **kwargs) -> None: - super().__init__(limits, dx) + kwargs.update({"limits": limits, "dx": dx}) + super().__init__(**kwargs) def evaluate(self, estimate, reference) -> list: return calculate_brier(estimate, reference, self._limits, self._dx) + def accumulate(self, estimate, reference): + brier_sum_npdf_tuple = calculate_brier_for_accumulation(estimate, reference, self._limits, self._dx) + return brier_sum_npdf_tuple + + def finalize(self, tuples): + # tuples is a list of tuples. The first value in the tuple is the Brier sum + # The second value is the number of PDFs + summed_terms = np.sum(tuples, axis=0) + + # calculate the mean from the summed terms + return summed_terms[0] / summed_terms[1] class OutlierMetric(SingleEnsembleMetric): """Class wrapper around the outlier calculation metric.""" @@ -287,6 +300,7 @@ def compute_from_digest(self, digest): data_quants = digest.inverse_cdf(eval_grid) PIT()._produce_output_ensemble(data_quants, eval_grid) + class CDELossMetric(DistToPointMetricDigester): """Conditional density loss""" diff --git a/src/qp/metrics/metrics.py b/src/qp/metrics/metrics.py index ea1ec47..f06dbfc 100644 --- a/src/qp/metrics/metrics.py +++ b/src/qp/metrics/metrics.py @@ -217,32 +217,7 @@ def evaluate_pdf_at_z(z, dist): return np.array(rbpes) -def calculate_brier(p, truth, limits, dx=0.01): - """This function will do the following: - - 1) Generate a Mx1 sized grid based on `limits` and `dx`. - 2) Produce an NxM array by evaluating the pdf for each of the N distribution objects - in the Ensemble p on the grid. - 3) Produce an NxM truth_array using the input truth and the generated grid. All values will be 0 or 1. - 4) Create a Brier metric evaluation object - 5) Return the result of the Brier metric calculation. - - Parameters - ---------- - p: qp.Ensemble object - of N distributions probability distribution functions that will be gridded and compared against truth. - truth: Nx1 sequence - the list of true values, 1 per distribution in p. - limits: 2-tuple of floats - endpoints grid to evaluate the PDFs for the distributions in p - dx: float - resolution of the grid Defaults to 0.01. - - Returns - ------- - Brier_metric: float - """ - +def _prepare_for_brier(p, truth, limits, dx=0.01): # Ensure that the number of distributions objects in the Ensemble # is equal to the length of the truth array if p.npdf != len(truth): @@ -270,11 +245,45 @@ def calculate_brier(p, truth, limits, dx=0.01): truth_array = np.squeeze([np.histogram(t, grid.hist_bin_edges)[0] for t in truth]) # instantiate the Brier metric object - brier_metric_evaluation = Brier(pdf_values, truth_array) + return Brier(pdf_values, truth_array) + +def calculate_brier(p, truth, limits, dx=0.01): + """This function will do the following: + + 1) Generate a Mx1 sized grid based on `limits` and `dx`. + 2) Produce an NxM array by evaluating the pdf for each of the N distribution objects + in the Ensemble p on the grid. + 3) Produce an NxM truth_array using the input truth and the generated grid. All values will be 0 or 1. + 4) Create a Brier metric evaluation object + 5) Return the result of the Brier metric calculation. + + Parameters + ---------- + p: qp.Ensemble object + of N distributions probability distribution functions that will be gridded and compared against truth. + truth: Nx1 sequence + the list of true values, 1 per distribution in p. + limits: 2-tuple of floats + endpoints grid to evaluate the PDFs for the distributions in p + dx: float + resolution of the grid Defaults to 0.01. + + Returns + ------- + Brier_metric: float + """ + + brier_metric_evaluation = _prepare_for_brier(p, truth, limits, dx) # return the results of evaluating the Brier metric return brier_metric_evaluation.evaluate() +def calculate_brier_for_accumulation(p, truth, limits, dx=0.01): + brier_metric_evaluation = _prepare_for_brier(p, truth, limits, dx) + + brier_sum = brier_metric_evaluation.accumulate() + + return (brier_sum, p.npdf) @deprecated( reason=""" diff --git a/tests/qp/test_metrics.py b/tests/qp/test_metrics.py index e7a7ef4..b88c507 100644 --- a/tests/qp/test_metrics.py +++ b/tests/qp/test_metrics.py @@ -301,6 +301,12 @@ def test_calculate_brier(self): class_result = brier_class.evaluate(self.ens_n, truth) assert np.all(result == class_result) + brier_class_accumulate = BrierMetric(limits=limits) + brier_class_accumulate.initialize() + sum_tuple = brier_class_accumulate.accumulate(self.ens_n, truth) + accumulated_result = brier_class_accumulate.finalize([sum_tuple]) + assert np.all(result == accumulated_result) + def test_calculate_brier_mismatched_number_of_truths(self): """Expect an exception when number of truth values doesn't match number of distributions""" truth = 2 * (np.random.uniform(size=(10, 1)) - 0.5) From 6a1dbae1ee945c31ab21db38d51a3317d76bca2c Mon Sep 17 00:00:00 2001 From: Drew Oldag Date: Fri, 1 Mar 2024 14:29:41 -0800 Subject: [PATCH 05/14] Adding unit tests and cleaning code. --- src/qp/metrics/concrete_metric_classes.py | 5 +++-- tests/qp/test_pit.py | 14 ++++++++++++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/src/qp/metrics/concrete_metric_classes.py b/src/qp/metrics/concrete_metric_classes.py index 640d226..c7a5f42 100644 --- a/src/qp/metrics/concrete_metric_classes.py +++ b/src/qp/metrics/concrete_metric_classes.py @@ -270,7 +270,7 @@ class PITMetric(DistToPointMetricDigester): default_eval_grid = np.linspace(0, 1, 100) def __init__(self, eval_grid: list = default_eval_grid, **kwargs) -> None: - super().__init__() + super().__init__(**kwargs) self._eval_grid = eval_grid def evaluate(self, estimate, reference) -> Ensemble: @@ -287,6 +287,7 @@ def compute_from_digest(self, digest): # Since we use equal weights for all the values in the digest # digest.weight is the total number of values, it is stored as a float, # so we cast to int. + eval_grid = self._eval_grid total_samples = int(digest.weight) n_pit = np.min([total_samples, len(eval_grid)]) if n_pit < len(eval_grid): @@ -298,7 +299,7 @@ def compute_from_digest(self, digest): eval_grid = np.linspace(0, 1, n_pit) data_quants = digest.inverse_cdf(eval_grid) - PIT()._produce_output_ensemble(data_quants, eval_grid) + return PIT._produce_output_ensemble(data_quants, eval_grid) class CDELossMetric(DistToPointMetricDigester): diff --git a/tests/qp/test_pit.py b/tests/qp/test_pit.py index d060b22..3d0379c 100644 --- a/tests/qp/test_pit.py +++ b/tests/qp/test_pit.py @@ -124,3 +124,17 @@ def test_pit_metric_class_matches_original_pit_class(self): eval_grid = np.linspace(0, 3, 100) assert np.all(class_result.pdf(eval_grid) == pit_obj.pit.pdf(eval_grid)) + + def test_pit_metric_parallelization(self): + """ This test primarily ensures that the machinery of the parallelization + works as expected, but does not verify the correctness of the parallelization""" + + quant_grid = np.linspace(0, 1, 101) + pit_obj = PIT(self.grid_ens, self.true_zs, quant_grid) + + configuration = {'tdigest_compression': 100000} + pit_metric = PITMetric(eval_grid=quant_grid, **configuration) + pit_metric.initialize() + centroids = pit_metric.accumulate(self.grid_ens, self.true_zs) + chunked_class_results = pit_metric.finalize([centroids]) + assert chunked_class_results.npdf == 1 \ No newline at end of file From 7c39546861ad223e4a4ac0f3e1e29edcb80991d5 Mon Sep 17 00:00:00 2001 From: Drew Oldag Date: Fri, 1 Mar 2024 16:39:58 -0800 Subject: [PATCH 06/14] Forcing the sum over a 2d array, otherwise the sum does not do what we want. --- src/qp/metrics/concrete_metric_classes.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/qp/metrics/concrete_metric_classes.py b/src/qp/metrics/concrete_metric_classes.py index c7a5f42..723fab4 100644 --- a/src/qp/metrics/concrete_metric_classes.py +++ b/src/qp/metrics/concrete_metric_classes.py @@ -148,7 +148,7 @@ def accumulate(self, estimate, reference): def finalize(self, tuples): # tuples is a list of tuples. The first value in the tuple is the Brier sum # The second value is the number of PDFs - summed_terms = np.sum(tuples, axis=0) + summed_terms = np.sum(np.atleast_2d(tuples), axis=0) # calculate the mean from the summed terms return summed_terms[0] / summed_terms[1] @@ -341,7 +341,7 @@ def accumulate(self, estimate, reference): return (term1_sum, term2_sum, npdf) def finalize(self, tuples): - summed_terms = np.sum(tuples, axis=0) + summed_terms = np.sum(np.atleast_2d(tuples), axis=0) term1 = summed_terms[0] / summed_terms[2] term2 = summed_terms[1] / summed_terms[2] return term1 - 2 * term2 From 1484e8fe567c760a38df676e5aa8cbd16732acc3 Mon Sep 17 00:00:00 2001 From: Eric Charles Date: Fri, 1 Mar 2024 16:58:29 -0800 Subject: [PATCH 07/14] pragma no cover a corner case --- src/qp/metrics/concrete_metric_classes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/qp/metrics/concrete_metric_classes.py b/src/qp/metrics/concrete_metric_classes.py index 723fab4..5e2264e 100644 --- a/src/qp/metrics/concrete_metric_classes.py +++ b/src/qp/metrics/concrete_metric_classes.py @@ -290,7 +290,7 @@ def compute_from_digest(self, digest): eval_grid = self._eval_grid total_samples = int(digest.weight) n_pit = np.min([total_samples, len(eval_grid)]) - if n_pit < len(eval_grid): + if n_pit < len(eval_grid): # pragma: no cover #! TODO: Determine what the appropriate style of logging is going to be for metrics. print( "Number of pit samples is smaller than the evaluation grid size. " From 4ae1d0390797ae60d4dfbc296b6224b10707a30f Mon Sep 17 00:00:00 2001 From: Drew Oldag Date: Mon, 4 Mar 2024 20:52:54 -0800 Subject: [PATCH 08/14] WIP - Initial commit to support writing out a dictionary collection of ensembles. --- src/qp/__init__.py | 1 + src/qp/factory.py | 11 +++++++++++ 2 files changed, 12 insertions(+) diff --git a/src/qp/__init__.py b/src/qp/__init__.py index f9cfb46..4bb575c 100644 --- a/src/qp/__init__.py +++ b/src/qp/__init__.py @@ -24,6 +24,7 @@ data_length, from_tables, is_qp_file, + write_dict, ) from .lazy_modules import * diff --git a/src/qp/factory.py b/src/qp/factory.py index a8ef2c0..7295ee7 100644 --- a/src/qp/factory.py +++ b/src/qp/factory.py @@ -357,6 +357,16 @@ def concatenate(ensembles): data[k] = np.squeeze(v) return Ensemble(gen_func, data, ancil) + @staticmethod + def write_dict(filename, ensemble_dict, **kwargs): + output_tables = {} + for key, val in ensemble_dict.items(): + # check that val is a qp.Ensemble + if not isinstance(val, Ensemble): + raise ValueError("All values in ensemble_dict must be qp.Ensemble") + + output_tables[key] = val.build_tables() + io.writeDictsToHdf5(output_tables, filename, **kwargs) _FACTORY = Factory() @@ -377,3 +387,4 @@ def instance(): data_length = _FACTORY.data_length from_tables = _FACTORY.from_tables is_qp_file = _FACTORY.is_qp_file +write_dict = _FACTORY.write_dict \ No newline at end of file From f794451f4669497145bf73c74c8cad31ac9bce9b Mon Sep 17 00:00:00 2001 From: Drew Oldag Date: Tue, 5 Mar 2024 12:52:35 -0800 Subject: [PATCH 09/14] Implementing a function to read dictionaries of qp ensembles from hdf5 files. --- src/qp/__init__.py | 1 + src/qp/factory.py | 29 ++++++++++++- src/qp/metrics/tdigest_example.py | 67 ++++++++++++++++++++++++++++++ test_normancil.pq | Bin 0 -> 1876 bytes 4 files changed, 96 insertions(+), 1 deletion(-) create mode 100644 src/qp/metrics/tdigest_example.py create mode 100644 test_normancil.pq diff --git a/src/qp/__init__.py b/src/qp/__init__.py index 4bb575c..f23f038 100644 --- a/src/qp/__init__.py +++ b/src/qp/__init__.py @@ -25,6 +25,7 @@ from_tables, is_qp_file, write_dict, + read_dict, ) from .lazy_modules import * diff --git a/src/qp/factory.py b/src/qp/factory.py index 7295ee7..f1fbf02 100644 --- a/src/qp/factory.py +++ b/src/qp/factory.py @@ -368,6 +368,32 @@ def write_dict(filename, ensemble_dict, **kwargs): output_tables[key] = val.build_tables() io.writeDictsToHdf5(output_tables, filename, **kwargs) + @staticmethod + def read_dict(filename): + """Assume that filename is an HDF5 file, containing multiple qp.Ensembles + that have been stored at nparrays.""" + results = {} + + # retrieve all the top level groups. Assume each top level group + # corresponds to an ensemble. + top_level_groups = io.readHdf5GroupNames(filename) + + # for each top level group, convert the subgroups (data, meta, ancil) into + # a dictionary of dictionaries and pass the result to `from_tables`. + for top_level_group in top_level_groups: + tables = {} + keys = io.readHdf5GroupNames(filename, top_level_group) + for key_name in keys: + # retrieve the hdf5 group object + group_object, _ = io.readHdf5Group(filename, f"{top_level_group}/{key_name}") + + # use the hdf5 group object to gather data into a dictionary + tables[key_name] = io.readHdf5GroupToDict(group_object) + + results[top_level_group] = from_tables(tables) + + return results + _FACTORY = Factory() @@ -387,4 +413,5 @@ def instance(): data_length = _FACTORY.data_length from_tables = _FACTORY.from_tables is_qp_file = _FACTORY.is_qp_file -write_dict = _FACTORY.write_dict \ No newline at end of file +write_dict = _FACTORY.write_dict +read_dict = _FACTORY.read_dict diff --git a/src/qp/metrics/tdigest_example.py b/src/qp/metrics/tdigest_example.py new file mode 100644 index 0000000..a145265 --- /dev/null +++ b/src/qp/metrics/tdigest_example.py @@ -0,0 +1,67 @@ +from functools import reduce +from itertools import chain +from operator import add +import time + +import numpy as np +import tqdm +from mpi4py import MPI + +from pytdigest import TDigest + + +comm = MPI.COMM_WORLD +rank = comm.Get_rank() + +# tdigest compression +COMPRESSION = 1000 + +# properties of lognormal distribution to be sampled +MEAN = 1 +SIGMA = 1 + +# data volume +TOTAL_SIZE = 1_000_000 +CHUNK_SIZE = TOTAL_SIZE // comm.size +N_BATCH = 10 +BATCH_SIZE = CHUNK_SIZE // N_BATCH + + +def accumulate_digest_centroids(xs, compression=100): + digest = TDigest.compute(xs, compression=compression) + centroids = digest.get_centroids() + + return centroids + + +def main(): + rng = np.random.default_rng() + + start = time.time() + centroids = [] + for i_batch in range(N_BATCH): + xs = rng.lognormal(mean=MEAN, sigma=SIGMA, size=BATCH_SIZE) + _centroids = accumulate_digest_centroids(xs, compression=COMPRESSION) + centroids.append(_centroids) + end = time.time() + print(f"[MPI {rank}] processed {N_BATCH * BATCH_SIZE} objects in {end - start:.3g} seconds") + + centroids = comm.gather(centroids, root=0) + + if rank == 0: + start = time.time() + digests = ( + TDigest.of_centroids(centroid, compression=COMPRESSION) + for centroid in chain.from_iterable(centroids) + ) + td = reduce(add, digests) + end = time.time() + print(f"[MPI {rank}] reduced {N_BATCH * BATCH_SIZE * comm.size} objects in {end - start:.3g} seconds") + + # quantiles = [0., 0.25, 0.5, 0.75, 1.] + # print(td.inverse_cdf(quantiles)) + print(f"true median: {np.exp(MEAN)}") + print(f"approx median: {td.inverse_cdf(0.5)}") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/test_normancil.pq b/test_normancil.pq new file mode 100644 index 0000000000000000000000000000000000000000..d5f787095cb1679e7881916d8c6e475a176ec285 GIT binary patch literal 1876 zcmcgt&2QpX5PxQYqA07PYK=hR5Ls%P14%I-X;@Vv4-$weWD~N%nAIxs7hqx=7i>Nf zMe4bKLEFEg>ZyN3EA`mpo_gw~QdL!|RCV67VMEnJd+C#~=grLf&2Q$-;0xU5*)aPx z!+y>**#wK;L#T^Tgo(W=_an^qPv!5wW0>uq%U^xTFzA=3@$j$Z$Om!sd%2KjnAjiX zK_rX*e7X}x$B}pU`k`Y)SNNDkY=+Hb05Z--*=RZ$TUl1pn{oo}{eu?#63;^9i6YUx zC*epu_j2~d7DBr}hSKa-su)Mn)KTQl8f$vkG^TFYaCM6fB@&6>LWv(jyWcRovHub; zm4ON%HiY02hZjO@IMpnoWOS+ybxQ_SF_iQJvLQCw{1{w{;Uo=Cj%~dkN>UOikw~Re z>Hm)?$vCkagRGw+l*%Ndj%;i4H2Lk5d(N;meQ~3XP0zBY-1D7R_uMV$K;kB{JYHQPJuJNbmyAfkd;$|)cE=M+(lnHbJXsQs&QhLC#&d!;(4v&oo#e+QefRfv|-F_iH_j@n-**hPSRUA|?#+UFT>l2I%xH7;sj0d4SvGkTJ zLN|9lQ_ecBVz-PJMn%z=jfo7t!e~4Y?8Z>BG*d9<1Hmc|74e$*c$&DFHmlfB^X;K( zUxE(LgZ)gc3WvhU=_1&h{=Qb~UJ;BSmRWKXt8MD_&K-#(=BI{Y)!g0{oUIpmr9Phu z)rKXDooTOD@+6TriJxM3rb-oX>d)@XBu(Hy+Zi^MyWlyYSmAnw-ws0i4OVz%m@t`gsH)h(B z;^cbW#+@vdJev0i?rwy=R<%*0cev);GgbTCQE~-}(Yh+|%~8wf`<+aBk~StjKY=Dc zBHh=5rpB0b`>GX*uixa<6x1cKU;5_(35?Lc5_BzwhNXvB^HRYJqQNS8(4a&J@a1mX zRKOGF$v^MUQ!o-3qvVbwPd@c@cVBg!eLC1X!QjIGBN^#bIt`zM!ze-*@ICztsv`$U literal 0 HcmV?d00001 From 1d5fd3d43e43511a70db2f5ec28666613ee4f831 Mon Sep 17 00:00:00 2001 From: Drew Oldag Date: Wed, 6 Mar 2024 16:04:53 -0800 Subject: [PATCH 10/14] Removing depdency on qp.Ensemble that was only used for type hinting. --- src/qp/metrics/concrete_metric_classes.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/qp/metrics/concrete_metric_classes.py b/src/qp/metrics/concrete_metric_classes.py index 5e2264e..f45ccde 100644 --- a/src/qp/metrics/concrete_metric_classes.py +++ b/src/qp/metrics/concrete_metric_classes.py @@ -2,7 +2,6 @@ import numpy as np -from qp.ensemble import Ensemble from qp.metrics.base_metric_classes import ( MetricOutputType, DistToDistMetric, @@ -273,7 +272,7 @@ def __init__(self, eval_grid: list = default_eval_grid, **kwargs) -> None: super().__init__(**kwargs) self._eval_grid = eval_grid - def evaluate(self, estimate, reference) -> Ensemble: + def evaluate(self, estimate, reference): pit_object = PIT(estimate, reference, self._eval_grid) return pit_object.pit From ec80fcc450bddc35c3ce728fefd82df40adb4ca6 Mon Sep 17 00:00:00 2001 From: Drew Oldag <47493171+drewoldag@users.noreply.github.com> Date: Wed, 6 Mar 2024 15:57:03 -0800 Subject: [PATCH 11/14] Support reading and writing a dictionary of Ensembles to hdf5 files (#218) * WIP - Initial commit to support writing out a dictionary collection of ensembles. * Implementing a function to read dictionaries of qp ensembles from hdf5 files. * Removing files that were accidentally committed. * Adding test coverage for read and write dict. * Marking test as skipped until the required tables_io work is released. * Unskipping a test now that tables_io has new release. * Adding pragma no cover to value error. --- src/qp/factory.py | 2 +- tests/qp/test_ensemble.py | 30 +++++++++++++++++++++++++++++- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/src/qp/factory.py b/src/qp/factory.py index f1fbf02..d00240c 100644 --- a/src/qp/factory.py +++ b/src/qp/factory.py @@ -363,7 +363,7 @@ def write_dict(filename, ensemble_dict, **kwargs): for key, val in ensemble_dict.items(): # check that val is a qp.Ensemble if not isinstance(val, Ensemble): - raise ValueError("All values in ensemble_dict must be qp.Ensemble") + raise ValueError("All values in ensemble_dict must be qp.Ensemble") # pragma: no cover output_tables[key] = val.build_tables() io.writeDictsToHdf5(output_tables, filename, **kwargs) diff --git a/tests/qp/test_ensemble.py b/tests/qp/test_ensemble.py index 46a0cbe..73d6a5e 100644 --- a/tests/qp/test_ensemble.py +++ b/tests/qp/test_ensemble.py @@ -6,7 +6,7 @@ import unittest import numpy as np - +from numpy.testing import assert_array_equal, assert_array_almost_equal import qp from qp import test_data from qp.plotting import init_matplotlib @@ -235,6 +235,34 @@ def test_mixmod_with_negative_weights(self): with self.assertRaises(ValueError): _ = qp.mixmod(weights=weights, means=means, stds=sigmas) + def test_dictionary_output(self): + """Test that writing and reading a dictionary of ensembles works as expected.""" + key = "hist" + qp.hist_gen.make_test_data() + cls_test_data = qp.hist_gen.test_data[key] + ens_h = build_ensemble(cls_test_data) + + key = "interp" + qp.interp_gen.make_test_data() + cls_test_data = qp.interp_gen.test_data[key] + ens_i = build_ensemble(cls_test_data) + + output_dict = { + 'hist': ens_h, + 'interp': ens_i, + } + + qp.factory.write_dict('test_dict.hdf5', output_dict) + + input_dict = qp.factory.read_dict('test_dict.hdf5') + + assert input_dict.keys() == output_dict.keys() + + XVALS = np.linspace(0,3,100) + for ens_type in ["hist", "interp"]: + assert_array_equal(input_dict[ens_type].metadata()['pdf_name'], output_dict[ens_type].metadata()['pdf_name']) + + assert_array_almost_equal(input_dict[ens_type].pdf(XVALS), output_dict[ens_type].pdf(XVALS)) if __name__ == "__main__": unittest.main() From 7f583c30b85e3d5818de5d6753dbc1b80ff594be Mon Sep 17 00:00:00 2001 From: Drew Oldag Date: Mon, 4 Mar 2024 20:52:54 -0800 Subject: [PATCH 12/14] WIP - Initial commit to support writing out a dictionary collection of ensembles. --- src/qp/factory.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/qp/factory.py b/src/qp/factory.py index d00240c..9212a25 100644 --- a/src/qp/factory.py +++ b/src/qp/factory.py @@ -413,5 +413,9 @@ def instance(): data_length = _FACTORY.data_length from_tables = _FACTORY.from_tables is_qp_file = _FACTORY.is_qp_file +<<<<<<< HEAD write_dict = _FACTORY.write_dict read_dict = _FACTORY.read_dict +======= +write_dict = _FACTORY.write_dict +>>>>>>> 4ae1d03 (WIP - Initial commit to support writing out a dictionary collection of ensembles.) From 3bb5ea345df47b6f63f9c9f3f565859a3b69f453 Mon Sep 17 00:00:00 2001 From: Drew Oldag Date: Tue, 5 Mar 2024 12:52:35 -0800 Subject: [PATCH 13/14] Implementing a function to read dictionaries of qp ensembles from hdf5 files. --- src/qp/factory.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/qp/factory.py b/src/qp/factory.py index 9212a25..d00240c 100644 --- a/src/qp/factory.py +++ b/src/qp/factory.py @@ -413,9 +413,5 @@ def instance(): data_length = _FACTORY.data_length from_tables = _FACTORY.from_tables is_qp_file = _FACTORY.is_qp_file -<<<<<<< HEAD write_dict = _FACTORY.write_dict read_dict = _FACTORY.read_dict -======= -write_dict = _FACTORY.write_dict ->>>>>>> 4ae1d03 (WIP - Initial commit to support writing out a dictionary collection of ensembles.) From 515f40eb7bc85e957dc84cbe805491f6eb8d615c Mon Sep 17 00:00:00 2001 From: Drew Oldag Date: Wed, 6 Mar 2024 16:15:54 -0800 Subject: [PATCH 14/14] Removing files that accidentally got into the repo. --- src/qp/metrics/tdigest_example.py | 67 ------------------------------ test_normancil.pq | Bin 1876 -> 0 bytes 2 files changed, 67 deletions(-) delete mode 100644 src/qp/metrics/tdigest_example.py delete mode 100644 test_normancil.pq diff --git a/src/qp/metrics/tdigest_example.py b/src/qp/metrics/tdigest_example.py deleted file mode 100644 index a145265..0000000 --- a/src/qp/metrics/tdigest_example.py +++ /dev/null @@ -1,67 +0,0 @@ -from functools import reduce -from itertools import chain -from operator import add -import time - -import numpy as np -import tqdm -from mpi4py import MPI - -from pytdigest import TDigest - - -comm = MPI.COMM_WORLD -rank = comm.Get_rank() - -# tdigest compression -COMPRESSION = 1000 - -# properties of lognormal distribution to be sampled -MEAN = 1 -SIGMA = 1 - -# data volume -TOTAL_SIZE = 1_000_000 -CHUNK_SIZE = TOTAL_SIZE // comm.size -N_BATCH = 10 -BATCH_SIZE = CHUNK_SIZE // N_BATCH - - -def accumulate_digest_centroids(xs, compression=100): - digest = TDigest.compute(xs, compression=compression) - centroids = digest.get_centroids() - - return centroids - - -def main(): - rng = np.random.default_rng() - - start = time.time() - centroids = [] - for i_batch in range(N_BATCH): - xs = rng.lognormal(mean=MEAN, sigma=SIGMA, size=BATCH_SIZE) - _centroids = accumulate_digest_centroids(xs, compression=COMPRESSION) - centroids.append(_centroids) - end = time.time() - print(f"[MPI {rank}] processed {N_BATCH * BATCH_SIZE} objects in {end - start:.3g} seconds") - - centroids = comm.gather(centroids, root=0) - - if rank == 0: - start = time.time() - digests = ( - TDigest.of_centroids(centroid, compression=COMPRESSION) - for centroid in chain.from_iterable(centroids) - ) - td = reduce(add, digests) - end = time.time() - print(f"[MPI {rank}] reduced {N_BATCH * BATCH_SIZE * comm.size} objects in {end - start:.3g} seconds") - - # quantiles = [0., 0.25, 0.5, 0.75, 1.] - # print(td.inverse_cdf(quantiles)) - print(f"true median: {np.exp(MEAN)}") - print(f"approx median: {td.inverse_cdf(0.5)}") - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/test_normancil.pq b/test_normancil.pq deleted file mode 100644 index d5f787095cb1679e7881916d8c6e475a176ec285..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1876 zcmcgt&2QpX5PxQYqA07PYK=hR5Ls%P14%I-X;@Vv4-$weWD~N%nAIxs7hqx=7i>Nf zMe4bKLEFEg>ZyN3EA`mpo_gw~QdL!|RCV67VMEnJd+C#~=grLf&2Q$-;0xU5*)aPx z!+y>**#wK;L#T^Tgo(W=_an^qPv!5wW0>uq%U^xTFzA=3@$j$Z$Om!sd%2KjnAjiX zK_rX*e7X}x$B}pU`k`Y)SNNDkY=+Hb05Z--*=RZ$TUl1pn{oo}{eu?#63;^9i6YUx zC*epu_j2~d7DBr}hSKa-su)Mn)KTQl8f$vkG^TFYaCM6fB@&6>LWv(jyWcRovHub; zm4ON%HiY02hZjO@IMpnoWOS+ybxQ_SF_iQJvLQCw{1{w{;Uo=Cj%~dkN>UOikw~Re z>Hm)?$vCkagRGw+l*%Ndj%;i4H2Lk5d(N;meQ~3XP0zBY-1D7R_uMV$K;kB{JYHQPJuJNbmyAfkd;$|)cE=M+(lnHbJXsQs&QhLC#&d!;(4v&oo#e+QefRfv|-F_iH_j@n-**hPSRUA|?#+UFT>l2I%xH7;sj0d4SvGkTJ zLN|9lQ_ecBVz-PJMn%z=jfo7t!e~4Y?8Z>BG*d9<1Hmc|74e$*c$&DFHmlfB^X;K( zUxE(LgZ)gc3WvhU=_1&h{=Qb~UJ;BSmRWKXt8MD_&K-#(=BI{Y)!g0{oUIpmr9Phu z)rKXDooTOD@+6TriJxM3rb-oX>d)@XBu(Hy+Zi^MyWlyYSmAnw-ws0i4OVz%m@t`gsH)h(B z;^cbW#+@vdJev0i?rwy=R<%*0cev);GgbTCQE~-}(Yh+|%~8wf`<+aBk~StjKY=Dc zBHh=5rpB0b`>GX*uixa<6x1cKU;5_(35?Lc5_BzwhNXvB^HRYJqQNS8(4a&J@a1mX zRKOGF$v^MUQ!o-3qvVbwPd@c@cVBg!eLC1X!QjIGBN^#bIt`zM!ze-*@ICztsv`$U