diff --git a/emtools/__init__.py b/emtools/__init__.py index d4692f3..011eba0 100644 --- a/emtools/__init__.py +++ b/emtools/__init__.py @@ -24,5 +24,5 @@ # * # ************************************************************************** -__version__ = '0.1.1' +__version__ = '0.1.2' diff --git a/emtools/jobs/__init__.py b/emtools/jobs/__init__.py index d09e3e8..672d307 100644 --- a/emtools/jobs/__init__.py +++ b/emtools/jobs/__init__.py @@ -14,7 +14,7 @@ # * # ************************************************************************** -from .pipeline import Pipeline +from .pipeline import Pipeline, ProcessingPipeline from .batch_manager import BatchManager -__all__ = ["Pipeline", "BatchManager"] \ No newline at end of file +__all__ = ["Pipeline", "BatchManager", "ProcessingPipeline"] \ No newline at end of file diff --git a/emtools/jobs/batch_manager.py b/emtools/jobs/batch_manager.py index 706777d..302ab4a 100644 --- a/emtools/jobs/batch_manager.py +++ b/emtools/jobs/batch_manager.py @@ -52,6 +52,13 @@ def _createBatchId(self): uuidSuffix = str(uuid4()).split('-')[0] return f"{nowPrefix}_{countStr}_{uuidSuffix}" + @staticmethod + def createBatchLink(batch_path, fn): + """ Create a symbolic link to fn inside the batch dir. """ + baseName = os.path.basename(fn) + os.symlink(os.path.abspath(fn), + os.path.join(batch_path, baseName)) + def _createBatch(self, items): batch_id = self._createBatchId() batch_path = os.path.join(self._workingPath, batch_id) @@ -60,10 +67,8 @@ def _createBatch(self, items): Process.system(f"mkdir '{batch_path}'") for item in items: - fn = self._itemFileNameFunc(item) - baseName = os.path.basename(fn) - os.symlink(os.path.abspath(fn), - os.path.join(batch_path, baseName)) + self.createBatchLink(batch_path, self._itemFileNameFunc(item)) + self._batchCount += 1 return { 'items': items, @@ -85,3 +90,4 @@ def generate(self): if items: yield self._createBatch(items) + diff --git a/emtools/jobs/pipeline.py b/emtools/jobs/pipeline.py index 62700b1..5f40420 100644 --- a/emtools/jobs/pipeline.py +++ b/emtools/jobs/pipeline.py @@ -14,8 +14,12 @@ # * # ************************************************************************** +import os +import sys from collections import OrderedDict import threading +import signal +import traceback class Pipeline: @@ -182,3 +186,71 @@ def _process(self): self._print("Got task: None") + +class ProcessingPipeline(Pipeline): + """ Subclass of Pipeline that is commonly used to run programs. + + This class will define a workingDir (usually os.getcwd) + and an output dir where all output should be generated. + It will also add some helper functions to manipulate file + paths relative to the working dir. + """ + def __init__(self, workingDir, outputDir, **kwargs): + Pipeline.__init__(self, **kwargs) + self.workingDir = self.__validate(workingDir, 'working') + self.outputDir = self.__validate(outputDir, 'output') + + def __validate(self, path, key): + if not path: + raise Exception(f'Invalid {key} directory: {path}') + if not os.path.exists(path): + raise Exception(f'Non-existing {key} directory: {path}') + + return path + + def get_arg(self, argDict, key, envKey, default=None): + """ Get an argument from the argDict or from the environment. + + Args: + argDict: arguments dict from where to get the 'key' value + key: string key of the argument name in argDict + envKey: string key of the environment variable + default: default value if not found in argDict or environ + """ + return argDict.get(key, os.environ.get(envKey, default)) + + def join(self, *p): + return os.path.join(self.outputDir, *p) + + def relpath(self, p): + return os.path.relpath(p, self.workingDir) + + def prerun(self): + """ This method will be called before the run. """ + pass + + def postrun(self): + """ This method will be called after the run. """ + pass + + def __file(self, suffix): + with open(self.join(f'RELION_JOB_EXIT_{suffix}'), 'w'): + pass + + def __abort(self, signum, frame): + self.__file('ABORTED') + sys.exit(0) + + def run(self): + try: + signal.signal(signal.SIGINT, self.__abort) + signal.signal(signal.SIGTERM, self.__abort) + self.prerun() + Pipeline.run(self) + self.postrun() + self.__file('SUCCESS') + except Exception as e: + self.__file('FAILURE') + traceback.print_exc() + + diff --git a/emtools/metadata/__init__.py b/emtools/metadata/__init__.py index 0445682..ece86c5 100644 --- a/emtools/metadata/__init__.py +++ b/emtools/metadata/__init__.py @@ -17,9 +17,10 @@ from .table import Column, ColumnList, Table from .starfile import StarFile, StarMonitor from .epu import EPU -from .misc import Bins, TsBins, DataFiles, MovieFiles +from .misc import Bins, TsBins, DataFiles, MovieFiles, Mdoc, TextFile from .sqlite import SqliteFile __all__ = ["Column", "ColumnList", "Table", "StarFile", "StarMonitor", "EPU", - "Bins", "TsBins", "SqliteFile", "DataFiles", "MovieFiles"] + "Bins", "TsBins", "SqliteFile", "DataFiles", "MovieFiles", + "Mdoc", "TextFile"] diff --git a/emtools/metadata/misc.py b/emtools/metadata/misc.py index e570fc1..85286bd 100644 --- a/emtools/metadata/misc.py +++ b/emtools/metadata/misc.py @@ -248,3 +248,51 @@ def total_movies(self): def print(self, sort=None): DataFiles.print(self, sort=sort) self.counters[1].print('movie') + + +class Mdoc(dict): + """ Helper class to manipulate IMOD .mdoc files. """ + + @staticmethod + def parse(mdocFn): + mdoc = Mdoc() + mdoc['global'] = section = {} + + with open(mdocFn, 'r') as f: + for line in f: + line = line.strip() + + if not line or line.startswith(';'): # Empty or comment lines + continue + + if line.startswith('['): # Section header + name = line[1:-1] + mdoc[name] = section = {} + else: # Key-value pair + key, value = line.split('=', 1) + section[key.strip()] = value.strip() + + return mdoc + + @property + def zvalues(self): + return [(k, v) for k, v in self.items() if k.startswith('ZValue')] + + def write(self, path): + with open(path, 'w') as f: + for key, section in self.items(): + if key != 'global': + f.write(f"\n[{key}]\n") + for k, v in section.items(): + f.write(f"{k} = {v}\n") + + +class TextFile: + @staticmethod + def stripLines(fn, **kwargs): + with open(fn) as f: + for line in f: + line = line.strip() + if line and not line.startswith('#'): + yield line + diff --git a/emtools/metadata/starfile.py b/emtools/metadata/starfile.py index baeb930..a2f2a96 100644 --- a/emtools/metadata/starfile.py +++ b/emtools/metadata/starfile.py @@ -310,6 +310,10 @@ def close(self): self._file.close() self._file = None + def flush(self): + if getattr(self, '_file', None): + self._file.flush() + # ---------------------- Writer functions -------------------------------- def writeLine(self, line): """ Write a line to the opened file. """ @@ -321,9 +325,11 @@ def _writeTableName(self, tableName): def writeSingleRow(self, tableName, row): """ Write a Row as a single row Table of label/value pairs. """ self._writeTableName(tableName) - m = max([len(c) for c in row._fields]) + 5 + d = row if isinstance(row, dict) else row._asdict() + m = max(len(c) for c in d) + 5 format = "_{:<%d} {:>10}\n" % m - for col, value in row._asdict().items(): + d = row if isinstance(row, dict) else row._asdict() + for col, value in d.items(): self._file.write(format.format(col, _escapeStrValue(value))) self._file.write('\n\n') diff --git a/emtools/scripts/emt-scipion-otf.py b/emtools/scripts/emt-scipion-otf.py index 6155cdf..b947d39 100755 --- a/emtools/scripts/emt-scipion-otf.py +++ b/emtools/scripts/emt-scipion-otf.py @@ -25,8 +25,9 @@ import datetime as dt import re -from emtools.utils import Process, Color, System, Pipeline +from emtools.utils import Process, Color, System from emtools.metadata import EPU, SqliteFile, StarFile, Table +from emtools.jobs import Pipeline import pyworkflow as pw from pyworkflow.project import Project @@ -476,7 +477,7 @@ def print_protocol(workingDir, protId): if protId == 'all': for prot in project.getRuns(iterate=True): clsName = prot.getClassName() - print(f"- {prot.getObjId():>8} {prot.getStatus():<10} {clsName}") + print(f"- {prot.getObjId():>8} {prot.getStatus():<10} {clsName:<40} {prot.getRunName()}") else: prot = project.getProtocol(int(protId)) if prot is None: