diff --git a/mala/__init__.py b/mala/__init__.py index 6646077b5..ae3a35108 100644 --- a/mala/__init__.py +++ b/mala/__init__.py @@ -40,7 +40,7 @@ HyperparameterOAT, HyperparameterNASWOT, HyperparameterOptuna, - HyperparameterACSD, + HyperparameterDescriptorScoring, ACSDAnalyzer, Runner, ) diff --git a/mala/common/parameters.py b/mala/common/parameters.py index 30ce695ba..3e4c531c5 100644 --- a/mala/common/parameters.py +++ b/mala/common/parameters.py @@ -1101,6 +1101,7 @@ def __init__(self): # For accelerated hyperparameter optimization. self.acsd_points = 100 + self.mutual_information_points = 20000 @property def rdb_storage_heartbeat(self): diff --git a/mala/network/__init__.py b/mala/network/__init__.py index eaa50c125..af9235498 100644 --- a/mala/network/__init__.py +++ b/mala/network/__init__.py @@ -11,6 +11,6 @@ from .hyperparameter_oat import HyperparameterOAT from .hyperparameter_naswot import HyperparameterNASWOT from .hyperparameter_optuna import HyperparameterOptuna -from .hyperparameter_acsd import HyperparameterACSD +from .hyperparameter_descriptor_scoring import HyperparameterDescriptorScoring from .acsd_analyzer import ACSDAnalyzer from .runner import Runner diff --git a/mala/network/acsd_analyzer.py b/mala/network/acsd_analyzer.py index cd4602cfb..c02a7f416 100644 --- a/mala/network/acsd_analyzer.py +++ b/mala/network/acsd_analyzer.py @@ -1,28 +1,17 @@ """Class for performing a full ACSD analysis.""" -import itertools -import os - import numpy as np from mala.datahandling.data_converter import ( descriptor_input_types, target_input_types, ) -from mala.descriptors.descriptor import Descriptor -from mala.targets.target import Target -from mala.network.hyperparameter import Hyperparameter -from mala.network.hyper_opt import HyperOpt -from mala.common.parallelizer import get_rank, printout -from mala.descriptors.bispectrum import Bispectrum -from mala.descriptors.atomic_density import AtomicDensity -from mala.descriptors.minterpy_descriptors import MinterpyDescriptors - -descriptor_input_types_acsd = descriptor_input_types + ["numpy", "openpmd"] -target_input_types_acsd = target_input_types + ["numpy", "openpmd"] +from mala.network.descriptor_scoring_optimizer import ( + DescriptorScoringOptimizer, +) -class ACSDAnalyzer(HyperOpt): +class ACSDAnalyzer(DescriptorScoringOptimizer): """ Analyzer based on the ACSD analysis. @@ -47,673 +36,23 @@ class ACSDAnalyzer(HyperOpt): def __init__( self, params, target_calculator=None, descriptor_calculator=None ): - super(ACSDAnalyzer, self).__init__(params) - # Calculators used to parse data from compatible files. - self._target_calculator = target_calculator - if self._target_calculator is None: - self._target_calculator = Target(params) - self._descriptor_calculator = descriptor_calculator - if self._descriptor_calculator is None: - self._descriptor_calculator = Descriptor(params) - if ( - not isinstance(self._descriptor_calculator, Bispectrum) - and not isinstance(self._descriptor_calculator, AtomicDensity) - and not isinstance( - self._descriptor_calculator, MinterpyDescriptors - ) - ): - raise Exception( - "Cannot calculate ACSD for the selected descriptors." - ) - - # Internal variables. - self.__snapshots = [] - self.__snapshot_description = [] - self.__snapshot_units = [] - - # Filled after the analysis. - self._labels = [] - self._study = [] - self._reduced_study = None - self._internal_hyperparam_list = None - - def add_snapshot( - self, - descriptor_input_type=None, - descriptor_input_path=None, - target_input_type=None, - target_input_path=None, - descriptor_units=None, - target_units=None, - ): - """ - Add a snapshot to be processed. - - Parameters - ---------- - descriptor_input_type : string - Type of descriptor data to be processed. - See mala.datahandling.data_converter.descriptor_input_types - for options. - - descriptor_input_path : string - Path of descriptor data to be processed. - - target_input_type : string - Type of target data to be processed. - See mala.datahandling.data_converter.target_input_types - for options. - - target_input_path : string - Path of target data to be processed. - - descriptor_units : string - Units for descriptor data processing. - - target_units : string - Units for target data processing. - """ - # Check the input. - if descriptor_input_type is not None: - if descriptor_input_path is None: - raise Exception( - "Cannot process descriptor data with no path given." - ) - if descriptor_input_type not in descriptor_input_types_acsd: - raise Exception("Cannot process this type of descriptor data.") - else: - raise Exception("Cannot calculate ACSD without descriptor data.") - - if target_input_type is not None: - if target_input_path is None: - raise Exception( - "Cannot process target data with no path given." - ) - if target_input_type not in target_input_types_acsd: - raise Exception("Cannot process this type of target data.") - else: - raise Exception("Cannot calculate ACSD without target data.") - - # Assign info. - self.__snapshots.append( - {"input": descriptor_input_path, "output": target_input_path} - ) - self.__snapshot_description.append( - {"input": descriptor_input_type, "output": target_input_type} - ) - self.__snapshot_units.append( - {"input": descriptor_units, "output": target_units} + super(ACSDAnalyzer, self).__init__( + params, + target_calculator=target_calculator, + descriptor_calculator=descriptor_calculator, ) - def add_hyperparameter(self, name, choices): - """ - Add a hyperparameter to the current investigation. + def _update_logging(self, score, index): + if self.best_score is None: + self.best_score = score + self.best_trial = index + elif score < self.best_score: + self.best_score = score + self.best_trial = index - Parameters - ---------- - name : string - Name of the hyperparameter. Please note that these names always - have to be the same as the parameter names in - ParametersDescriptors. - - choices : - List of possible choices. - """ - if name not in [ - "bispectrum_twojmax", - "bispectrum_cutoff", - "atomic_density_sigma", - "atomic_density_cutoff", - "minterpy_cutoff_cube_size", - "minterpy_polynomial_degree", - "minterpy_lp_norm", - ]: - raise Exception("Unkown hyperparameter for ACSD analysis entered.") - - self.params.hyperparameters.hlist.append( - Hyperparameter( - hotype="acsd", - name=name, - choices=choices, - opttype="categorical", - ) - ) - - def perform_study( - self, file_based_communication=False, return_plotting=False - ): - """ - Perform the study, i.e. the optimization. - - This is done by sampling different descriptors, calculated with - different hyperparameters and then calculating the ACSD. - """ - # Prepare the hyperparameter lists. - self._construct_hyperparam_list() - hyperparameter_tuples = list( - itertools.product(*self._internal_hyperparam_list) - ) - - # Perform the ACSD analysis separately for each snapshot. - best_acsd = None - best_trial = None - for i in range(0, len(self.__snapshots)): - printout( - "Starting ACSD analysis of snapshot", str(i), min_verbosity=1 - ) - current_list = [] - target = self._load_target( - self.__snapshots[i], - self.__snapshot_description[i], - self.__snapshot_units[i], - file_based_communication, - ) - - for idx, hyperparameter_tuple in enumerate(hyperparameter_tuples): - if isinstance(self._descriptor_calculator, Bispectrum): - self.params.descriptors.bispectrum_cutoff = ( - hyperparameter_tuple[0] - ) - self.params.descriptors.bispectrum_twojmax = ( - hyperparameter_tuple[1] - ) - elif isinstance(self._descriptor_calculator, AtomicDensity): - self.params.descriptors.atomic_density_cutoff = ( - hyperparameter_tuple[0] - ) - self.params.descriptors.atomic_density_sigma = ( - hyperparameter_tuple[1] - ) - elif isinstance( - self._descriptor_calculator, MinterpyDescriptors - ): - self.params.descriptors.atomic_density_cutoff = ( - hyperparameter_tuple[0] - ) - self.params.descriptors.atomic_density_sigma = ( - hyperparameter_tuple[1] - ) - self.params.descriptors.minterpy_cutoff_cube_size = ( - hyperparameter_tuple[2] - ) - self.params.descriptors.minterpy_polynomial_degree = ( - hyperparameter_tuple[3] - ) - self.params.descriptors.minterpy_lp_norm = ( - hyperparameter_tuple[4] - ) - - descriptor = self._calculate_descriptors( - self.__snapshots[i], - self.__snapshot_description[i], - self.__snapshot_units[i], - ) - if get_rank() == 0: - acsd = self._calculate_acsd( - descriptor, - target, - self.params.hyperparameters.acsd_points, - descriptor_vectors_contain_xyz=self.params.descriptors.descriptors_contain_xyz, - ) - if not np.isnan(acsd): - if best_acsd is None: - best_acsd = acsd - best_trial = idx - elif acsd < best_acsd: - best_acsd = acsd - best_trial = idx - current_list.append( - list(hyperparameter_tuple) + [acsd] - ) - else: - current_list.append( - list(hyperparameter_tuple) + [np.inf] - ) - - outstring = "[" - for label_id, label in enumerate(self._labels): - outstring += ( - label + ": " + str(hyperparameter_tuple[label_id]) - ) - if label_id < len(self._labels) - 1: - outstring += ", " - outstring += "]" - best_trial_string = ". No suitable trial found yet." - if best_acsd is not None: - best_trial_string = ( - ". Best trial is " - + str(best_trial) - + " with " - + str(best_acsd) - ) - - printout( - "Trial", - idx, - "finished with ACSD=" + str(acsd), - "and parameters:", - outstring + best_trial_string, - min_verbosity=1, - ) - - if get_rank() == 0: - self._study.append(current_list) - - if get_rank() == 0: - self._study = np.mean(self._study, axis=0) - - # TODO: Does this even make sense for the minterpy descriptors? - if return_plotting: - results_to_plot = [] - if len(self._internal_hyperparam_list) == 2: - len_first_dim = len(self._internal_hyperparam_list[0]) - len_second_dim = len(self._internal_hyperparam_list[1]) - for i in range(0, len_first_dim): - results_to_plot.append( - self._study[ - i * len_second_dim : (i + 1) * len_second_dim, - 2:, - ] - ) - - if isinstance(self._descriptor_calculator, Bispectrum): - return results_to_plot, { - "twojmax": self._internal_hyperparam_list[1], - "cutoff": self._internal_hyperparam_list[0], - } - if isinstance(self._descriptor_calculator, AtomicDensity): - return results_to_plot, { - "sigma": self._internal_hyperparam_list[1], - "cutoff": self._internal_hyperparam_list[0], - } - - def set_optimal_parameters(self): - """ - Set the optimal parameters found in the present study. - - The parameters will be written to the parameter object with which the - hyperparameter optimizer was created. - """ - if get_rank() == 0: - minimum_acsd = self._study[np.argmin(self._study[:, -1])] - if len(self._internal_hyperparam_list) == 2: - if isinstance(self._descriptor_calculator, Bispectrum): - self.params.descriptors.bispectrum_cutoff = minimum_acsd[0] - self.params.descriptors.bispectrum_twojmax = int( - minimum_acsd[1] - ) - printout( - "ACSD analysis finished, optimal parameters: ", - ) - printout( - "Bispectrum twojmax: ", - self.params.descriptors.bispectrum_twojmax, - ) - printout( - "Bispectrum cutoff: ", - self.params.descriptors.bispectrum_cutoff, - ) - if isinstance(self._descriptor_calculator, AtomicDensity): - self.params.descriptors.atomic_density_cutoff = ( - minimum_acsd[0] - ) - self.params.descriptors.atomic_density_sigma = ( - minimum_acsd[1] - ) - printout( - "ACSD analysis finished, optimal parameters: ", - ) - printout( - "Atomic density sigma: ", - self.params.descriptors.atomic_density_sigma, - ) - printout( - "Atomic density cutoff: ", - self.params.descriptors.atomic_density_cutoff, - ) - elif len(self._internal_hyperparam_list) == 5: - if isinstance( - self._descriptor_calculator, MinterpyDescriptors - ): - self.params.descriptors.atomic_density_cutoff = ( - minimum_acsd[0] - ) - self.params.descriptors.atomic_density_sigma = ( - minimum_acsd[1] - ) - self.params.descriptors.minterpy_cutoff_cube_size = ( - minimum_acsd[2] - ) - self.params.descriptors.minterpy_polynomial_degree = int( - minimum_acsd[3] - ) - self.params.descriptors.minterpy_lp_norm = int( - minimum_acsd[4] - ) - printout( - "ACSD analysis finished, optimal parameters: ", - ) - printout( - "Atomic density sigma: ", - self.params.descriptors.atomic_density_sigma, - ) - printout( - "Atomic density cutoff: ", - self.params.descriptors.atomic_density_cutoff, - ) - printout( - "Minterpy cube cutoff: ", - self.params.descriptors.minterpy_cutoff_cube_size, - ) - printout( - "Minterpy polynomial degree: ", - self.params.descriptors.minterpy_polynomial_degree, - ) - printout( - "Minterpy LP norm degree: ", - self.params.descriptors.minterpy_lp_norm, - ) - - def _construct_hyperparam_list(self): - if isinstance(self._descriptor_calculator, Bispectrum): - if ( - list( - map( - lambda p: "bispectrum_cutoff" in p.name, - self.params.hyperparameters.hlist, - ) - ).count(True) - == 0 - ): - first_dim_list = [self.params.descriptors.bispectrum_cutoff] - else: - first_dim_list = self.params.hyperparameters.hlist[ - list( - map( - lambda p: "bispectrum_cutoff" in p.name, - self.params.hyperparameters.hlist, - ) - ).index(True) - ].choices - - if ( - list( - map( - lambda p: "bispectrum_twojmax" in p.name, - self.params.hyperparameters.hlist, - ) - ).count(True) - == 0 - ): - second_dim_list = [self.params.descriptors.bispectrum_twojmax] - else: - second_dim_list = self.params.hyperparameters.hlist[ - list( - map( - lambda p: "bispectrum_twojmax" in p.name, - self.params.hyperparameters.hlist, - ) - ).index(True) - ].choices - - self._internal_hyperparam_list = [first_dim_list, second_dim_list] - self._labels = ["cutoff", "twojmax"] - - elif isinstance(self._descriptor_calculator, AtomicDensity): - if ( - list( - map( - lambda p: "atomic_density_cutoff" in p.name, - self.params.hyperparameters.hlist, - ) - ).count(True) - == 0 - ): - first_dim_list = [ - self.params.descriptors.atomic_density_cutoff - ] - else: - first_dim_list = self.params.hyperparameters.hlist[ - list( - map( - lambda p: "atomic_density_cutoff" in p.name, - self.params.hyperparameters.hlist, - ) - ).index(True) - ].choices - - if ( - list( - map( - lambda p: "atomic_density_sigma" in p.name, - self.params.hyperparameters.hlist, - ) - ).count(True) - == 0 - ): - second_dim_list = [ - self.params.descriptors.atomic_density_sigma - ] - else: - second_dim_list = self.params.hyperparameters.hlist[ - list( - map( - lambda p: "atomic_density_sigma" in p.name, - self.params.hyperparameters.hlist, - ) - ).index(True) - ].choices - self._internal_hyperparam_list = [first_dim_list, second_dim_list] - self._labels = ["cutoff", "sigma"] - - elif isinstance(self._descriptor_calculator, MinterpyDescriptors): - if ( - list( - map( - lambda p: "atomic_density_cutoff" in p.name, - self.params.hyperparameters.hlist, - ) - ).count(True) - == 0 - ): - first_dim_list = [ - self.params.descriptors.atomic_density_cutoff - ] - else: - first_dim_list = self.params.hyperparameters.hlist[ - list( - map( - lambda p: "atomic_density_cutoff" in p.name, - self.params.hyperparameters.hlist, - ) - ).index(True) - ].choices - - if ( - list( - map( - lambda p: "atomic_density_sigma" in p.name, - self.params.hyperparameters.hlist, - ) - ).count(True) - == 0 - ): - second_dim_list = [ - self.params.descriptors.atomic_density_sigma - ] - else: - second_dim_list = self.params.hyperparameters.hlist[ - list( - map( - lambda p: "atomic_density_sigma" in p.name, - self.params.hyperparameters.hlist, - ) - ).index(True) - ].choices - - if ( - list( - map( - lambda p: "minterpy_cutoff_cube_size" in p.name, - self.params.hyperparameters.hlist, - ) - ).count(True) - == 0 - ): - third_dim_list = [ - self.params.descriptors.minterpy_cutoff_cube_size - ] - else: - third_dim_list = self.params.hyperparameters.hlist[ - list( - map( - lambda p: "minterpy_cutoff_cube_size" in p.name, - self.params.hyperparameters.hlist, - ) - ).index(True) - ].choices - - if ( - list( - map( - lambda p: "minterpy_polynomial_degree" in p.name, - self.params.hyperparameters.hlist, - ) - ).count(True) - == 0 - ): - fourth_dim_list = [ - self.params.descriptors.minterpy_polynomial_degree - ] - else: - fourth_dim_list = self.params.hyperparameters.hlist[ - list( - map( - lambda p: "minterpy_polynomial_degree" in p.name, - self.params.hyperparameters.hlist, - ) - ).index(True) - ].choices - - if ( - list( - map( - lambda p: "minterpy_lp_norm" in p.name, - self.params.hyperparameters.hlist, - ) - ).count(True) - == 0 - ): - fifth_dim_list = [self.params.descriptors.minterpy_lp_norm] - else: - fifth_dim_list = self.params.hyperparameters.hlist[ - list( - map( - lambda p: "minterpy_lp_norm" in p.name, - self.params.hyperparameters.hlist, - ) - ).index(True) - ].choices - - self._internal_hyperparam_list = [ - first_dim_list, - second_dim_list, - third_dim_list, - fourth_dim_list, - fifth_dim_list, - ] - self._labels = [ - "cutoff", - "sigma", - "minterpy_cutoff", - "minterpy_polynomial_degree", - "minterpy_lp_norm", - ] - - else: - raise Exception( - "Unkown descriptor calculator selected. Cannot " - "calculate ACSD." - ) - - def _calculate_descriptors(self, snapshot, description, original_units): - descriptor_calculation_kwargs = {} - tmp_input = None - if description["input"] == "espresso-out": - descriptor_calculation_kwargs["units"] = original_units["input"] - tmp_input, local_size = ( - self._descriptor_calculator.calculate_from_qe_out( - snapshot["input"], **descriptor_calculation_kwargs - ) - ) - - elif description["input"] is None: - # In this case, only the output is processed. - pass - - else: - raise Exception( - "Unknown file extension, cannot convert descriptor" - ) - if self.params.descriptors._configuration["mpi"]: - tmp_input = self._descriptor_calculator.gather_descriptors( - tmp_input - ) - - return tmp_input - - def _load_target( - self, snapshot, description, original_units, file_based_communication - ): - memmap = None - if ( - self.params.descriptors._configuration["mpi"] - and file_based_communication - ): - memmap = "acsd.out.npy_temp" - - target_calculator_kwargs = {} - - # Read the output data - tmp_output = None - if description["output"] == ".cube": - target_calculator_kwargs["units"] = original_units["output"] - target_calculator_kwargs["use_memmap"] = memmap - # If no units are provided we just assume standard units. - tmp_output = self._target_calculator.read_from_cube( - snapshot["output"], **target_calculator_kwargs - ) - - elif description["output"] == ".xsf": - target_calculator_kwargs["units"] = original_units["output"] - target_calculator_kwargs["use_memmap"] = memmap - # If no units are provided we just assume standard units. - tmp_output = self._target_calculator.read_from_xsf( - snapshot["output"], **target_calculator_kwargs - ) - - elif description["output"] == "numpy": - if get_rank() == 0: - tmp_output = self._target_calculator.read_from_numpy_file( - snapshot["output"], units=original_units["output"] - ) - - elif description["output"] == "openpmd": - if get_rank() == 0: - tmp_output = self._target_calculator.read_from_numpy_file( - snapshot["output"], units=original_units["output"] - ) - else: - raise Exception("Unknown file extension, cannot convert target") - - if get_rank() == 0: - if ( - self.params.targets._configuration["mpi"] - and file_based_communication - ): - os.remove(memmap) - - return tmp_output + def get_best_trial(self): + """Different from best_trial because of parallelization.""" + return self._study[np.argmin(self._study[:, -1])] @staticmethod def _calculate_cosine_similarities( @@ -800,6 +139,14 @@ def _calculate_cosine_similarities( return np.array(similarity_array) + def _calculate_score(self, descriptor, target): + return self._calculate_acsd( + descriptor, + target, + self.params.hyperparameters.acsd_points, + descriptor_vectors_contain_xyz=self.params.descriptors.descriptors_contain_xyz, + ) + @staticmethod def _calculate_acsd( descriptor_data, diff --git a/mala/network/descriptor_scoring_optimizer.py b/mala/network/descriptor_scoring_optimizer.py new file mode 100644 index 000000000..71839c0c3 --- /dev/null +++ b/mala/network/descriptor_scoring_optimizer.py @@ -0,0 +1,540 @@ +"""Base class for ACSD, mutual information and related methods.""" + +from abc import abstractmethod, ABC +import itertools +import os + +import numpy as np + +from mala.datahandling.data_converter import ( + descriptor_input_types, + target_input_types, +) +from mala.descriptors.descriptor import Descriptor +from mala.targets.target import Target +from mala.network.hyperparameter import Hyperparameter +from mala.network.hyper_opt import HyperOpt +from mala.common.parallelizer import get_rank, printout +from mala.descriptors.bispectrum import Bispectrum +from mala.descriptors.atomic_density import AtomicDensity +from mala.descriptors.minterpy_descriptors import MinterpyDescriptors + +descriptor_input_types_descriptor_scoring = descriptor_input_types + [ + "numpy", + "openpmd", +] +target_input_types_descriptor_scoring = target_input_types + [ + "numpy", + "openpmd", +] + + +class DescriptorScoringOptimizer(HyperOpt, ABC): + """ + Base class for all training-free descriptor hyperparameter optimizers. + + These optimizer use alternative metrics (ACSD, mutual information, etc. + to tune descriptor hyperparameters. + + Parameters + ---------- + params : mala.common.parametes.Parameters + Parameters used to create this hyperparameter optimizer. + + descriptor_calculator : mala.descriptors.descriptor.Descriptor + The descriptor calculator used for parsing/converting fingerprint + data. If None, the descriptor calculator will be created by this + object using the parameters provided. Default: None + + target_calculator : mala.targets.target.Target + Target calculator used for parsing/converting target data. If None, + the target calculator will be created by this object using the + parameters provided. Default: None + """ + + def __init__( + self, params, target_calculator=None, descriptor_calculator=None + ): + super(DescriptorScoringOptimizer, self).__init__(params) + # Calculators used to parse data from compatible files. + self._target_calculator = target_calculator + if self._target_calculator is None: + self._target_calculator = Target(params) + self._descriptor_calculator = descriptor_calculator + if self._descriptor_calculator is None: + self._descriptor_calculator = Descriptor(params) + if not isinstance( + self._descriptor_calculator, Bispectrum + ) and not isinstance(self._descriptor_calculator, AtomicDensity): + raise Exception("Unsupported descriptor type selected.") + + # Internal variables. + self._snapshots = [] + self._snapshot_description = [] + self._snapshot_units = [] + + # Filled after the analysis. + self._labels = [] + self._study = [] + self._reduced_study = None + self._internal_hyperparam_list = None + + # Logging metrics. + self.best_score = None + self.best_trial_index = None + + def add_snapshot( + self, + descriptor_input_type=None, + descriptor_input_path=None, + target_input_type=None, + target_input_path=None, + descriptor_units=None, + target_units=None, + ): + """ + Add a snapshot to be processed. + + Parameters + ---------- + descriptor_input_type : string + Type of descriptor data to be processed. + See mala.datahandling.data_converter.descriptor_input_types + for options. + + descriptor_input_path : string + Path of descriptor data to be processed. + + target_input_type : string + Type of target data to be processed. + See mala.datahandling.data_converter.target_input_types + for options. + + target_input_path : string + Path of target data to be processed. + + descriptor_units : string + Units for descriptor data processing. + + target_units : string + Units for target data processing. + """ + # Check the input. + if descriptor_input_type is not None: + if descriptor_input_path is None: + raise Exception( + "Cannot process descriptor data with no path given." + ) + if ( + descriptor_input_type + not in descriptor_input_types_descriptor_scoring + ): + raise Exception("Cannot process this type of descriptor data.") + else: + raise Exception( + "Cannot calculate scoring metrics without descriptor data." + ) + + if target_input_type is not None: + if target_input_path is None: + raise Exception( + "Cannot process target data with no path given." + ) + if target_input_type not in target_input_types_descriptor_scoring: + raise Exception("Cannot process this type of target data.") + else: + raise Exception( + "Cannot calculate scoring metrics without target data." + ) + + # Assign info. + self._snapshots.append( + {"input": descriptor_input_path, "output": target_input_path} + ) + self._snapshot_description.append( + {"input": descriptor_input_type, "output": target_input_type} + ) + self._snapshot_units.append( + {"input": descriptor_units, "output": target_units} + ) + + def add_hyperparameter(self, name, choices): + """ + Add a hyperparameter to the current investigation. + + Parameters + ---------- + name : string + Name of the hyperparameter. Please note that these names always + have to be the same as the parameter names in + ParametersDescriptors. + + choices : + List of possible choices. + """ + if name not in [ + "bispectrum_twojmax", + "bispectrum_cutoff", + "atomic_density_sigma", + "atomic_density_cutoff", + ]: + raise Exception( + "Unkown hyperparameter for training free descriptor" + "hyperparameter optimization entered." + ) + + self.params.hyperparameters.hlist.append( + Hyperparameter( + hotype="descriptor_scoring", + name=name, + choices=choices, + opttype="categorical", + ) + ) + + def perform_study( + self, file_based_communication=False, return_plotting=False + ): + """ + Perform the study, i.e. the optimization. + + This is done by sampling different descriptors, calculated with + different hyperparameters and then calculating the ACSD. + """ + # Prepare the hyperparameter lists. + self._construct_hyperparam_list() + hyperparameter_tuples = list( + itertools.product(*self._internal_hyperparam_list) + ) + + # Perform the descriptor scoring analysis separately for each snapshot. + self.best_trial_index = None + self.best_score = None + for i in range(0, len(self._snapshots)): + printout( + "Starting descriptor scoring analysis of snapshot", + str(i), + min_verbosity=1, + ) + current_list = [] + target = self._load_target( + self._snapshots[i], + self._snapshot_description[i], + self._snapshot_units[i], + file_based_communication, + ) + + for idx, hyperparameter_tuple in enumerate(hyperparameter_tuples): + if isinstance(self._descriptor_calculator, Bispectrum): + self.params.descriptors.bispectrum_cutoff = ( + hyperparameter_tuple[0] + ) + self.params.descriptors.bispectrum_twojmax = ( + hyperparameter_tuple[1] + ) + elif isinstance(self._descriptor_calculator, AtomicDensity): + self.params.descriptors.atomic_density_cutoff = ( + hyperparameter_tuple[0] + ) + self.params.descriptors.atomic_density_sigma = ( + hyperparameter_tuple[1] + ) + + descriptor = self._calculate_descriptors( + self._snapshots[i], + self._snapshot_description[i], + self._snapshot_units[i], + ) + if get_rank() == 0: + score = self._calculate_score( + descriptor, + target, + ) + if not np.isnan(score): + self._update_logging(score, idx) + current_list.append( + list(hyperparameter_tuple) + [score] + ) + else: + current_list.append( + list(hyperparameter_tuple) + [np.inf] + ) + + outstring = "[" + for label_id, label in enumerate(self._labels): + outstring += ( + label + ": " + str(hyperparameter_tuple[label_id]) + ) + if label_id < len(self._labels) - 1: + outstring += ", " + outstring += "]" + best_trial_string = ". No suitable trial found yet." + if self.best_score is not None: + best_trial_string = ( + ". Best trial is " + + str(self.best_trial_index) + + " with " + + str(self.best_score) + ) + + printout( + "Trial", + idx, + "finished with score=" + str(score), + "and parameters:", + outstring + best_trial_string, + min_verbosity=1, + ) + + if get_rank() == 0: + self._study.append(current_list) + + if get_rank() == 0: + self._study = np.mean(self._study, axis=0) + + if return_plotting: + results_to_plot = [] + if len(self._internal_hyperparam_list) == 2: + len_first_dim = len(self._internal_hyperparam_list[0]) + len_second_dim = len(self._internal_hyperparam_list[1]) + for i in range(0, len_first_dim): + results_to_plot.append( + self._study[ + i * len_second_dim : (i + 1) * len_second_dim, + 2:, + ] + ) + + if isinstance(self._descriptor_calculator, Bispectrum): + return results_to_plot, { + "twojmax": self._internal_hyperparam_list[1], + "cutoff": self._internal_hyperparam_list[0], + } + if isinstance(self._descriptor_calculator, AtomicDensity): + return results_to_plot, { + "sigma": self._internal_hyperparam_list[1], + "cutoff": self._internal_hyperparam_list[0], + } + + def set_optimal_parameters(self): + if get_rank() == 0: + best_trial = self.get_best_trial() + minimum_score = self._study[np.argmin(self._study[:, -1])] + if isinstance(self._descriptor_calculator, Bispectrum): + self.params.descriptors.bispectrum_cutoff = best_trial[0] + self.params.descriptors.bispectrum_twojmax = int(best_trial[1]) + printout( + "ACSD analysis finished, optimal parameters: ", + ) + printout( + "Bispectrum twojmax: ", + self.params.descriptors.bispectrum_twojmax, + ) + printout( + "Bispectrum cutoff: ", + self.params.descriptors.bispectrum_cutoff, + ) + if isinstance(self._descriptor_calculator, AtomicDensity): + self.params.descriptors.atomic_density_cutoff = best_trial[0] + self.params.descriptors.atomic_density_sigma = best_trial[1] + printout( + "ACSD analysis finished, optimal parameters: ", + ) + printout( + "Atomic density sigma: ", + self.params.descriptors.atomic_density_sigma, + ) + printout( + "Atomic density cutoff: ", + self.params.descriptors.atomic_density_cutoff, + ) + + @abstractmethod + def get_best_trial(self): + """Different from best_trial because of parallelization.""" + pass + + def _construct_hyperparam_list(self): + if isinstance(self._descriptor_calculator, Bispectrum): + if ( + list( + map( + lambda p: "bispectrum_cutoff" in p.name, + self.params.hyperparameters.hlist, + ) + ).count(True) + == 0 + ): + first_dim_list = [self.params.descriptors.bispectrum_cutoff] + else: + first_dim_list = self.params.hyperparameters.hlist[ + list( + map( + lambda p: "bispectrum_cutoff" in p.name, + self.params.hyperparameters.hlist, + ) + ).index(True) + ].choices + + if ( + list( + map( + lambda p: "bispectrum_twojmax" in p.name, + self.params.hyperparameters.hlist, + ) + ).count(True) + == 0 + ): + second_dim_list = [self.params.descriptors.bispectrum_twojmax] + else: + second_dim_list = self.params.hyperparameters.hlist[ + list( + map( + lambda p: "bispectrum_twojmax" in p.name, + self.params.hyperparameters.hlist, + ) + ).index(True) + ].choices + + self._internal_hyperparam_list = [first_dim_list, second_dim_list] + self._labels = ["cutoff", "twojmax"] + + elif isinstance(self._descriptor_calculator, AtomicDensity): + if ( + list( + map( + lambda p: "atomic_density_cutoff" in p.name, + self.params.hyperparameters.hlist, + ) + ).count(True) + == 0 + ): + first_dim_list = [ + self.params.descriptors.atomic_density_cutoff + ] + else: + first_dim_list = self.params.hyperparameters.hlist[ + list( + map( + lambda p: "atomic_density_cutoff" in p.name, + self.params.hyperparameters.hlist, + ) + ).index(True) + ].choices + + if ( + list( + map( + lambda p: "atomic_density_sigma" in p.name, + self.params.hyperparameters.hlist, + ) + ).count(True) + == 0 + ): + second_dim_list = [ + self.params.descriptors.atomic_density_sigma + ] + else: + second_dim_list = self.params.hyperparameters.hlist[ + list( + map( + lambda p: "atomic_density_sigma" in p.name, + self.params.hyperparameters.hlist, + ) + ).index(True) + ].choices + self._internal_hyperparam_list = [first_dim_list, second_dim_list] + self._labels = ["cutoff", "sigma"] + + else: + raise Exception( + "Unkown descriptor calculator selected. Cannot " + "perform descriptor scoring optimization." + ) + + def _calculate_descriptors(self, snapshot, description, original_units): + descriptor_calculation_kwargs = {} + tmp_input = None + if description["input"] == "espresso-out": + descriptor_calculation_kwargs["units"] = original_units["input"] + tmp_input, local_size = ( + self._descriptor_calculator.calculate_from_qe_out( + snapshot["input"], **descriptor_calculation_kwargs + ) + ) + + elif description["input"] is None: + # In this case, only the output is processed. + pass + + else: + raise Exception( + "Unknown file extension, cannot convert descriptor" + ) + if self.params.descriptors._configuration["mpi"]: + tmp_input = self._descriptor_calculator.gather_descriptors( + tmp_input + ) + + return tmp_input + + def _load_target( + self, snapshot, description, original_units, file_based_communication + ): + memmap = None + if ( + self.params.descriptors._configuration["mpi"] + and file_based_communication + ): + memmap = "acsd.out.npy_temp" + + target_calculator_kwargs = {} + + # Read the output data + tmp_output = None + if description["output"] == ".cube": + target_calculator_kwargs["units"] = original_units["output"] + target_calculator_kwargs["use_memmap"] = memmap + # If no units are provided we just assume standard units. + tmp_output = self._target_calculator.read_from_cube( + snapshot["output"], **target_calculator_kwargs + ) + + elif description["output"] == ".xsf": + target_calculator_kwargs["units"] = original_units["output"] + target_calculator_kwargs["use_memmap"] = memmap + # If no units are provided we just assume standard units. + tmp_output = self._target_calculator.read_from_xsf( + snapshot["output"], **target_calculator_kwargs + ) + + elif description["output"] == "numpy": + if get_rank() == 0: + tmp_output = self._target_calculator.read_from_numpy_file( + snapshot["output"], units=original_units["output"] + ) + + elif description["output"] == "openpmd": + if get_rank() == 0: + tmp_output = self._target_calculator.read_from_numpy_file( + snapshot["output"], units=original_units["output"] + ) + else: + raise Exception("Unknown file extension, cannot convert target") + + if get_rank() == 0: + if ( + self.params.targets._configuration["mpi"] + and file_based_communication + ): + os.remove(memmap) + + return tmp_output + + @abstractmethod + def _update_logging(self, score, index): + pass + + @abstractmethod + def _calculate_score(self, descriptor, target): + pass diff --git a/mala/network/hyperparameter.py b/mala/network/hyperparameter.py index 4294d6a4f..17f0111ab 100644 --- a/mala/network/hyperparameter.py +++ b/mala/network/hyperparameter.py @@ -158,12 +158,12 @@ def __new__( hparam = HyperparameterOAT( hotype=hotype, opttype=opttype, name=name, choices=choices ) - if hotype == "acsd": - from mala.network.hyperparameter_acsd import ( - HyperparameterACSD, + if hotype == "descriptor_scoring": + from mala.network.hyperparameter_descriptor_scoring import ( + HyperparameterDescriptorScoring, ) - hparam = HyperparameterACSD( + hparam = HyperparameterDescriptorScoring( hotype=hotype, opttype=opttype, name=name, diff --git a/mala/network/hyperparameter_acsd.py b/mala/network/hyperparameter_descriptor_scoring.py similarity index 92% rename from mala/network/hyperparameter_acsd.py rename to mala/network/hyperparameter_descriptor_scoring.py index 6ecee0e76..34428f1d6 100644 --- a/mala/network/hyperparameter_acsd.py +++ b/mala/network/hyperparameter_descriptor_scoring.py @@ -3,7 +3,7 @@ from mala.network.hyperparameter import Hyperparameter -class HyperparameterACSD(Hyperparameter): +class HyperparameterDescriptorScoring(Hyperparameter): """Represents an optuna parameter. Parameters @@ -44,7 +44,7 @@ def __init__( high=0, choices=None, ): - super(HyperparameterACSD, self).__init__( + super(HyperparameterDescriptorScoring, self).__init__( opttype=opttype, name=name, low=low, high=high, choices=choices ) diff --git a/mala/network/mutual_information_analyzer.py b/mala/network/mutual_information_analyzer.py index 8dd7566e2..073210ec3 100644 --- a/mala/network/mutual_information_analyzer.py +++ b/mala/network/mutual_information_analyzer.py @@ -1,26 +1,17 @@ """Class for performing a full mutual information analysis.""" -import itertools -import os - import numpy as np from mala.datahandling.data_converter import ( descriptor_input_types, target_input_types, ) -from mala.network.acsd_analyzer import ACSDAnalyzer +from mala.network.descriptor_scoring_optimizer import ( + DescriptorScoringOptimizer, +) import sklearn.mixture import sklearn.covariance import matplotlib.pyplot as plt -from sklearn.preprocessing import Normalizer -from mala.common.parallelizer import get_rank, printout -from mala.descriptors.bispectrum import Bispectrum -from mala.descriptors.atomic_density import AtomicDensity -from mala.descriptors.minterpy_descriptors import MinterpyDescriptors - -descriptor_input_types_acsd = descriptor_input_types + ["numpy", "openpmd"] -target_input_types_acsd = target_input_types + ["numpy", "openpmd"] def normalize(data): @@ -102,7 +93,7 @@ def mutual_information( return mi -class MutualInformationAnalyzer(ACSDAnalyzer): +class MutualInformationAnalyzer(DescriptorScoringOptimizer): """ Analyzer based on mutual information analysis. @@ -131,93 +122,28 @@ def __init__( descriptor_calculator=descriptor_calculator, ) - def set_optimal_parameters(self): - """ - Set the optimal parameters found in the present study. - - The parameters will be written to the parameter object with which the - hyperparameter optimizer was created. - """ - if get_rank() == 0: - minimum_acsd = self.study[np.argmax(self.study[:, -1])] - if len(self.internal_hyperparam_list) == 2: - if isinstance(self.descriptor_calculator, Bispectrum): - self.params.descriptors.bispectrum_cutoff = minimum_acsd[0] - self.params.descriptors.bispectrum_twojmax = int( - minimum_acsd[1] - ) - printout( - "ACSD analysis finished, optimal parameters: ", - ) - printout( - "Bispectrum twojmax: ", - self.params.descriptors.bispectrum_twojmax, - ) - printout( - "Bispectrum cutoff: ", - self.params.descriptors.bispectrum_cutoff, - ) - if isinstance(self.descriptor_calculator, AtomicDensity): - self.params.descriptors.atomic_density_cutoff = ( - minimum_acsd[0] - ) - self.params.descriptors.atomic_density_sigma = ( - minimum_acsd[1] - ) - printout( - "ACSD analysis finished, optimal parameters: ", - ) - printout( - "Atomic density sigma: ", - self.params.descriptors.atomic_density_sigma, - ) - printout( - "Atomic density cutoff: ", - self.params.descriptors.atomic_density_cutoff, - ) - elif len(self.internal_hyperparam_list) == 5: - if isinstance(self.descriptor_calculator, MinterpyDescriptors): - self.params.descriptors.atomic_density_cutoff = ( - minimum_acsd[0] - ) - self.params.descriptors.atomic_density_sigma = ( - minimum_acsd[1] - ) - self.params.descriptors.minterpy_cutoff_cube_size = ( - minimum_acsd[2] - ) - self.params.descriptors.minterpy_polynomial_degree = int( - minimum_acsd[3] - ) - self.params.descriptors.minterpy_lp_norm = int( - minimum_acsd[4] - ) - printout( - "ACSD analysis finished, optimal parameters: ", - ) - printout( - "Atomic density sigma: ", - self.params.descriptors.atomic_density_sigma, - ) - printout( - "Atomic density cutoff: ", - self.params.descriptors.atomic_density_cutoff, - ) - printout( - "Minterpy cube cutoff: ", - self.params.descriptors.minterpy_cutoff_cube_size, - ) - printout( - "Minterpy polynomial degree: ", - self.params.descriptors.minterpy_polynomial_degree, - ) - printout( - "Minterpy LP norm degree: ", - self.params.descriptors.minterpy_lp_norm, - ) + def get_best_trial(self): + """Different from best_trial because of parallelization.""" + return self._study[np.argmax(self._study[:, -1])] + + def _update_logging(self, score, index): + if self.best_score is None: + self.best_score = score + self.best_trial = index + elif score > self.best_score: + self.best_score = score + self.best_trial = index + + def _calculate_score(self, descriptor, target): + return self._calculate_mutual_information( + descriptor, + target, + self.params.hyperparameters.mutual_information_points, + descriptor_vectors_contain_xyz=self.params.descriptors.descriptors_contain_xyz, + ) @staticmethod - def _calculate_acsd( + def _calculate_mutual_information( descriptor_data, ldos_data, n_samples,