diff --git a/doc/conf.py b/doc/conf.py index 4dbd3009..002a7bf0 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -85,6 +85,7 @@ nb_execution_mode = "force" nb_execution_raise_on_error = True nb_execution_show_tb = True +nb_execution_timeout = 90 # max. seconds/cell source_suffix = { ".rst": "restructuredtext", diff --git a/doc/example/distributions.ipynb b/doc/example/distributions.ipynb index 86235fe1..fd251213 100644 --- a/doc/example/distributions.ipynb +++ b/doc/example/distributions.ipynb @@ -33,39 +33,73 @@ "\n", "from petab.v1.C import *\n", "from petab.v1.priors import Prior\n", + "from petab.v1.parameters import scale, unscale\n", + "\n", "\n", "sns.set_style(None)\n", "\n", "\n", - "def plot(prior: Prior, ax=None):\n", + "def plot(prior: Prior):\n", " \"\"\"Visualize a distribution.\"\"\"\n", + " fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))\n", + " sample = prior.sample(20_000, x_scaled=True)\n", + "\n", + " fig.suptitle(str(prior))\n", + "\n", + " plot_single(prior, ax=ax1, sample=sample, scaled=False)\n", + " plot_single(prior, ax=ax2, sample=sample, scaled=True)\n", + " plt.tight_layout()\n", + " plt.show()\n", + "\n", + "def plot_single(prior: Prior, scaled: bool = False, ax=None, sample: np.array = None):\n", + " fig = None\n", " if ax is None:\n", " fig, ax = plt.subplots()\n", "\n", - " sample = prior.sample(10000)\n", - "\n", - " # pdf\n", - " xmin = min(sample.min(), prior.lb_scaled if prior.bounds is not None else sample.min())\n", - " xmax = max(sample.max(), prior.ub_scaled if prior.bounds is not None else sample.max())\n", + " if sample is None:\n", + " sample = prior.sample(20_000)\n", + "\n", + " # assuming scaled sample\n", + " if not scaled:\n", + " sample = unscale(sample, prior.transformation)\n", + " bounds = prior.bounds\n", + " else:\n", + " bounds = (prior.lb_scaled, prior.ub_scaled) if prior.bounds is not None else None\n", + "\n", + " # plot pdf\n", + " xmin = min(sample.min(), bounds[0] if prior.bounds is not None else sample.min())\n", + " xmax = max(sample.max(), bounds[1] if prior.bounds is not None else sample.max())\n", + " padding = 0.1 * (xmax - xmin)\n", + " xmin -= padding\n", + " xmax += padding\n", " x = np.linspace(xmin, xmax, 500)\n", - " y = prior.pdf(x)\n", + " y = prior.pdf(x, x_scaled=scaled, rescale=scaled)\n", " ax.plot(x, y, color='red', label='pdf')\n", "\n", " sns.histplot(sample, stat='density', ax=ax, label=\"sample\")\n", "\n", - " # bounds\n", + " # plot bounds\n", " if prior.bounds is not None:\n", - " for bound in (prior.lb_scaled, prior.ub_scaled):\n", + " for bound in bounds:\n", " if bound is not None and np.isfinite(bound):\n", " ax.axvline(bound, color='black', linestyle='--', label='bound')\n", "\n", - " ax.set_title(str(prior))\n", - " ax.set_xlabel('Parameter value on the parameter scale')\n", + " if fig is not None:\n", + " ax.set_title(str(prior))\n", + "\n", + " if scaled:\n", + " ax.set_xlabel(f'Parameter value on parameter scale ({prior.transformation})')\n", + " ax.set_ylabel(\"Rescaled density\")\n", + " else:\n", + " ax.set_xlabel('Parameter value')\n", + "\n", " ax.grid(False)\n", " handles, labels = ax.get_legend_handles_labels()\n", " unique_labels = dict(zip(labels, handles))\n", " ax.legend(unique_labels.values(), unique_labels.keys())\n", - " plt.show()" + "\n", + " if ax is None:\n", + " plt.show()\n" ], "id": "initial_id", "outputs": [], @@ -81,11 +115,11 @@ "metadata": {}, "cell_type": "code", "source": [ - "plot(Prior(UNIFORM, (0, 1)))\n", - "plot(Prior(NORMAL, (0, 1)))\n", - "plot(Prior(LAPLACE, (0, 1)))\n", - "plot(Prior(LOG_NORMAL, (0, 1)))\n", - "plot(Prior(LOG_LAPLACE, (1, 0.5)))" + "plot_single(Prior(UNIFORM, (0, 1)))\n", + "plot_single(Prior(NORMAL, (0, 1)))\n", + "plot_single(Prior(LAPLACE, (0, 1)))\n", + "plot_single(Prior(LOG_NORMAL, (0, 1)))\n", + "plot_single(Prior(LOG_LAPLACE, (1, 0.5)))" ], "id": "4f09e50a3db06d9f", "outputs": [], @@ -94,7 +128,7 @@ { "metadata": {}, "cell_type": "markdown", - "source": "If a parameter scale is specified (`parameterScale=lin|log|log10` not a `parameterScale*`-type distribution), the sample is transformed accordingly (but not the distribution parameters):\n", + "source": "If a parameter scale is specified (`parameterScale=lin|log|log10`) and the chosen distribution is not a `parameterScale*`-type distribution, then the distribution parameters are taken as is, i.e., the `parameterScale` is not applied to the distribution parameters. In the context of PEtab prior distributions, `parameterScale` will only be used for the start point sampling for optimization, where the sample will be transformed accordingly. This is demonstrated below. The left plot always shows the prior distribution for unscaled parameter values, and the right plot shows the prior distribution for scaled parameter values. Note that in the objective function, the prior is always on the unscaled parameters.\n", "id": "dab4b2d1e0f312d8" }, { @@ -131,18 +165,20 @@ { "metadata": {}, "cell_type": "markdown", - "source": "Prior distributions can also be defined on the parameter scale by using the types `parameterScaleUniform`, `parameterScaleNormal` or `parameterScaleLaplace`. In these cases, 1) the distribution parameter are interpreted on the transformed parameter scale, and 2) a sample from the given distribution is used directly, without applying any transformation according to `parameterScale` (this implies, that for `parameterScale=lin`, there is no difference between `parameterScaleUniform` and `uniform`):", + "source": "Prior distributions can also be defined on the scaled parameters (i.e., transformed according to `parameterScale`) by using the types `parameterScaleUniform`, `parameterScaleNormal` or `parameterScaleLaplace`. In these cases, the distribution parameter are interpreted on the transformed parameter scale (but not the parameter bounds, see below). This implies, that for `parameterScale=lin`, there is no difference between `parameterScaleUniform` and `uniform`.", "id": "263c9fd31156a4d5" }, { "metadata": {}, "cell_type": "code", "source": [ + "# different, because transformation!=LIN\n", "plot(Prior(UNIFORM, (0.01, 2), transformation=LOG10))\n", "plot(Prior(PARAMETER_SCALE_UNIFORM, (0.01, 2), transformation=LOG10))\n", "\n", + "# same, because transformation=LIN\n", "plot(Prior(UNIFORM, (0.01, 2), transformation=LIN))\n", - "plot(Prior(PARAMETER_SCALE_UNIFORM, (0.01, 2), transformation=LIN))\n" + "plot(Prior(PARAMETER_SCALE_UNIFORM, (0.01, 2), transformation=LIN))" ], "id": "5ca940bc24312fc6", "outputs": [], @@ -151,15 +187,18 @@ { "metadata": {}, "cell_type": "markdown", - "source": "To prevent the sampled parameters from exceeding the bounds, the sampled parameters are clipped to the bounds. The bounds are defined in the parameter table. Note that the current implementation does not support sampling from a truncated distribution. Instead, the samples are clipped to the bounds. This may introduce unwanted bias, and thus, should only be used with caution (i.e., the bounds should be chosen wide enough):", + "source": "The given distributions are truncated at the bounds defined in the parameter table:", "id": "b1a8b17d765db826" }, { "metadata": {}, "cell_type": "code", "source": [ - "plot(Prior(NORMAL, (0, 1), bounds=(-4, 4))) # negligible clipping-bias at 4 sigma\n", - "plot(Prior(UNIFORM, (0, 1), bounds=(0.1, 0.9))) # significant clipping-bias" + "plot(Prior(NORMAL, (0, 1), bounds=(-2, 2)))\n", + "plot(Prior(UNIFORM, (0, 1), bounds=(0.1, 0.9)))\n", + "plot(Prior(UNIFORM, (1e-8, 1), bounds=(0.1, 0.9), transformation=LOG10))\n", + "plot(Prior(LAPLACE, (0, 1), bounds=(-0.5, 0.5)))\n", + "plot(Prior(PARAMETER_SCALE_UNIFORM, (-3, 1), bounds=(1e-2, 1), transformation=LOG10))" ], "id": "4ac42b1eed759bdd", "outputs": [], @@ -175,9 +214,11 @@ "metadata": {}, "cell_type": "code", "source": [ - "plot(Prior(NORMAL, (10, 1), bounds=(6, 14), transformation=\"log10\"))\n", - "plot(Prior(PARAMETER_SCALE_NORMAL, (10, 1), bounds=(10**6, 10**14), transformation=\"log10\"))\n", - "plot(Prior(LAPLACE, (10, 2), bounds=(6, 14)))" + "plot(Prior(NORMAL, (10, 1), bounds=(6, 11), transformation=\"log10\"))\n", + "plot(Prior(PARAMETER_SCALE_NORMAL, (2, 1), bounds=(10**0, 10**3), transformation=\"log10\"))\n", + "plot(Prior(LAPLACE, (10, 2), bounds=(6, 14)))\n", + "plot(Prior(LOG_LAPLACE, (1, 0.5), bounds=(0.5, 8)))\n", + "plot(Prior(LOG_NORMAL, (2, 1), bounds=(0.5, 8)))" ], "id": "581e1ac431860419", "outputs": [], diff --git a/petab/v1/C.py b/petab/v1/C.py index be044a5c..c7e7cbdd 100644 --- a/petab/v1/C.py +++ b/petab/v1/C.py @@ -207,6 +207,13 @@ PARAMETER_SCALE_LAPLACE, ] +#: parameterScale*-type prior distributions +PARAMETER_SCALE_PRIOR_TYPES = [ + PARAMETER_SCALE_UNIFORM, + PARAMETER_SCALE_NORMAL, + PARAMETER_SCALE_LAPLACE, +] + #: Supported noise distributions NOISE_MODELS = [NORMAL, LAPLACE] diff --git a/petab/v1/distributions.py b/petab/v1/distributions.py index 418f5b44..0bac259a 100644 --- a/petab/v1/distributions.py +++ b/petab/v1/distributions.py @@ -28,15 +28,65 @@ class Distribution(abc.ABC): If a float, the distribution is transformed to its corresponding log distribution with the given base (e.g., Normal -> Log10Normal). If ``False``, no transformation is applied. + :param trunc: The truncation points (lower, upper) of the distribution + or ``None`` if the distribution is not truncated. """ - def __init__(self, log: bool | float = False): + def __init__( + self, *, log: bool | float = False, trunc: tuple[float, float] = None + ): if log is True: log = np.exp(1) + + if trunc == (-np.inf, np.inf): + trunc = None + + if trunc is not None and trunc[0] >= trunc[1]: + raise ValueError( + "The lower truncation limit must be smaller " + "than the upper truncation limit." + ) + self._logbase = log + self._trunc = trunc + + self._cd_low = None + self._cd_high = None + self._truncation_normalizer = 1 + + if self._trunc is not None: + try: + # the cumulative density of the transformed distribution at the + # truncation limits + self._cd_low = self._cdf_transformed_untruncated( + self.trunc_low + ) + self._cd_high = self._cdf_transformed_untruncated( + self.trunc_high + ) + # normalization factor for the PDF/CDF of the transformed + # distribution to account for truncation + self._truncation_normalizer = 1 / ( + self._cd_high - self._cd_low + ) + except NotImplementedError: + pass + + @property + def trunc_low(self) -> float: + """The lower truncation limit of the transformed distribution.""" + return self._trunc[0] if self._trunc else -np.inf - def _undo_log(self, x: np.ndarray | float) -> np.ndarray | float: - """Undo the log transformation. + @property + def trunc_high(self) -> float: + """The upper truncation limit of the transformed distribution.""" + return self._trunc[1] if self._trunc else np.inf + + def _exp(self, x: np.ndarray | float) -> np.ndarray | float: + """Exponentiate / undo the log transformation if applicable. + + Exponentiate if a log transformation is applied to the distribution. + Otherwise, return the input. :param x: The sample to transform. :return: The transformed sample @@ -45,8 +95,11 @@ def _undo_log(self, x: np.ndarray | float) -> np.ndarray | float: return x return self._logbase**x - def _apply_log(self, x: np.ndarray | float) -> np.ndarray | float: - """Apply the log transformation. + def _log(self, x: np.ndarray | float) -> np.ndarray | float: + """Apply the log transformation if enabled. + + Compute the log of x with the specified base if a log transformation + is applied to the distribution. Otherwise, return the input. :param x: The value to transform. :return: The transformed value. @@ -55,40 +108,47 @@ def _apply_log(self, x: np.ndarray | float) -> np.ndarray | float: return x return np.log(x) / np.log(self._logbase) - def sample(self, shape=None) -> np.ndarray: + def sample(self, shape=None) -> np.ndarray | float: """Sample from the distribution. :param shape: The shape of the sample. :return: A sample from the distribution. """ - sample = self._sample(shape) - return self._undo_log(sample) + sample = ( + self._exp(self._sample(shape)) + if self._trunc is None + else self._inverse_transform_sample(shape) + ) + + return sample @abc.abstractmethod - def _sample(self, shape=None) -> np.ndarray: + def _sample(self, shape=None) -> np.ndarray | float: """Sample from the underlying distribution. :param shape: The shape of the sample. :return: A sample from the underlying distribution, - before applying, e.g., the log transformation. + before applying, e.g., the log transformation or truncation. """ ... - def pdf(self, x): + def pdf(self, x) -> np.ndarray | float: """Probability density function at x. :param x: The value at which to evaluate the PDF. :return: The value of the PDF at ``x``. """ - # handle the log transformation; see also: - # https://en.wikipedia.org/wiki/Probability_density_function#Scalar_to_scalar - chain_rule_factor = ( - (1 / (x * np.log(self._logbase))) if self._logbase else 1 + if self._trunc is None: + return self._pdf_transformed_untruncated(x) + + return np.where( + (x >= self.trunc_low) & (x <= self.trunc_high), + self._pdf_transformed_untruncated(x) * self._truncation_normalizer, + 0, ) - return self._pdf(self._apply_log(x)) * chain_rule_factor @abc.abstractmethod - def _pdf(self, x): + def _pdf_untransformed_untruncated(self, x) -> np.ndarray | float: """Probability density function of the underlying distribution at x. :param x: The value at which to evaluate the PDF. @@ -96,6 +156,30 @@ def _pdf(self, x): """ ... + def _pdf_transformed_untruncated(self, x) -> np.ndarray | float: + """Probability density function of the transformed, but untruncated + distribution at x. + + :param x: The value at which to evaluate the PDF. + :return: The value of the PDF at ``x``. + """ + if self.logbase is False: + return self._pdf_untransformed_untruncated(x) + + # handle the log transformation; see also: + # https://en.wikipedia.org/wiki/Probability_density_function#Scalar_to_scalar + with np.errstate(invalid="ignore", divide="ignore"): + chain_rule_factor = ( + (1 / (x * np.log(self._logbase))) if self._logbase else 1 + ) + + return np.where( + x > 0, + self._pdf_untransformed_untruncated(self._log(x)) + * chain_rule_factor, + 0, + ) + @property def logbase(self) -> bool | float: """The base of the log transformation. @@ -104,13 +188,94 @@ def logbase(self) -> bool | float: """ return self._logbase + def cdf(self, x) -> np.ndarray | float: + """Cumulative distribution function at x. + + :param x: The value at which to evaluate the CDF. + :return: The value of the CDF at ``x``. + """ + if self._trunc is None: + return self._cdf_transformed_untruncated(x) + return ( + self._cdf_transformed_untruncated(x) - self._cd_low + ) * self._truncation_normalizer + + def _cdf_transformed_untruncated(self, x) -> np.ndarray | float: + """Cumulative distribution function of the transformed, but untruncated + distribution at x. + + :param x: The value at which to evaluate the CDF. + :return: The value of the CDF at ``x``. + """ + if not self.logbase: + return self._cdf_untransformed_untruncated(x) + + with np.errstate(invalid="ignore"): + return np.where( + x < 0, 0, self._cdf_untransformed_untruncated(self._log(x)) + ) + + def _cdf_untransformed_untruncated(self, x) -> np.ndarray | float: + """Cumulative distribution function of the underlying + (untransformed, untruncated) distribution at x. + + :param x: The value at which to evaluate the CDF. + :return: The value of the CDF at ``x``. + """ + raise NotImplementedError + + def _ppf_untransformed_untruncated(self, q) -> np.ndarray | float: + """Percent point function of the underlying + (untransformed, untruncated) distribution at q. + + :param q: The quantile at which to evaluate the PPF. + :return: The value of the PPF at ``q``. + """ + raise NotImplementedError + + def _ppf_transformed_untruncated(self, q) -> np.ndarray | float: + """Percent point function of the transformed, but untruncated + distribution at q. + + :param q: The quantile at which to evaluate the PPF. + :return: The value of the PPF at ``q``. + """ + return self._exp(self._ppf_untransformed_untruncated(q)) + + def ppf(self, q) -> np.ndarray | float: + """Percent point function at q. + + :param q: The quantile at which to evaluate the PPF. + :return: The value of the PPF at ``q``. + """ + if self._trunc is None: + return self._ppf_transformed_untruncated(q) + + # Adjust quantiles to account for truncation + adjusted_q = self._cd_low + q * (self._cd_high - self._cd_low) + return self._ppf_transformed_untruncated(adjusted_q) + + def _inverse_transform_sample(self, shape) -> np.ndarray | float: + """Generate an inverse transform sample from the transformed and + truncated distribution. + + :param shape: The shape of the sample. + :return: The sample. + """ + uniform_sample = np.random.uniform( + low=self._cd_low, high=self._cd_high, size=shape + ) + return self._ppf_transformed_untruncated(uniform_sample) + class Normal(Distribution): """A (log-)normal distribution. :param loc: The location parameter of the distribution. :param scale: The scale parameter of the distribution. - :param truncation: The truncation limits of the distribution. + :param trunc: The truncation limits of the distribution. + ``None`` if the distribution is not truncated. The truncation limits + are the truncation limits of the transformed distribution. :param log: If ``True``, the distribution is transformed to a log-normal distribution. If a float, the distribution is transformed to a log-normal distribution with the given base. @@ -124,35 +289,37 @@ def __init__( self, loc: float, scale: float, - truncation: tuple[float, float] | None = None, + trunc: tuple[float, float] | None = None, log: bool | float = False, ): - super().__init__(log=log) self._loc = loc self._scale = scale - self._truncation = truncation - - if truncation is not None: - raise NotImplementedError("Truncation is not yet implemented.") + super().__init__(log=log, trunc=trunc) def __repr__(self): - trunc = f", truncation={self._truncation}" if self._truncation else "" + trunc = f", trunc={self._trunc}" if self._trunc else "" log = f", log={self._logbase}" if self._logbase else "" return f"Normal(loc={self._loc}, scale={self._scale}{trunc}{log})" - def _sample(self, shape=None): + def _sample(self, shape=None) -> np.ndarray | float: return np.random.normal(loc=self._loc, scale=self._scale, size=shape) - def _pdf(self, x): + def _pdf_untransformed_untruncated(self, x) -> np.ndarray | float: return norm.pdf(x, loc=self._loc, scale=self._scale) + def _cdf_untransformed_untruncated(self, x) -> np.ndarray | float: + return norm.cdf(x, loc=self._loc, scale=self._scale) + + def _ppf_untransformed_untruncated(self, q) -> np.ndarray | float: + return norm.ppf(q, loc=self._loc, scale=self._scale) + @property - def loc(self): + def loc(self) -> float: """The location parameter of the underlying distribution.""" return self._loc @property - def scale(self): + def scale(self) -> float: """The scale parameter of the underlying distribution.""" return self._scale @@ -177,27 +344,35 @@ def __init__( *, log: bool | float = False, ): - super().__init__(log=log) self._low = low self._high = high + super().__init__(log=log) def __repr__(self): log = f", log={self._logbase}" if self._logbase else "" return f"Uniform(low={self._low}, high={self._high}{log})" - def _sample(self, shape=None): + def _sample(self, shape=None) -> np.ndarray | float: return np.random.uniform(low=self._low, high=self._high, size=shape) - def _pdf(self, x): + def _pdf_untransformed_untruncated(self, x) -> np.ndarray | float: return uniform.pdf(x, loc=self._low, scale=self._high - self._low) + def _cdf_untransformed_untruncated(self, x) -> np.ndarray | float: + return uniform.cdf(x, loc=self._low, scale=self._high - self._low) + + def _ppf_untransformed_untruncated(self, q) -> np.ndarray | float: + return uniform.ppf(q, loc=self._low, scale=self._high - self._low) + class Laplace(Distribution): """A (log-)Laplace distribution. :param loc: The location parameter of the distribution. :param scale: The scale parameter of the distribution. - :param truncation: The truncation limits of the distribution. + :param trunc: The truncation limits of the distribution. + ``None`` if the distribution is not truncated. The truncation limits + are the truncation limits of the transformed distribution. :param log: If ``True``, the distribution is transformed to a log-Laplace distribution. If a float, the distribution is transformed to a log-Laplace distribution with the given base. @@ -211,33 +386,36 @@ def __init__( self, loc: float, scale: float, - truncation: tuple[float, float] | None = None, + trunc: tuple[float, float] | None = None, log: bool | float = False, ): - super().__init__(log=log) self._loc = loc self._scale = scale - self._truncation = truncation - if truncation is not None: - raise NotImplementedError("Truncation is not yet implemented.") + super().__init__(log=log, trunc=trunc) def __repr__(self): - trunc = f", truncation={self._truncation}" if self._truncation else "" + trunc = f", trunc={self._trunc}" if self._trunc else "" log = f", log={self._logbase}" if self._logbase else "" return f"Laplace(loc={self._loc}, scale={self._scale}{trunc}{log})" - def _sample(self, shape=None): + def _sample(self, shape=None) -> np.ndarray | float: return np.random.laplace(loc=self._loc, scale=self._scale, size=shape) - def _pdf(self, x): + def _pdf_untransformed_untruncated(self, x) -> np.ndarray | float: return laplace.pdf(x, loc=self._loc, scale=self._scale) + def _cdf_untransformed_untruncated(self, x) -> np.ndarray | float: + return laplace.cdf(x, loc=self._loc, scale=self._scale) + + def _ppf_untransformed_untruncated(self, q) -> np.ndarray | float: + return laplace.ppf(q, loc=self._loc, scale=self._scale) + @property - def loc(self): + def loc(self) -> float: """The location parameter of the underlying distribution.""" return self._loc @property - def scale(self): + def scale(self) -> float: """The scale parameter of the underlying distribution.""" return self._scale diff --git a/petab/v1/priors.py b/petab/v1/priors.py index e1263946..b66ec484 100644 --- a/petab/v1/priors.py +++ b/petab/v1/priors.py @@ -58,6 +58,14 @@ class Prior: on the `parameter_scale` scale). :param bounds: The untransformed bounds of the sample (lower, upper). :param transformation: The transformation of the distribution. + :param bounds_truncate: Whether the generated prior will be truncated + at the bounds. + If ``True``, the probability density will be rescaled + accordingly and the sample is generated from the truncated + distribution. + If ``False``, the probability density will not be rescaled + accordingly, but the sample will be generated from the truncated + distribution. """ def __init__( @@ -66,6 +74,7 @@ def __init__( parameters: tuple, bounds: tuple = None, transformation: str = C.LIN, + bounds_truncate: bool = True, ): if transformation not in C.PARAMETER_SCALES: raise ValueError( @@ -87,27 +96,51 @@ def __init__( self._parameters = parameters self._bounds = bounds self._transformation = transformation + self._bounds_truncate = bounds_truncate + + truncation = bounds + if truncation is not None: + # for uniform, we don't want to implement truncation and just + # adapt the distribution parameters + if type_ == C.PARAMETER_SCALE_UNIFORM: + parameters = ( + max(parameters[0], scale(truncation[0], transformation)), + min(parameters[1], scale(truncation[1], transformation)), + ) + elif type_ == C.UNIFORM: + parameters = ( + max(parameters[0], truncation[0]), + min(parameters[1], truncation[1]), + ) # create the underlying distribution match type_, transformation: case (C.UNIFORM, _) | (C.PARAMETER_SCALE_UNIFORM, C.LIN): self.distribution = Uniform(*parameters) case (C.NORMAL, _) | (C.PARAMETER_SCALE_NORMAL, C.LIN): - self.distribution = Normal(*parameters) + self.distribution = Normal(*parameters, trunc=truncation) case (C.LAPLACE, _) | (C.PARAMETER_SCALE_LAPLACE, C.LIN): - self.distribution = Laplace(*parameters) + self.distribution = Laplace(*parameters, trunc=truncation) case (C.PARAMETER_SCALE_UNIFORM, C.LOG): self.distribution = Uniform(*parameters, log=True) case (C.LOG_NORMAL, _) | (C.PARAMETER_SCALE_NORMAL, C.LOG): - self.distribution = Normal(*parameters, log=True) + self.distribution = Normal( + *parameters, log=True, trunc=truncation + ) case (C.LOG_LAPLACE, _) | (C.PARAMETER_SCALE_LAPLACE, C.LOG): - self.distribution = Laplace(*parameters, log=True) + self.distribution = Laplace( + *parameters, log=True, trunc=truncation + ) case (C.PARAMETER_SCALE_UNIFORM, C.LOG10): self.distribution = Uniform(*parameters, log=10) case (C.PARAMETER_SCALE_NORMAL, C.LOG10): - self.distribution = Normal(*parameters, log=10) + self.distribution = Normal( + *parameters, log=10, trunc=truncation + ) case (C.PARAMETER_SCALE_LAPLACE, C.LOG10): - self.distribution = Laplace(*parameters, log=10) + self.distribution = Laplace( + *parameters, log=10, trunc=truncation + ) case _: raise ValueError( "Unsupported distribution type / transformation: " @@ -123,69 +156,55 @@ def __repr__(self): ) @property - def type(self): + def type(self) -> str: return self._type @property - def parameters(self): + def parameters(self) -> tuple: + """The parameters of the distribution.""" return self._parameters @property - def bounds(self): + def bounds(self) -> tuple[float, float] | None: + """The non-scaled bounds of the distribution.""" return self._bounds @property - def transformation(self): + def transformation(self) -> str: + """The `parameterScale`.""" return self._transformation - def sample(self, shape=None) -> np.ndarray: + def sample(self, shape=None, x_scaled=False) -> np.ndarray | float: """Sample from the distribution. :param shape: The shape of the sample. + :param x_scaled: Whether the sample should be on the parameter scale. :return: A sample from the distribution. """ raw_sample = self.distribution.sample(shape) - return self._clip_to_bounds(self._scale_sample(raw_sample)) + if x_scaled: + return self._scale_sample(raw_sample) + else: + return raw_sample def _scale_sample(self, sample): """Scale the sample to the parameter space""" - # if self.on_parameter_scale: - # return sample - + # we also need to scale parameterScale* distributions, because + # internally, they are handled as (unscaled) log-distributions return scale(sample, self.transformation) - def _clip_to_bounds(self, x): - """Clip `x` values to bounds. - - :param x: The values to clip. Assumed to be on the parameter scale. - """ - # TODO: replace this by proper truncation - if self.bounds is None: - return x - - return np.maximum( - np.minimum(self.ub_scaled, x), - self.lb_scaled, - ) - @property - def lb_scaled(self): + def lb_scaled(self) -> float: """The lower bound on the parameter scale.""" return scale(self.bounds[0], self.transformation) @property - def ub_scaled(self): + def ub_scaled(self) -> float: """The upper bound on the parameter scale.""" return scale(self.bounds[1], self.transformation) - def pdf(self, x): - """Probability density function at x. - - :param x: The value at which to evaluate the PDF. - ``x`` is assumed to be on the parameter scale. - :return: The value of the PDF at ``x``. Note that the PDF does - currently not account for the clipping at the bounds. - """ + def _chain_rule_coeff(self, x) -> np.ndarray | float: + """The chain rule coefficient for the transformation at x.""" x = unscale(x, self.transformation) # scale the PDF to the parameter scale @@ -198,25 +217,63 @@ def pdf(self, x): else: raise ValueError(f"Unknown transformation: {self.transformation}") - return self.distribution.pdf(x) * coeff + return coeff + + def pdf( + self, x, x_scaled: bool = False, rescale=False + ) -> np.ndarray | float: + """Probability density function at x. + + This accounts for truncation, independent of the `bounds_truncate` + parameter. + + :param x: The value at which to evaluate the PDF. + ``x`` is assumed to be on the parameter scale. + :param x_scaled: Whether ``x`` is on the parameter scale. + :param rescale: Whether to rescale the PDF to integrate to 1 on the + parameter scale. Only used if ``x_scaled`` is ``True``. + :return: The value of the PDF at ``x``. + """ + if x_scaled: + coeff = self._chain_rule_coeff(x) if rescale else 1 + x = unscale(x, self.transformation) + return self.distribution.pdf(x) * coeff + + return self.distribution.pdf(x) - def neglogprior(self, x): + def neglogprior( + self, x: np.array | float, x_scaled: bool = False + ) -> np.ndarray | float: """Negative log-prior at x. :param x: The value at which to evaluate the negative log-prior. - ``x`` is assumed to be on the parameter scale. + :param x_scaled: Whether ``x`` is on the parameter scale. + Note that the prior is always evaluated on the non-scaled + parameters. :return: The negative log-prior at ``x``. """ - return -np.log(self.pdf(x)) + if self._bounds_truncate: + # the truncation is handled by the distribution + # the prior is always evaluated on the non-scaled parameters + return -np.log(self.pdf(x, x_scaled=x_scaled, rescale=False)) + + # we want to evaluate the prior on the untruncated distribution + if x_scaled: + x = unscale(x, self.transformation) + return -np.log(self.distribution._pdf_transformed_untruncated(x)) @staticmethod def from_par_dict( - d, type_=Literal["initialization", "objective"] + d, + type_=Literal["initialization", "objective"], + bounds_truncate: bool = True, ) -> Prior: """Create a distribution from a row of the parameter table. :param d: A dictionary representing a row of the parameter table. :param type_: The type of the distribution. + :param bounds_truncate: Whether the generated prior will be truncated + at the bounds. :return: A distribution object. """ dist_type = d.get(f"{type_}PriorType", C.PARAMETER_SCALE_UNIFORM) @@ -242,6 +299,7 @@ def from_par_dict( parameters=params, bounds=(d[C.LOWER_BOUND], d[C.UPPER_BOUND]), transformation=pscale, + bounds_truncate=bounds_truncate, ) @@ -294,6 +352,7 @@ def priors_to_measurements(problem: Problem): return new_problem def scaled_observable_formula(parameter_id, parameter_scale): + # The location parameter of the prior if parameter_scale == LIN: return parameter_id if parameter_scale == LOG: @@ -322,6 +381,12 @@ def scaled_observable_formula(parameter_id, parameter_scale): # offset raise NotImplementedError("Uniform priors are not supported.") + if prior_type not in (C.NORMAL, C.LAPLACE): + # we can't (easily) handle parameterScale* priors or log*-priors + raise NotImplementedError( + f"Objective prior type {prior_type} is not implemented." + ) + parameter_id = row.name prior_parameters = tuple( map( @@ -346,7 +411,9 @@ def scaled_observable_formula(parameter_id, parameter_scale): OBSERVABLE_ID: new_obs_id, OBSERVABLE_FORMULA: scaled_observable_formula( parameter_id, - parameter_scale if "parameterScale" in prior_type else LIN, + parameter_scale + if prior_type in C.PARAMETER_SCALE_PRIOR_TYPES + else LIN, ), NOISE_FORMULA: f"noiseParameter1_{new_obs_id}", } @@ -355,12 +422,13 @@ def scaled_observable_formula(parameter_id, parameter_scale): elif OBSERVABLE_TRANSFORMATION in new_problem.observable_df: # only set default if the column is already present new_observable[OBSERVABLE_TRANSFORMATION] = LIN - + # type of the underlying distribution if prior_type in (NORMAL, PARAMETER_SCALE_NORMAL, LOG_NORMAL): new_observable[NOISE_DISTRIBUTION] = NORMAL elif prior_type in (LAPLACE, PARAMETER_SCALE_LAPLACE, LOG_LAPLACE): new_observable[NOISE_DISTRIBUTION] = LAPLACE else: + # we can't (easily) handle uniform priors in PEtab v1 raise NotImplementedError( f"Objective prior type {prior_type} is not implemented." ) diff --git a/petab/v1/sampling.py b/petab/v1/sampling.py index a046879f..f96bd1a0 100644 --- a/petab/v1/sampling.py +++ b/petab/v1/sampling.py @@ -28,7 +28,11 @@ def sample_from_prior( # unpack info p_type, p_params, scaling, bounds = prior prior = Prior( - p_type, tuple(p_params), bounds=tuple(bounds), transformation=scaling + p_type, + tuple(p_params), + bounds=tuple(bounds), + transformation=scaling, + bounds_truncate=True, ) return prior.sample(shape=(n_starts,)) @@ -74,7 +78,9 @@ def sample_parameter_startpoints( # get types and parameters of priors from dataframe return np.array( [ - Prior.from_par_dict(row, type_="initialization").sample(n_starts) + Prior.from_par_dict( + row, type_="initialization", bounds_truncate=True + ).sample(n_starts, x_scaled=True) for row in par_to_estimate.to_dict("records") ] ).T diff --git a/tests/v1/test_distributions.py b/tests/v1/test_distributions.py index 9df830fa..e06d9edc 100644 --- a/tests/v1/test_distributions.py +++ b/tests/v1/test_distributions.py @@ -1,3 +1,5 @@ +import sys + import numpy as np import pytest from numpy.testing import assert_allclose @@ -27,6 +29,11 @@ Uniform(2, 4, log=10), Laplace(1, 2), Laplace(1, 0.5, log=True), + Normal(2, 1, trunc=(1, 2)), + Normal(2, 1, log=True, trunc=(0.5, 8)), + Normal(2, 1, log=10), + Laplace(1, 2, trunc=(1, 2)), + Laplace(1, 0.5, log=True, trunc=(0.5, 8)), ], ) def test_sample_matches_pdf(distribution): @@ -51,36 +58,54 @@ def cdf(x): assert p > 0.05, (p, distribution) + # check min/max of CDF at the bounds + assert np.isclose( + distribution.cdf( + distribution.trunc_low + if not distribution.logbase + else max(sys.float_info.min, distribution.trunc_low) + ), + 0, + atol=1e-16, + rtol=0, + ) + assert np.isclose( + distribution.cdf(distribution.trunc_high), 1, atol=1e-14, rtol=0 + ) + # Test samples match scipy CDFs reference_pdf = None - if isinstance(distribution, Normal) and distribution.logbase is False: - reference_pdf = norm.pdf(sample, distribution.loc, distribution.scale) - elif isinstance(distribution, Uniform) and distribution.logbase is False: - reference_pdf = uniform.pdf( - sample, distribution._low, distribution._high - distribution._low - ) - elif isinstance(distribution, Laplace) and distribution.logbase is False: - reference_pdf = laplace.pdf( - sample, distribution.loc, distribution.scale - ) - elif isinstance(distribution, Normal) and distribution.logbase == np.exp( - 1 - ): - reference_pdf = lognorm.pdf( - sample, scale=np.exp(distribution.loc), s=distribution.scale - ) - elif isinstance(distribution, Uniform) and distribution.logbase == np.exp( - 1 - ): - reference_pdf = loguniform.pdf( - sample, np.exp(distribution._low), np.exp(distribution._high) - ) - elif isinstance(distribution, Laplace) and distribution.logbase == np.exp( - 1 - ): - reference_pdf = loglaplace.pdf( - sample, c=1 / distribution.scale, scale=np.exp(distribution.loc) - ) + if distribution._trunc is None and distribution.logbase is False: + if isinstance(distribution, Normal): + reference_pdf = norm.pdf( + sample, distribution.loc, distribution.scale + ) + elif isinstance(distribution, Uniform): + reference_pdf = uniform.pdf( + sample, + distribution._low, + distribution._high - distribution._low, + ) + elif isinstance(distribution, Laplace): + reference_pdf = laplace.pdf( + sample, distribution.loc, distribution.scale + ) + + if distribution._trunc is None and distribution.logbase == np.exp(1): + if isinstance(distribution, Normal): + reference_pdf = lognorm.pdf( + sample, scale=np.exp(distribution.loc), s=distribution.scale + ) + elif isinstance(distribution, Uniform): + reference_pdf = loguniform.pdf( + sample, np.exp(distribution._low), np.exp(distribution._high) + ) + elif isinstance(distribution, Laplace): + reference_pdf = loglaplace.pdf( + sample, + c=1 / distribution.scale, + scale=np.exp(distribution.loc), + ) if reference_pdf is not None: assert_allclose( distribution.pdf(sample), reference_pdf, rtol=1e-10, atol=1e-14 diff --git a/tests/v1/test_priors.py b/tests/v1/test_priors.py index d98879e3..5cb2ad3a 100644 --- a/tests/v1/test_priors.py +++ b/tests/v1/test_priors.py @@ -6,7 +6,7 @@ import numpy as np import pandas as pd import pytest -from scipy.integrate import cumulative_trapezoid +from scipy.integrate import cumulative_trapezoid, quad from scipy.stats import kstest import petab.v1 @@ -20,9 +20,39 @@ get_simulation_conditions, get_simulation_df, ) +from petab.v1.calculate import calculate_single_llh from petab.v1.priors import Prior, priors_to_measurements +def test_priors_to_measurements_simple(): + """Test the conversion of priors to measurements. + + Illustrates & tests the conversion of a prior to a measurement. + """ + # parameter value at which we evaluate the prior + par_value = 2.5 + # location and scale parameters of the prior + prior_loc = 3 + prior_scale = 3 + + for prior_type in [C.NORMAL, C.LAPLACE]: + # evaluate the orignal prior + prior = Prior( + prior_type, (prior_loc, prior_scale), transformation=C.LIN + ) + logprior = -prior.neglogprior(par_value, x_scaled=False) + + # evaluate the alternative implementation as a measurement + llh = calculate_single_llh( + measurement=prior_loc, + simulation=par_value, + scale=C.LIN, + noise_distribution=prior_type, + noise_value=prior_scale, + ) + assert np.isclose(llh, logprior, rtol=1e-12, atol=1e-16) + + @pytest.mark.parametrize( "problem_id", ["Schwen_PONE2014", "Isensee_JCB2018", "Raimundez_PCB2020"] ) @@ -59,8 +89,13 @@ def test_priors_to_measurements(problem_id): ) ) - # convert priors to measurements - petab_problem_measurements = priors_to_measurements(petab_problem_priors) + try: + # convert priors to measurements + petab_problem_measurements = priors_to_measurements( + petab_problem_priors + ) + except NotImplementedError as e: + pytest.skip(str(e)) # check that the original problem is not modified for attr in [ @@ -121,9 +156,12 @@ def apply_parameter_values(row): # apply the parameter values to the observable formula for the prior if row[OBSERVABLE_ID].startswith("prior_"): parameter_id = row[OBSERVABLE_ID].removeprefix("prior_") - if original_problem.parameter_df.loc[ - parameter_id, OBJECTIVE_PRIOR_TYPE - ].startswith("parameterScale"): + if ( + original_problem.parameter_df.loc[ + parameter_id, OBJECTIVE_PRIOR_TYPE + ] + in C.PARAMETER_SCALE_PRIOR_TYPES + ): row[SIMULATION] = x_scaled_dict[parameter_id] else: row[SIMULATION] = x_unscaled_dict[parameter_id] @@ -156,13 +194,17 @@ def apply_parameter_values(row): ] priors = [ Prior.from_par_dict( - petab_problem_priors.parameter_df.loc[par_id], type_="objective" + petab_problem_priors.parameter_df.loc[par_id], + type_="objective", + bounds_truncate=False, ) for par_id in parameter_ids ] prior_contrib = 0 for parameter_id, prior in zip(parameter_ids, priors, strict=True): - prior_contrib -= prior.neglogprior(x_scaled_dict[parameter_id]) + prior_contrib -= prior.neglogprior( + x_scaled_dict[parameter_id], x_scaled=True + ) assert np.isclose( llh_priors + prior_contrib, llh_measurements, rtol=1e-8, atol=1e-16 @@ -194,21 +236,47 @@ def test_sample_matches_pdf(prior_args, transform): """Test that the sample matches the PDF.""" np.random.seed(1) N_SAMPLES = 10_000 + prior = Prior(*prior_args, transformation=transform) - sample = prior.sample(N_SAMPLES) - # pdf -> cdf - def cdf(x): - return cumulative_trapezoid(prior.pdf(x), x) + for x_scaled in [False, True]: + sample = prior.sample(N_SAMPLES, x_scaled=x_scaled) + + # pdf -> cdf + def cdf(x): + return cumulative_trapezoid( + prior.pdf( + x, + x_scaled=x_scaled, # noqa B208 + rescale=x_scaled, # noqa B208 + ), + x, + ) + + # Kolmogorov-Smirnov test to check if the sample is drawn from the CDF + _, p = kstest(sample, cdf) - # Kolmogorov-Smirnov test to check if the sample is drawn from the CDF - _, p = kstest(sample, cdf) + if p < 0.05: + import matplotlib.pyplot as plt - # if p < 0.05: - # import matplotlib.pyplot as plt - # plt.hist(sample, bins=100, density=True) - # x = np.linspace(min(sample), max(sample), 100) - # plt.plot(x, distribution.pdf(x)) - # plt.show() + plt.hist(sample, bins=100, density=True) + x = np.linspace(min(sample), max(sample), 100) + plt.plot(x, prior.pdf(x, x_scaled=x_scaled, rescale=x_scaled)) + plt.xlabel(("scaled" if x_scaled else "unscaled") + " x") + plt.ylabel(("rescaled " if x_scaled else "") + "density") + plt.title(str(prior)) + plt.show() + print() - assert p > 0.05, (p, prior) + assert p > 0.05, (p, prior) + + # check that the integral of the PDF is 1 on the parameter scale + integral, abserr = quad( + lambda x: prior.pdf(x, x_scaled=False), + -np.inf, + np.inf, + limit=100, + epsabs=1e-10, + epsrel=0, + ) + assert np.isclose(integral, 1, rtol=0, atol=10 * abserr)