diff --git a/examples/hpo/cartpole/config_overrides/cartpole_stab.yaml b/examples/hpo/cartpole/config_overrides/cartpole_stab.yaml new file mode 100644 index 000000000..b65e2466f --- /dev/null +++ b/examples/hpo/cartpole/config_overrides/cartpole_stab.yaml @@ -0,0 +1,68 @@ +task_config: + seed: 42 + info_in_reset: true + ctrl_freq: 15 + pyb_freq: 750 + physics: pyb + + init_state_randomization_info: + init_x: + distrib: uniform + high: 1.0 + low: 1.0 + init_x_dot: + distrib: uniform + high: 0.05 + low: -0.05 + init_theta: + distrib: uniform + high: 0.05 + low: -0.05 + init_theta_dot: + distrib: uniform + high: 0.05 + low: -0.05 + # disturbances: + # observation: + # - disturbance_func: white_noise + # std: 0.0001 + inertial_prop: + cart_mass: 1.0 + pole_length: 0.5 + pole_mass: 0.1 + inertial_prop_randomization_info: null + randomized_inertial_prop: false + + + # normalized_rl_action_space: false + + task: stabilization + task_info: + stabilization_goal: [0.0, 0.0, 0.0, 0.0] + # stabilization_goal_tolerance: 0.01 + stabilization_goal_tolerance: 0.0 + use_constraint_penalty: false + + episode_len_sec: 5 + cost: quadratic + done_on_violation: false + done_on_out_of_bound: false + + obs_goal_horizon: 1 + + # RL Reward + rew_state_weight: [1, 1, 1, 1] + rew_act_weight: [0.1] + rew_exponential: True + + constraints: + - constraint_form: default_constraint + constrained_variable: input + upper_bounds: [3.0] + lower_bounds: [-3.0] + - constraint_form: default_constraint + constrained_variable: state + # upper_bounds: [1.5, 0.45, 0.1, 0.3] + # lower_bounds: [-1.5, -0.45, -0.1, -0.3] + upper_bounds: [1.5, 0.45, 0.5, 0.5] + lower_bounds: [-1.5, -0.45, -0.5, -0.5] diff --git a/examples/hpo/cartpole/config_overrides/gp_mpc_cartpole_hpo.yaml b/examples/hpo/cartpole/config_overrides/gp_mpc_cartpole_hpo.yaml new file mode 100644 index 000000000..9996a5b1c --- /dev/null +++ b/examples/hpo/cartpole/config_overrides/gp_mpc_cartpole_hpo.yaml @@ -0,0 +1,33 @@ +hpo_config: + + load_if_exists: True # this should set to True if hpo is run in parallel + objective: [exponentiated_rmse] # [other metrics defined in base_experiment.py] + objective_bounds: [[0.0, 1.0]] # [bounds for each objective]. Worse value will be assigned if objective evaluation is None + direction: [maximize] # [minimize, maximize] + repetitions: 5 # number of samples of performance for each objective query + n_episodes: 5 # number of episodes to evaluate each policy + use_gpu: True + seed: 24 + save_n_best_hps: 1 + # budget + trials: 40 + + # hyperparameters + hps_config: + horizon: 20 + learning_rate: + - 0.01 + - 0.01 + - 0.01 + - 0.01 + optimization_iterations: + - 3000 + - 3000 + - 3000 + - 3000 + kernel: Matern + n_ind_points: 30 + num_epochs: 5 + num_samples: 75 + q_mpc: [1, 1, 1, 1] + r_mpc: [0.1] diff --git a/examples/hpo/gp_mpc/config_overrides/cartpole/gp_mpc_cartpole_150.yaml b/examples/hpo/cartpole/config_overrides/gp_mpc_cartpole_stab_200.yaml similarity index 79% rename from examples/hpo/gp_mpc/config_overrides/cartpole/gp_mpc_cartpole_150.yaml rename to examples/hpo/cartpole/config_overrides/gp_mpc_cartpole_stab_200.yaml index 0da50d191..65872d25b 100644 --- a/examples/hpo/gp_mpc/config_overrides/cartpole/gp_mpc_cartpole_150.yaml +++ b/examples/hpo/cartpole/config_overrides/gp_mpc_cartpole_stab_200.yaml @@ -4,6 +4,7 @@ algo_config: deque_size: 10 eval_batch_size: 10 gp_approx: mean_eq + # gp_model_path: ./gp_mpc/results/ gp_model_path: null horizon: 20 prior_info: @@ -27,19 +28,19 @@ algo_config: - 3000 - 3000 overwrite_saved_data: false - prior_param_coeff: 1.5 + prior_param_coeff: 2.0 prob: 0.95 q_mpc: - 1 + - 0.1 - 1 - - 1 - - 1 + - 0.1 r_mpc: - 0.1 kernel: Matern - sparse_gp: True + sparse_gp: true n_ind_points: 40 - inducing_point_selection_method: 'kmeans' + inducing_point_selection_method: kmeans recalc_inducing_points_at_every_step: false soft_constraints: gp_soft_constraints: false @@ -49,18 +50,18 @@ algo_config: target_mask: null train_iterations: null test_data_ratio: 0.2 - use_prev_start: true warmstart: true - num_epochs: 5 - num_samples: 75 - num_test_episodes_per_epoch: 2 - num_train_episodes_per_epoch: 2 + num_epochs: 2 + num_samples: 50 + num_test_episodes_per_epoch: 1 + num_train_episodes_per_epoch: 1 same_test_initial_state: true same_train_initial_state: false rand_data_selection: false terminate_train_on_done: True terminate_test_on_done: False + # parallel: False parallel: True -device: cpu +device: cuda restore: null diff --git a/examples/hpo/cartpole/config_overrides/ilqr_cartpole_hpo.yaml b/examples/hpo/cartpole/config_overrides/ilqr_cartpole_hpo.yaml new file mode 100644 index 000000000..78ea87b97 --- /dev/null +++ b/examples/hpo/cartpole/config_overrides/ilqr_cartpole_hpo.yaml @@ -0,0 +1,22 @@ +hpo_config: + + load_if_exists: True # this should set to True if hpo is run in parallel + objective: [exponentiated_rmse] # [other metrics defined in base_experiment.py] + objective_bounds: [[0.0, 1.0]] # [bounds for each objective]. Worse value will be assigned if objective evaluation is None + direction: [maximize] # [minimize, maximize] + repetitions: 5 # number of samples of performance for each objective query + n_episodes: 5 # number of episodes to evaluate each policy + use_gpu: True + seed: 24 + save_n_best_hps: 1 + # budget + trials: 40 + + # hyperparameters + hps_config: + max_iterations: 15 + lamb_factor: 10 + lamb_max: 1000 + epsilon: 0.01 + q_lqr: [1, 1, 1, 1] + r_lqr: [0.1] diff --git a/examples/hpo/cartpole/config_overrides/ilqr_cartpole_stab_100.yaml b/examples/hpo/cartpole/config_overrides/ilqr_cartpole_stab_100.yaml new file mode 100644 index 000000000..842529247 --- /dev/null +++ b/examples/hpo/cartpole/config_overrides/ilqr_cartpole_stab_100.yaml @@ -0,0 +1,24 @@ +algo: ilqr +algo_config: + # Cost parameters + q_lqr: [1, 1, 1, 1] + r_lqr: [0.1] + + # Model arguments + # Note: Higher simulation frequency is required if using controller designed + # based on the continuous-time model + discrete_dynamics: True + + # iLQR arguments + max_iterations: 15 + lamb_factor: 10 + lamb_max: 1000 + epsilon: 0.01 + + # prior info + prior_param_coeff: 1.0 + prior_info: + prior_prop: + cart_mass: 1.0 + pole_length: 0.5 + pole_mass: 0.1 diff --git a/examples/hpo/cartpole/config_overrides/linear_mpsc_cartpole_stab_100.yaml b/examples/hpo/cartpole/config_overrides/linear_mpsc_cartpole_stab_100.yaml new file mode 100644 index 000000000..9485bf733 --- /dev/null +++ b/examples/hpo/cartpole/config_overrides/linear_mpsc_cartpole_stab_100.yaml @@ -0,0 +1,34 @@ +safety_filter: linear_mpsc +sf_config: + # LQR controller parameters + r_lin: + - 0.1 + q_lin: + - 1 + - 0.1 + - 1 + - 0.1 + + # MPC Parameters + horizon: 20 + warmstart: True + integration_algo: rk4 + # use_terminal_set: False + use_terminal_set: True + + # Prior info + prior_info: + prior_prop: null + randomize_prior_prop: False + prior_prop_rand_info: null + + # Safe set calculation + n_samples: 600 + n_samples_terminal_set: 100 + learn_terminal_set: False + + # Tau parameter for the calcuation of the RPI + tau: 0.95 + + # Cost function + cost_function: one_step_cost diff --git a/examples/hpo/cartpole/config_overrides/ppo_cartpole_hpo.yaml b/examples/hpo/cartpole/config_overrides/ppo_cartpole_hpo.yaml new file mode 100644 index 000000000..5e2872996 --- /dev/null +++ b/examples/hpo/cartpole/config_overrides/ppo_cartpole_hpo.yaml @@ -0,0 +1,39 @@ +hpo_config: + + load_if_exists: True # this should set to True if hpo is run in parallel + objective: [exponentiated_rmse] # [other metrics defined in base_experiment.py] + objective_bounds: [[0.0, 1.0]] # [bounds for each objective]. Worse value will be assigned if objective evaluation is None + direction: [maximize] # [minimize, maximize] + repetitions: 5 # number of samples of performance for each objective query + n_episodes: 5 # number of episodes to evaluate each policy + use_gpu: True + seed: 24 + save_n_best_hps: 1 + # budget + trials: 40 + + # hyperparameters + hps_config: + # model args + hidden_dim: 64 + activation: relu + + # loss args + gamma: 0.99 + gae_lambda: 0.95 + clip_param: 0.2 + target_kl: 0.01 + entropy_coef: 0.01 + + # optim args + opt_epochs: 10 + mini_batch_size: 64 + actor_lr: 0.0003 + critic_lr: 0.001 + + # runner args + max_env_steps: 72000 + + # objective + rew_state_weight: [1, 1, 1, 1] + rew_act_weight: [0.1] diff --git a/examples/hpo/rl/ppo/config_overrides/cartpole/ppo_cartpole.yaml b/examples/hpo/cartpole/config_overrides/ppo_cartpole_stab_.yaml similarity index 100% rename from examples/hpo/rl/ppo/config_overrides/cartpole/ppo_cartpole.yaml rename to examples/hpo/cartpole/config_overrides/ppo_cartpole_stab_.yaml diff --git a/examples/hpo/gp_mpc/config_overrides/cartpole/cartpole_stab.yaml b/examples/hpo/gp_mpc/config_overrides/cartpole/cartpole_stab.yaml deleted file mode 100644 index 01d3ce77b..000000000 --- a/examples/hpo/gp_mpc/config_overrides/cartpole/cartpole_stab.yaml +++ /dev/null @@ -1,67 +0,0 @@ -task_config: - constraints: - - constraint_form: default_constraint - constrained_variable: input - - constraint_form: default_constraint - constrained_variable: state - upper_bounds: - - 100 - - 100 - - 100 - - 100 - lower_bounds: - - -100 - - -100 - - -100 - - -100 - cost: quadratic - ctrl_freq: 15 - disturbances: - observation: - - disturbance_func: white_noise - std: 0.0001 - done_on_violation: false - episode_len_sec: 10 - gui: false - inertial_prop: - cart_mass: 1.0 - pole_length: 0.5 - pole_mass: 0.1 - inertial_prop_randomization_info: null - info_in_reset: false - init_state: - init_x: 0.0 - init_x_dot: 0.0 - init_theta: 0.0 - init_theta_dot: 0.0 - init_state_randomization_info: - init_x: - distrib: 'uniform' - low: -0.1 - high: 0.1 - init_x_dot: - distrib: 'uniform' - low: -0.1 - high: 0.1 - init_theta: - distrib: 'uniform' - low: -0.2 - high: 0.2 - init_theta_dot: - distrib: 'uniform' - low: -0.1 - high: 0.1 - normalized_rl_action_space: false - prior_prop: - cart_mass: 1.0 - pole_length: 0.5 - pole_mass: 0.1 - pyb_freq: 750 - randomized_inertial_prop: false - randomized_init: true - task: stabilization - task_info: - stabilization_goal: [0] - stabilization_goal_tolerance: 0.005 - use_constraint_penalty: false - verbose: false diff --git a/examples/hpo/gp_mpc/config_overrides/cartpole/gp_mpc_cartpole_hpo.yaml b/examples/hpo/gp_mpc/config_overrides/cartpole/gp_mpc_cartpole_hpo.yaml deleted file mode 100644 index 3953fbde9..000000000 --- a/examples/hpo/gp_mpc/config_overrides/cartpole/gp_mpc_cartpole_hpo.yaml +++ /dev/null @@ -1,35 +0,0 @@ -hpo_config: - hpo: True # do hyperparameter optimization - load_if_exists: True # this should set to True if hpo is run in parallel - use_database: False # this is set to true if MySQL is used - objective: [exponentiated_avg_return] # [other metrics defined in base_experiment.py] - direction: [maximize] # [maximize, maximize] - dynamical_runs: False # if True, dynamically increase runs - warm_trials: 20 # number of trials to run before dyamical runs - approximation_threshold: 5 # this is only used when dynamical_runs is True - repetitions: 5 # number of samples of performance for each objective query - alpha: 1 # significance level for CVaR - use_gpu: True - dashboard: False - seed: 24 - save_n_best_hps: 3 - # budget - trials: 40 - - # hyperparameters - hps_config: - horizon: 20 - learning_rate: - - 0.01 - - 0.01 - - 0.01 - - 0.01 - optimization_iterations: - - 3000 - - 3000 - - 3000 - - 3000 - kernel: Matern - n_ind_points: 35 - num_epochs: 5 - num_samples: 75 diff --git a/examples/hpo/gp_mpc/config_overrides/cartpole/optimized_hyperparameters.yaml b/examples/hpo/gp_mpc/config_overrides/cartpole/optimized_hyperparameters.yaml deleted file mode 100644 index 6c47f2197..000000000 --- a/examples/hpo/gp_mpc/config_overrides/cartpole/optimized_hyperparameters.yaml +++ /dev/null @@ -1,7 +0,0 @@ -horizon: 35 -kernel: 'RBF' -n_ind_points: 40 -num_epochs: 5 -num_samples: 75 -optimization_iterations: [2800, 2800, 2800, 2800] -learning_rate: [0.023172075157730145, 0.023172075157730145, 0.023172075157730145, 0.023172075157730145] diff --git a/examples/hpo/hpo.sh b/examples/hpo/hpo.sh new file mode 100755 index 000000000..99b7af72f --- /dev/null +++ b/examples/hpo/hpo.sh @@ -0,0 +1,152 @@ +#!/bin/bash + +######## NOTE ######## +# This script is used to run HPO in parallel. +# 1. Adjust hpo config. +# 2. Remove or backup the database if needed. +# 3. Create a screen session screen, and detach it Ctrl+a d. +# 4. Run this script by giving experiment name as the first arg, seed as the second, and number of parallel jobs as the third arg. +# 5. If you want to kill them, run pkill -f "python ./.py". +##################### + +cd ~/safe-control-gym + +experiment_name=$1 +seed1=$2 +parallel_jobs=$3 # Number of parallel jobs +sampler=$4 # optuna or vizier +localOrHost=$5 +sys=$6 # cartpole, or quadrotor_2D_attitude +sys_name=${sys%%_*} # cartpole, or quadrotor +algo=$7 # ilqr, gpmpc_acados +prior=$8 +safety_filter=$9 # True or False +task=${10} # stab, or tracking +resume=${11} # True or False + + +# activate the environment +if [ "$localOrHost" == 'local' ]; then + source /home/tsung/anaconda3/etc/profile.d/conda.sh +elif [ "$localOrHost" == 'host0' ]; then + source /home/tueilsy-st01/anaconda3/etc/profile.d/conda.sh +elif [ "$localOrHost" == 'hostx' ]; then + source /home/tsung/miniconda3/etc/profile.d/conda.sh +else + echo "Please specify the machine to run the experiment." + exit 1 +fi + +conda activate safe + +# echo config path +echo "task config path: ./examples/hpo/${sys_name}/config_overrides/${sys}_${task}.yaml" +echo "algo config path: ./examples/hpo/${sys_name}/config_overrides/${algo}_${sys}_${task}_${prior}.yaml" +echo "hpo config path: ./examples/hpo/${sys_name}/config_overrides/${algo}_${sys}_hpo.yaml" + +# Adjust the seed for each parallel job +seeds=() +for ((i=0; i 1 else self.get_episode_rmse()[0], 'rmse_std': np.asarray(self.get_episode_rmse()).std(), + 'exponentiated_rmse': np.asarray(self.get_episode_exponentiated_rmse()).mean(), 'worst_case_rmse_at_0.5': compute_cvar(np.asarray(self.get_episode_rmse()), 0.5, lower_range=False), 'failure_rate': np.asarray(self.get_episode_constraint_violations()).mean(), 'average_constraint_violation': np.asarray(self.get_episode_constraint_violation_steps()).mean(), @@ -457,6 +458,11 @@ def get_episode_returns(self): ''' return self.get_episode_data('reward', postprocess_func=sum) + def get_episode_exponentiated_rmse(self): + '''Total exponentiated rmse of episodes.''' + + return self.get_episode_data('mse', postprocess_func=lambda x: float(np.exp(-np.sqrt(np.mean(x))))) + def get_episode_rmse(self): '''Root mean square error of episodes. diff --git a/safe_control_gym/hyperparameters/README.md b/safe_control_gym/hyperparameters/README.md index 383692d06..d1e577285 100644 --- a/safe_control_gym/hyperparameters/README.md +++ b/safe_control_gym/hyperparameters/README.md @@ -8,23 +8,7 @@ Create the environment by following the steps presented in the main [`README.md`](README.md) -To perform hyperparmeter optimization, you may need `MySQL` database (optional): - -```bash -sudo apt-get install mysql-server # MySQL server starts automatically after the installation -``` - -To set up, run the following commands sequentially: - -```bash -sudo mysql -CREATE USER optuna@"%"; # this creates the user name 'optuna' accessible by any host -CREATE DATABASE {algo}_hpo; -GRANT ALL ON {algo}_hpo.* TO optuna@"%"; -exit -``` - -You may replace `{algo}` with `ppo` or `sac` in order to run the scripts. +You may replace `{algo}` with, e.g., `ppo`, in order to run the scripts. ### Usages @@ -37,30 +21,12 @@ python ./examples/hpo/hpo_experiment.py \ --algo, {algo}, \ --task, {sys}, \ --overrides, \ - ./examples/hpo/rl/config_overrides/{sys}/{sys}_{task}.yaml, \ - ./examples/hpo/rl/{algo}/config_overrides/{sys}/{algo}_{sys}.yaml, \ - ./examples/hpo/rl/{algo}/config_overrides/{sys}/{algo}_{sys}_hpo.yaml, \ + ./examples/hpo/{sys}/config_overrides/{sys}_{task}.yaml, \ + ./examples/hpo/{sys}/{algo}/config_overrides/{algo}_{sys}_{task}_{prior}.yaml, \ + ./examples/hpo/{sys}/{algo}/config_overrides/{algo}_{sys}_hpo.yaml, \ --output_dir, {output_dir}, \ --seed, 7, \ --use_gpu, True ``` -You may replace `{sys}` with `cartpole` in order to run the script. - -#### Train policy using optimized hyperparameters - -Execute the following command if optimized hyperparameters are given - -```bash -python ./examples/hpo/hpo_experiment.py \ - --algo {algo} \ - --overrides \ - ./examples/hpo/rl/{algo}/config_overrides/cartpole/{algo}_{sys}.yaml \ - ./examples/hpo/rl/config_overrides/{sys}/{sys}_{task}.yaml \ - --output_dir {output_dir} \ - --tag {run_name} \ - --opt_hps {best_hp_file} \ - --task {sys} --seed 2 --use_gpu True -``` - -You may replace `{sys}` with `cartpole` in order to run the script. As an example, `{best_hp_file}` can be replaced with `examples/hpo/rl/sac/config_overrides/cartpole/optimized_hyperparameters.yaml` for `sac`. +You may replace `{sys}` with `cartpole` and `{prior}` with `100`, `200`, or `''`, respectively, in order to run the script. diff --git a/safe_control_gym/hyperparameters/base_hpo.py b/safe_control_gym/hyperparameters/base_hpo.py new file mode 100644 index 000000000..71f642764 --- /dev/null +++ b/safe_control_gym/hyperparameters/base_hpo.py @@ -0,0 +1,415 @@ +'''Base hyerparameter optimization class.''' + + +import os +from abc import ABC, abstractmethod +from copy import deepcopy +from functools import partial + +import numpy as np + +from safe_control_gym.experiments.base_experiment import BaseExperiment +from safe_control_gym.hyperparameters.hpo_search_space import HYPERPARAMS_DICT +from safe_control_gym.utils.logging import ExperimentLogger +from safe_control_gym.utils.registration import make +from safe_control_gym.utils.utils import mkdirs + + +class BaseHPO(ABC): + + def __init__(self, + hpo_config, + task_config, + algo_config, + algo='ilqr', + task='stabilization', + output_dir='./results', + safety_filter=None, + sf_config=None, + load_study=False): + """ + Base class for Hyperparameter Optimization (HPO). + + Args: + hpo_config (dict): Configuration specific to hyperparameter optimization. + task_config (dict): Configuration for the task. + algo_config (dict): Algorithm configuration. + algo (str): Algorithm name. + task (str): The task/environment the agent will interact with. + output_dir (str): Directory where results and models will be saved. + safety_filter (str): Safety filter to be applied (optional). + sf_config: Safety filter configuration (optional). + load_study (bool): Load existing study if True. + """ + self.algo = algo + self.task = task + self.output_dir = output_dir + self.task_config = task_config + self.hpo_config = hpo_config + self.algo_config = algo_config + self.safety_filter = safety_filter + self.sf_config = sf_config + assert self.safety_filter is None or self.sf_config is not None, 'Safety filter config must be provided if safety filter is not None' + self.search_space_key = 'ilqr_sf' if self.safety_filter == 'linear_mpsc' and self.algo == 'ilqr' else self.algo + self.logger = ExperimentLogger(output_dir) + self.load_study = load_study + self.study_name = algo + '_hpo' + self.hps_config = hpo_config.hps_config + self.n_episodes = hpo_config.n_episodes + self.objective_bounds = hpo_config.objective_bounds + + env_func = partial(make, self.task, output_dir=self.output_dir, **self.task_config) + env = env_func() + + self.state_dim = env.state_dim + self.action_dim = env.action_dim + + self.append_hps_config() + self.check_hyperparmeter_config() + + assert len(hpo_config.objective) == len(hpo_config.direction), 'objective and direction must have the same length' + assert len(hpo_config.objective) == 1, 'Only single-objective optimization is supported' + + def append_hps_config(self): + """ + Append hyperparameters (self.hps_config) if safety filter is not None. + + """ + if self.safety_filter is not None: + for hp in HYPERPARAMS_DICT[self.search_space_key]: + if hp in self.sf_config: + self.hps_config[hp] = self.sf_config[hp] + + def special_handle(self, param_name, param_value): + """ + Special handling for specific hyperparameters, e.g., learning_rate and optimization_iterations, which + have list types in configs but only one DoF in the search space. Special handling can be added here in + the future if needed. + + Args: + param_name (str): Name of the hyperparameter. + param_value (Any): Sampled value of the hyperparameter. + + Returns: + Valid (bool): True if the hyperparameter is valid, False otherwise. + param_value (Any): If valid, sampled value of the hyperparameter cast to the appropriate type based on self.hps_config. + """ + + # special cases: learning_rate and optimization_iterations for gp_mpc + valid = False + if self.algo == 'gp_mpc': + if param_name == 'learning_rate' or param_name == 'optimization_iterations': + if type(param_value) is not type(self.hps_config[param_name]): + param_value = len(self.hps_config[param_name]) * [param_value] + if type(param_value) is type(self.hps_config[param_name]): + valid = True + return valid, param_value + + return valid, param_value + + def check_hyperparmeter_config(self): + """ + Check if the hyperparameter configuration (self.hps_config) is valid, e.g., if types match what defined in hpo_search_space.py. + """ + valid = True + for param in self.hps_config: + if HYPERPARAMS_DICT[self.search_space_key][param]['type'] is not type(self.hps_config[param]): + valid = False + valid, _ = self.special_handle(param, self.hps_config[param]) + assert valid, f'Hyperparameter {param} should be of type {HYPERPARAMS_DICT[self.search_space_key][param]["type"]}' + + @abstractmethod + def setup_problem(self): + """ + Setup hyperparameter optimization, e.g., search space, study, algorithm, etc. + Needs to be implemented by subclasses. + """ + raise NotImplementedError + + @abstractmethod + def warm_start(self, params): + """ + Warm start the study. + + Args: + params (dict): Specified hyperparameters. + objective (float): Objective value. + """ + raise NotImplementedError + + def cast_to_original_type_from_config(self, param_name, param_value): + """ + Cast the parameter to its original type based on the existing task, algo, or safty filter config. + + Args: + param_name (str): Name of the hyperparameter. + param_value (Any): Sampled value of the hyperparameter. + + Returns: + Any: The parameter value cast to the appropriate type. + """ + # Check if the parameter exists in task_config, algo_config, or sf_config + if param_name in self.task_config: + current_value = self.task_config[param_name] + valid, res_value = self.special_handle(param_name, param_value) + elif param_name in self.algo_config: + current_value = self.algo_config[param_name] + valid, res_value = self.special_handle(param_name, param_value) + elif self.safety_filter is not None and param_name in self.sf_config: + current_value = self.sf_config[param_name] + valid, res_value = self.special_handle(param_name, param_value) + else: + raise ValueError(f'Unknown parameter {param_name} - cannot cast to original type from config') + + if valid: + return res_value + else: + return type(current_value)(param_value) + + def cast_to_original_type_from_hyperparams_dict(self, param_name, param_value): + """ + Cast the parameter to its original type based on HYPERPARAMS_DICT. + + Args: + param_name (str): Name of the hyperparameter. + param_value (Any): Sampled value of the hyperparameter. + + Returns: + Any: The parameter value cast to the appropriate type. + """ + return HYPERPARAMS_DICT[self.search_space_key][param_name]['type'](param_value) + + def param_to_config(self, params): + """ + Convert sampled hyperparameters to configurations (self.task_config, self.algo_config, and self.sf_config). + + Args: + params (dict): Sampled hyperparameters. + + """ + # Iterate through the params dictionary + for param_name, param_value in params.items(): + # Handle multidimensional hyperparameters (e.g., q_mpc_0, q_mpc_1) + base_param, index_str = param_name.rsplit('_', 1) if '_' in param_name else (None, '') + if index_str.isdigit(): + if (base_param in self.algo_config or base_param in self.task_config or (self.safety_filter is not None and base_param in self.sf_config)): + # If base parameter exists in algo_config as a list/array + index = int(index_str) + if base_param in self.algo_config: + self.algo_config[base_param][index] = param_value + elif base_param in self.task_config: + self.task_config[base_param][index] = param_value + elif base_param in self.sf_config: + self.sf_config[base_param][index] = param_value + # Handle the mapping based on the parameter name + elif param_name in self.algo_config: + # If the param is related to the algorithm, update the algo_config + self.algo_config[param_name] = self.cast_to_original_type_from_config(param_name, param_value) + elif param_name in self.task_config: + # If the param is related to the task, update the task_config + self.task_config[param_name] = self.cast_to_original_type_from_config(param_name, param_value) + elif self.safety_filter is not None and param_name in self.sf_config: + # If the param is related to the safety filter, update the sf_config + self.sf_config[param_name] = self.cast_to_original_type_from_config(param_name, param_value) + else: + # If the parameter doesn't map to a known config, log or handle it + print(f'Warning: Unknown parameter {param_name} - not mapped to any configuration') + + def config_to_param(self, params): + """ + Convert configuration to hyperparameters (mainly to handle multidimensional hyperparameters) as the input to add_trial. + + Args: + params (dict): User specified hyperparameters (self.hps_config). + + Returns: + dict: Hyperparameter representation in the problem. + """ + params = deepcopy(params) + for param in list(params.keys()): + is_list = isinstance(params[param], list) + if is_list: + # multi-dimensional hyperparameters + if list == HYPERPARAMS_DICT[self.search_space_key][param]['type']: + for i, value in enumerate(params[param]): + new_param = f'{param}_{i}' + params[new_param] = value + del params[param] + # single-dimensional hyperparameters but in list format + else: + params[param] = params[param][0] + + return params + + def post_process_best_hyperparams(self, params): + """ + Post-process the best hyperparameters after optimization (mainly to handle multidimensional hyperparameters). + + Args: + params (dict): Best hyperparameters. + + Returns: + params (dict): Post-processed hyperparameters. + """ + aggregated_params = {} + for param_name, param_value in params.items(): + # Handle multidimensional hyperparameters (e.g., q_mpc_0, q_mpc_1) + base_param, index_str = param_name.rsplit('_', 1) if '_' in param_name else (None, '') + if index_str.isdigit(): + if base_param in self.algo_config or base_param in self.task_config or (self.safety_filter is not None and base_param in self.sf_config): + # If base parameter exists in algo_config as a list/array + index = int(index_str) + if base_param in aggregated_params: + aggregated_params[base_param][index] = param_value + else: + aggregated_params[base_param] = {index: param_value} + + for param_name, param_value in aggregated_params.items(): + for hp in list(params.keys()): # Create a list of keys to iterate over + if param_name in hp: + del params[hp] + params[param_name] = [param_value[i] for i in range(len(param_value))] + + for hp in params: + params[hp] = self.cast_to_original_type_from_config(hp, params[hp]) + + return params + + def evaluate(self, params): + """ + Evaluation of hyperparameters. + + Args: + params (dict): Hyperparameters to be evaluated. + + Returns: + Sampled objective value (list) + """ + sampled_hyperparams = params + + returns, seeds = [], [] + for i in range(self.hpo_config.repetitions): + + seed = np.random.randint(0, 10000) + + # update the agent config with sample candidate hyperparameters + # pass the hyperparameters to config + self.param_to_config(sampled_hyperparams) + + seeds.append(seed) + self.logger.info('Sample hyperparameters: {}'.format(sampled_hyperparams)) + self.logger.info('Seeds: {}'.format(seeds)) + + try: + self.env_func = partial(make, self.task, output_dir=self.output_dir, **self.task_config) + # using deepcopy(self.algo_config) prevent setting being overwritten + self.agent = make(self.algo, + self.env_func, + training=True, + checkpoint_path=os.path.join(self.output_dir, 'model_latest.pt'), + output_dir=os.path.join(self.output_dir, 'hpo'), + use_gpu=self.hpo_config.use_gpu, + seed=seed, + **deepcopy(self.algo_config)) + + self.agent.reset() + eval_env = self.env_func(seed=seed * 111) + # Setup safety filter + if self.safety_filter is not None: + env_func_filter = partial(make, + self.task, + **self.task_config) + safety_filter = make(self.safety_filter, + env_func_filter, + **self.sf_config) + safety_filter.reset() + try: + safety_filter.learn() + except Exception as e: + self.logger.info(f'Exception occurs when constructing safety filter: {e}') + self.logger.info('Safety filter config: {}'.format(self.sf_config)) + self.logger.std_out_logger.logger.exception('Full exception traceback') + self.agent.close() + del self.agent + del self.env_func + return self.none_handler() + mkdirs(f'{self.output_dir}/models/') + safety_filter.save(path=f'{self.output_dir}/models/{self.safety_filter}.pkl') + experiment = BaseExperiment(eval_env, self.agent, safety_filter=safety_filter) + else: + experiment = BaseExperiment(eval_env, self.agent) + except Exception as e: + # catch exception + self.logger.info(f'Exception occurs when constructing agent: {e}') + self.logger.std_out_logger.logger.exception('Full exception traceback') + if hasattr(self, 'agent'): + self.agent.close() + del self.agent + del self.env_func + return self.none_handler() + + # return objective estimate + try: + # self.agent.learn() + experiment.launch_training() + except Exception as e: + # catch the NaN generated by the sampler + self.agent.close() + del self.agent + del self.env_func + self.logger.info(f'Exception occurs during learning: {e}') + self.logger.std_out_logger.logger.exception('Full exception traceback') + print(e) + print('Sampled hyperparameters:') + print(sampled_hyperparams) + return self.none_handler() + + # TODO: add n_episondes to the config + try: + _, metrics = experiment.run_evaluation(n_episodes=self.n_episodes, n_steps=None, done_on_max_steps=False) + except Exception as e: + self.agent.close() + # delete instances + del self.agent + del self.env_func + del experiment + self.logger.info(f'Exception occurs during evaluation: {e}') + self.logger.std_out_logger.logger.exception('Full exception traceback') + print(e) + print('Sampled hyperparameters:') + print(sampled_hyperparams) + return self.none_handler() + + # at the moment, only single-objective optimization is supported + returns.append(metrics[self.hpo_config.objective[0]]) + self.logger.info('Sampled objectives: {}'.format(returns)) + + self.agent.close() + # delete instances + del self.agent + del self.env_func + + return returns + + def none_handler(self): + """ + Assign worse objective values (based on objective bound) to None returns. + """ + if self.hpo_config.direction[0] == 'maximize': + return self.objective_bounds[0][0] + else: + return self.objective_bounds[0][1] + + @abstractmethod + def hyperparameter_optimization(self): + """ + Hyperparameter optimization loop. Should be implemented by subclasses. + """ + raise NotImplementedError + + @abstractmethod + def checkpoint(self): + """ + Save checkpoints, results, and logs during optimization. + """ + raise NotImplementedError diff --git a/safe_control_gym/hyperparameters/hpo.py b/safe_control_gym/hyperparameters/hpo.py deleted file mode 100644 index 0c92c107d..000000000 --- a/safe_control_gym/hyperparameters/hpo.py +++ /dev/null @@ -1,295 +0,0 @@ -''' The implementation of HPO class - -Reference: - * stable baselines3 https://github.com/DLR-RM/rl-baselines3-zoo/blob/master/rl_zoo3/hyperparams_opt.py - * Optuna: https://optuna.org -''' -import os -from copy import deepcopy -from functools import partial - -import matplotlib.pyplot as plt -import numpy as np -import optuna -import yaml -from optuna.samplers import RandomSampler, TPESampler -from optuna.study import MaxTrialsCallback -from optuna.trial import FrozenTrial, TrialState -from optuna.visualization.matplotlib import plot_optimization_history, plot_param_importances -from optuna_dashboard import run_server - -from safe_control_gym.experiments.base_experiment import BaseExperiment -from safe_control_gym.hyperparameters.hpo_sampler import HYPERPARAMS_SAMPLER -from safe_control_gym.utils.logging import ExperimentLogger -from safe_control_gym.utils.registration import make - - -class HPO(object): - - def __init__(self, algo, task, sampler, load_study, output_dir, task_config, hpo_config, **algo_config): - ''' Hyperparameter optimization class - - Args: - algo: algo name - env_func: environment that the agent will interact with - output_dir: output directory - hpo_config: hyperparameter optimization configuration - algo_config: algorithm configuration - ''' - - self.algo = algo - self.study_name = algo + '_hpo' - self.task = task - self.load_study = load_study - self.task_config = task_config - self.hpo_config = hpo_config - self.hps_config = hpo_config.hps_config - self.output_dir = output_dir - self.algo_config = algo_config - self.logger = ExperimentLogger(output_dir, log_file_out=False) - self.total_runs = 0 - # init sampler - if sampler == 'RandomSampler': - self.sampler = RandomSampler(seed=self.hpo_config.seed) - elif sampler == 'TPESampler': - self.sampler = TPESampler(seed=self.hpo_config.seed) - else: - raise ValueError('Unknown sampler.') - - assert len(hpo_config.objective) == len(hpo_config.direction), 'objective and direction must have the same length' - - def objective(self, trial: optuna.Trial) -> float: - ''' The stochastic objective function for a HPO tool to optimize over - - Args: - trial: A single trial object that contains the hyperparameters to be evaluated - ''' - - # sample candidate hyperparameters - sampled_hyperparams = HYPERPARAMS_SAMPLER[self.algo](self.hps_config, trial) - - # log trial number - self.logger.info(f'Trial number: {trial.number}') - - # flag for increasing runs - increase_runs = True - first_iteration = True - - # do repetition - returns, seeds = [], [] - while increase_runs: - increase_runs = False - if first_iteration: - Gs = np.inf - for i in range(self.hpo_config.repetitions): - - seed = np.random.randint(0, 10000) - # update the agent config with sample candidate hyperparameters - # new agent with the new hps - for hp in sampled_hyperparams: - self.algo_config[hp] = sampled_hyperparams[hp] - - seeds.append(seed) - self.logger.info(f'Sample hyperparameters: {sampled_hyperparams}') - self.logger.info(f'Seeds: {seeds}') - - try: - self.env_func = partial(make, self.task, output_dir=self.output_dir, **self.task_config) - # using deepcopy(self.algo_config) prevent setting being overwritten - self.agent = make(self.algo, - self.env_func, - training=True, - checkpoint_path=os.path.join(self.output_dir, 'model_latest.pt'), - output_dir=os.path.join(self.output_dir, 'hpo'), - use_gpu=self.hpo_config.use_gpu, - seed=seed, - **deepcopy(self.algo_config)) - - self.agent.reset() - eval_env = self.env_func(seed=seed * 111) - experiment = BaseExperiment(eval_env, self.agent) - except Exception as e: - # catch exception - self.logger.info(f'Exception occurs when constructing agent: {e}') - - # return objective estimate - # TODO: report intermediate results to Optuna for pruning - try: - # self.agent.learn() - experiment.launch_training() - self.total_runs += 1 - - except Exception as e: - # catch the NaN generated by the sampler - self.agent.close() - del self.agent - del self.env_func - del experiment - self.logger.info(f'Exception occurs during learning: {e}') - print(e) - print('Sampled hyperparameters:') - print(sampled_hyperparams) - returns.append(0.0) - break - - # avg_return = self.agent._run() - _, metrics = experiment.run_evaluation(n_episodes=5, n_steps=None, done_on_max_steps=True) - - # at the moment, only single-objective optimization is supported - returns.append(metrics[self.hpo_config.objective[0]]) - self.logger.info(f'Sampled objectives: {returns}') - - self.agent.close() - # delete instances - del self.agent - del self.env_func - - Gss = self._compute_cvar(np.array(returns), self.hpo_config.alpha) - - # if the current objective is better than the best objective, trigger more runs to avoid maximization bias - if self.hpo_config.warm_trials < len(self.study.trials) and self.hpo_config.dynamical_runs: - if Gss > self.study.best_value or first_iteration is False: - if abs(Gs - Gss) > self.hpo_config.approximation_threshold: - increase_runs = True - first_iteration = False - Gs = Gss - self.logger.info('Trigger more runs') - else: - increase_runs = False - - self.logger.info(f'Returns: {Gss}') - - return Gss - - def hyperparameter_optimization(self) -> None: - if self.load_study: - self.study = optuna.load_study(study_name=self.study_name, storage=f'mysql+pymysql://optuna@localhost/{self.study_name}') - elif self.hpo_config.use_database is False: - # single-objective optimization - if len(self.hpo_config.direction) == 1: - self.study = optuna.create_study( - direction=self.hpo_config.direction[0], - sampler=self.sampler, - pruner=optuna.pruners.MedianPruner(n_warmup_steps=10), - study_name=self.study_name, - ) - # multi-objective optimization - else: - self.study = optuna.create_study( - directions=self.hpo_config.direction, - sampler=self.sampler, - pruner=optuna.pruners.MedianPruner(n_warmup_steps=10), - study_name=self.study_name, - ) - else: - # single-objective optimization - if len(self.hpo_config.direction) == 1: - self.study = optuna.create_study( - direction=self.hpo_config.direction[0], - sampler=self.sampler, - pruner=optuna.pruners.MedianPruner(n_warmup_steps=10), - study_name=self.study_name, - storage=f'mysql+pymysql://optuna@localhost/{self.study_name}', - load_if_exists=self.hpo_config.load_if_exists - ) - # multi-objective optimization - else: - self.study = optuna.create_study( - directions=self.hpo_config.direction, - sampler=self.sampler, - pruner=optuna.pruners.MedianPruner(n_warmup_steps=10), - study_name=self.study_name, - storage=f'mysql+pymysql://optuna@localhost/{self.study_name}', - load_if_exists=self.hpo_config.load_if_exists - ) - - self.study.optimize(self.objective, - catch=(RuntimeError,), - callbacks=[MaxTrialsCallback(self.hpo_config.trials, states=(TrialState.COMPLETE,))], - ) - - output_dir = os.path.join(self.output_dir, 'hpo') - # save meta data - self.study.trials_dataframe().to_csv(output_dir + '/trials.csv') - - # save top-n best hyperparameters - if len(self.hpo_config.direction) == 1: - trials = self.study.trials - if self.hpo_config.direction[0] == 'minimize': - trials.sort(key=self._value_key) - else: - trials.sort(key=self._value_key, reverse=True) - for i in range(min(self.hpo_config.save_n_best_hps, len(self.study.trials))): - params = trials[i].params - with open(f'{output_dir}/hyperparameters_{trials[i].value:.4f}.yaml', 'w')as f: - yaml.dump(params, f, default_flow_style=False) - else: - best_trials = self.study.best_trials - for i in range(len(self.study.best_trials)): - params = best_trials[i].params - with open(f'{output_dir}/best_hyperparameters_[{best_trials[i].values[0]:.4f},{best_trials[i].values[1]:.4f}].yaml', 'w')as f: - yaml.dump(params, f, default_flow_style=False) - - # dashboard - if self.hpo_config.dashboard and self.hpo_config.use_database: - run_server(f'mysql+pymysql://optuna@localhost/{self.study_name}') - - # save plot - try: - if len(self.hpo_config.objective) == 1: - plot_param_importances(self.study) - plt.tight_layout() - plt.savefig(output_dir + '/param_importances.png') - # plt.show() - plt.close() - plot_optimization_history(self.study) - plt.tight_layout() - plt.savefig(output_dir + '/optimization_history.png') - # plt.show() - plt.close() - else: - for i in range(len(self.hpo_config.objective)): - plot_param_importances(self.study, target=lambda t: t.values[i]) - plt.tight_layout() - plt.savefig(output_dir + f'/param_importances_{self.hpo_config.objective[i]}.png') - # plt.show() - plt.close() - plot_optimization_history(self.study, target=lambda t: t.values[i]) - plt.tight_layout() - plt.savefig(output_dir + f'/optimization_history_{self.hpo_config.objective[i]}.png') - # plt.show() - plt.close() - except Exception as e: - print(e) - print('Plotting failed.') - - self.logger.info(f'Total runs: {self.total_runs}') - self.logger.close() - - return - - def _value_key(self, trial: FrozenTrial) -> float: - ''' Returns value of trial object for sorting.''' - if trial.value is None: - if self.hpo_config.direction[0] == 'minimize': - return float('inf') - else: - return float('-inf') - else: - return trial.value - - def _compute_cvar(self, returns: np.ndarray, alpha: float = 0.2) -> float: - ''' Compute CVaR.''' - assert returns.ndim == 1, 'returns must be 1D array' - sorted_returns = np.sort(returns) - n = len(sorted_returns) - VaR_idx = int(alpha * n) - if VaR_idx == 0: - VaR_idx = 1 - - if self.hpo_config.direction[0] == 'minimize': - CVaR = sorted_returns[-VaR_idx:].mean() - else: - CVaR = sorted_returns[:VaR_idx].mean() - - return CVaR diff --git a/safe_control_gym/hyperparameters/hpo_sampler.py b/safe_control_gym/hyperparameters/hpo_sampler.py deleted file mode 100644 index 97844c92c..000000000 --- a/safe_control_gym/hyperparameters/hpo_sampler.py +++ /dev/null @@ -1,223 +0,0 @@ -'''Sampler for hyperparameters for different algorithms - -Reference: - * stable baselines3 https://github.com/DLR-RM/rl-baselines3-zoo/blob/master/rl_zoo3/hyperparams_opt.py -''' - -from typing import Any, Dict - -import optuna - -# define the categorical choice or real interval for each hyperparameter -PPO_dict = { - 'categorical': { - 'hidden_dim': [8, 16, 32, 64, 128, 256], - 'activation': ['tanh', 'relu', 'leaky_relu'], - 'gamma': [0.9, 0.95, 0.98, 0.99, 0.995, 0.999, 0.9999], - 'gae_lambda': [0.8, 0.9, 0.92, 0.95, 0.98, 0.99, 1.0], - 'clip_param': [0.1, 0.2, 0.3, 0.4], - 'opt_epochs': [1, 5, 10, 20], - 'mini_batch_size': [32, 64, 128], - 'rollout_steps': [50, 100, 150, 200], - 'max_env_steps': [30000, 72000, 216000], # to make sure having the checkpoint at these steps [30000, 72000, 216000] - }, - 'float': { # note that in float type, you must specify the upper and lower bound - 'target_kl': [0.00000001, 0.8], - 'entropy_coef': [0.00000001, 0.1], - 'actor_lr': [1e-5, 1], - 'critic_lr': [1e-5, 1], - } -} -SAC_dict = { - 'categorical': { - 'hidden_dim': [32, 64, 128, 256, 512], - 'activation': ['tanh', 'relu', 'leaky_relu'], - 'gamma': [0.9, 0.95, 0.98, 0.99, 0.995, 0.999, 0.9999], - 'train_interval': [10, 100, 1000], # should be divisible by max_env_steps - 'train_batch_size': [32, 64, 128, 256, 512], - 'max_env_steps': [30000, 72000, 216000], # to make sure having the checkpoint at these steps [30000, 72000, 216000] - 'warm_up_steps': [500, 1000, 2000, 4000], - }, - 'float': { # note that in float type, you must specify the upper and lower bound - 'tau': [0.005, 1.0], - 'actor_lr': [1e-5, 1], - 'critic_lr': [1e-5, 1], - } -} - -GPMPC_dict = { - 'categorical': { - 'horizon': [10, 15, 20, 25, 30, 35], - 'kernel': ['Matern', 'RBF'], - 'n_ind_points': [30, 40, 50], # number should lower 0.8 * MIN(num_samples) if 0,2 is test_data_ratio - 'num_epochs': [4, 5, 6, 7, 8], - 'num_samples': [70, 75, 80, 85], - 'optimization_iterations': [2800, 3000, 3200], # to make sure having the same checkpoint at these steps [30000, 54000, 72000] - }, - 'float': { # note that in float type, you must specify the upper and lower bound - 'learning_rate': [5e-4, 0.5], - } -} - - -def ppo_sampler(hps_dict: Dict[str, Any], trial: optuna.Trial) -> Dict[str, Any]: - '''Sampler for PPO hyperparameters. - - Args: - hps_dict: the dict of hyperparameters that will be optimized over - trial: budget variable - ''' - - # TODO: conditional hyperparameters - - # model args - hidden_dim = trial.suggest_categorical('hidden_dim', PPO_dict['categorical']['hidden_dim']) - activation = trial.suggest_categorical('activation', PPO_dict['categorical']['activation']) - - # loss args - gamma = trial.suggest_categorical('gamma', PPO_dict['categorical']['gamma']) - # Factor for trade-off of bias vs variance for Generalized Advantage Estimator - gae_lambda = trial.suggest_categorical('gae_lambda', PPO_dict['categorical']['gae_lambda']) - clip_param = trial.suggest_categorical('clip_param', PPO_dict['categorical']['clip_param']) - # Limit the KL divergence between updates - target_kl = trial.suggest_float('target_kl', PPO_dict['float']['target_kl'][0], PPO_dict['float']['target_kl'][1], log=True) - # Entropy coefficient for the loss calculation - entropy_coef = trial.suggest_float('entropy_coef', PPO_dict['float']['entropy_coef'][0], PPO_dict['float']['entropy_coef'][1], log=True) - - # optim args - opt_epochs = trial.suggest_categorical('opt_epochs', PPO_dict['categorical']['opt_epochs']) - mini_batch_size = trial.suggest_categorical('mini_batch_size', PPO_dict['categorical']['mini_batch_size']) - actor_lr = trial.suggest_float('actor_lr', PPO_dict['float']['actor_lr'][0], PPO_dict['float']['actor_lr'][1], log=True) - critic_lr = trial.suggest_float('critic_lr', PPO_dict['float']['critic_lr'][0], PPO_dict['float']['critic_lr'][1], log=True) - # The maximum value for the gradient clipping - # max_grad_norm = trial.suggest_categorical('max_grad_norm', [0.3, 0.5, 0.6, 0.7, 0.8, 0.9, 1, 2, 5]) - - # The number of steps to run for each environment per update - # Note: rollout_steps * n_envs should be greater than mini_batch_size - # The value is set in this way for the sake of evluation and checkpoint - # e.g. total_steps will be incremented by 4 *[50, 100, 150, 250] = [200, 400, 600, 1000] - # then eval_inverval can be set to 6000 if we want to capture learning efficiency and intermediate performance - rollout_steps = trial.suggest_categorical('rollout_steps', PPO_dict['categorical']['rollout_steps']) - max_env_steps = trial.suggest_categorical('max_env_steps', PPO_dict['categorical']['max_env_steps']) - - # Orthogonal initialization - # ortho_init = False - # ortho_init = trial.suggest_categorical('ortho_init', [False, True]) - - hps_suggestion = { - 'hidden_dim': hidden_dim, - 'activation': activation, - 'gamma': gamma, - 'gae_lambda': gae_lambda, - 'clip_param': clip_param, - 'target_kl': target_kl, - 'entropy_coef': entropy_coef, - 'opt_epochs': opt_epochs, - 'mini_batch_size': mini_batch_size, - 'actor_lr': actor_lr, - 'critic_lr': critic_lr, - # 'max_grad_norm': max_grad_norm, (currently not implemented in PPO controller) - 'max_env_steps': max_env_steps, - 'rollout_steps': rollout_steps, - } - - assert len(hps_suggestion) == len(hps_dict), ValueError('We are optimizing over different number of HPs as you listed.') - - return hps_suggestion - - -def sac_sampler(hps_dict: Dict[str, Any], trial: optuna.Trial) -> Dict[str, Any]: - '''Sampler for SAC hyperparameters. - - Args: - hps_dict: the dict of hyperparameters that will be optimized over - trial: budget variable - ''' - - # TODO: conditional hyperparameters - - # model args - hidden_dim = trial.suggest_categorical('hidden_dim', SAC_dict['categorical']['hidden_dim']) - activation = trial.suggest_categorical('activation', SAC_dict['categorical']['activation']) - - # loss args - gamma = trial.suggest_categorical('gamma', SAC_dict['categorical']['gamma']) - tau = trial.suggest_float('tau', SAC_dict['float']['tau'][0], SAC_dict['float']['tau'][1], log=False) - - # optim args - train_interval = trial.suggest_categorical('train_interval', SAC_dict['categorical']['train_interval']) - train_batch_size = trial.suggest_categorical('train_batch_size', SAC_dict['categorical']['train_batch_size']) - actor_lr = trial.suggest_float('actor_lr', SAC_dict['float']['actor_lr'][0], SAC_dict['float']['actor_lr'][1], log=True) - critic_lr = trial.suggest_float('critic_lr', SAC_dict['float']['critic_lr'][0], SAC_dict['float']['critic_lr'][1], log=True) - - max_env_steps = trial.suggest_categorical('max_env_steps', SAC_dict['categorical']['max_env_steps']) - warm_up_steps = trial.suggest_categorical('warm_up_steps', SAC_dict['categorical']['warm_up_steps']) - - hps_suggestion = { - 'hidden_dim': hidden_dim, - 'activation': activation, - 'gamma': gamma, - 'tau': tau, - 'train_interval': train_interval, - 'train_batch_size': train_batch_size, - 'actor_lr': actor_lr, - 'critic_lr': critic_lr, - 'max_env_steps': max_env_steps, - 'warm_up_steps': warm_up_steps, - } - - assert len(hps_suggestion) == len(hps_dict), ValueError('We are optimizing over different number of HPs as you listed.') - - return hps_suggestion - - -def gpmpc_sampler(hps_dict: Dict[str, Any], trial: optuna.Trial) -> Dict[str, Any]: - '''Sampler for PPO hyperparameters. - - Args: - hps_dict: the dict of hyperparameters that will be optimized over - trial: budget variable - ''' - - horizon = trial.suggest_categorical('horizon', GPMPC_dict['categorical']['horizon']) - kernel = trial.suggest_categorical('kernel', GPMPC_dict['categorical']['kernel']) - n_ind_points = trial.suggest_categorical('n_ind_points', GPMPC_dict['categorical']['n_ind_points']) - num_epochs = trial.suggest_categorical('num_epochs', GPMPC_dict['categorical']['num_epochs']) - num_samples = trial.suggest_categorical('num_samples', GPMPC_dict['categorical']['num_samples']) - - # get dimensions of the dynamics - d = len(hps_dict['learning_rate']) - assert d == len(hps_dict['optimization_iterations']), 'The number of optimization iterations must be the same as the number of learning rates.' - - # use same setting for all dimensions for simplicity - optimization_iterations, learning_rate = [], [] - - optimization_iterations = d * [trial.suggest_categorical('optimization_iterations', GPMPC_dict['categorical']['optimization_iterations'])] - learning_rate = d * [trial.suggest_float('learning_rate', GPMPC_dict['float']['learning_rate'][0], GPMPC_dict['float']['learning_rate'][1], log=True)] - - hps_suggestion = { - 'horizon': horizon, - 'kernel': kernel, - 'n_ind_points': n_ind_points, - 'num_epochs': num_epochs, - 'num_samples': num_samples, - 'optimization_iterations': optimization_iterations, - 'learning_rate': learning_rate, - } - - assert len(hps_suggestion) == len(hps_dict), ValueError('We are optimizing over different number of HPs as you listed.') - - return hps_suggestion - - -HYPERPARAMS_SAMPLER = { - 'ppo': ppo_sampler, - 'sac': sac_sampler, - 'gp_mpc': gpmpc_sampler, -} - -HYPERPARAMS_DICT = { - 'ppo': PPO_dict, - 'sac': SAC_dict, - 'gp_mpc': GPMPC_dict, -} diff --git a/safe_control_gym/hyperparameters/hpo_search_space.py b/safe_control_gym/hyperparameters/hpo_search_space.py new file mode 100644 index 000000000..bf305c4b0 --- /dev/null +++ b/safe_control_gym/hyperparameters/hpo_search_space.py @@ -0,0 +1,97 @@ +"""Defines the hyperparameter search space for each algorithm.""" + +from typing import Any, Dict + +PPO_dict = { + 'hidden_dim': {'values': [8, 16, 32, 64, 128, 256, 512], 'scale': 'uniform', 'type': int, 'cat': 'discrete'}, + 'gamma': {'values': [0.9, 0.95, 0.98, 0.99, 0.995, 0.999, 0.9999], 'scale': 'uniform', 'type': float, 'cat': 'discrete'}, + 'gae_lambda': {'values': [0.8, 0.9, 0.92, 0.95, 0.98, 0.99, 1.0], 'scale': 'uniform', 'type': float, 'cat': 'discrete'}, + 'clip_param': {'values': [0.1, 0.2, 0.3, 0.4], 'scale': 'uniform', 'type': float, 'cat': 'discrete'}, + 'opt_epochs': {'values': [1, 5, 10, 20, 25], 'scale': 'uniform', 'type': int, 'cat': 'discrete'}, + 'mini_batch_size': {'values': [32, 64, 128, 256], 'scale': 'uniform', 'type': int, 'cat': 'discrete'}, + 'max_env_steps': {'values': [30000, 72000, 114000, 156000, 216000, 276000, 336000], 'scale': 'uniform', 'type': int, 'cat': 'discrete'}, + 'activation': {'values': ['tanh', 'relu', 'leaky_relu'], 'scale': 'uniform', 'type': str, 'cat': 'categorical'}, + 'target_kl': {'values': [0.00000001, 0.8], 'scale': 'uniform', 'type': float, 'cat': 'float'}, + 'entropy_coef': {'values': [0.00000001, 0.1], 'scale': 'log', 'type': float, 'cat': 'float'}, # log-scaled + 'actor_lr': {'values': [1e-5, 1], 'scale': 'log', 'type': float, 'cat': 'float'}, # log-scaled + 'critic_lr': {'values': [1e-5, 1], 'scale': 'log', 'type': float, 'cat': 'float'}, # log-scaled + 'rew_state_weight': {'values': [0.001, 10], 'scale': 'uniform', 'type': list, 'cat': 'float'}, + 'rew_act_weight': {'values': [0.001, 10], 'scale': 'uniform', 'type': list, 'cat': 'float'}, +} + +SAC_dict = { + 'hidden_dim': {'values': [32, 64, 128, 256, 512], 'scale': 'uniform', 'type': int, 'cat': 'discrete'}, + 'gamma': {'values': [0.9, 0.95, 0.98, 0.99, 0.995, 0.999, 0.9999], 'scale': 'uniform', 'type': float, 'cat': 'discrete'}, + 'train_interval': {'values': [10, 100, 1000], 'scale': 'uniform', 'type': int, 'cat': 'discrete'}, + 'train_batch_size': {'values': [32, 64, 128, 256, 512, 1024], 'scale': 'uniform', 'type': int, 'cat': 'discrete'}, + 'max_env_steps': {'values': [30000, 72000, 114000, 156000, 216000, 276000, 336000], 'scale': 'uniform', 'type': int, 'cat': 'discrete'}, + 'warm_up_steps': {'values': [500, 1000, 2000, 4000], 'scale': 'uniform', 'type': int, 'cat': 'discrete'}, + 'max_buffer_size': {'values': [10000, 50000, 100000, 200000], 'scale': 'uniform', 'type': int, 'cat': 'discrete'}, + 'activation': {'values': ['tanh', 'relu', 'leaky_relu'], 'scale': 'uniform', 'type': str, 'cat': 'categorical'}, + 'tau': {'values': [0.005, 1.0], 'scale': 'uniform', 'type': float, 'cat': 'float'}, + 'init_temperature': {'values': [0.01, 1], 'scale': 'uniform', 'type': float, 'cat': 'float'}, + 'actor_lr': {'values': [1e-5, 1], 'scale': 'log', 'type': float, 'cat': 'float'}, # log-scaled + 'critic_lr': {'values': [1e-5, 1], 'scale': 'log', 'type': float, 'cat': 'float'}, # log-scaled + 'entropy_lr': {'values': [1e-5, 1], 'scale': 'log', 'type': float, 'cat': 'float'}, # log-scaled + 'rew_state_weight': {'values': [0.001, 10], 'scale': 'uniform', 'type': list, 'cat': 'float'}, + 'rew_act_weight': {'values': [0.001, 10], 'scale': 'uniform', 'type': list, 'cat': 'float'}, +} + +GPMPC_dict = { + 'horizon': {'values': [10, 15, 20, 25, 30, 35, 40], 'scale': 'uniform', 'type': int, 'cat': 'discrete'}, + 'n_ind_points': {'values': [30, 40, 50], 'scale': 'uniform', 'type': int, 'cat': 'discrete'}, + 'num_epochs': {'values': [2, 3, 4, 5, 6, 7, 8], 'scale': 'uniform', 'type': int, 'cat': 'discrete'}, + 'num_samples': {'values': [70, 75, 80, 85], 'scale': 'uniform', 'type': int, 'cat': 'discrete'}, + 'optimization_iterations': {'values': [2400, 2600, 2800, 3000, 3200], 'scale': 'uniform', 'type': int, 'cat': 'discrete'}, # type belongs to int due to 1 DoF + 'kernel': {'values': ['Matern', 'RBF'], 'scale': 'uniform', 'type': str, 'cat': 'categorical'}, + 'learning_rate': {'values': [5e-4, 0.5], 'scale': 'log', 'type': float, 'cat': 'float'}, # type belongs to float due to 1 DoF + 'q_mpc': {'values': [0.001, 10], 'scale': 'uniform', 'type': list, 'cat': 'float'}, + 'r_mpc': {'values': [0.001, 10], 'scale': 'uniform', 'type': list, 'cat': 'float'}, +} + +iLQR_dict = { + 'max_iterations': {'values': [5, 10, 15, 20], 'scale': 'uniform', 'type': int, 'cat': 'discrete'}, + 'lamb_factor': {'values': [5, 10, 15], 'scale': 'uniform', 'type': int, 'cat': 'discrete'}, + 'lamb_max': {'values': [1000, 1500, 2000], 'scale': 'uniform', 'type': int, 'cat': 'discrete'}, + 'epsilon': {'values': [0.01, 0.005, 0.001], 'scale': 'uniform', 'type': float, 'cat': 'discrete'}, + 'q_lqr': {'values': [0.001, 10], 'scale': 'uniform', 'type': list, 'cat': 'float'}, + 'r_lqr': {'values': [0.001, 10], 'scale': 'uniform', 'type': list, 'cat': 'float'}, +} + +iLQR_SF_dict = { + 'max_iterations': {'values': [5, 10, 15, 20], 'scale': 'uniform', 'type': int, 'cat': 'discrete'}, + 'lamb_factor': {'values': [5, 10, 15], 'scale': 'uniform', 'type': int, 'cat': 'discrete'}, + 'lamb_max': {'values': [1000, 1500, 2000], 'scale': 'uniform', 'type': int, 'cat': 'discrete'}, + 'epsilon': {'values': [0.01, 0.005, 0.001], 'scale': 'uniform', 'type': float, 'cat': 'discrete'}, + 'q_lqr': {'values': [0.001, 10], 'scale': 'uniform', 'type': list, 'cat': 'float'}, + 'r_lqr': {'values': [0.001, 10], 'scale': 'uniform', 'type': list, 'cat': 'float'}, + 'horizon': {'values': [10, 15, 20, 25, 30, 35, 40], 'scale': 'uniform', 'type': int, 'cat': 'discrete'}, + 'n_samples': {'values': [400, 600, 800, 1000], 'scale': 'uniform', 'type': int, 'cat': 'discrete'}, + 'tau': {'values': [0.95, 0.99], 'scale': 'uniform', 'type': float, 'cat': 'discrete'}, + 'q_lin': {'values': [0.001, 10], 'scale': 'uniform', 'type': list, 'cat': 'float'}, + 'r_lin': {'values': [0.001, 10], 'scale': 'uniform', 'type': list, 'cat': 'float'}, +} + + +HYPERPARAMS_DICT = { + 'ppo': PPO_dict, + 'sac': SAC_dict, + 'gp_mpc': GPMPC_dict, + 'gpmpc_acados': GPMPC_dict, + 'ilqr': iLQR_dict, + 'ilqr_sf': iLQR_SF_dict, +} + + +def is_log_scale(param: Dict[str, Any]) -> bool: + """Check if the hyperparameter log scale. + + args: + param (dict): the hyperparameter dictionary + + returns: + bool: True if the hyperparameter is log-scaled, False otherwise + + """ + + return param['scale'] == 'log' diff --git a/safe_control_gym/hyperparameters/optuna/hpo_optuna.py b/safe_control_gym/hyperparameters/optuna/hpo_optuna.py new file mode 100644 index 000000000..fe8e8d840 --- /dev/null +++ b/safe_control_gym/hyperparameters/optuna/hpo_optuna.py @@ -0,0 +1,248 @@ +""" The implementation of HPO class using Optuna + +Reference: + * stable baselines3 https://github.com/DLR-RM/rl-baselines3-zoo/blob/master/rl_zoo3/hyperparams_opt.py + * Optuna: https://optuna.org + +""" +import os + +import matplotlib.pyplot as plt +import numpy as np +import optuna +import yaml +from optuna.samplers import TPESampler +from optuna.study import MaxTrialsCallback +from optuna.trial import FrozenTrial, TrialState +from optuna.visualization.matplotlib import plot_optimization_history, plot_param_importances + +from safe_control_gym.hyperparameters.base_hpo import BaseHPO +from safe_control_gym.hyperparameters.optuna.hpo_optuna_utils import HYPERPARAMS_SAMPLER + + +class HPO_Optuna(BaseHPO): + + def __init__(self, + hpo_config, + task_config, + algo_config, + algo='ilqr', + task='stabilization', + output_dir='./results', + safety_filter=None, + sf_config=None, + load_study=False): + """ + Hyperparameter Optimization (HPO) class using package Optuna. + + Args: + hpo_config: Configuration specific to hyperparameter optimization. + task_config: Configuration for the task. + algo_config: Algorithm configuration. + algo (str): Algorithm name. + task (str): The task/environment the agent will interact with. + output_dir (str): Directory where results and models will be saved. + safety_filter (str): Safety filter to be applied (optional). + sf_config: Safety filter configuration (optional). + load_study (bool): Load existing study if True. + """ + super().__init__(hpo_config, task_config, algo_config, algo, task, output_dir, safety_filter, sf_config, load_study) + self.setup_problem() + + def setup_problem(self): + """ Setup hyperparameter optimization, e.g., search space, study, algorithm, etc.""" + + # init sampler + self.sampler = TPESampler(seed=self.hpo_config.seed) + + def objective(self, trial: optuna.Trial) -> float: + """ The stochastic objective function for a HPO tool to optimize over + + args: + trial: A single trial object that contains the hyperparameters to be evaluated + + """ + + # sample candidate hyperparameters + sampled_hyperparams = HYPERPARAMS_SAMPLER[self.search_space_key](trial, self.state_dim, self.action_dim) + + # log trial number + self.logger.info('Trial number: {}'.format(trial.number)) + + returns = self.evaluate(sampled_hyperparams) + Gss = np.array(returns).mean() + + self.logger.info('Returns: {}'.format(Gss)) + + if len(self.study.trials) > 0: + self.checkpoint() + + self.objective_value = Gss + # wandb.log({self.hpo_config.objective[0]: Gss}) + return Gss + + def hyperparameter_optimization(self) -> None: + """ Hyperparameter optimization. + """ + if self.load_study: + self.study = optuna.load_study(study_name=self.study_name, storage=f'sqlite:///{self.study_name}_optuna.db') + else: + # single-objective optimization + if len(self.hpo_config.direction) == 1: + self.study = optuna.create_study( + direction=self.hpo_config.direction[0], + sampler=self.sampler, + pruner=optuna.pruners.MedianPruner(n_warmup_steps=10), + study_name=self.study_name, + storage='sqlite:///{}_optuna.db'.format(self.study_name), + load_if_exists=self.hpo_config.load_if_exists + ) + # multi-objective optimization + else: + self.study = optuna.create_study( + directions=self.hpo_config.direction, + sampler=self.sampler, + pruner=optuna.pruners.MedianPruner(n_warmup_steps=10), + study_name=self.study_name, + storage='sqlite:///{}_optuna.db'.format(self.study_name), + load_if_exists=self.hpo_config.load_if_exists + ) + self.warm_start(self.config_to_param(self.hps_config)) + + self.study.optimize(self.objective, + catch=(RuntimeError,), + callbacks=[MaxTrialsCallback(self.hpo_config.trials, states=(TrialState.COMPLETE,)), + self._warn_unused_parameter_callback], + ) + + self.checkpoint() + + self.logger.close() + + return + + def warm_start(self, params): + """ + Warm start the study. + + Args: + params (dict): Specified hyperparameters to be evaluated. + """ + if hasattr(self, 'study'): + self.study.enqueue_trial(params, skip_if_exists=True) + + def checkpoint(self): + """ + Save checkpoints, results, and logs during optimization. + """ + output_dir = os.path.join(self.output_dir, 'hpo') + if not os.path.exists(output_dir): + os.makedirs(output_dir) + + try: + # save meta data + self.study.trials_dataframe().to_csv(output_dir + '/trials.csv') + except Exception as e: + print(e) + print('Saving trials.csv failed.') + + try: + # save top-n best hyperparameters + if len(self.hpo_config.direction) == 1: + trials = self.study.get_trials(deepcopy=True, states=(TrialState.COMPLETE,)) + if self.hpo_config.direction[0] == 'minimize': + trials.sort(key=self._value_key) + else: + trials.sort(key=self._value_key, reverse=True) + for i in range(min(self.hpo_config.save_n_best_hps, len(self.study.trials))): + params = trials[i].params + params = self.post_process_best_hyperparams(params) + with open(f'{output_dir}/hyperparameters_trial{len(trials)}_{trials[i].value:.4f}.yaml', 'w')as f: + yaml.dump(params, f, default_flow_style=False) + else: + best_trials = self.study.best_trials + for i in range(len(self.study.best_trials)): + params = best_trials[i].params + params = self.post_process_best_hyperparams(params) + with open(f'{output_dir}/best_hyperparameters_[{best_trials[i].values[0]:.4f},{best_trials[i].values[1]:.4f}].yaml', 'w')as f: + yaml.dump(params, f, default_flow_style=False) + except Exception as e: + print(e) + print('Saving best hyperparameters failed.') + + # save plot + try: + if len(self.hpo_config.objective) == 1: + plot_param_importances(self.study) + plt.tight_layout() + plt.savefig(output_dir + '/param_importances.png') + # plt.show() + plt.close() + plot_optimization_history(self.study) + plt.tight_layout() + plt.savefig(output_dir + '/optimization_history.png') + # plt.show() + plt.close() + else: + for i in range(len(self.hpo_config.objective)): + plot_param_importances(self.study, target=lambda t: t.values[i]) + plt.tight_layout() + plt.savefig(output_dir + '/param_importances_{}.png'.format(self.hpo_config.objective[i])) + # plt.show() + plt.close() + plot_optimization_history(self.study, target=lambda t: t.values[i]) + plt.tight_layout() + plt.savefig(output_dir + '/optimization_history_{}.png'.format(self.hpo_config.objective[i])) + # plt.show() + plt.close() + except Exception as e: + print(e) + print('Plotting failed.') + + def _value_key(self, trial: FrozenTrial) -> float: + """ Returns value of trial object for sorting + + """ + if trial.value is None: + if self.hpo_config.direction[0] == 'minimize': + return self.objective_bounds[0][1] + else: + return self.objective_bounds[0][0] + else: + return trial.value + + def _compute_cvar(self, returns: np.ndarray, alpha: float = 0.2) -> float: + """ Compute CVaR + + """ + assert returns.ndim == 1, 'returns must be 1D array' + sorted_returns = np.sort(returns) + n = len(sorted_returns) + VaR_idx = int(alpha * n) + if VaR_idx == 0: + VaR_idx = 1 + + if self.hpo_config.direction[0] == 'minimize': + CVaR = sorted_returns[-VaR_idx:].mean() + else: + CVaR = sorted_returns[:VaR_idx].mean() + + return CVaR + + def _warn_unused_parameter_callback(self, study: optuna.Study, trial: FrozenTrial) -> None: + """User-defined callback to warn unused parameters.""" + fixed_params = trial.system_attrs.get('fixed_params') + if fixed_params is None: + return + + for param_name, param_value in fixed_params.items(): + distribution = trial.distributions.get(param_name) + if distribution is None: + # Or you can raise a something exception here. + self.logger.info(f"Parameter '{param_name}' is not used at trial {trial.number}.") + continue + + param_value_internal_repr = distribution.to_internal_repr(param_value) + if not distribution._contains(param_value_internal_repr): + # Or you can raise a something exception here. + self.logger.info(f"Parameter '{param_name}' is not used at trial {trial.number}.") diff --git a/safe_control_gym/hyperparameters/optuna/hpo_optuna_utils.py b/safe_control_gym/hyperparameters/optuna/hpo_optuna_utils.py new file mode 100644 index 000000000..fc0476a64 --- /dev/null +++ b/safe_control_gym/hyperparameters/optuna/hpo_optuna_utils.py @@ -0,0 +1,276 @@ +""" Utils for Optuna hyperparameter optimization. """ + +from typing import Any, Dict + +import optuna + +from safe_control_gym.hyperparameters.hpo_search_space import (GPMPC_dict, PPO_dict, SAC_dict, iLQR_dict, + iLQR_SF_dict, is_log_scale) + + +def ppo_sampler(trial: optuna.Trial, state_dim: int, action_dim: int) -> Dict[str, Any]: + """Sampler for PPO hyperparameters. + + args: + hps_dict: the dict of hyperparameters that will be optimized over + trial: budget variable + state_dim: dimension of the state space + action_dim: dimension of the action space + + """ + + # TODO: conditional hyperparameters + + # model args + hidden_dim = trial.suggest_categorical('hidden_dim', PPO_dict['hidden_dim']['values']) + activation = trial.suggest_categorical('activation', PPO_dict['activation']['values']) + + # loss args + gamma = trial.suggest_categorical('gamma', PPO_dict['gamma']['values']) + # Factor for trade-off of bias vs variance for Generalized Advantage Estimator + gae_lambda = trial.suggest_categorical('gae_lambda', PPO_dict['gae_lambda']['values']) + clip_param = trial.suggest_categorical('clip_param', PPO_dict['clip_param']['values']) + # Limit the KL divergence between updates + target_kl = trial.suggest_float('target_kl', PPO_dict['target_kl']['values'][0], PPO_dict['target_kl']['values'][1], log=is_log_scale(PPO_dict['target_kl'])) + # Entropy coefficient for the loss calculation + entropy_coef = trial.suggest_float('entropy_coef', PPO_dict['entropy_coef']['values'][0], PPO_dict['entropy_coef']['values'][1], log=is_log_scale(PPO_dict['entropy_coef'])) + + # optim args + opt_epochs = trial.suggest_categorical('opt_epochs', PPO_dict['opt_epochs']['values']) + mini_batch_size = trial.suggest_categorical('mini_batch_size', PPO_dict['mini_batch_size']['values']) + actor_lr = trial.suggest_float('actor_lr', PPO_dict['actor_lr']['values'][0], PPO_dict['actor_lr']['values'][1], log=is_log_scale(PPO_dict['actor_lr'])) + critic_lr = trial.suggest_float('critic_lr', PPO_dict['critic_lr']['values'][0], PPO_dict['critic_lr']['values'][1], log=is_log_scale(PPO_dict['critic_lr'])) + max_env_steps = trial.suggest_categorical('max_env_steps', PPO_dict['max_env_steps']['values']) + + # cost parameters + state_weight = [ + trial.suggest_float(f'rew_state_weight_{i}', PPO_dict['rew_state_weight']['values'][0], PPO_dict['rew_state_weight']['values'][1], log=is_log_scale(PPO_dict['rew_state_weight'])) + for i in range(state_dim) + ] + action_weight = [ + trial.suggest_float(f'rew_act_weight_{i}', PPO_dict['rew_act_weight']['values'][0], PPO_dict['rew_act_weight']['values'][1], log=is_log_scale(PPO_dict['rew_act_weight'])) + for i in range(action_dim) + ] + + hps_suggestion = { + 'hidden_dim': hidden_dim, + 'activation': activation, + 'gamma': gamma, + 'gae_lambda': gae_lambda, + 'clip_param': clip_param, + 'target_kl': target_kl, + 'entropy_coef': entropy_coef, + 'opt_epochs': opt_epochs, + 'mini_batch_size': mini_batch_size, + 'actor_lr': actor_lr, + 'critic_lr': critic_lr, + 'max_env_steps': max_env_steps, + 'rew_state_weight': state_weight, + 'rew_act_weight': action_weight, + } + + return hps_suggestion + + +def sac_sampler(trial: optuna.Trial, state_dim: int, action_dim: int) -> Dict[str, Any]: + """Sampler for SAC hyperparameters. + + args: + hps_dict: the dict of hyperparameters that will be optimized over + trial: budget variable + state_dim: dimension of the state space + action_dim: dimension of the action space + """ + + # TODO: conditional hyperparameters + + # model args + hidden_dim = trial.suggest_categorical('hidden_dim', SAC_dict['hidden_dim']['values']) + activation = trial.suggest_categorical('activation', SAC_dict['activation']['values']) + + # loss args + gamma = trial.suggest_categorical('gamma', SAC_dict['gamma']['values']) + tau = trial.suggest_float('tau', SAC_dict['tau']['values'][0], SAC_dict['tau']['values'][1], log=is_log_scale(SAC_dict['tau'])) + + # optim args + train_interval = trial.suggest_categorical('train_interval', SAC_dict['train_interval']['values']) + train_batch_size = trial.suggest_categorical('train_batch_size', SAC_dict['train_batch_size']['values']) + actor_lr = trial.suggest_float('actor_lr', SAC_dict['actor_lr']['values'][0], SAC_dict['actor_lr']['values'][1], log=is_log_scale(SAC_dict['actor_lr'])) + critic_lr = trial.suggest_float('critic_lr', SAC_dict['critic_lr']['values'][0], SAC_dict['critic_lr']['values'][1], log=is_log_scale(SAC_dict['critic_lr'])) + + max_env_steps = trial.suggest_categorical('max_env_steps', SAC_dict['max_env_steps']['values']) + warm_up_steps = trial.suggest_categorical('warm_up_steps', SAC_dict['warm_up_steps']['values']) + + # cost parameters + state_weight = [ + trial.suggest_float(f'rew_state_weight_{i}', SAC_dict['rew_state_weight']['values'][0], SAC_dict['rew_state_weight']['values'][1], log=is_log_scale(SAC_dict['rew_state_weight'])) + for i in range(state_dim) + ] + action_weight = [ + trial.suggest_float(f'rew_act_weight_{i}', SAC_dict['rew_act_weight']['values'][0], SAC_dict['rew_act_weight']['values'][1], log=is_log_scale(SAC_dict['rew_act_weight'])) + for i in range(action_dim) + ] + + hps_suggestion = { + 'hidden_dim': hidden_dim, + 'activation': activation, + 'gamma': gamma, + 'tau': tau, + 'train_interval': train_interval, + 'train_batch_size': train_batch_size, + 'actor_lr': actor_lr, + 'critic_lr': critic_lr, + 'max_env_steps': max_env_steps, + 'warm_up_steps': warm_up_steps, + 'rew_state_weight': state_weight, + 'rew_act_weight': action_weight, + } + + return hps_suggestion + + +def gpmpc_sampler(trial: optuna.Trial, state_dim: int, action_dim: int) -> Dict[str, Any]: + """Sampler for PPO hyperparameters. + + args: + hps_dict: the dict of hyperparameters that will be optimized over + trial: budget variable + state_dim: dimension of the state space + action_dim: dimension of the action space + """ + + horizon = trial.suggest_categorical('horizon', GPMPC_dict['horizon']['values']) + kernel = trial.suggest_categorical('kernel', GPMPC_dict['kernel']['values']) + n_ind_points = trial.suggest_categorical('n_ind_points', GPMPC_dict['n_ind_points']['values']) + num_epochs = trial.suggest_categorical('num_epochs', GPMPC_dict['num_epochs']['values']) + num_samples = trial.suggest_categorical('num_samples', GPMPC_dict['num_samples']['values']) + + optimization_iterations = trial.suggest_categorical('optimization_iterations', GPMPC_dict['optimization_iterations']['values']) + learning_rate = trial.suggest_float('learning_rate', GPMPC_dict['learning_rate']['values'][0], GPMPC_dict['learning_rate']['values'][1], log=is_log_scale(GPMPC_dict['learning_rate'])) + + # objective + state_weight = [ + trial.suggest_float(f'q_mpc_{i}', GPMPC_dict['q_mpc']['values'][0], GPMPC_dict['q_mpc']['values'][1], log=is_log_scale(GPMPC_dict['q_mpc'])) + for i in range(state_dim) + ] + action_weight = [ + trial.suggest_float(f'r_mpc_{i}', GPMPC_dict['r_mpc']['values'][0], GPMPC_dict['r_mpc']['values'][1], log=is_log_scale(GPMPC_dict['r_mpc'])) + for i in range(action_dim) + ] + + hps_suggestion = { + 'horizon': horizon, + 'kernel': kernel, + 'n_ind_points': n_ind_points, + 'num_epochs': num_epochs, + 'num_samples': num_samples, + 'optimization_iterations': optimization_iterations, + 'learning_rate': learning_rate, + 'q_mpc': state_weight, + 'r_mpc': action_weight, + } + + return hps_suggestion + + +def ilqr_sampler(trial: optuna.Trial, state_dim: int, action_dim: int) -> Dict[str, Any]: + """Sampler for iLQR hyperparameters. + + args: + hps_dict: the dict of hyperparameters that will be optimized over + trial: budget variable + state_dim: dimension of the state space + action_dim: dimension of the action space + """ + + max_iterations = trial.suggest_categorical('max_iterations', iLQR_dict['max_iterations']['values']) + lamb_factor = trial.suggest_categorical('lamb_factor', iLQR_dict['lamb_factor']['values']) + lamb_max = trial.suggest_categorical('lamb_max', iLQR_dict['lamb_max']['values']) + epsilon = trial.suggest_categorical('epsilon', iLQR_dict['epsilon']['values']) + + # cost parameters + state_weight = [ + trial.suggest_float(f'q_lqr_{i}', iLQR_dict['q_lqr']['values'][0], iLQR_dict['q_lqr']['values'][1], log=is_log_scale(iLQR_dict['q_lqr'])) + for i in range(state_dim) + ] + action_weight = [ + trial.suggest_float(f'r_lqr_{i}', iLQR_dict['r_lqr']['values'][0], iLQR_dict['r_lqr']['values'][1], log=is_log_scale(iLQR_dict['r_lqr'])) + for i in range(action_dim) + ] + + hps_suggestion = { + 'max_iterations': max_iterations, + 'lamb_factor': lamb_factor, + 'lamb_max': lamb_max, + 'epsilon': epsilon, + 'q_lqr': state_weight, + 'r_lqr': action_weight, + } + + return hps_suggestion + + +def ilqr_sf_sampler(trial: optuna.Trial, state_dim: int, action_dim: int) -> Dict[str, Any]: + """Sampler for iLQR hyperparameters with safety filter. + + args: + hps_dict: the dict of hyperparameters that will be optimized over + trial: budget variable + state_dim: dimension of the state space + action_dim: dimension of the action space + """ + + max_iterations = trial.suggest_categorical('max_iterations', iLQR_SF_dict['max_iterations']['values']) + lamb_factor = trial.suggest_categorical('lamb_factor', iLQR_SF_dict['lamb_factor']['values']) + lamb_max = trial.suggest_categorical('lamb_max', iLQR_SF_dict['lamb_max']['values']) + epsilon = trial.suggest_categorical('epsilon', iLQR_SF_dict['epsilon']['values']) + + # safety filter + horizon = trial.suggest_categorical('horizon', iLQR_SF_dict['horizon']['values']) + n_samples = trial.suggest_categorical('n_samples', iLQR_SF_dict['n_samples']['values']) + + # cost parameters + state_weight = [ + trial.suggest_float(f'q_lqr_{i}', iLQR_SF_dict['q_lqr']['values'][0], iLQR_SF_dict['q_lqr']['values'][1], log=is_log_scale(iLQR_SF_dict['q_lqr'])) + for i in range(state_dim) + ] + action_weight = [ + trial.suggest_float(f'r_lqr_{i}', iLQR_SF_dict['r_lqr']['values'][0], iLQR_SF_dict['r_lqr']['values'][1], log=is_log_scale(iLQR_SF_dict['r_lqr'])) + for i in range(action_dim) + ] + + # safety filter + tau = trial.suggest_float('tau', iLQR_SF_dict['tau']['values'][0], iLQR_SF_dict['tau']['values'][1], log=is_log_scale(iLQR_SF_dict['tau'])) + sf_state_weight = [ + trial.suggest_float(f'q_lin_{i}', iLQR_SF_dict['q_lin']['values'][0], iLQR_SF_dict['q_lin']['values'][1], log=is_log_scale(iLQR_SF_dict['q_lin'])) + for i in range(state_dim) + ] + sf_action_weight = [ + trial.suggest_float(f'r_lin_{i}', iLQR_SF_dict['r_lin']['values'][0], iLQR_SF_dict['r_lin']['values'][1], log=is_log_scale(iLQR_SF_dict['r_lin'])) + for i in range(action_dim) + ] + hps_suggestion = { + 'max_iterations': max_iterations, + 'lamb_factor': lamb_factor, + 'lamb_max': lamb_max, + 'epsilon': epsilon, + 'horizon': horizon, + 'n_samples': n_samples, + 'q_lqr': state_weight, + 'r_lqr': action_weight, + 'tau': tau, + 'q_lin': sf_state_weight, + 'r_lin': sf_action_weight, + } + + return hps_suggestion + + +HYPERPARAMS_SAMPLER = { + 'ppo': ppo_sampler, + 'sac': sac_sampler, + 'gp_mpc': gpmpc_sampler, + 'gpmpc_acados': gpmpc_sampler, + 'ilqr': ilqr_sampler, + 'ilqr_sf': ilqr_sf_sampler, +} diff --git a/safe_control_gym/hyperparameters/vizier/hpo_vizier.py b/safe_control_gym/hyperparameters/vizier/hpo_vizier.py new file mode 100644 index 000000000..22af6e3d0 --- /dev/null +++ b/safe_control_gym/hyperparameters/vizier/hpo_vizier.py @@ -0,0 +1,266 @@ +""" The implementation of HPO class using Vizier + +Reference: + * https://oss-vizier.readthedocs.io/en/latest/ + * https://arxiv.org/pdf/0912.3995 + +""" + +import csv +import os +import time + +import matplotlib.pyplot as plt +import numpy as np +import yaml +from vizier.service import clients +from vizier.service import pyvizier as vz +from vizier.service import servers + +from safe_control_gym.hyperparameters.base_hpo import BaseHPO +from safe_control_gym.hyperparameters.hpo_search_space import HYPERPARAMS_DICT + + +class HPO_Vizier(BaseHPO): + + def __init__(self, + hpo_config, + task_config, + algo_config, + algo='ilqr', + task='stabilization', + output_dir='./results', + safety_filter=None, + sf_config=None, + load_study=False): + """ + Hyperparameter Optimization (HPO) class using package Vizier. + + Args: + hpo_config: Configuration specific to hyperparameter optimization. + task_config: Configuration for the task. + algo_config: Algorithm configuration. + algo (str): Algorithm name. + task (str): The task/environment the agent will interact with. + output_dir (str): Directory where results and models will be saved. + safety_filter (str): Safety filter to be applied (optional). + sf_config: Safety filter configuration (optional). + load_study (bool): Load existing study if True. + """ + super().__init__(hpo_config, task_config, algo_config, algo, task, output_dir, safety_filter, sf_config, load_study) + + self.client_id = f'client_{os.getpid()}' # use process id as client id + self.setup_problem() + + def setup_problem(self): + """ Setup hyperparameter optimization, e.g., search space, study, algorithm, etc. """ + + # define the problem statement + self.problem = vz.ProblemStatement() + + # define the search space + self.search_space = HYPERPARAMS_DICT[self.search_space_key] + + for hp_name, hp_info in self.search_space.items(): + hp_values = hp_info['values'] + scale = hp_info['scale'] + is_list = hp_info['type'] == list + cat = hp_info['cat'] + + if cat == 'float': + if scale == 'uniform': + if is_list: + for i in range(len(self.hps_config[hp_name])): + self.problem.search_space.root.add_float_param(f'{hp_name}_{i}', hp_values[0], hp_values[1]) + else: + self.problem.search_space.root.add_float_param(hp_name, hp_values[0], hp_values[1]) + elif scale == 'log': + if is_list: + for i in range(len(self.hps_config[hp_name])): + self.problem.search_space.root.add_float_param(f'{hp_name}_{i}', hp_values[0], hp_values[1], scale_type=vz.ScaleType.LOG) + else: + self.problem.search_space.root.add_float_param(hp_name, hp_values[0], hp_values[1], scale_type=vz.ScaleType.LOG) + else: + raise ValueError('Invalid scale') + + elif cat == 'discrete': + if scale == 'uniform': + self.problem.search_space.root.add_discrete_param(hp_name, hp_values) + elif scale == 'log': + self.problem.search_space.root.add_discrete_param(hp_name, hp_values, scale_type=vz.ScaleType.LOG) + else: + raise ValueError('Invalid scale') + + elif cat == 'categorical': + self.problem.search_space.root.add_categorical_param(hp_name, hp_values) + else: + raise ValueError('Invalid hyperparameter category') + + # Set optimization direction based on objective and direction from the HPO config + if self.hpo_config.direction[0] == 'maximize': + self.problem.metric_information.append( + vz.MetricInformation(name=self.hpo_config.objective[0], goal=vz.ObjectiveMetricGoal.MAXIMIZE)) + elif self.hpo_config.direction[0] == 'minimize': + self.problem.metric_information.append( + vz.MetricInformation(name=self.hpo_config.objective[0], goal=vz.ObjectiveMetricGoal.MINIMIZE)) + + def hyperparameter_optimization(self) -> None: + """ Hyperparameter optimization. + """ + if self.load_study: + with open(f'{self.study_name}_endpoint.yaml', 'r') as config_file: + endpoint = yaml.safe_load(config_file)['endpoint'] + clients.environment_variables.server_endpoint = endpoint + study_config = vz.StudyConfig.from_problem(self.problem) + self.study_client = clients.Study.from_study_config(study_config, owner='owner', study_id=self.study_name) + self.study_client = clients.Study.from_resource_name(self.study_client.resource_name) + else: + server = servers.DefaultVizierServer(database_url=f'sqlite:///{self.study_name}_vizier.db') + clients.environment_variables.server_endpoint = server.endpoint + endpoint = server.endpoint + with open(f'{self.study_name}_endpoint.yaml', 'w') as config_file: + yaml.dump({'endpoint': endpoint}, config_file, default_flow_style=False) + + study_config = vz.StudyConfig.from_problem(self.problem) + self.study_client = clients.Study.from_study_config(study_config, owner='owner', study_id=self.study_name) + self.warm_start(self.config_to_param(self.hps_config)) + + existing_trials = 0 + while existing_trials < self.hpo_config.trials: + # get suggested hyperparameters + suggestions = self.study_client.suggest(count=1, client_id=self.client_id) + # suggestions = self.study_client.suggest(count=1) + + for suggestion in suggestions: + if suggestion.id > self.hpo_config.trials: + self.logger.info(f'Trial {suggestion.id} is deleted as it exceeds the maximum number of trials.') + suggestion.delete() + existing_trials = suggestion.id + break + self.logger.info(f'Hyperparameter optimization trial {suggestion.id}/{self.hpo_config.trials}') + existing_trials = suggestion.id + # evaluate the suggested hyperparameters + materialized_suggestion = suggestion.materialize() + suggested_params = {key: val.value for key, val in materialized_suggestion.parameters._items.items()} + res = self.evaluate(suggested_params) + objective_value = np.mean(res) + self.logger.info(f'Returns: {objective_value}') + if objective_value is None: + objective_value = 0.0 + final_measurement = vz.Measurement({f'{self.hpo_config.objective[0]}': objective_value}) + self.objective_value = objective_value + # wandb.log({f'{self.hpo_config.objective[0]}': objective_value}) + suggestion.complete(final_measurement) + + if existing_trials > 0: + self.checkpoint() + + if self.load_study is False: + + completed_trial_filter = vz.TrialFilter(status=[vz.TrialStatus.COMPLETED]) + finished_trials = len(list(self.study_client.trials(trial_filter=completed_trial_filter).get())) + # wait until other clients to finish + while finished_trials < self.hpo_config.trials: + self.logger.info(f'Waiting for other clients to finish remaining trials: {self.hpo_config.trials - finished_trials}') + finished_trials = len(list(self.study_client.trials(trial_filter=completed_trial_filter).get())) + # sleep for 10 seconds + time.sleep(10) + + self.logger.info(f'Have finished trials: {finished_trials}/{self.hpo_config.trials}') + + self.checkpoint() + + self.logger.info('Deleting server.') + del server + + self.logger.close() + + def warm_start(self, params): + """ + Warm start the study. + + Args: + params (dict): Specified hyperparameters to be evaluated. + """ + if hasattr(self, 'study_client'): + res = self.evaluate(params) + objective_value = np.mean(res) + trial = vz.Trial(parameters=params, final_measurement=vz.Measurement({f'{self.hpo_config.objective[0]}': objective_value})) + self.study_client._add_trial(trial) + + def checkpoint(self): + """ + Save checkpoints, results, and logs during optimization. + """ + output_dir = os.path.join(self.output_dir, 'hpo') + if not os.path.exists(output_dir): + os.makedirs(output_dir) + + completed_trial_filter = vz.TrialFilter(status=[vz.TrialStatus.COMPLETED]) + all_trials = [tc.materialize() for tc in self.study_client.trials(trial_filter=completed_trial_filter)] + + try: + for optimal_trial in self.study_client.optimal_trials(): + optimal_trial = optimal_trial.materialize() + params = {key: val.value for key, val in optimal_trial.parameters._items.items()} + params = self.post_process_best_hyperparams(params) + with open(f'{output_dir}/hyperparameters_trial{len(all_trials)}_{optimal_trial.final_measurement.metrics[self.hpo_config.objective[0]].value:.4f}.yaml', 'w')as f: + yaml.dump(params, f, default_flow_style=False) + except Exception as e: + print(e) + print('Saving hyperparameters failed') + + try: + # Visualize all trials so-far. + trial_i = [t for t in range(len(all_trials))] + trial_ys = [t.final_measurement.metrics[self.hpo_config.objective[0]].value for t in all_trials] + plt.scatter(trial_i, trial_ys, label='trials', marker='o', color='blue') + + # Mark optimal trial so far. + optimal_trial_i = [optimal_trial.id - 1] + optimal_trial_ys = [optimal_trial.final_measurement.metrics[self.hpo_config.objective[0]].value] + plt.scatter(optimal_trial_i, optimal_trial_ys, label='optimal', marker='x', color='green', s=100) + + # Plot. + plt.legend() + plt.title('Optimization History Plot') + plt.xlabel('Trial') + plt.ylabel('Objective Value') + plt.tight_layout() + plt.savefig(output_dir + '/optimization_history.png') + plt.close() + + trial_data = [] + + # Collect all the parameter keys across trials for use in the CSV header + parameter_keys = set() + + for t in all_trials: + trial_number = t.id - 1 + trial_value = t.final_measurement.metrics[self.hpo_config.objective[0]].value + + # Extract parameters for each trial + trial_params = {key: val.value for key, val in t.parameters._items.items()} + trial_params = self.post_process_best_hyperparams(trial_params) + parameter_keys.update(trial_params.keys()) # Collect parameter keys dynamically + trial_data.append((trial_number, trial_value, trial_params)) + + # Convert set to list for a consistent order in the CSV header + parameter_keys = sorted(list(parameter_keys)) + + # Save to CSV file + csv_file = 'trials.csv' + with open(output_dir + '/' + csv_file, mode='w', newline='') as file: + writer = csv.writer(file) + + # Write the header with 'number', 'value', followed by all parameter keys + writer.writerow(['number', 'value'] + parameter_keys) + + # Write the trial data + for trial_number, trial_value, trial_params in trial_data: + # Ensure that parameters are written in the same order as the header + param_values = [trial_params.get(key, '') for key in parameter_keys] + writer.writerow([trial_number, trial_value] + param_values) + except Exception as e: + print(e) + print('Saving optimization history failed') diff --git a/safe_control_gym/utils/logging.py b/safe_control_gym/utils/logging.py index 5b43d41df..241d18426 100644 --- a/safe_control_gym/utils/logging.py +++ b/safe_control_gym/utils/logging.py @@ -27,6 +27,7 @@ def __init__(self, logger_name, log_dir, level=logging.INFO): logger.addHandler(stream_handler) self.logger = logger self.file_handler = file_handler + self.stream_handler = stream_handler def info(self, msg): '''Chain print message to logger.''' @@ -35,6 +36,11 @@ def info(self, msg): def close(self): '''Free log file.''' self.file_handler.close() + self.stream_handler.close() + + # Remove handlers from logger + self.logger.removeHandler(self.file_handler) + self.logger.removeHandler(self.stream_handler) class FileLogger: diff --git a/setup.py b/setup.py index 41dc8fbff..093d20cfe 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,7 @@ 'pre-commit', 'optuna', 'optuna-dashboard', - 'mysql-connector-python', - 'pymysql', + 'pandas', + 'google-vizier[jax]', ], ) diff --git a/tests/test_hpo/test_hpo.py b/tests/test_hpo/test_hpo.py index 8d3432dc0..1c025ae7c 100644 --- a/tests/test_hpo/test_hpo.py +++ b/tests/test_hpo/test_hpo.py @@ -1,210 +1,75 @@ import os import sys -import time -import munch import pytest from examples.hpo.hpo_experiment import hpo -from safe_control_gym.hyperparameters.database import create, drop from safe_control_gym.utils.configuration import ConfigFactory @pytest.mark.parametrize('SYS', ['cartpole']) @pytest.mark.parametrize('TASK', ['stab']) -@pytest.mark.parametrize('ALGO', ['ppo', 'sac', 'gp_mpc']) -@pytest.mark.parametrize('SAMPLER', ['TPESampler', 'RandomSampler']) -def test_hpo(SYS, TASK, ALGO, SAMPLER): - '''Test HPO for one single trial using MySQL database. +@pytest.mark.parametrize('ALGO', ['ilqr', 'gp_mpc', 'ppo']) +@pytest.mark.parametrize('PRIOR', ['']) +@pytest.mark.parametrize('SAFETY_FILTER', ['', 'linear_mpsc']) +@pytest.mark.parametrize('SAMPLER', ['optuna', 'vizier']) +def test_hpo_cartpole(SYS, TASK, ALGO, PRIOR, SAFETY_FILTER, SAMPLER): + '''Test HPO for one single trial. (create a study from scratch) ''' - pytest.skip('Takes too long.') # output_dir - output_dir = './examples/hpo/results' + output_dir = f'./examples/hpo/results/{ALGO}' # delete output_dir if os.path.exists(output_dir): os.system(f'rm -rf {output_dir}') - # drop the database if exists - drop(munch.Munch({'tag': f'{ALGO}_hpo'})) - # create database - create(munch.Munch({'tag': f'{ALGO}_hpo'})) - - if ALGO == 'gp_mpc': - PRIOR = '150' - sys.argv[1:] = ['--algo', ALGO, - '--task', SYS, - '--overrides', - f'./examples/hpo/gp_mpc/config_overrides/{SYS}/{SYS}_{TASK}.yaml', - f'./examples/hpo/gp_mpc/config_overrides/{SYS}/{ALGO}_{SYS}_{PRIOR}.yaml', - f'./examples/hpo/gp_mpc/config_overrides/{SYS}/{ALGO}_{SYS}_hpo.yaml', - '--output_dir', output_dir, - '--seed', '1', - ] - else: + # delete database + if os.path.exists(f'{ALGO}_hpo_{SAMPLER}.db'): + os.system(f'rm {ALGO}_hpo_{SAMPLER}.db') + if os.path.exists(f'{ALGO}_hpo_{SAMPLER}.db-journal'): + os.system(f'rm {ALGO}_hpo_{SAMPLER}.db-journal') + if os.path.exists(f'{ALGO}_hpo_endpoint.yaml'): + os.system(f'rm {ALGO}_hpo_endpoint.yaml') + + if ALGO == 'ilqr': + PRIOR = '100' + elif ALGO == 'gp_mpc' or ALGO == 'gpmpc_acados': + PRIOR = '200' + + # check if the config file exists + TASK_CONFIG_PATH = f'./examples/hpo/{SYS}/config_overrides/{SYS}_{TASK}.yaml' + ALGO_CONFIG_PATH = f'./examples/hpo/{SYS}/config_overrides/{ALGO}_{SYS}_{TASK}_{PRIOR}.yaml' + HPO_CONFIG_PATH = f'./examples/hpo/{SYS}/config_overrides/{ALGO}_{SYS}_hpo.yaml' + assert os.path.exists(TASK_CONFIG_PATH), f'{TASK_CONFIG_PATH} does not exist' + assert os.path.exists(ALGO_CONFIG_PATH), f'{ALGO_CONFIG_PATH} does not exist' + assert os.path.exists(HPO_CONFIG_PATH), f'{HPO_CONFIG_PATH} does not exist' + + if SAFETY_FILTER == 'linear_mpsc': + if ALGO != 'ilqr': + pytest.skip('SAFETY_FILTER is only supported for ilqr') + raise ValueError('SAFETY_FILTER is only supported for ilqr') + SAFETY_FILTER_CONFIG_PATH = f'./examples/hpo/{SYS}/config_overrides/{SAFETY_FILTER}_{SYS}_{TASK}_{PRIOR}.yaml' + assert os.path.exists(SAFETY_FILTER_CONFIG_PATH), f'{SAFETY_FILTER_CONFIG_PATH} does not exist' + MPSC_COST = 'one_step_cost' sys.argv[1:] = ['--algo', ALGO, '--task', SYS, '--overrides', - f'./examples/hpo/rl/config_overrides/{SYS}/{SYS}_{TASK}.yaml', - f'./examples/hpo/rl/{ALGO}/config_overrides/{SYS}/{ALGO}_{SYS}.yaml', - f'./examples/hpo/rl/{ALGO}/config_overrides/{SYS}/{ALGO}_{SYS}_hpo.yaml', + TASK_CONFIG_PATH, + ALGO_CONFIG_PATH, + HPO_CONFIG_PATH, + SAFETY_FILTER_CONFIG_PATH, + '--kv_overrides', f'sf_config.cost_function={MPSC_COST}', '--output_dir', output_dir, '--seed', '7', '--use_gpu', 'True' ] - - fac = ConfigFactory() - fac.add_argument('--load_study', type=bool, default=False, help='whether to load study from a previous HPO.') - fac.add_argument('--sampler', type=str, default='TPESampler', help='which sampler to use in HPO.') - config = fac.merge() - config.hpo_config.trials = 1 - config.hpo_config.repetitions = 1 - config.hpo_config.use_database = True - config.sampler = SAMPLER - - hpo(config) - - # delete output_dir - if os.path.exists(output_dir): - os.system(f'rm -rf {output_dir}') - - # drop database - drop(munch.Munch({'tag': f'{ALGO}_hpo'})) - - -@pytest.mark.parametrize('SYS', ['cartpole']) -@pytest.mark.parametrize('TASK', ['stab']) -@pytest.mark.parametrize('ALGO', ['ppo', 'sac', 'gp_mpc']) -@pytest.mark.parametrize('LOAD', [False, True]) -@pytest.mark.parametrize('SAMPLER', ['TPESampler', 'RandomSampler']) -def test_hpo_parallelism(SYS, TASK, ALGO, LOAD, SAMPLER): - '''Test HPO for in parallel.''' - - pytest.skip('Takes too long.') - - # if LOAD is False, create a study from scratch - if not LOAD: - # drop the database if exists - drop(munch.Munch({'tag': f'{ALGO}_hpo'})) - # create database - create(munch.Munch({'tag': f'{ALGO}_hpo'})) - # output_dir - output_dir = './examples/hpo/results' - - if ALGO == 'gp_mpc': - PRIOR = '150' - sys.argv[1:] = ['--algo', ALGO, - '--task', SYS, - '--overrides', - f'./examples/hpo/gp_mpc/config_overrides/{SYS}/{SYS}_{TASK}.yaml', - f'./examples/hpo/gp_mpc/config_overrides/{SYS}/{ALGO}_{SYS}_{PRIOR}.yaml', - f'./examples/hpo/gp_mpc/config_overrides/{SYS}/{ALGO}_{SYS}_hpo.yaml', - '--output_dir', output_dir, - '--seed', '1', - ] - else: - sys.argv[1:] = ['--algo', ALGO, - '--task', SYS, - '--overrides', - f'./examples/hpo/rl/config_overrides/{SYS}/{SYS}_{TASK}.yaml', - f'./examples/hpo/rl/{ALGO}/config_overrides/{SYS}/{ALGO}_{SYS}.yaml', - f'./examples/hpo/rl/{ALGO}/config_overrides/{SYS}/{ALGO}_{SYS}_hpo.yaml', - '--output_dir', output_dir, - '--seed', '7', - '--use_gpu', 'True' - ] - - fac = ConfigFactory() - fac.add_argument('--load_study', type=bool, default=False, help='whether to load study from a previous HPO.') - fac.add_argument('--sampler', type=str, default='TPESampler', help='which sampler to use in HPO.') - config = fac.merge() - config.hpo_config.trials = 1 - config.hpo_config.repetitions = 1 - config.hpo_config.use_database = True - config.sampler = SAMPLER - - hpo(config) - # if LOAD is True, load a study from a previous HPO study - else: - # first, wait a bit untill the HPO study is created - time.sleep(3) - # output_dir - output_dir = './examples/hpo/results' - if ALGO == 'gp_mpc': - PRIOR = '150' - sys.argv[1:] = ['--algo', ALGO, - '--task', SYS, - '--overrides', - f'./examples/hpo/gp_mpc/config_overrides/{SYS}/{SYS}_{TASK}.yaml', - f'./examples/hpo/gp_mpc/config_overrides/{SYS}/{ALGO}_{SYS}_{PRIOR}.yaml', - f'./examples/hpo/gp_mpc/config_overrides/{SYS}/{ALGO}_{SYS}_hpo.yaml', - '--output_dir', output_dir, - '--seed', '1', - ] - else: - sys.argv[1:] = ['--algo', ALGO, - '--task', SYS, - '--overrides', - f'./examples/hpo/rl/config_overrides/{SYS}/{SYS}_{TASK}.yaml', - f'./examples/hpo/rl/{ALGO}/config_overrides/{SYS}/{ALGO}_{SYS}.yaml', - f'./examples/hpo/rl/{ALGO}/config_overrides/{SYS}/{ALGO}_{SYS}_hpo.yaml', - '--output_dir', output_dir, - '--seed', '8', - '--use_gpu', 'True' - ] - - fac = ConfigFactory() - fac.add_argument('--load_study', type=bool, default=True, help='whether to load study from a previous HPO.') - fac.add_argument('--sampler', type=str, default='TPESampler', help='which sampler to use in HPO.') - config = fac.merge() - config.hpo_config.trials = 1 - config.hpo_config.repetitions = 1 - config.sampler = SAMPLER - - hpo(config) - - # delete output_dir - if os.path.exists(output_dir): - os.system(f'rm -rf {output_dir}') - - # drop database - drop(munch.Munch({'tag': f'{ALGO}_hpo'})) - - -@pytest.mark.parametrize('SYS', ['cartpole']) -@pytest.mark.parametrize('TASK', ['stab']) -@pytest.mark.parametrize('ALGO', ['ppo', 'sac', 'gp_mpc']) -@pytest.mark.parametrize('SAMPLER', ['TPESampler', 'RandomSampler']) -def test_hpo_without_database(SYS, TASK, ALGO, SAMPLER): - '''Test HPO for one single trial without using MySQL database. - (create a study from scratch) - ''' - pytest.skip('Takes too long.') - - # output_dir - output_dir = './examples/hpo/results' - # delete output_dir - if os.path.exists(output_dir): - os.system(f'rm -rf {output_dir}') - - if ALGO == 'gp_mpc': - PRIOR = '150' - sys.argv[1:] = ['--algo', ALGO, - '--task', SYS, - '--overrides', - f'./examples/hpo/gp_mpc/config_overrides/{SYS}/{SYS}_{TASK}.yaml', - f'./examples/hpo/gp_mpc/config_overrides/{SYS}/{ALGO}_{SYS}_{PRIOR}.yaml', - f'./examples/hpo/gp_mpc/config_overrides/{SYS}/{ALGO}_{SYS}_hpo.yaml', - '--output_dir', output_dir, - '--seed', '1', - ] else: sys.argv[1:] = ['--algo', ALGO, '--task', SYS, '--overrides', - f'./examples/hpo/rl/config_overrides/{SYS}/{SYS}_{TASK}.yaml', - f'./examples/hpo/rl/{ALGO}/config_overrides/{SYS}/{ALGO}_{SYS}.yaml', - f'./examples/hpo/rl/{ALGO}/config_overrides/{SYS}/{ALGO}_{SYS}_hpo.yaml', + TASK_CONFIG_PATH, + ALGO_CONFIG_PATH, + HPO_CONFIG_PATH, '--output_dir', output_dir, '--seed', '7', '--use_gpu', 'True' @@ -212,15 +77,31 @@ def test_hpo_without_database(SYS, TASK, ALGO, SAMPLER): fac = ConfigFactory() fac.add_argument('--load_study', type=bool, default=False, help='whether to load study from a previous HPO.') - fac.add_argument('--sampler', type=str, default='TPESampler', help='which sampler to use in HPO.') + fac.add_argument('--sampler', type=str, default='optuna', help='which sampler to use in HPO.') config = fac.merge() config.hpo_config.trials = 1 config.hpo_config.repetitions = 1 - config.hpo_config.use_database = False config.sampler = SAMPLER + # hyperparameter configurations (to make tests faster) + # should belong to the distributions defined in the search space + if 'optimization_iterations' in config.hpo_config.hps_config: + d = len(config.hpo_config.hps_config.optimization_iterations) + config.hpo_config.hps_config.optimization_iterations = [2400] * d + if 'num_epochs' in config.hpo_config.hps_config: + config.hpo_config.hps_config.num_epochs = 2 + if 'max_env_steps' in config.hpo_config.hps_config: + config.hpo_config.hps_config.max_env_steps = 30000 + hpo(config) # delete output_dir if os.path.exists(output_dir): os.system(f'rm -rf {output_dir}') + # delete database + if os.path.exists(f'{ALGO}_hpo_{SAMPLER}.db'): + os.system(f'rm {ALGO}_hpo_{SAMPLER}.db') + if os.path.exists(f'{ALGO}_hpo_{SAMPLER}.db-journal'): + os.system(f'rm {ALGO}_hpo_{SAMPLER}.db-journal') + if os.path.exists(f'{ALGO}_hpo_endpoint.yaml'): + os.system(f'rm {ALGO}_hpo_endpoint.yaml') diff --git a/tests/test_hpo/test_hpo_database.py b/tests/test_hpo/test_hpo_database.py deleted file mode 100644 index ea7ee5d6c..000000000 --- a/tests/test_hpo/test_hpo_database.py +++ /dev/null @@ -1,15 +0,0 @@ -import munch -import pytest - -from safe_control_gym.hyperparameters.database import create, drop - - -@pytest.mark.parametrize('ALGO', ['ppo', 'sac', 'gp_mpc']) -def test_hpo_database(ALGO): - pytest.skip('Requires MySQL Database to be running.') - - # create database - create(munch.Munch({'tag': f'{ALGO}_hpo'})) - - # drop database - drop(munch.Munch({'tag': f'{ALGO}_hpo'})) diff --git a/tests/test_hpo/test_train.py b/tests/test_hpo/test_train.py deleted file mode 100644 index 58e803df3..000000000 --- a/tests/test_hpo/test_train.py +++ /dev/null @@ -1,88 +0,0 @@ -import os -import sys - -import munch -import pytest - -from examples.hpo.hpo_experiment import train -from safe_control_gym.hyperparameters.database import create, drop -from safe_control_gym.utils.configuration import ConfigFactory - - -@pytest.mark.parametrize('SYS', ['cartpole']) -@pytest.mark.parametrize('TASK', ['stab']) -@pytest.mark.parametrize('ALGO', ['ppo', 'sac', 'gp_mpc']) -@pytest.mark.parametrize('HYPERPARAMETER', ['default', 'optimimum']) -def test_train(SYS, TASK, ALGO, HYPERPARAMETER): - '''Test training rl/lbc given a set of hyperparameters.''' - pytest.skip('Takes too long.') - - # output_dir - output_dir = './examples/hpo/results' - # delete output_dir if exists - if os.path.exists(output_dir): - os.system(f'rm -rf {output_dir}') - # drop the database if exists - drop(munch.Munch({'tag': f'{ALGO}_hpo'})) - # create database - create(munch.Munch({'tag': f'{ALGO}_hpo'})) - - # optimized hp path - if ALGO == 'ppo': - if HYPERPARAMETER == 'default': - opt_hp_path = '' - elif HYPERPARAMETER == 'optimimum': - opt_hp_path = 'examples/hpo/rl/ppo/config_overrides/cartpole/optimized_hyperparameters.yaml' - else: - raise ValueError('HYPERPARAMETER must be either default or optimimum') - elif ALGO == 'sac': - if HYPERPARAMETER == 'default': - opt_hp_path = '' - elif HYPERPARAMETER == 'optimimum': - opt_hp_path = 'examples/hpo/rl/sac/config_overrides/cartpole/optimized_hyperparameters.yaml' - else: - raise ValueError('HYPERPARAMETER must be either default or optimimum') - elif ALGO == 'gp_mpc': - if HYPERPARAMETER == 'default': - opt_hp_path = '' - elif HYPERPARAMETER == 'optimimum': - opt_hp_path = 'examples/hpo/gp_mpc/config_overrides/cartpole/optimized_hyperparameters.yaml' - else: - raise ValueError('HYPERPARAMETER must be either default or optimimum') - PRIOR = '150' - - if ALGO == 'gp_mpc': - sys.argv[1:] = ['--algo', ALGO, - '--task', SYS, - '--overrides', - f'./examples/hpo/{ALGO}/config_overrides/{SYS}/{SYS}_{TASK}.yaml', - f'./examples/hpo/{ALGO}/config_overrides/{SYS}/{ALGO}_{SYS}_{PRIOR}.yaml', - '--output_dir', output_dir, - '--opt_hps', opt_hp_path, - '--seed', '2' - ] - else: - sys.argv[1:] = ['--algo', ALGO, - '--task', SYS, - '--overrides', - f'./examples/hpo/rl/config_overrides/{SYS}/{SYS}_{TASK}.yaml', - f'./examples/hpo/rl/{ALGO}/config_overrides/{SYS}/{ALGO}_{SYS}.yaml', - '--output_dir', output_dir, - '--tag', 's1', - '--opt_hps', opt_hp_path, - '--seed', '6', - '--use_gpu', 'True' - ] - - fac = ConfigFactory() - fac.add_argument('--opt_hps', type=str, default='', help='yaml file as a result of HPO.') - config = fac.merge() - - train(config) - - # delete output_dir - if os.path.exists(output_dir): - os.system(f'rm -rf {output_dir}') - - # drop database - drop(munch.Munch({'tag': f'{ALGO}_hpo'}))