Skip to content

Commit

Permalink
Merge pull request #10 from 3dem/devel
Browse files Browse the repository at this point in the history
Release 0.1.2
  • Loading branch information
delarosatrevin authored Oct 18, 2024
2 parents 41a115b + 8e36749 commit 526ad33
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 13 deletions.
2 changes: 1 addition & 1 deletion emtools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@
# *
# **************************************************************************

__version__ = '0.1.1'
__version__ = '0.1.2'

4 changes: 2 additions & 2 deletions emtools/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# *
# **************************************************************************

from .pipeline import Pipeline
from .pipeline import Pipeline, ProcessingPipeline
from .batch_manager import BatchManager

__all__ = ["Pipeline", "BatchManager"]
__all__ = ["Pipeline", "BatchManager", "ProcessingPipeline"]
14 changes: 10 additions & 4 deletions emtools/jobs/batch_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ def _createBatchId(self):
uuidSuffix = str(uuid4()).split('-')[0]
return f"{nowPrefix}_{countStr}_{uuidSuffix}"

@staticmethod
def createBatchLink(batch_path, fn):
""" Create a symbolic link to fn inside the batch dir. """
baseName = os.path.basename(fn)
os.symlink(os.path.abspath(fn),
os.path.join(batch_path, baseName))

def _createBatch(self, items):
batch_id = self._createBatchId()
batch_path = os.path.join(self._workingPath, batch_id)
Expand All @@ -60,10 +67,8 @@ def _createBatch(self, items):
Process.system(f"mkdir '{batch_path}'")

for item in items:
fn = self._itemFileNameFunc(item)
baseName = os.path.basename(fn)
os.symlink(os.path.abspath(fn),
os.path.join(batch_path, baseName))
self.createBatchLink(batch_path, self._itemFileNameFunc(item))

self._batchCount += 1
return {
'items': items,
Expand All @@ -85,3 +90,4 @@ def generate(self):

if items:
yield self._createBatch(items)

72 changes: 72 additions & 0 deletions emtools/jobs/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@
# *
# **************************************************************************

import os
import sys
from collections import OrderedDict
import threading
import signal
import traceback


class Pipeline:
Expand Down Expand Up @@ -182,3 +186,71 @@ def _process(self):

self._print("Got task: None")


class ProcessingPipeline(Pipeline):
""" Subclass of Pipeline that is commonly used to run programs.
This class will define a workingDir (usually os.getcwd)
and an output dir where all output should be generated.
It will also add some helper functions to manipulate file
paths relative to the working dir.
"""
def __init__(self, workingDir, outputDir, **kwargs):
Pipeline.__init__(self, **kwargs)
self.workingDir = self.__validate(workingDir, 'working')
self.outputDir = self.__validate(outputDir, 'output')

def __validate(self, path, key):
if not path:
raise Exception(f'Invalid {key} directory: {path}')
if not os.path.exists(path):
raise Exception(f'Non-existing {key} directory: {path}')

return path

def get_arg(self, argDict, key, envKey, default=None):
""" Get an argument from the argDict or from the environment.
Args:
argDict: arguments dict from where to get the 'key' value
key: string key of the argument name in argDict
envKey: string key of the environment variable
default: default value if not found in argDict or environ
"""
return argDict.get(key, os.environ.get(envKey, default))

def join(self, *p):
return os.path.join(self.outputDir, *p)

def relpath(self, p):
return os.path.relpath(p, self.workingDir)

def prerun(self):
""" This method will be called before the run. """
pass

def postrun(self):
""" This method will be called after the run. """
pass

def __file(self, suffix):
with open(self.join(f'RELION_JOB_EXIT_{suffix}'), 'w'):
pass

def __abort(self, signum, frame):
self.__file('ABORTED')
sys.exit(0)

def run(self):
try:
signal.signal(signal.SIGINT, self.__abort)
signal.signal(signal.SIGTERM, self.__abort)
self.prerun()
Pipeline.run(self)
self.postrun()
self.__file('SUCCESS')
except Exception as e:
self.__file('FAILURE')
traceback.print_exc()


5 changes: 3 additions & 2 deletions emtools/metadata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
from .table import Column, ColumnList, Table
from .starfile import StarFile, StarMonitor
from .epu import EPU
from .misc import Bins, TsBins, DataFiles, MovieFiles
from .misc import Bins, TsBins, DataFiles, MovieFiles, Mdoc, TextFile
from .sqlite import SqliteFile


__all__ = ["Column", "ColumnList", "Table", "StarFile", "StarMonitor", "EPU",
"Bins", "TsBins", "SqliteFile", "DataFiles", "MovieFiles"]
"Bins", "TsBins", "SqliteFile", "DataFiles", "MovieFiles",
"Mdoc", "TextFile"]
48 changes: 48 additions & 0 deletions emtools/metadata/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,51 @@ def total_movies(self):
def print(self, sort=None):
DataFiles.print(self, sort=sort)
self.counters[1].print('movie')


class Mdoc(dict):
""" Helper class to manipulate IMOD .mdoc files. """

@staticmethod
def parse(mdocFn):
mdoc = Mdoc()
mdoc['global'] = section = {}

with open(mdocFn, 'r') as f:
for line in f:
line = line.strip()

if not line or line.startswith(';'): # Empty or comment lines
continue

if line.startswith('['): # Section header
name = line[1:-1]
mdoc[name] = section = {}
else: # Key-value pair
key, value = line.split('=', 1)
section[key.strip()] = value.strip()

return mdoc

@property
def zvalues(self):
return [(k, v) for k, v in self.items() if k.startswith('ZValue')]

def write(self, path):
with open(path, 'w') as f:
for key, section in self.items():
if key != 'global':
f.write(f"\n[{key}]\n")
for k, v in section.items():
f.write(f"{k} = {v}\n")


class TextFile:
@staticmethod
def stripLines(fn, **kwargs):
with open(fn) as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
yield line

10 changes: 8 additions & 2 deletions emtools/metadata/starfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,10 @@ def close(self):
self._file.close()
self._file = None

def flush(self):
if getattr(self, '_file', None):
self._file.flush()

# ---------------------- Writer functions --------------------------------
def writeLine(self, line):
""" Write a line to the opened file. """
Expand All @@ -321,9 +325,11 @@ def _writeTableName(self, tableName):
def writeSingleRow(self, tableName, row):
""" Write a Row as a single row Table of label/value pairs. """
self._writeTableName(tableName)
m = max([len(c) for c in row._fields]) + 5
d = row if isinstance(row, dict) else row._asdict()
m = max(len(c) for c in d) + 5
format = "_{:<%d} {:>10}\n" % m
for col, value in row._asdict().items():
d = row if isinstance(row, dict) else row._asdict()
for col, value in d.items():
self._file.write(format.format(col, _escapeStrValue(value)))
self._file.write('\n\n')

Expand Down
5 changes: 3 additions & 2 deletions emtools/scripts/emt-scipion-otf.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
import datetime as dt
import re

from emtools.utils import Process, Color, System, Pipeline
from emtools.utils import Process, Color, System
from emtools.metadata import EPU, SqliteFile, StarFile, Table
from emtools.jobs import Pipeline

import pyworkflow as pw
from pyworkflow.project import Project
Expand Down Expand Up @@ -476,7 +477,7 @@ def print_protocol(workingDir, protId):
if protId == 'all':
for prot in project.getRuns(iterate=True):
clsName = prot.getClassName()
print(f"- {prot.getObjId():>8} {prot.getStatus():<10} {clsName}")
print(f"- {prot.getObjId():>8} {prot.getStatus():<10} {clsName:<40} {prot.getRunName()}")
else:
prot = project.getProtocol(int(protId))
if prot is None:
Expand Down

0 comments on commit 526ad33

Please sign in to comment.