Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed enzyme optmization with Kcat fitness function #240

Merged
merged 4 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 69 additions & 11 deletions examples/enzeptional/example_enzeptional.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,58 @@
import logging
import pandas as pd
from typing import Tuple, List, Optional
from gt4sd.frameworks.enzeptional.processing import HFandTAPEModelUtility
from gt4sd.frameworks.enzeptional.core import SequenceMutator, EnzymeOptimizer
from gt4sd.configuration import GT4SDConfiguration, sync_algorithm_with_s3


def initialize_environment():
"""Synchronize with GT4SD S3 storage and set up the environment."""
# NOTE: For those interested in optimizing kcat values, it is important to adjust the scorer path to reflect this focus, thereby selecting the appropriate model for kcat optimization: f"{configuration.gt4sd_local_cache_path}/properties/proteins/enzeptional/scorers/kcat/model.pkl". The specification of the scaler, located within the same directory as the `scorer.pkl`, is mandatory for accurate model performance.
def initialize_environment(model = "feasibility") -> Tuple[str, Optional[str]]:
"""Synchronize with GT4SD S3 storage and set up the environment.

Args:
model (str): Type of optimization ("feasibility" or "kcat").

Returns:
Tuple[str, Optional[str]]: The path to the scorer file and scaler file (if existing).
"""
configuration = GT4SDConfiguration.get_instance()
sync_algorithm_with_s3("proteins/enzeptional/scorers", module="properties")
return f"{configuration.gt4sd_local_cache_path}/properties/proteins/enzeptional/scorers/feasibility/model.pkl"


def load_experiment_parameters():
def load_experiment_parameters() -> Tuple[List, List, List, List]:
"""Load experiment parameters from a CSV file."""
df = pd.read_csv("data.csv").iloc[1]
return df["substrates"], df["products"], df["sequences"], eval(df["intervals"])


def setup_optimizer(
substrate_smiles, product_smiles, sample_sequence, intervals, scorer_path
substrate_smiles: str,
product_smiles: str,
sample_sequence: str,
intervals: List[List[int]],
scorer_path: str,
scaler_path: str,
concat_order: List[str],
use_xgboost_scorer: bool
):
"""Set up and return the optimizer with all necessary components configured."""
"""Set up and return the optimizer with all necessary components configured

Args:
substrate_smiles (str): SMILES representation of
the substrate.
product_smiles (str): SMILES representation of the
product.
sample_sequence (str): The initial protein sequence.
intervals (List[List[int]]): Intervals for mutation.
scorer_path (str): File path to the scoring model.
scaler_path (str): Path to the scaller in case you are usinh the Kcat model.
concat_order (List[str]): Order of concatenating embeddings.
use_xgboost_scorer (bool): flag to specify if the fitness function is the Kcat.

Returns:
Initialized optmizer
"""
model_tokenizer_paths = "facebook/esm2_t33_650M_UR50D"
chem_paths = "seyonec/ChemBERTa-zinc-base-v1"

Expand Down Expand Up @@ -52,33 +82,61 @@ def setup_optimizer(
"selection_ratio": 0.25,
"perform_crossover": True,
"crossover_type": "single_point",
"concat_order": ["substrate", "sequence", "product"],
"concat_order": concat_order,
"scaler_filepath": scaler_path,
"use_xgboost_scorer": use_xgboost_scorer
}
return EnzymeOptimizer(**optimizer_config)


def optimize_sequences(optimizer):
"""Optimize sequences using the configured optimizer."""
"""Optimize sequences using the configured optimizer.

Args:
optimizer: Initialized optimizer

Returns:
Optimized sequences
"""
return optimizer.optimize(
num_iterations=3, num_sequences=5, num_mutations=5, time_budget=3600
)


def main():
def main_kcat():
"""Optimization using Kcat model"""
logging.basicConfig(level=logging.INFO)
scorer_path = initialize_environment()
scorer_path, scaler_path = initialize_environment(model="kcat")
concat_order, use_xgboost_scorer = ["substrate", "sequence"], True
(
substrate_smiles,
product_smiles,
sample_sequence,
intervals,
) = load_experiment_parameters()
optimizer = setup_optimizer(
substrate_smiles, product_smiles, sample_sequence, intervals, scorer_path
substrate_smiles, product_smiles, sample_sequence, intervals, scorer_path, scaler_path, concat_order, use_xgboost_scorer
)
optimized_sequences, iteration_info = optimize_sequences(optimizer)
logging.info("Optimization completed.")


def main_feasibility():
"""Optimization using Feasibility model"""
logging.basicConfig(level=logging.INFO)
scorer_path, scaler_path = initialize_environment()
concat_order, use_xgboost_scorer = ["substrate", "sequence", "product"], False
(
substrate_smiles,
product_smiles,
sample_sequence,
intervals,
) = load_experiment_parameters()
optimizer = setup_optimizer(
substrate_smiles, product_smiles, sample_sequence, intervals, scorer_path, scaler_path, concat_order, use_xgboost_scorer
)
optimized_sequences, iteration_info = optimize_sequences(optimizer)
logging.info("Optimization completed.")

if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ torchmetrics>=0.7.0,<1.0.0
transformers>=4.22.0,<=4.24.0
typing_extensions>=3.7.4.3
wheel>=0.26
xgboost>=1.7.6
4 changes: 4 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ install_requires =
transformers<4.26.0
typing_extensions
wheel
xgboost
setup_requires =
setuptools
package_dir =
Expand Down Expand Up @@ -281,3 +282,6 @@ ignore_missing_imports = True

[mypy-ruamel.*]
ignore_missing_imports = True

[mypy-xgboost.*]
ignore_missing_imports = True
52 changes: 36 additions & 16 deletions src/gt4sd/frameworks/enzeptional/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from itertools import product as iter_product
import time
from joblib import load
import xgboost as xgb
from .processing import (
HFandTAPEModelUtility,
SelectionGenerator,
Expand Down Expand Up @@ -367,6 +368,8 @@ def __init__(
minimum_interval_length: int = 8,
pad_intervals: bool = False,
concat_order=["sequence", "substrate", "product"],
scaler_filepath: Optional[str] = None,
use_xgboost_scorer: Optional[bool] = False,
):
"""Initializes the optimizer with models, sequences, and
optimization parameters.
Expand All @@ -379,18 +382,22 @@ def __init__(
product_smiles (str): SMILES representation of the product.
chem_model_path (str): Path to the chemical model.
chem_tokenizer_path (str): Path to the chemical tokenizer.
scorer_filepath (str): Path to the scoring model.
mutator (SequenceMutator): The mutator for generating sequence variants.
intervals (List[Tuple[int, int]]): Intervals for mutation.
batch_size (int, optional): The number of sequences to process in one batch. Defaults to 2.
seed (int, optional): Random seed. Defaults to 123.
top_k (int, optional): Number of top mutations to consider. Defaults to 2.
selection_ratio (float, optional): Ratio of sequences to select after scoring. Defaults to 0.5.
perform_crossover (bool, optional): Flag to perform crossover operation. Defaults to False.
crossover_type (str, optional): Type of crossover operation. Defaults to "uniform".
minimum_interval_length (int, optional): Minimum length of mutation intervals. Defaults to 8.
pad_intervals (bool, optional): Flag to pad the intervals. Defaults to False.
concat_order (list, optional): Order of concatenating embeddings. Defaults to ["sequence", "substrate", "product"].
scorer_filepath (str): File path to the scoring model.
mutator (SequenceMutator): The mutator for generating
sequence variants.
intervals (List[List[int]]): Intervals for mutation.
batch_size (int): The number of sequences to process in one batch.
top_k (int): Number of top mutations to consider.
selection_ratio (float): Ratio of sequences to select
after scoring.
perform_crossover (bool): Flag to perform crossover operation.
crossover_type (str): Type of crossover operation.
minimum_interval_length (int): Minimum length of
mutation intervals.
pad_intervals (bool): Flag to pad the intervals.
concat_order (list): Order of concatenating embeddings.
scaler_filepath (str): Path to the scaller in case you are usinh the Kcat model.
use_xgboost_scorer (bool): flag to specify if the fitness function is the Kcat.
"""
self.sequence = sequence
self.protein_model = protein_model
Expand All @@ -407,7 +414,9 @@ def __init__(
self.mutator.set_top_k(top_k)
self.concat_order = concat_order
self.scorer = load(scorer_filepath)
self.seed = seed
if scaler_filepath is not None:
self.scaler = load(scaler_filepath)
self.use_xgboost_scorer = use_xgboost_scorer

self.chem_model = HFandTAPEModelUtility(chem_model_path, chem_tokenizer_path)
self.substrate_embedding = self.chem_model.embed([substrate_smiles])[0]
Expand All @@ -424,7 +433,7 @@ def __init__(
self.intervals = sanitize_intervals_with_padding(
self.intervals, minimum_interval_length, len(sequence)
)

self.seed = seed
random.seed(self.seed)

def optimize(
Expand Down Expand Up @@ -614,7 +623,13 @@ def score_sequence(self, sequence: str) -> Dict[str, Any]:
combined_embedding = np.concatenate(ordered_embeddings)
combined_embedding = combined_embedding.reshape(1, -1)

score = self.scorer.predict_proba(combined_embedding)[0][1]
if self.use_xgboost_scorer:
if self.scaler is not None:
combined_embedding = self.scaler.transform(combined_embedding)
score = self.scorer.predict(xgb.DMatrix(combined_embedding))[0]
else:
score = self.scorer.predict_proba(combined_embedding)[0][1]

return {"sequence": sequence, "score": score}

def score_sequences(self, sequences: List[str]) -> List[Dict[str, float]]:
Expand Down Expand Up @@ -643,7 +658,12 @@ def score_sequences(self, sequences: List[str]) -> List[Dict[str, float]]:
combined_embedding = np.concatenate(ordered_embeddings)
combined_embedding = combined_embedding.reshape(1, -1)

score = self.scorer.predict_proba(combined_embedding)[0][1]
if self.use_xgboost_scorer:
if self.scaler is not None:
combined_embedding = self.scaler.transform(combined_embedding)
score = self.scorer.predict(xgb.DMatrix(combined_embedding))[0]
else:
score = self.scorer.predict_proba(combined_embedding)[0][1]
output.append({"sequence": sequences[position], "score": score})

return output
Loading