Skip to content

Commit

Permalink
manual handling of process control
Browse files Browse the repository at this point in the history
  • Loading branch information
OBrink committed Sep 26, 2022
1 parent 28f7e95 commit b2d807f
Showing 1 changed file with 90 additions and 87 deletions.
177 changes: 90 additions & 87 deletions examples/randepict_batch_run_tfrecord_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,24 @@
from typing import Tuple, List
import argparse
import numpy as np
from multiprocessing import get_context
from multiprocessing import Process
import time
from PIL import Image
import random
import tensorflow as tf
from RanDepict import RandomDepictor

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

class RandomDepictor(RandomDepictor):
"""This function is a child of RanDepict.random_depictor and contains some
additional functions to write the images into tfrecord files."""

def __init__(self, seed=42) -> None:
"""
This class is a child of RanDepict.RandomDepictor and contains some
additional functions to write the images into tfrecord files.
"""
def __init__(
self,
seed=(random.randint(0, 100))
) -> None:
super().__init__(seed=seed)

def batch_depict_save(
Expand All @@ -26,29 +32,32 @@ def batch_depict_save(
ID_list: List[str],
chunksize: int,
shape: Tuple[int, int] = (299, 299),
processes: int = 20,
seed: int = 42,
num_processes: int = 20,
seed: int = (random.randint(0, 100)),
timeout_limit: int = 1800
) -> None:
"""This function takes a list of SMILES str, the amount of images to
create per SMILES str and the path of an output directory.
"""
This function takes a list of SMILES str, the amount of images
to create per SMILES str and the path of an output directory.
It then creates images_per_structure depictions of each chemical
structure that is represented by the SMILES strings and saves them
in tfrecord files in output_dir.
If an ID list (list with names of same length as smiles_list that
contains unique IDs), the IDs will be used to name the files."""
in tfrecord files in output_dir. If an ID list (list with names of
same length as smiles_list that contains unique IDs), the IDs will
be used to name the files.
"""

counter = (n for n in range(int(len(smiles_list) / chunksize)))
counter = (n for n in range(int(len(smiles_list)/chunksize)))

def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i : i + n]
yield lst[i:i + n]

smiles_list = chunks(smiles_list, chunksize)
tokens = chunks(tokens, chunksize)
IDs = chunks(ID_list, chunksize)

starmap_tuple_generator = (
async_proc_args = (
(
next(smiles_list),
n_non_augmented,
Expand All @@ -60,9 +69,29 @@ def chunks(lst, n):
)
for n in counter
)

with get_context("spawn").Pool(processes) as p:
p.starmap(self.depict_save, starmap_tuple_generator)
process_list = []
while True:
for proc, init_time in process_list:
# Remove finished processes
if not proc.is_alive():
process_list.remove((proc, init_time))
# Remove timed out processes
elif time.time() - init_time >= timeout_limit:
process_list.remove((proc, init_time))
proc.terminate()
proc.join()
if len(process_list) < num_processes:
# Start new processes
for _ in range(num_processes-len(process_list)):
try:
p = Process(target=self.depict_save,
args=next(async_proc_args))
process_list.append((p, time.time()))
p.start()
except StopIteration:
break
if len(process_list) == 0:
break

def _bytes_feature(self, value):
"""Returns a bytes_list from a string / byte."""
Expand All @@ -80,17 +109,19 @@ def depict_save(
tokens: List[np.array],
shape: Tuple[int, int],
ID: List[int],
seed: int = 0,
seed: int = (random.randint(0, 100)),
):
"""This function takes a list of SMILES str, the amount of images to
create per SMILES str and the path of an output directory.
It then creates images_per_structure depictions of the chemical
structure that is represented by the SMILES strings and saves it
in a tfrecord file in output_dir.
The given ID is used as the base filename."""
"""
This function takes a list of SMILES str, the amount of
images to create per SMILES str and the path of an output
directory. It then creates images_per_structure depictions
of the chemical structure that is represented by the SMILES
strings and saves it in a tfrecord file in output_dir.
The given ID is used as the base filename.
"""
depictor = RandomDepictor(seed + 13)

tfrecord_name = "tfrecord_ds/dataset_DS-{}.tfrecord".format(ID[0])
tfrecord_name = f"tfrecord_ds/DECIMER_PubChem_DS-{ID[0]}.tfrecord"
print(f"Beginning process that writes {tfrecord_name}")
writer = tf.io.TFRecordWriter(tfrecord_name)
for smiles_index in range(len(smiles)):
smi = smiles[smiles_index]
Expand All @@ -101,52 +132,40 @@ def depict_save(
self.save_in_tfrecord(token_list, depictor(smi, shape), writer)
break
except Exception as e:
print(
"An exception occured:\n{}\n".format(e),
"It is ignored for now and we",
"simply try to depict this structure again...",
)
print(f'An exception occured:\n{e}',
'It is ignored for now and we simply try to depict this structure again...')
else:
print(
"FAILED depicting structure: {}.".format(smi),
"Tried five times.",
)

print('FAILED depicting structure: {}. Tried five times.'.format(smi))
for _ in range(n_non_augmented):
for _ in range(5):
try:
self.save_in_tfrecord(
token_list, depictor.random_depiction(smi, shape), writer
)
self.save_in_tfrecord(token_list, depictor.random_depiction(smi, shape), writer)
break
except Exception as e:
print(
"An exception occured:\n{}\n".format(e),
"It is ignored for now and we",
"simply try to depict this structure again...",
)
print(f'An exception occured:\n{e}',
'It is ignored for now and we simply try to depict this structure again...')
else:
print(
"FAILED depicting structure: {}.".format(smi),
"Tried five times.",
)
print('FAILED depicting structure: {}. Tried five times.'.format(smi))
print(f"{tfrecord_name} has been created regularly")
return

def save_in_tfrecord(
self, train_caption: np.array, image: np.array, writer: tf.io.TFRecordWriter
self,
train_caption: np.array,
image: np.array,
writer: tf.io.TFRecordWriter
) -> None:
"""
This function takes a list of captions (token list, np.array)
and a list of images (np.array) as well as a file index.
It saves the images and captions in a tfrecord file.
This function takes a list of captions (token list, np.array) and a list of images
(np.array) as well as a file index. It saves the images and captions in a tfrecord file.
Args:
train_captions (List[np.array]): np.array([0,1,2,3,1,1,2,0,...])
images (List[np.array]): array representing the structure depiction
images (List[np.array]): array that represents the structure depiction
file_index (int): index for naming tfrecord files
"""
if not os.path.exists("tfrecord_ds"):
os.mkdir("tfrecord_ds")
if not os.path.exists('tfrecord_ds'):
os.mkdir('tfrecord_ds')
image = Image.fromarray(image)
with io.BytesIO() as output:
image.save(output, format="PNG")
Expand All @@ -161,35 +180,23 @@ def save_in_tfrecord(
writer.write(serialized)
return None


def parse_arguments() -> argparse.Namespace:
"""
Parse arguments with argparse.
Returns:
Returns an namespace object that holds the given arguments
"""
parser = argparse.ArgumentParser()
parser.add_argument("file", nargs="+")
return parser.parse_args()


def main() -> None:
"""
'''
This script reads a SMILES, the annotationa and IDs from a file and
generates three augmented and three non-augmented depictions per chemical
structure. The images are saved in tfrecord files.
generates three augmented and three non-augmented depictions per chemical structure.
The images are saved in tfrecord files.
___
Structure of input file:
ID1,SMILES1,annotation\n
ID2,SMILES2,annotation\n
(...)
Here, the annotation is an array saved as a str ([1 2 3 4 .. .. X])
The annotation is an array saved as a str ([1 2 3 4 .. .. X])
___
"""

'''
# Parse arguments
args = parse_arguments()
parser = argparse.ArgumentParser()
parser.add_argument("file", nargs="+")
args = parser.parse_args()

# Read input data from file
for file_in in args.file:
Expand All @@ -198,24 +205,20 @@ def main() -> None:
tokens_list = []
with open(file_in, "r") as fp:
for line in fp.readlines():
if line[-1] == "\n":
if line[-1] == '\n':
line = line[:-1]
line = line.replace(" ", " ").replace(" ", ",")
line = line.replace(";[ ", ";[").replace(" ", " ").replace(" ", ",")
ID, smiles, tokens = line.split(";")
ID_list.append(ID)
smiles_list.append(smiles)
tokens_list.append(np.array(eval(tokens)))

# Set image shape and number of depictions per SMILES and output paths
# Set desired image shape and number of depictions per SMILES and output paths
im_per_SMILES_noaug = 1
im_per_SMILES_aug = 3
depiction_img_shape = (299, 299)

if not os.path.exists("tfrecord_ds"):
os.mkdir("tfrecord_ds")

# If SMILES_chunksize is 100,
# then 100*im_per_SMILES_noaug*im_per_SMILES_aug are
# If SMILES_chunksize is 100, then 100*im_per_SMILES_noaug*im_per_SMILES_aug are
# saved in one tfrecord file
SMILES_chunksize = 1500
with RandomDepictor() as depictor:
Expand All @@ -227,10 +230,10 @@ def main() -> None:
ID_list,
SMILES_chunksize,
depiction_img_shape,
None,
20,
random.randint(0, 100),
)
1800)


if __name__ == "__main__":
if __name__ == '__main__':
main()

0 comments on commit b2d807f

Please sign in to comment.