diff --git a/app/deployments/surrogate_service.py b/app/deployments/surrogate_service.py new file mode 100644 index 0000000..1281144 --- /dev/null +++ b/app/deployments/surrogate_service.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python + +###################################### +# Imports +###################################### + +from typing import List, Optional + +import bentoml +import pandas as pd +from bentoml.io import JSON, PandasDataFrame +from pydantic import BaseModel + +###################################### +# Constants +###################################### + +TASK = "surrogate" + +###################################### +# Main +###################################### + +runner = bentoml.mlflow.get(f"{TASK}:latest").to_runner() + +svc = bentoml.Service(TASK, runners=[runner]) + + +class SimulationFeatures(BaseModel): + """The simulation features data model.""" + + random_seed: Optional[int] = 100 + max_order: Optional[int] = 3 + root_ratio: Optional[float] = 0.5 + fine_root_threshold: Optional[float] = 0.06 + outer_root_num: Optional[int] = 10 + inner_root_num: Optional[int] = 8 + min_primary_length: Optional[float] = 20 + max_primary_length: Optional[float] = 30 + base_diameter: Optional[float] = 0.11 + diameter_reduction: Optional[float] = 0.2 + apex_diameter: Optional[float] = 0.02 + min_sec_root_num: Optional[int] = 1 + max_sec_root_num: Optional[int] = 3 + growth_sec_root: Optional[float] = 0.2 + min_sec_root_length: Optional[float] = 100 + max_sec_root_length: Optional[float] = 220 + segments_per_root: Optional[int] = 50 + length_reduction: Optional[float] = 0.5 + root_vary: Optional[float] = 30 + interbranch_distance: Optional[float] = 0.0078 + mechanical_constraints: Optional[float] = 0.5 + root_tissue_density: Optional[float] = 0.05 + gravitropism: Optional[float] = 7.5 + origin_min: Optional[float] = 1e-3 + origin_max: Optional[float] = 1e-2 + enable_soil: Optional[bool] = False + soil_layer_height: Optional[float] = 0 + soil_layer_width: Optional[float] = 0 + soil_n_layers: Optional[int] = 0 + soil_n_cols: Optional[int] = 0 + max_val_attempts: Optional[int] = 50 + simulation_tag: Optional[str] = "default" + no_root_zone: Optional[float] = 1e-4 + floor_threshold: Optional[float] = 0.4 + ceiling_threshold: Optional[float] = 0.9 + + +class SurrogateFeatures(BaseModel): + """The Surrogate features data model.""" + + data: List[SimulationFeatures] + + +input_spec = JSON(pydantic_model=SurrogateFeatures) + + +@svc.api(input=input_spec, output=PandasDataFrame()) +def predict(inputs: SurrogateFeatures) -> dict: + """Get the surrogate model predictions. + + Args: + inputs (SurrogateFeatures): + The simulation parameter data. + + Returns: + dict: + The surrogate model predictions. + """ + if len(inputs.data) > 0: + input_list = [simulation.dict() for simulation in inputs.data] + + if len(input_list) == 1: + index = [0] + else: + index = None + input_df = pd.DataFrame(input_list, index=index) + result = runner.predict.run(input_df) + return result diff --git a/app/flows/run_snpe.py b/app/flows/run_snpe.py index 001f80f..7192bf2 100644 --- a/app/flows/run_snpe.py +++ b/app/flows/run_snpe.py @@ -292,6 +292,30 @@ def log_data_task( statistics_list: list[SummaryStatisticsModel], simulation_uuid: str, ) -> tuple: + """Log the Bayesian SNPE model for output data. + + Args: + input_parameters (RootCalibrationModel): + The root calibration data model. + estimator (GraphFlowFeatureExtractor): + The normalising flow model. + posterior_samples (torch.Tensor): + Samples from the posterior. + node_df (pd.DataFrame): + The node dataframe. + edge_df (pd.DataFrame): + The edge dataframe. + loader (JointLoader): + The data loader. + statistics_list (list[SummaryStatisticsModel]): + The list of summary statistics. + simulation_uuid (str): + The simulation UUID. + + Returns: + tuple: + The simulation and its parameters. + """ time_now = get_datetime_now() outdir = get_outdir() diff --git a/app/flows/run_surrogate.py b/app/flows/run_surrogate.py index 410e445..386b5af 100644 --- a/app/flows/run_surrogate.py +++ b/app/flows/run_surrogate.py @@ -26,32 +26,27 @@ from prefect import flow, task from prefect.artifacts import create_table_artifact from prefect.task_runners import SequentialTaskRunner -from sklearn.preprocessing import MinMaxScaler, StandardScaler from torch.optim.lr_scheduler import StepLR -from torch.utils.data import DataLoader, TensorDataset from deeprootgen.calibration import ( - AbcModel, EarlyStopper, + SingleTaskVariationalGPModel, + SurrogateModel, calculate_summary_statistic_discrepancy, get_calibration_summary_stats, lhc_sample, log_model, + prepare_surrogate_data, run_calibration_simulation, + training_loop, ) -from deeprootgen.data_model import ( - RootCalibrationModel, - StatisticsComparisonModel, - SummaryStatisticsModel, -) -from deeprootgen.io import save_graph_to_db +from deeprootgen.data_model import RootCalibrationModel, SummaryStatisticsModel from deeprootgen.pipeline import ( begin_experiment, get_datetime_now, get_outdir, log_config, log_experiment_details, - log_simulation, ) from deeprootgen.statistics import DistanceMetricBase @@ -91,28 +86,6 @@ def prepare_task(input_parameters: RootCalibrationModel) -> tuple: return sample_df, distances, statistics_list -class GPModel(gpytorch.models.ApproximateGP): - def __init__(self, inducing_points): - variational_distribution = gpytorch.variational.CholeskyVariationalDistribution( - inducing_points.size(0) - ) - variational_strategy = gpytorch.variational.VariationalStrategy( - self, - inducing_points, - variational_distribution, - learn_inducing_locations=True, - ) - super().__init__(variational_strategy) - self.mean_module = gpytorch.means.ConstantMean() - self.covar_module = gpytorch.kernels.ScaleKernel(gpytorch.kernels.RBFKernel()) - self.likelihood = gpytorch.likelihoods.GaussianLikelihood() - - def forward(self, x): - mean_x = self.mean_module(x) - covar_x = self.covar_module(x) - return gpytorch.distributions.MultivariateNormal(mean_x, covar_x) - - class MultitaskVariationalGPModel(gpytorch.models.ApproximateGP): def __init__(self, n_features, num_latents, num_tasks): inducing_points = torch.rand(num_latents, n_features, n_features) @@ -159,7 +132,23 @@ def perform_summary_stat_task( sample_df: pd.DataFrame, distances: list[DistanceMetricBase], statistics_list: list[SummaryStatisticsModel], -) -> None: +) -> tuple: + """Train a surrogate model using cost emulation. + + Args: + input_parameters (RootCalibrationModel): + The root calibration data model. + sample_df (pd.DataFrame): + The data produced using Latin Hypercube sampling. + distances (list[DistanceMetricBase]): + The list of distance metrics. + statistics_list (list[SummaryStatisticsModel]): + The list of summary statistics. + + Returns: + tuple: + The trained surrogate. + """ discrepancies = [] parameter_intervals = input_parameters.parameter_intervals.dict() for sample_row in sample_df.to_dict("records"): @@ -171,114 +160,91 @@ def perform_summary_stat_task( parameter_specs, input_parameters, statistics_list, distances ) discrepancies.append(discrepancy) - sample_df["discrepancy"] = discrepancies - training_df = sample_df.loc[:, sample_df.nunique() != 1] - splits = [] - for i in range(3): - split_df = training_df.query(f"training_data_split == {i}").drop( - columns=["training_data_split", "sim_ids"] - ) - X = split_df.drop(columns="discrepancy").values - y = split_df["discrepancy"].values.reshape(-1, 1) - splits.append((X, y)) - - train, val, test = splits - X_train, y_train = train - X_scaler = MinMaxScaler() - X_scaler.fit(X_train) - y_scaler = MinMaxScaler() - y_scaler.fit(y_train) + loaders, scalers, training_df, num_data = prepare_surrogate_data(sample_df) + train_loader, val_loader, _ = loaders calibration_parameters = input_parameters.calibration_parameters num_inducing_points = calibration_parameters["num_inducing_points"] - - def prepare_data(split: tuple, shuffle: bool = True) -> tuple: - X, y = split - X = torch.Tensor(X_scaler.transform(X)).double() - y = torch.Tensor(y).double() - y = torch.Tensor(y_scaler.transform(y)).double() - dataset = TensorDataset(X, y) - loader = DataLoader(dataset, batch_size=num_inducing_points, shuffle=shuffle) - return loader - - train_loader = prepare_data(train) - val_loader = prepare_data(val) - test_loader = prepare_data(test, shuffle=False) - early_stopper = EarlyStopper(patience=5, min_delta=0) - - n_epochs = calibration_parameters["n_epochs"] - lr = calibration_parameters["lr"] - - # model = MultitaskVariationalGPModel( - # n_features= X_train.shape[-1], - # num_latents = num_inducing_points, - # num_tasks = y_train.shape[-1] - # ).train().double() - - for X_batch, _ in train_loader: - model = GPModel(inducing_points=X_batch).double() + for inducing_points, _ in train_loader: + inducing_points = inducing_points[:num_inducing_points, :] break + model = SingleTaskVariationalGPModel(inducing_points).double() + likelihood = gpytorch.likelihoods.GaussianLikelihood().double() + model.train() + likelihood.train() - mll = gpytorch.mlls.VariationalELBO( - model.likelihood, model, num_data=y_train.shape[0] + variational_ngd_optimizer = gpytorch.optim.NGD( + model.variational_parameters(), num_data=num_data, lr=0.1 ) - - # variational_ngd_optimizer = gpytorch.optim.NGD( - # model.variational_parameters(), - # num_data=y_train.shape[0], - # lr = 0.1 - # ) - hyperparameter_optimizer = torch.optim.Adam( + lr = calibration_parameters["lr"] + optimizer = torch.optim.Adam( [ - {"params": model.parameters()}, - # {"params": model.hyperparameters()}, + {"params": model.hyperparameters()}, + {"params": likelihood.parameters()}, ], lr=lr, ) + mll = gpytorch.mlls.VariationalELBO(likelihood, model, num_data=num_data) + + n_epochs = calibration_parameters["n_epochs"] scheduler = StepLR( - hyperparameter_optimizer, + optimizer, step_size=int(n_epochs * 0.7), gamma=0.1, # type: ignore[operator] ) - validation_check: int = np.ceil(n_epochs * 0.05).astype("int") # type: ignore[operator] - for epoch in range(n_epochs): # type: ignore - model.train() - for X_batch, y_batch in train_loader: - # variational_ngd_optimizer.zero_grad() - hyperparameter_optimizer.zero_grad() - out = model(X_batch) - loss = -mll(out, y_batch).mean() - loss.backward() - # variational_ngd_optimizer.step() - hyperparameter_optimizer.step() - scheduler.step() - - if (epoch % validation_check) == 0: - model.eval() - with torch.no_grad(), gpytorch.settings.fast_pred_var(): - validation_loss = 0 - for X_batch, y_batch in val_loader: - out = model(X_batch) - validation_loss += -mll(out, y_batch).mean().item() - if early_stopper.early_stop(validation_loss): - break - print( - f"Epoch: {epoch}, Training loss: {loss.item()} Validation loss: {validation_loss}", - ) + patience = calibration_parameters["early_stopping_patience"] + early_stopper = EarlyStopper(patience=patience, min_delta=0) + training_loop( + model, + train_loader, + val_loader, + n_epochs, + mll, + optimizer, + variational_ngd_optimizer, + scheduler, + early_stopper, + ) + + return model, likelihood, sample_df, loaders, scalers, training_df, inducing_points + - model.eval() - X_test = [] - y_test = [] - for X_batch, y_batch in test_loader: - X_test.extend(X_batch) - y_test.extend(y_batch) - X_test = torch.stack(X_test) - y_test = torch.stack(y_test) +@task +def log_summary_stat_task( + input_parameters: RootCalibrationModel, + model: gpytorch.models.ApproximateGP, + likelihood: gpytorch.likelihoods.GaussianLikelihood, + sample_df: pd.DataFrame, + loaders: tuple, + scalers: tuple, + training_df: pd.DataFrame, + inducing_points: torch.Tensor, + simulation_uuid: str, +) -> None: + """Log the surrogate model results. + Args: + input_parameters (RootCalibrationModel): + The root calibration data model. + model (gpytorch.models.ApproximateGP): + The surrogate model. + likelihood (gpytorch.likelihoods.GaussianLikelihood): + The surrogate model likelihood. + sample_df (pd.DataFrame): + The sample simulation data. + loaders (tuple): + The data loaders. + scalers (tuple): + The data scalers. + training_df (pd.DataFrame): + The model training data. + simulation_uuid (str): + The simulation UUID. + """ time_now = get_datetime_now() outdir = get_outdir() @@ -286,49 +252,151 @@ def prepare_data(split: tuple, shuffle: bool = True) -> tuple: sample_df.to_csv(outfile, index=False) mlflow.log_artifact(outfile) - with torch.no_grad(), gpytorch.settings.fast_pred_var(): - predictions = model.likelihood(model(X_test)) - mean = predictions.mean.cpu().numpy() - lower, upper = predictions.confidence_region() - lower, upper = lower.cpu().numpy(), upper.cpu().numpy() - - metrics = [] - for metric_func in [ - gpytorch.metrics.mean_standardized_log_loss, - gpytorch.metrics.mean_squared_error, - gpytorch.metrics.mean_absolute_error, - gpytorch.metrics.quantile_coverage_error, - gpytorch.metrics.negative_log_predictive_density, + train_loader, val_loader, test_loader = loaders + X_scaler, y_scaler = scalers + + def log_performance(loader: torch.utils.data.DataLoader, split_name: str) -> None: + model.eval() + X_data = [] + y_data = [] + + for X_batch, y_batch in loader: + X_data.extend(X_batch) + y_data.extend(y_batch) + + X_data = torch.stack(X_data) + y_data = torch.stack(y_data) + + with torch.no_grad(), gpytorch.settings.fast_pred_var(): + predictions = likelihood(model(X_data)) + mean = predictions.mean.detach().cpu().numpy() + lower, upper = predictions.confidence_region() + lower, upper = lower.detach().cpu().numpy(), upper.detach().cpu().numpy() + + metrics = [] + for metric_func in [ + gpytorch.metrics.mean_standardized_log_loss, + gpytorch.metrics.mean_squared_error, + gpytorch.metrics.mean_absolute_error, + gpytorch.metrics.quantile_coverage_error, + gpytorch.metrics.negative_log_predictive_density, + ]: + metric_name = metric_func.__name__ + metric_score = ( + metric_func(predictions, y_data).cpu().detach().numpy().item() + ) + mlflow.log_metric(f"{split_name}_{metric_name}", metric_score) + metrics.append({"metric_name": metric_name, "metric_score": metric_score}) + + metric_df = pd.DataFrame(metrics) + outfile = osp.join(outdir, f"{time_now}-{TASK}_{split_name}_metrics.csv") + metric_df.to_csv(outfile, index=False) + mlflow.log_artifact(outfile) + + create_table_artifact( + key=f"surrogate-metrics-{split_name}-data", + table=metric_df.to_dict(orient="records"), + description=f"# Surrogate metrics on {split_name} data.", + ) + + y_data = y_data.detach().cpu().numpy().reshape(-1, 1) + mean = y_scaler.inverse_transform(mean.reshape(-1, 1)).flatten() + actual = y_scaler.inverse_transform(y_data).flatten() + lower = y_scaler.inverse_transform(lower.reshape(-1, 1)).flatten() + upper = y_scaler.inverse_transform(upper.reshape(-1, 1)).flatten() + index = np.arange(len(mean)) + + first_col = sample_df.columns[0] + X_data = X_scaler.inverse_transform(X_data.detach().cpu().numpy()) + x = X_data[:, 0] + out_df = pd.DataFrame( + { + f"{split_name}_predicted": mean, + f"{split_name}_actual": actual, + f"{split_name}_{first_col}": x, + "index": index, + } + ) + out_df.plot.scatter(f"{split_name}_actual", f"{split_name}_predicted") + outfile = osp.join( + outdir, f"{time_now}-{TASK}_{split_name}_actual_predicted.png" + ) + plt.tight_layout() + plt.savefig(outfile) + mlflow.log_artifact(outfile) + + out_df.plot.scatter( + f"{split_name}_{first_col}", + f"{split_name}_actual", + title=f"{split_name}_actual", + ) + outfile = osp.join(outdir, f"{time_now}-{TASK}_{split_name}_actual.png") + plt.tight_layout() + plt.savefig(outfile) + mlflow.log_artifact(outfile) + + fig = out_df.plot.scatter( + f"{split_name}_{first_col}", + f"{split_name}_predicted", + title=f"{split_name}_predicted", + ) + fig.fill_between(mean, lower, upper, alpha=0.5) + outfile = osp.join(outdir, f"{time_now}-{TASK}_{split_name}_predicted.png") + plt.tight_layout() + plt.savefig(outfile) + mlflow.log_artifact(outfile) + + outfile = osp.join( + outdir, f"{time_now}-{TASK}_{split_name}_actual_predicted.csv" + ) + out_df.to_csv(outfile, index=False) + mlflow.log_artifact(outfile) + + return mean, lower, upper + + log_performance(val_loader, "validation") + log_performance(train_loader, "train") + mean, lower, upper = log_performance(test_loader, "test") + + mlflow.set_tag("surrogate_type", "cost emulation") + artifacts = {} + for obj, name in [ + (model.state_dict(), "state_dict"), + (inducing_points, "inducing_points"), ]: - metric_name = metric_func.__name__ - metric_score = metric_func(predictions, y_test).cpu().detach().numpy().item() - mlflow.log_metric(metric_name, metric_score) - metrics.append({"metric_name": metric_name, "metric_score": metric_score}) - - metric_df = pd.DataFrame(metrics) - outfile = osp.join(outdir, f"{time_now}-{TASK}_metrics.csv") - metric_df.to_csv(outfile, index=False) - mlflow.log_artifact(outfile) + outfile = osp.join(outdir, f"{time_now}-{TASK}_{name}.pth") + artifacts[name] = outfile + torch.save(obj, outfile) - create_table_artifact( - key="surrogate-metrics", - table=metric_df.to_dict(orient="records"), - description="# Surrogate metrics.", + drop_columns = ["training_data_split", "sim_ids", "discrepancy"] + column_names = training_df.drop(columns=drop_columns).columns + + for obj, name in [ + (X_scaler, "X_scaler"), + (y_scaler, "Y_scaler"), + (column_names, "column_names"), + ]: + outfile = osp.join(outdir, f"{time_now}-{TASK}_{name}.pkl") + artifacts[name] = outfile + calibrator_dump(obj, outfile) + + signature_x = training_df.drop(columns=drop_columns).head(5).to_dict("records") + signature_y = pd.DataFrame( + {"discrepancy": mean, "lower_bound": lower, "upper_bound": upper} ) - mean = y_scaler.inverse_transform(mean).flatten() - actual = y_scaler.inverse_transform(y_test.cpu().numpy()).flatten() - lower = y_scaler.inverse_transform(lower).flatten() - upper = y_scaler.inverse_transform(upper).flatten() - index = np.arange(len(mean)) - out_df = pd.DataFrame( - {"test_predicted": mean, "test_actual": actual, "index": index} + calibration_model = SurrogateModel() + + log_model( + TASK, + input_parameters, + calibration_model, + artifacts, + simulation_uuid, + signature_x, + signature_y, + {"surrogate_type": "cost_emulator", "n_tasks": 1}, ) - out_df.plot.scatter("test_actual", "test_predicted") - outfile = osp.join(outdir, f"{time_now}-{TASK}_actual_predicted.png") - plt.tight_layout() - plt.savefig(outfile) - mlflow.log_artifact(outfile) @task @@ -350,8 +418,22 @@ def run_surrogate(input_parameters: RootCalibrationModel, simulation_uuid: str) input_parameters.statistics_comparison.use_summary_statistics # type: ignore ) if use_summary_statistics: - perform_summary_stat_task( - input_parameters, sample_df, distances, statistics_list + model, likelihood, sample_df, loaders, scalers, training_df, inducing_points = ( + perform_summary_stat_task( + input_parameters, sample_df, distances, statistics_list + ) + ) + + log_summary_stat_task( + input_parameters, + model, + likelihood, + sample_df, + loaders, + scalers, + training_df, + inducing_points, + simulation_uuid, ) config = input_parameters.dict() diff --git a/deeprootgen/calibration/__init__.py b/deeprootgen/calibration/__init__.py index 3478842..41d165e 100644 --- a/deeprootgen/calibration/__init__.py +++ b/deeprootgen/calibration/__init__.py @@ -4,6 +4,7 @@ OptimisationModel, SensitivityAnalysisModel, SnpeModel, + SurrogateModel, log_model, ) from .parameters import PriorCollection, get_simulation_parameters, lhc_sample @@ -13,4 +14,9 @@ get_calibration_summary_stats, run_calibration_simulation, ) -from .surrogates import EarlyStopper +from .surrogates import ( + EarlyStopper, + SingleTaskVariationalGPModel, + prepare_surrogate_data, + training_loop, +) diff --git a/deeprootgen/calibration/model_versioning.py b/deeprootgen/calibration/model_versioning.py index 890cf41..a987cae 100644 --- a/deeprootgen/calibration/model_versioning.py +++ b/deeprootgen/calibration/model_versioning.py @@ -6,6 +6,7 @@ from typing import Any import bentoml +import gpytorch import mlflow import numpy as np import pandas as pd @@ -14,6 +15,7 @@ import torch.nn as nn import torch.nn.functional as F import zuko +from gpytorch.models import ApproximateGP from lampe.inference import NPE from mlflow.client import MlflowClient from mlflow.models import infer_signature @@ -22,6 +24,7 @@ from torch_geometric.nn.pool import global_max_pool from ..data_model import RootCalibrationModel +from .surrogates import SingleTaskVariationalGPModel def log_model( @@ -32,6 +35,7 @@ def log_model( simulation_uuid: str, signature_x: pd.DataFrame | np.ndarray | list | None = None, signature_y: pd.DataFrame | np.ndarray | list | None = None, + model_config: dict = None, # type: ignore [assignment] ) -> None: """Log the calibrator model to the registry. @@ -49,7 +53,9 @@ def log_model( signature_x (pd.DataFrame | np.ndarray | list | None, optional): The signature for data inputs. Defaults to None. signature_y (pd.DataFrame | np.ndarray | list | None, optional): - The signature for data outputs. Defaults to None.. Defaults to None. + The signature for data outputs. Defaults to None. + model_config (dict, optional): + The model configuration. Defaults to None. """ if signature_x is None and signature_y is None: signature = None @@ -61,6 +67,7 @@ def log_model( artifact_path=task, artifacts=artifacts, signature=signature, + model_config=model_config, ) model_uri = logged_model.model_uri model_version = mlflow.register_model( @@ -426,3 +433,97 @@ def flow(self, x: tuple[torch.Tensor]) -> dist.Distribution: x = self.encode(x) x = self.npe.flow(x) return x + + +class SurrogateModel(mlflow.pyfunc.PythonModel): + """A surrogate calibration model.""" + + def __init__(self) -> None: + """The SurrogateModel constructor.""" + self.task = "surrogate" + self.state_dict = None + self.X_scaler = None + self.Y_scaler = None + self.model = None + self.likelihood = None + self.column_names = None + + def load_context(self, context: Context) -> None: + """Load the model context. + + Args: + context (Context): + The model context. + """ + import joblib + + def load_data(k: str) -> Any: + artifact = context.artifacts[k] + return joblib.load(artifact) + + state_dict_path = context.artifacts["state_dict"] + self.state_dict = torch.load(state_dict_path) + + if context.model_config["surrogate_type"] == "cost_emulator": + inducing_points_path = context.artifacts["inducing_points"] + inducing_points = torch.load(inducing_points_path).double() + self.model = SingleTaskVariationalGPModel(inducing_points).double() + self.likelihood = gpytorch.likelihoods.GaussianLikelihood().double() + + self.model.eval() + self.X_scaler = load_data("X_scaler") + self.Y_scaler = load_data("Y_scaler") + self.column_names = load_data("column_names") + + def predict( + self, context: Context, model_input: pd.DataFrame, params: dict | None = None + ) -> pd.DataFrame: + """Make a model prediction. + + Args: + context (Context): + The model context. + model_input (pd.DataFrame): + The model input data. + params (dict, optional): + Optional model parameters. Defaults to None. + + Raises: + ValueError: + Error raised when the calibrator has not been loaded. + + Returns: + pd.DataFrame: + The model prediction. + """ + for prop in [ + self.state_dict, + self.X_scaler, + self.Y_scaler, + self.model, + self.likelihood, + self.column_names, + ]: + if prop is None: + raise ValueError(f"The {self.task} calibrator has not been loaded.") + + filtered_df = model_input[self.column_names] + X = self.X_scaler.transform(filtered_df.values) + X = torch.Tensor(X).double() + predictions = self.likelihood(self.model(X)) + + mean = predictions.mean.detach().cpu().numpy() + lower, upper = predictions.confidence_region() + lower, upper = lower.detach().cpu().numpy(), upper.detach().cpu().numpy() + + print(mean.shape) + + mean = self.Y_scaler.inverse_transform(mean.reshape(-1, 1)).flatten() + lower = self.Y_scaler.inverse_transform(lower.reshape(-1, 1)).flatten() + upper = self.Y_scaler.inverse_transform(upper.reshape(-1, 1)).flatten() + + df = pd.DataFrame( + {"discrepancy": mean, "lower_bound": lower, "upper_bound": upper} + ) + + return df diff --git a/deeprootgen/calibration/surrogates.py b/deeprootgen/calibration/surrogates.py index f295262..3ef71c2 100644 --- a/deeprootgen/calibration/surrogates.py +++ b/deeprootgen/calibration/surrogates.py @@ -3,6 +3,16 @@ This module defines various utlities and methods for working with surrogate models. """ +import gpytorch +import numpy as np +import pandas as pd +import torch +from gpytorch.models import ApproximateGP +from gpytorch.variational import NaturalVariationalDistribution, VariationalStrategy +from sklearn.preprocessing import MinMaxScaler +from torch.optim.lr_scheduler import StepLR +from torch.utils.data import DataLoader, TensorDataset + class EarlyStopper: """Early stopping for training.""" @@ -30,3 +40,165 @@ def early_stop(self, validation_loss: float) -> bool: if self.counter >= self.patience: return True return False + + +class SingleTaskVariationalGPModel(ApproximateGP): + """A variational Gaussian process for single task regression.""" + + def __init__(self, inducing_points: torch.Tensor) -> None: + """SingleTaskVariationalGPModel constructor. + + Args: + inducing_points (torch.Tensor): + The inducing points for training. + """ + variational_distribution = NaturalVariationalDistribution( + inducing_points.size(0) + ) + + variational_strategy = VariationalStrategy( + self, + inducing_points, + variational_distribution, + learn_inducing_locations=True, + ) + + super().__init__(variational_strategy) + + self.mean_module = gpytorch.means.ConstantMean() + self.covar_module = gpytorch.kernels.ScaleKernel(gpytorch.kernels.RBFKernel()) + + def forward(self, x: torch.Tensor) -> gpytorch.distributions.MultivariateNormal: + """The forward pass. + + Args: + x (torch.Tensor): + The input data. + + Returns: + gpytorch.distributions.MultivariateNormal: + A multivariate Gaussian distribution. + """ + mean_x = self.mean_module(x) + covar_x = self.covar_module(x) + return gpytorch.distributions.MultivariateNormal(mean_x, covar_x) + + +def prepare_surrogate_data( + sample_df: pd.DataFrame, output_names: list[str] = None, batch_size: int = 1024 # type: ignore [assignment] +) -> tuple: + """Prepare data for training the surrogate model. + + Args: + sample_df (pd.DataFrame): + The dataframe of sample simulation data. + output_names (list[str], optional): + The list of output names. Defaults to None. + batch_size (int, optional): + The tensor data batch size. Defaults to 1024. + + Returns: + tuple: + The processed data for the surrogate. + """ + training_df = sample_df.loc[:, sample_df.nunique() != 1] + + if output_names is None: + output_names = ["discrepancy"] + + splits = [] + for i in range(3): + split_df = training_df.query(f"training_data_split == {i}").drop( + columns=["training_data_split", "sim_ids"] + ) + X = split_df.drop(columns=output_names).values + y = split_df[output_names].values + if y.shape[-1] == 1: + y = y.reshape(-1, 1) + + splits.append((X, y)) + + train, val, test = splits + X_train, y_train = train + X_scaler = MinMaxScaler() + X_scaler.fit(X_train) + y_scaler = MinMaxScaler() + y_scaler.fit(y_train) + + def prepare_data(split: tuple) -> tuple: + X, y = split + X = torch.Tensor(X_scaler.transform(X)).double() + y = torch.Tensor(y_scaler.transform(y)).double() + if y.shape[-1] == 1: + y = y.squeeze() + + dataset = TensorDataset(X, y) + loader = DataLoader(dataset, batch_size=batch_size, shuffle=False) + return loader + + train_loader = prepare_data(train) + val_loader = prepare_data(val) + test_loader = prepare_data(test) + + num_data = y_train.shape[0] + return ( + (train_loader, val_loader, test_loader), + (X_scaler, y_scaler), + training_df, + num_data, + ) + + +def training_loop( + model: ApproximateGP, + train_loader: DataLoader, + val_loader: DataLoader, + n_epochs: int, + mll: gpytorch.mlls.VariationalELBO, + optimizer: torch.optim.Adam, + variational_ngd_optimizer: gpytorch.optim.NGD = None, + scheduler: StepLR = None, + early_stopper: EarlyStopper = None, # type: ignore [assignment] + silent: bool = False, +) -> ApproximateGP: + validation_check: int = np.ceil(n_epochs * 0.05).astype("int") + for epoch in range(n_epochs): + model.train() + for x_batch, y_batch in train_loader: + optimizer.zero_grad() + if variational_ngd_optimizer is not None: + variational_ngd_optimizer.zero_grad() + + output = model(x_batch) + loss = -mll(output, y_batch) + loss.backward() + + optimizer.step() + if variational_ngd_optimizer is not None: + variational_ngd_optimizer.step() + + if scheduler is not None: + scheduler.step() + + if (epoch % validation_check) == 0: + model.eval() + with torch.no_grad(), gpytorch.settings.fast_pred_var(): + validation_loss = 0 + for X_batch, y_batch in val_loader: + out = model(X_batch) + validation_loss += -mll(out, y_batch).mean().item() + + if early_stopper is not None: + if early_stopper.early_stop(validation_loss): + return model + + if not silent: + print( + f""" + Epoch: {epoch} + Training loss: {loss.item()} + Validation loss: {validation_loss} + """ + ) + + return model diff --git a/docker-compose.model-deploy.yaml b/docker-compose.model-deploy.yaml index 2c918c2..d7e4674 100644 --- a/docker-compose.model-deploy.yaml +++ b/docker-compose.model-deploy.yaml @@ -112,6 +112,34 @@ services: command: > bentoml serve --host 0.0.0.0 -p 3000 deployments.snpe_service:svc + surrogate: + image: ghcr.io/jbris/deep-root-gen:${APP_VERSION} + restart: always + stop_grace_period: 10s + environment: + MLFLOW_TRACKING_URI: http://mlflow:5000 + MLFLOW_BACKEND_STORE_URI: $MLFLOW_BACKEND_STORE_URI + MLFLOW_S3_ENDPOINT_URL: http://minio:9000 + AWS_ACCESS_KEY_ID: $AWS_ACCESS_KEY_ID + AWS_SECRET_ACCESS_KEY: $AWS_SECRET_ACCESS_KEY + BENTOML_DO_NOT_TRACK: True + BENTOML_HOME: /app/outputs/bento + BENTOML_BUCKET: s3://bento + BENTOML_CONFIG: /app/bentoml_configuration.yaml + ports: + - 5005:3000 + volumes: + - ./app/app.py:/app/app.py + - ./app/conf:/app/conf + - ./app/pages:/app/pages + - ./app/outputs:/app/outputs + - ./app/flows:/app/flows + - ./app/deployments:/app/deployments + - ./deeprootgen:/app/deeprootgen + - ./bentoml_configuration.yaml:/app/bentoml_configuration.yaml + command: > + bentoml serve --host 0.0.0.0 -p 3000 deployments.surrogate_service:svc + networks: default: name: $COMPOSE_PROJECT_NAME