From 5d4ed2db34b59aad598c1c3bb0ec556cf9ba1cf8 Mon Sep 17 00:00:00 2001 From: mccrindlebrian Date: Wed, 9 Oct 2024 23:03:04 -0400 Subject: [PATCH 1/3] first iteration of lowest to largest energy analysis to optimize throughput of valis --- folding/validators/forward.py | 9 ++- folding/validators/protein.py | 141 ++++++++++++++++---------------- folding/validators/reward.py | 148 +++++++++++++++++++--------------- 3 files changed, 161 insertions(+), 137 deletions(-) diff --git a/folding/validators/forward.py b/folding/validators/forward.py index 03fbc79b..8e501a3d 100644 --- a/folding/validators/forward.py +++ b/folding/validators/forward.py @@ -14,6 +14,7 @@ from folding.utils.openmm_forcefields import FORCEFIELD_REGISTRY from folding.validators.hyperparameters import HyperParameters +from folding.validators.reward import RewardPipeline from folding.utils.ops import ( load_and_sample_random_pdb_ids, get_response_info, @@ -108,12 +109,12 @@ def run_step( ) return event - energies, energy_event = get_energies( - protein=protein, responses=responses_serving, uids=active_uids - ) + RP = RewardPipeline(protein=protein, responses=responses_serving, uids=active_uids) + RP.process_energies() + RP.check_run_validities() # Log the step event. - event.update({"energies": energies.tolist(), **energy_event}) + event.update({"energies": RP.energies.tolist(), **RP.event}) if len(protein.md_inputs) > 0: event["md_inputs"] = list(protein.md_inputs.keys()) diff --git a/folding/validators/protein.py b/folding/validators/protein.py index 1328a5de..b0ae482a 100644 --- a/folding/validators/protein.py +++ b/folding/validators/protein.py @@ -4,20 +4,22 @@ import base64 import random import shutil +from pathlib import Path + from collections import defaultdict from dataclasses import dataclass -from pathlib import Path -from typing import Dict, List, Literal +from typing import Dict, List, Literal, Tuple import bittensor as bt import numpy as np import pandas as pd import plotly.express as px + from openmm import app, unit from pdbfixer import PDBFixer -from folding.base.simulation import OpenMMSimulation from folding.store import Job +from folding.base.simulation import OpenMMSimulation from folding.utils.opemm_simulation_config import SimulationConfig from folding.utils.ops import ( OpenMMException, @@ -146,6 +148,14 @@ def _get_pdb_complexity(pdb_path): pdb_complexity[key] += 1 return pdb_complexity + @staticmethod + def save_pdb(output_path: str, simulation: app.Simulation): + """Save the pdb file to the output path.""" + positions = simulation.context.getState(getPositions=True).getPositions() + topology = simulation.topology + with open(output_path, "w") as f: + app.PDBFile.writeFile(topology, positions, f) + def gather_pdb_id(self): if self.pdb_id is None: self.pdb_id = load_and_sample_random_pdb_ids( @@ -365,17 +375,16 @@ def delete_files(self, directory: str): # os.rmdir(output_directory) def get_miner_data_directory(self, hotkey: str): - self.miner_data_directory = os.path.join(self.validator_directory, hotkey[:8]) + return os.path.join(self.validator_directory, hotkey[:8]) def process_md_output( self, md_output: dict, seed: int, state: str, hotkey: str - ) -> bool: + ) -> Tuple[bool, Dict]: MIN_LOGGING_ENTRIES = 500 MIN_SIMULATION_STEPS = 5000 required_files_extensions = ["cpt", "log"] hotkey_alias = hotkey[:8] - self.current_state = state # This is just mapper from the file extension to the name of the file stores in the dict. self.md_outputs_exts = { @@ -386,69 +395,64 @@ def process_md_output( bt.logging.warning( f"Miner {hotkey_alias} returned empty md_output... Skipping!" ) - return False + return False, None for ext in required_files_extensions: if ext not in self.md_outputs_exts: bt.logging.error(f"Missing file with extension {ext} in md_output") - return False + return False, None - self.get_miner_data_directory(hotkey=hotkey) + miner_data_directory = self.get_miner_data_directory(hotkey=hotkey) # Save files so we can check the hash later. self.save_files( files=md_output, - output_directory=self.miner_data_directory, + output_directory=miner_data_directory, ) + try: # NOTE: The seed written in the self.system_config is not used here # because the miner could have used something different and we want to # make sure that we are using the correct seed. bt.logging.info( - f"Recreating miner {hotkey_alias} simulation in state: {self.current_state}" + f"Recreating miner {hotkey_alias} simulation in state: {state}" ) - self.simulation, self.system_config = self.create_simulation( + simulation, local_system_config = self.create_simulation( pdb=self.load_pdb_file(pdb_file=self.pdb_location), system_config=self.system_config.get_config(), seed=seed, state=state, ) - checkpoint_path = os.path.join( - self.miner_data_directory, f"{self.current_state}.cpt" - ) + checkpoint_path = os.path.join(miner_data_directory, f"{state}.cpt") log_file_path = os.path.join( - self.miner_data_directory, self.md_outputs_exts["log"] + miner_data_directory, self.md_outputs_exts["log"] ) - self.simulation.loadCheckpoint(checkpoint_path) - self.log_file = pd.read_csv(log_file_path) - self.log_step = self.log_file['#"Step"'].iloc[-1] + simulation.loadCheckpoint(checkpoint_path) + log_file = pd.read_csv(log_file_path) + log_step = log_file['#"Step"'].iloc[-1] # Checks to see if we have enough steps in the log file to start validation - if len(self.log_file) < MIN_LOGGING_ENTRIES: + if len(log_file) < MIN_LOGGING_ENTRIES: raise ValidationError( f"Miner {hotkey_alias} did not run enough steps in the simulation... Skipping!" ) # Make sure that we are enough steps ahead in the log file compared to the checkpoint file. # Checks if log_file is MIN_STEPS steps ahead of checkpoint - if (self.log_step - self.simulation.currentStep) < MIN_SIMULATION_STEPS: + if (log_step - simulation.currentStep) < MIN_SIMULATION_STEPS: # If the miner did not run enough steps, we will load the old checkpoint - checkpoint_path = os.path.join( - self.miner_data_directory, f"{self.current_state}_old.cpt" - ) + checkpoint_path = os.path.join(miner_data_directory, f"{state}_old.cpt") if os.path.exists(checkpoint_path): bt.logging.warning( f"Miner {hotkey_alias} did not run enough steps since last checkpoint... Loading old checkpoint" ) - self.simulation.loadCheckpoint(checkpoint_path) + simulation.loadCheckpoint(checkpoint_path) # Checking to see if the old checkpoint has enough steps to validate - if ( - self.log_step - self.simulation.currentStep - ) < MIN_SIMULATION_STEPS: + if (log_step - simulation.currentStep) < MIN_SIMULATION_STEPS: raise ValidationError( f"Miner {hotkey_alias} did not run enough steps in the simulation... Skipping!" ) @@ -457,31 +461,39 @@ def process_md_output( f"Miner {hotkey_alias} did not run enough steps and no old checkpoint found... Skipping!" ) - self.cpt_step = self.simulation.currentStep - self.checkpoint_path = checkpoint_path - # Save the system config to the miner data directory system_config_path = os.path.join( - self.miner_data_directory, f"miner_system_config_{seed}.pkl" + miner_data_directory, f"miner_system_config_{seed}.pkl" ) if not os.path.exists(system_config_path): write_pkl( - data=self.system_config, + data=local_system_config, path=system_config_path, write_mode="wb", ) except ValidationError as E: bt.logging.warning(f"{E}") - return False + return False, None except Exception as e: bt.logging.error(f"Failed to recreate simulation: {e}") - return False + return False, None - return True + return True, { + "simulation": simulation, + "log_file": log_file, + "log_step": log_step, + } - def is_run_valid(self): + def is_run_valid( + self, + simulation: app.Simulation, + state: str, + hotkey: str, + log_file: pd.DataFrame, + log_step: int, + ) -> Tuple[bool, list, list]: """ Checks if the run is valid by comparing the potential energy values between the current simulation and a reference log file. @@ -493,19 +505,18 @@ def is_run_valid(self): # The percentage that we allow the energy to differ from the miner to the validator. ANOMALY_THRESHOLD = 0.5 + miner_data_directory = self.get_miner_data_directory(hotkey=hotkey) # Check to see if we have a logging resolution of 10 or better, if not the run is not valid - if (self.log_file['#"Step"'][1] - self.log_file['#"Step"'][0]) > 10: - return False + if (log_file['#"Step"'][1] - log_file['#"Step"'][0]) > 10: + return False, [], [] # Run the simulation at most 3000 steps - steps_to_run = min(3000, self.log_step - self.cpt_step) + steps_to_run = min(3000, log_step - self.simulation.currentStep) - self.simulation.reporters.append( + simulation.reporters.append( app.StateDataReporter( - os.path.join( - self.miner_data_directory, f"check_{self.current_state}.log" - ), + os.path.join(miner_data_directory, f"check_{state}.log"), 10, step=True, potentialEnergy=True, @@ -513,21 +524,21 @@ def is_run_valid(self): ) bt.logging.info( - f"Running {steps_to_run} steps. log_step: {self.log_step}, cpt_step: {self.cpt_step}" + f"Running {steps_to_run} steps. log_step: {log_step}, cpt_step: {simulation.currentStep}" ) - self.simulation.step(steps_to_run) + simulation.step(steps_to_run) check_log_file = pd.read_csv( - os.path.join(self.miner_data_directory, f"check_{self.current_state}.log") + os.path.join(miner_data_directory, f"check_{state}.log") ) - max_step = self.cpt_step + steps_to_run + max_step = simulation.currentStep + steps_to_run check_energies: np.ndarray = check_log_file["Potential Energy (kJ/mole)"].values - miner_energies: np.ndarray = self.log_file[ - (self.log_file['#"Step"'] > self.cpt_step) - & (self.log_file['#"Step"'] <= max_step) + miner_energies: np.ndarray = log_file[ + (log_file['#"Step"'] > simulation.currentStep) + & (log_file['#"Step"'] <= max_step) ]["Potential Energy (kJ/mole)"].values # calculating absolute percent difference per step @@ -539,25 +550,23 @@ def is_run_valid(self): fig = px.scatter( df, - title=f"Energy: {self.pdb_id} for state {self.current_state} starting at checkpoint step: {self.cpt_step}", + title=f"Energy: {self.pdb_id} for state {state} starting at checkpoint step: {simulation.currentStep}", labels={"index": "Step", "value": "Energy (kJ/mole)"}, height=600, width=1400, ) - filename = f"{self.pdb_id}_cpt_step_{self.cpt_step}_state_{self.current_state}" - fig.write_image( - os.path.join(self.miner_data_directory, filename + "_energy.png") - ) + filename = f"{self.pdb_id}_cpt_step_{simulation.currentStep}_state_{state}" + fig.write_image(os.path.join(miner_data_directory, filename + "_energy.png")) fig = px.scatter( percent_diff, - title=f"Percent Diff: {self.pdb_id} for state {self.current_state} starting at checkpoint step: {self.cpt_step}", + title=f"Percent Diff: {self.pdb_id} for state {state} starting at checkpoint step: {simulation.currentStep}", labels={"index": "Step", "value": "Percent Diff"}, height=600, width=1400, ) fig.write_image( - os.path.join(self.miner_data_directory, filename + "_percent_diff.png") + os.path.join(miner_data_directory, filename + "_percent_diff.png") ) median_percent_diff = np.median(percent_diff) @@ -566,23 +575,15 @@ def is_run_valid(self): if median_percent_diff > ANOMALY_THRESHOLD: return False, check_energies.tolist(), miner_energies.tolist() - self.save_pdb( - output_path=os.path.join( - self.miner_data_directory, f"{self.pdb_id}_folded.pdb" - ) + + Protein.save_pdb( + output_path=os.path.join(miner_data_directory, f"{self.pdb_id}_folded.pdb") ) - return True, check_energies.tolist(), miner_energies.tolist() - def save_pdb(self, output_path: str): - """Save the pdb file to the output path.""" - positions = self.simulation.context.getState(getPositions=True).getPositions() - topology = self.simulation.topology - with open(output_path, "w") as f: - app.PDBFile.writeFile(topology, positions, f) + return True, check_energies.tolist(), miner_energies.tolist() def get_energy(self): state = self.simulation.context.getState(getEnergy=True) - return state.getPotentialEnergy() / unit.kilojoules_per_mole def get_rmsd(self, output_path: str = None, xvg_command: str = "-xvg none"): diff --git a/folding/validators/reward.py b/folding/validators/reward.py index 99d8bf40..61918af5 100644 --- a/folding/validators/reward.py +++ b/folding/validators/reward.py @@ -1,81 +1,103 @@ import time -from typing import List +from typing import List, Dict -import bittensor as bt import numpy as np +import bittensor as bt -from folding.protocol import JobSubmissionSynapse from folding.validators.protein import Protein +from folding.protocol import JobSubmissionSynapse -def get_energies( - protein: Protein, responses: List[JobSubmissionSynapse], uids: List[int] -): - """Takes all the data from reponse synapses, checks if the data is valid, and returns the energies. - - Args: - protein (Protein): instance of the Protein class - responses (List[JobSubmissionSynapse]): list of JobSubmissionSynapse objects - uids (List[int]): list of uids - - Returns: - Tuple: Tuple containing the energies and the event dictionary - """ - event = {} - event["is_valid"] = [False] * len(uids) - event["checked_energy"] = [0] * len(uids) - event["reported_energy"] = [0] * len(uids) - event["miner_energy"] = [0] * len(uids) - event["rmsds"] = [0] * len(uids) - event["process_md_output_time"] = [0] * len(uids) - event["is_run_valid"] = [0] * len(uids) - - energies = np.zeros(len(uids)) - - for i, (uid, resp) in enumerate(zip(uids, responses)): - # Ensures that the md_outputs from the miners are parsed correctly - try: - start_time = time.time() - can_process = protein.process_md_output( - md_output=resp.md_output, - hotkey=resp.axon.hotkey, - state=resp.miner_state, - seed=resp.miner_seed, - ) - event["process_md_output_time"][i] = time.time() - start_time +class RewardPipeline: + def __init__( + self, protein: Protein, responses: List[JobSubmissionSynapse], uids: List[int] + ): + self.protein = protein + self.responses = responses + self.uids = uids + + self.energies = np.zeros(len(uids)) + + self.event = {} + self.event["is_valid"] = [False] * len(uids) + self.event["checked_energy"] = [0] * len(uids) + self.event["reported_energy"] = [0] * len(uids) + self.event["miner_energy"] = [0] * len(uids) + self.event["rmsds"] = [0] * len(uids) + self.event["process_md_output_time"] = [0] * len(uids) + self.event["is_run_valid"] = [0] * len(uids) + + self.packages = [None] * len(uids) + self.miner_states = [None] * len(uids) + + def process_energies(self): + for i, (uid, resp) in enumerate(zip(self.uids, self.responses)): + try: + start_time = time.time() + + can_process, package = self.protein.process_md_output( + md_output=resp.md_output, + hotkey=resp.axon.hotkey, + state=resp.miner_state, + seed=resp.miner_seed, + ) - if not can_process: - continue + self.packages[i] = package + self.miner_states[i] = resp.miner_state - if resp.dendrite.status_code != 200: - bt.logging.info( - f"uid {uid} responded with status code {resp.dendrite.status_code}" - ) - continue + self.event["process_md_output_time"][i] = time.time() - start_time - energy = protein.get_energy() - rmsd = protein.get_rmsd() + if not can_process: + continue - if energy == 0: + if resp.dendrite.status_code != 200: + bt.logging.info( + f"uid {uid} responded with status code {resp.dendrite.status_code}" + ) + continue + + energy = self.protein.get_energy() + + # Catching edge case where energy is 0 + if energy == 0: + continue + + self.energies[i] = energy + + except Exception as E: + # If any of the above methods have an error, we will catch here. + bt.logging.error( + f"Failed to parse miner data for uid {uid} with error: {E}" + ) continue - start_time = time.time() - is_valid, checked_energy, miner_energy = protein.is_run_valid() - event["is_run_valid"][i] = time.time() - start_time + def check_run_validities(self): + start_time = time.time() - energies[i] = energy if is_valid else 0 + # Checking the naive case where all energies are 0. + if sum(self.energies) == 0: + return False - event["checked_energy"][i] = checked_energy - event["miner_energy"][i] = miner_energy - event["is_valid"][i] = is_valid - event["reported_energy"][i] = float(energy) - event["rmsds"][i] = float(rmsd) + # Iterate over the energies from lowest to highest. + for index in np.argsort(self.energies): + package = self.packages[index] - except Exception as E: - # If any of the above methods have an error, we will catch here. - bt.logging.error( - f"Failed to parse miner data for uid {uid} with error: {E}" + if package is None: + continue + + is_valid, checked_energy, miner_energy = self.protein.is_run_valid( + state=self.miner_states[index], **package ) - continue + self.event["is_run_valid_time"][index] = time.time() - start_time - return energies, event + self.event["checked_energy"][index] = checked_energy + self.event["miner_energy"][index] = miner_energy + self.event["is_valid"][index] = is_valid + self.event["reported_energy"][index] = float(self.energies[index]) + + # If the run is valid, then we can presume that all other simulations do not need to be considered for competition. + if not is_valid: + self.energies[index] = 0 + continue + else: + break From d04e9c3ec15c446342f4e100077db95936c261b9 Mon Sep 17 00:00:00 2001 From: mccrindlebrian Date: Thu, 10 Oct 2024 09:03:08 -0400 Subject: [PATCH 2/3] remove self.simulation reference in is_valid --- folding/validators/protein.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/folding/validators/protein.py b/folding/validators/protein.py index b0ae482a..d1c6e25d 100644 --- a/folding/validators/protein.py +++ b/folding/validators/protein.py @@ -512,7 +512,7 @@ def is_run_valid( return False, [], [] # Run the simulation at most 3000 steps - steps_to_run = min(3000, log_step - self.simulation.currentStep) + steps_to_run = min(3000, log_step - simulation.currentStep) simulation.reporters.append( app.StateDataReporter( From 3f6808b6004d73fc4c7d148aa6445686a714204b Mon Sep 17 00:00:00 2001 From: mccrindlebrian Date: Thu, 10 Oct 2024 09:11:48 -0400 Subject: [PATCH 3/3] remove get_energy from protein class, move to utils, and feed in simulation class --- folding/utils/ops.py | 6 ++++++ folding/validators/protein.py | 4 ---- folding/validators/reward.py | 3 ++- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/folding/utils/ops.py b/folding/utils/ops.py index a3ee529f..0f6de20e 100644 --- a/folding/utils/ops.py +++ b/folding/utils/ops.py @@ -14,6 +14,7 @@ import parmed as pmd import bittensor as bt +from openmm import app, unit from folding.protocol import JobSubmissionSynapse @@ -298,3 +299,8 @@ def convert_cif_to_pdb(cif_file: str, pdb_file: str): except Exception as e: bt.logging.error(f"Failed to convert {cif_file} to PDB format. Error: {e}") + + +def get_energy_from_simulation(simulation: app.Simulation): + state = simulation.context.getState(getEnergy=True) + return state.getPotentialEnergy() / unit.kilojoules_per_mole diff --git a/folding/validators/protein.py b/folding/validators/protein.py index d1c6e25d..c98581d8 100644 --- a/folding/validators/protein.py +++ b/folding/validators/protein.py @@ -582,10 +582,6 @@ def is_run_valid( return True, check_energies.tolist(), miner_energies.tolist() - def get_energy(self): - state = self.simulation.context.getState(getEnergy=True) - return state.getPotentialEnergy() / unit.kilojoules_per_mole - def get_rmsd(self, output_path: str = None, xvg_command: str = "-xvg none"): """TODO: Implement the RMSD calculation""" return -1 diff --git a/folding/validators/reward.py b/folding/validators/reward.py index 61918af5..808968b9 100644 --- a/folding/validators/reward.py +++ b/folding/validators/reward.py @@ -5,6 +5,7 @@ import bittensor as bt from folding.validators.protein import Protein +from folding.utils.ops import get_energy_from_simulation from folding.protocol import JobSubmissionSynapse @@ -56,7 +57,7 @@ def process_energies(self): ) continue - energy = self.protein.get_energy() + energy = get_energy_from_simulation(package["simulation"]) # Catching edge case where energy is 0 if energy == 0: