From b2d807fdd915f2ba8a091eb09d642b063a3d704b Mon Sep 17 00:00:00 2001 From: Otto Brinkhaus Date: Mon, 26 Sep 2022 11:26:54 +0200 Subject: [PATCH] manual handling of process control --- .../randepict_batch_run_tfrecord_output.py | 177 +++++++++--------- 1 file changed, 90 insertions(+), 87 deletions(-) diff --git a/examples/randepict_batch_run_tfrecord_output.py b/examples/randepict_batch_run_tfrecord_output.py index 73a40fb..6405510 100644 --- a/examples/randepict_batch_run_tfrecord_output.py +++ b/examples/randepict_batch_run_tfrecord_output.py @@ -3,18 +3,24 @@ from typing import Tuple, List import argparse import numpy as np -from multiprocessing import get_context +from multiprocessing import Process +import time from PIL import Image import random import tensorflow as tf from RanDepict import RandomDepictor +os.environ["CUDA_VISIBLE_DEVICES"] = "-1" class RandomDepictor(RandomDepictor): - """This function is a child of RanDepict.random_depictor and contains some - additional functions to write the images into tfrecord files.""" - - def __init__(self, seed=42) -> None: + """ + This class is a child of RanDepict.RandomDepictor and contains some + additional functions to write the images into tfrecord files. + """ + def __init__( + self, + seed=(random.randint(0, 100)) + ) -> None: super().__init__(seed=seed) def batch_depict_save( @@ -26,29 +32,32 @@ def batch_depict_save( ID_list: List[str], chunksize: int, shape: Tuple[int, int] = (299, 299), - processes: int = 20, - seed: int = 42, + num_processes: int = 20, + seed: int = (random.randint(0, 100)), + timeout_limit: int = 1800 ) -> None: - """This function takes a list of SMILES str, the amount of images to - create per SMILES str and the path of an output directory. + """ + This function takes a list of SMILES str, the amount of images + to create per SMILES str and the path of an output directory. It then creates images_per_structure depictions of each chemical structure that is represented by the SMILES strings and saves them - in tfrecord files in output_dir. - If an ID list (list with names of same length as smiles_list that - contains unique IDs), the IDs will be used to name the files.""" + in tfrecord files in output_dir. If an ID list (list with names of + same length as smiles_list that contains unique IDs), the IDs will + be used to name the files. + """ - counter = (n for n in range(int(len(smiles_list) / chunksize))) + counter = (n for n in range(int(len(smiles_list)/chunksize))) def chunks(lst, n): """Yield successive n-sized chunks from lst.""" for i in range(0, len(lst), n): - yield lst[i : i + n] + yield lst[i:i + n] smiles_list = chunks(smiles_list, chunksize) tokens = chunks(tokens, chunksize) IDs = chunks(ID_list, chunksize) - starmap_tuple_generator = ( + async_proc_args = ( ( next(smiles_list), n_non_augmented, @@ -60,9 +69,29 @@ def chunks(lst, n): ) for n in counter ) - - with get_context("spawn").Pool(processes) as p: - p.starmap(self.depict_save, starmap_tuple_generator) + process_list = [] + while True: + for proc, init_time in process_list: + # Remove finished processes + if not proc.is_alive(): + process_list.remove((proc, init_time)) + # Remove timed out processes + elif time.time() - init_time >= timeout_limit: + process_list.remove((proc, init_time)) + proc.terminate() + proc.join() + if len(process_list) < num_processes: + # Start new processes + for _ in range(num_processes-len(process_list)): + try: + p = Process(target=self.depict_save, + args=next(async_proc_args)) + process_list.append((p, time.time())) + p.start() + except StopIteration: + break + if len(process_list) == 0: + break def _bytes_feature(self, value): """Returns a bytes_list from a string / byte.""" @@ -80,17 +109,19 @@ def depict_save( tokens: List[np.array], shape: Tuple[int, int], ID: List[int], - seed: int = 0, + seed: int = (random.randint(0, 100)), ): - """This function takes a list of SMILES str, the amount of images to - create per SMILES str and the path of an output directory. - It then creates images_per_structure depictions of the chemical - structure that is represented by the SMILES strings and saves it - in a tfrecord file in output_dir. - The given ID is used as the base filename.""" + """ + This function takes a list of SMILES str, the amount of + images to create per SMILES str and the path of an output + directory. It then creates images_per_structure depictions + of the chemical structure that is represented by the SMILES + strings and saves it in a tfrecord file in output_dir. + The given ID is used as the base filename. + """ depictor = RandomDepictor(seed + 13) - - tfrecord_name = "tfrecord_ds/dataset_DS-{}.tfrecord".format(ID[0]) + tfrecord_name = f"tfrecord_ds/DECIMER_PubChem_DS-{ID[0]}.tfrecord" + print(f"Beginning process that writes {tfrecord_name}") writer = tf.io.TFRecordWriter(tfrecord_name) for smiles_index in range(len(smiles)): smi = smiles[smiles_index] @@ -101,52 +132,40 @@ def depict_save( self.save_in_tfrecord(token_list, depictor(smi, shape), writer) break except Exception as e: - print( - "An exception occured:\n{}\n".format(e), - "It is ignored for now and we", - "simply try to depict this structure again...", - ) + print(f'An exception occured:\n{e}', + 'It is ignored for now and we simply try to depict this structure again...') else: - print( - "FAILED depicting structure: {}.".format(smi), - "Tried five times.", - ) - + print('FAILED depicting structure: {}. Tried five times.'.format(smi)) for _ in range(n_non_augmented): for _ in range(5): try: - self.save_in_tfrecord( - token_list, depictor.random_depiction(smi, shape), writer - ) + self.save_in_tfrecord(token_list, depictor.random_depiction(smi, shape), writer) break except Exception as e: - print( - "An exception occured:\n{}\n".format(e), - "It is ignored for now and we", - "simply try to depict this structure again...", - ) + print(f'An exception occured:\n{e}', + 'It is ignored for now and we simply try to depict this structure again...') else: - print( - "FAILED depicting structure: {}.".format(smi), - "Tried five times.", - ) + print('FAILED depicting structure: {}. Tried five times.'.format(smi)) + print(f"{tfrecord_name} has been created regularly") return def save_in_tfrecord( - self, train_caption: np.array, image: np.array, writer: tf.io.TFRecordWriter + self, + train_caption: np.array, + image: np.array, + writer: tf.io.TFRecordWriter ) -> None: """ - This function takes a list of captions (token list, np.array) - and a list of images (np.array) as well as a file index. - It saves the images and captions in a tfrecord file. + This function takes a list of captions (token list, np.array) and a list of images + (np.array) as well as a file index. It saves the images and captions in a tfrecord file. Args: train_captions (List[np.array]): np.array([0,1,2,3,1,1,2,0,...]) - images (List[np.array]): array representing the structure depiction + images (List[np.array]): array that represents the structure depiction file_index (int): index for naming tfrecord files """ - if not os.path.exists("tfrecord_ds"): - os.mkdir("tfrecord_ds") + if not os.path.exists('tfrecord_ds'): + os.mkdir('tfrecord_ds') image = Image.fromarray(image) with io.BytesIO() as output: image.save(output, format="PNG") @@ -161,35 +180,23 @@ def save_in_tfrecord( writer.write(serialized) return None - -def parse_arguments() -> argparse.Namespace: - """ - Parse arguments with argparse. - - Returns: - Returns an namespace object that holds the given arguments - """ - parser = argparse.ArgumentParser() - parser.add_argument("file", nargs="+") - return parser.parse_args() - - def main() -> None: - """ + ''' This script reads a SMILES, the annotationa and IDs from a file and - generates three augmented and three non-augmented depictions per chemical - structure. The images are saved in tfrecord files. + generates three augmented and three non-augmented depictions per chemical structure. + The images are saved in tfrecord files. ___ Structure of input file: ID1,SMILES1,annotation\n ID2,SMILES2,annotation\n (...) - Here, the annotation is an array saved as a str ([1 2 3 4 .. .. X]) + The annotation is an array saved as a str ([1 2 3 4 .. .. X]) ___ - """ - + ''' # Parse arguments - args = parse_arguments() + parser = argparse.ArgumentParser() + parser.add_argument("file", nargs="+") + args = parser.parse_args() # Read input data from file for file_in in args.file: @@ -198,24 +205,20 @@ def main() -> None: tokens_list = [] with open(file_in, "r") as fp: for line in fp.readlines(): - if line[-1] == "\n": + if line[-1] == '\n': line = line[:-1] - line = line.replace(" ", " ").replace(" ", ",") + line = line.replace(";[ ", ";[").replace(" ", " ").replace(" ", ",") ID, smiles, tokens = line.split(";") ID_list.append(ID) smiles_list.append(smiles) tokens_list.append(np.array(eval(tokens))) - # Set image shape and number of depictions per SMILES and output paths + # Set desired image shape and number of depictions per SMILES and output paths im_per_SMILES_noaug = 1 im_per_SMILES_aug = 3 depiction_img_shape = (299, 299) - if not os.path.exists("tfrecord_ds"): - os.mkdir("tfrecord_ds") - - # If SMILES_chunksize is 100, - # then 100*im_per_SMILES_noaug*im_per_SMILES_aug are + # If SMILES_chunksize is 100, then 100*im_per_SMILES_noaug*im_per_SMILES_aug are # saved in one tfrecord file SMILES_chunksize = 1500 with RandomDepictor() as depictor: @@ -227,10 +230,10 @@ def main() -> None: ID_list, SMILES_chunksize, depiction_img_shape, - None, + 20, random.randint(0, 100), - ) + 1800) -if __name__ == "__main__": +if __name__ == '__main__': main()