diff --git a/SynTemp/SynComp/__init__.py b/SynTemp/SynComp/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/SynTemp/SynComp/rule_compose.py b/SynTemp/SynComp/rule_compose.py new file mode 100644 index 0000000..9315252 --- /dev/null +++ b/SynTemp/SynComp/rule_compose.py @@ -0,0 +1,163 @@ +import os +import glob +import logging +from typing import List +from SynTemp.SynComp.valence_constrain import ValenceConstrain +from SynTemp.SynUtils.graph_utils import load_gml_as_text +from mod import RCMatch, ruleGMLString + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" +) + + +class RuleCompose: + def __init__(self) -> None: + pass + + def _compose(rule_1, rule_2): + """ + Compose two rules and filter the results based on chemical valence constraints. + + Parameters: + - rule_1: First rule object to compose. + - rule_2: Second rule object to compose. + + Returns: + - list: List of 'good' modifications where the resulting rules pass the + valence checks. + """ + try: + # Attempt to match and compose the rules + m = RCMatch(rule_1, rule_2) + modRes = m.composeAll() + valence_check = ValenceConstrain() + goodMod, _ = valence_check.split(modRes) + return goodMod + except Exception as e: + print(e) + return [] # Return an empty list in case of failure + + @staticmethod + def _process_compose(rule_1_id, rule_2_id, rule_path, rule_path_compose): + """ + Process and compose two rules based on their GML files. + + Parameters: + - rule_1_id (str): Identifier for the first rule. + - rule_2_id (str): Identifier for the second rule. + - rule_path (str): Directory path where the original GML files are stored. + - rule_path_compose (str): Directory path where the composed GML files + will be saved. + + Returns: + - list: Composed rules from the two provided rules. + """ + rule_1 = load_gml_as_text(f"{rule_path}/{rule_1_id}.gml") + rule_1 = ruleGMLString(rule_1) + rule_2 = ruleGMLString(load_gml_as_text(f"{rule_path}/{rule_2_id}.gml")) + rules_compose = RuleCompose._compose(rule_1, rule_2) + if rule_path_compose: + for key, value in enumerate(rules_compose): + filepath = f"{rule_path_compose}/p_{rule_1_id}_{rule_2_id}_r{key}.gml" + RuleCompose.save_gml_from_text( + value.getGMLString(), filepath, key, [rule_1_id, rule_2_id] + ) + return rules_compose + + @staticmethod + def _auto_compose(rule_path, rule_path_compose): + """ + Automatically find all GML files in the given directory and compose them pairwise. + + Parameters: + - rule_path (str): Directory path where the GML files are stored. + - rule_path_compose (str): Directory path where the composed GML files will + be saved. + + Returns: + - None: Composed rules are saved directly to the filesystem. + """ + # Get all gml file names in the directory + gml_files = [os.path.basename(f) for f in glob.glob(f"{rule_path}/*.gml")] + gml_ids = [ + os.path.splitext(f)[0] for f in gml_files + ] # Strip the .gml extension to get IDs + + # Compose each pair of rules once (i.e., (rule1, rule2) but not (rule2, rule1)) + # Calculate the total number of compositions for progress logging + num_files = len(gml_ids) + total_compositions = num_files * (num_files - 1) // 2 + current_composition = 0 + for i in range(len(gml_ids)): + for j in range(i + 1, len(gml_ids)): + RuleCompose._process_compose( + gml_ids[i], gml_ids[j], rule_path, rule_path_compose + ) + current_composition += 1 + if current_composition % 100 == 0: + logging.info( + f"Progress: {current_composition}/{total_compositions}" + + "compositions completed." + ) + + @staticmethod + def save_gml_from_text( + gml_content: str, gml_file_path: str, rule_id: str, parent_ids: List[str] + ) -> bool: + """ + Save a text string to a GML file by modifying the 'ruleID' line to include parent + rule names. This function parses the given GML content, identifies any lines + starting with 'ruleID', and replaces these lines with a new ruleID that + incorporates identifiers from parent rules. + + Parameters: + - gml_content (str): The content to be saved to the GML file. This should be the + entire textual content of a GML file. + - gml_file_path (str): The file path where the GML file should be saved. If the + path does not exist or is inaccessible, the function will return False and print + an error message. + - rule_id (str): The original rule ID from the content. This is the identifier + that will be modified to include parent IDs in the new ruleID. + - parent_ids (List[str]): List of parent rule IDs to prepend to the original rule + ID. These are combined into a new identifier to reflect the hierarchical + relationship in rule IDs. + + Returns: + - bool: True if the file was successfully saved, False otherwise. The function + attempts to write the modified GML content to the specified file path. + """ + try: + parent_ids = [str(i) for i in parent_ids] + rule_id = str(rule_id) + # Create the new ruleID by concatenating parent IDs with the original rule ID + new_rule_id = ( + "p_" + "_".join(parent_ids) + "_r_" + rule_id + if parent_ids + else "r_" + rule_id + ) + + # Initialize a list to hold the modified lines + modified_lines = [] + + # Iterate through each line and replace the 'ruleID' line as needed + for line in gml_content.splitlines(): + if line.strip().startswith("ruleID"): + # Replace the whole line with the new ruleID + modified_lines.append(f'\truleID "{new_rule_id}"') + else: + modified_lines.append(line) + + # Join all lines back into a single string + modified_content = "\n".join(modified_lines) + + # Write the modified content to the file + with open(gml_file_path, "w") as file: + file.write(modified_content) + return True + except FileNotFoundError: + print(f"Unable to access the file path: {gml_file_path}") + return False + except Exception as e: + print(f"An error occurred while writing to the file: {e}") + return False diff --git a/SynTemp/SynComp/valence_constrain.py b/SynTemp/SynComp/valence_constrain.py new file mode 100644 index 0000000..b8ba1fd --- /dev/null +++ b/SynTemp/SynComp/valence_constrain.py @@ -0,0 +1,101 @@ +import importlib.resources +from SynTemp.SynUtils.utils import load_database +from mod import BondType +import logging +from typing import List, Tuple + + +class ValenceConstrain: + def __init__(self): + """ + Initialize the ValenceConstrain class by setting up bond type orders and loading + the maximum valence data. + + Parameters: + - None + + Returns: + - None + """ + self.btToOrder = { + BondType.Single: 1, + BondType.Double: 2, + BondType.Triple: 3, + BondType.Aromatic: 0, + } + maxValence_path = importlib.resources.files("SynTemp.SynComp").joinpath( + "MaxValence.json.gz" + ) + self.maxValence = load_database(maxValence_path)[0] + + def valence(self, vertex) -> int: + """ + Calculate the valence of a vertex based on its incident edges. + + Parameters: + - vertex (Vertex): The vertex for which to calculate the valence. + + Returns: + - int: The total valence of the vertex. + """ + return sum(self.btToOrder[edge.bondType] for edge in vertex.incidentEdges) + + def check_rule(self, rule, verbose: bool = False, log_error: bool = False) -> bool: + """ + Check if the rule is chemically valid according to valence rules. + + Parameters: + - rule (Rule): The rule to check for chemical validity. + - verbose (bool): If true, logs additional information about the rule + checking process. + - log_error (bool): If true, logs additional information about the valence + checking issue. + + Returns: + - bool: True if the rule is chemically valid, False otherwise. + """ + try: + for vertex_pair in rule.vertices: + left_valence = self.valence(vertex_pair.left) + right_valence = self.valence(vertex_pair.right) + left_label = vertex_pair.left.stringLabel + right_label = vertex_pair.right.stringLabel + + if left_valence != right_valence: + raise ValueError( + f"Valence mismatch: left {left_valence} vs right {right_valence}" + ) + + if left_valence > self.maxValence.get( + left_label, 0 + ) or right_valence > self.maxValence.get(right_label, 0): + if verbose: + logging.info( + f"Bad Rule for vertex {left_label} --->" + + "Exceeds max chemical valence" + ) + return False + return True + except Exception as e: + if log_error: + logging.error(f"Error checking rule {rule}: {e}") + return False + + def split(self, rules: List) -> Tuple[List, List]: + """ + Split rules into 'good' and 'bad' based on their chemical validity. + + Parameters: + - rules (List[Rule]): A list of rules to be checked and split. + + Returns: + - Tuple[List[Rule], List[Rule]]: A tuple containing two lists, one for + 'good' rules and another for 'bad' rules. + """ + good, bad = [], [] + for rule in rules: + if self.check_rule(rule): + good.append(rule) + else: + bad.append(rule) + return good, bad