Skip to content

Commit

Permalink
types and cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
mr-c committed Aug 25, 2021
1 parent c932df0 commit fd59405
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 72 deletions.
10 changes: 8 additions & 2 deletions src/toil/batchSystems/singleMachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,8 +405,14 @@ def _runDebugJob(self, jobCommand, jobID, environment):
# We can actually run in this thread
jobName, jobStoreLocator, jobStoreID = jobCommand.split()[1:4] # Parse command
jobStore = Toil.resumeJobStore(jobStoreLocator)
toil_worker.workerScript(jobStore, jobStore.config, jobName, jobStoreID, None,
redirectOutputToLogFile=not self.debugWorker) # Call the worker
toil_worker.workerScript(
jobStore,
jobStore.config,
jobName,
jobStoreID,
None,
redirectOutputToLogFile=not self.debugWorker,
) # Call the worker
else:
# Run synchronously. If starting or running the command fails, let the exception stop us.
subprocess.check_call(jobCommand,
Expand Down
25 changes: 15 additions & 10 deletions src/toil/leader.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,8 +664,8 @@ def innerLoop(self):

# Consistency check the toil state
assert self.toilState.updatedJobs == {}
#assert self.toilState.successorCounts == {}
#assert self.toilState.successorJobStoreIDToPredecessorJobs == {}
# assert self.toilState.successorCounts == {}
# assert self.toilState.successorJobStoreIDToPredecessorJobs == {}
assert self.toilState.serviceJobStoreIDToPredecessorJob == {}
assert self.toilState.servicesIssued == {}
# assert self.toilState.jobsToBeScheduledWithMultiplePredecessors # These are not properly emptied yet
Expand Down Expand Up @@ -749,17 +749,22 @@ def issueJob(self, jobNode):
for context in self.batchSystem.getWorkerContexts():
# For each context manager hook the batch system wants to run in
# the worker, serialize and send it.
workerCommand.append('--context')
workerCommand.append(base64.b64encode(pickle.dumps(context)).decode('utf-8'))
workerCommand.append("--context")
workerCommand.append(
base64.b64encode(pickle.dumps(context)).decode("utf-8")
)

# add the toilState as a pickle
workerCommand.append('--toilState')
workerCommand.append(base64.b64encode(pickle.dumps(self.toilState)).decode('utf-8'))

jobNode.command = ' '.join(workerCommand)
workerCommand.append("--toilState")
workerCommand.append(
base64.b64encode(pickle.dumps(self.toilState)).decode("utf-8")
)

omp_threads = os.environ.get('OMP_NUM_THREADS') \
or str(max(1, int(jobNode.cores))) # make sure OMP_NUM_THREADS is a positive integer
jobNode.command = " ".join(workerCommand)

omp_threads = os.environ.get("OMP_NUM_THREADS") or str(
max(1, int(jobNode.cores))
) # make sure OMP_NUM_THREADS is a positive integer

job_environment = {
# Set the number of cores used by OpenMP applications
Expand Down
49 changes: 33 additions & 16 deletions src/toil/test/cwl/cwlTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -814,11 +814,11 @@ def test_download_structure(self) -> None:
class CWLToilOptimizeTests(ToilTest):
def setUp(self):
"""Runs anew before each test to create farm fresh temp dirs."""
self.outDir = f'/tmp/toil-cwl-test-{str(uuid.uuid4())}'
self.outDir = f"/tmp/toil-cwl-test-{str(uuid.uuid4())}"
os.makedirs(self.outDir)
self.rootDir = self._projectRootPath()
self.jobDir = os.path.join(self.outDir, 'jobStore')
self.statDir = os.path.join(self.jobDir, 'stats')
self.jobDir = os.path.join(self.outDir, "jobStore")
self.statDir = os.path.join(self.jobDir, "stats")

def tearDown(self):
"""Clean up outputs."""
Expand All @@ -828,36 +828,53 @@ def tearDown(self):

def _tester(self, cwlfile, jobfile, expect, main_args=[]):
from toil.cwl import cwltoil

st = StringIO()
main_args = main_args[:]
main_args.extend(['--logDebug','--stats','--outdir', self.outDir, '--jobStore', self.jobDir,
os.path.join(self.rootDir, cwlfile), os.path.join(self.rootDir, jobfile)])
main_args.extend(
[
"--logDebug",
"--stats",
"--outdir",
self.outDir,
"--jobStore",
self.jobDir,
os.path.join(self.rootDir, cwlfile),
os.path.join(self.rootDir, jobfile),
]
)
cwltoil.main(main_args, stdout=st)
out = self._extract_job_lists()
self.assertEqual(out, expect)

def _match_extract_string(self, stringin):
import re
search_pattern = re.compile('^.* (\w*) kind-CWLJob/instance-.*$')

search_pattern = re.compile("^.* (\w*) kind-CWLJob/instance-.*$")
if search_pattern.match(stringin):
return(search_pattern.sub(r'\1',stringin))
return search_pattern.sub(r"\1", stringin)
else:
return(None)
return None

def _extract_job_lists(self):
worker_list = []
for filename in os.listdir(self.statDir):
with open(os.path.join(self.statDir,filename)) as f:
with open(os.path.join(self.statDir, filename)) as f:
test_json = json.load(f)
if 'workers' in test_json.keys() and len(test_json['jobs']) > 0:
job_list = [self._match_extract_string(x) for x in test_json['logs']['names']]
if "workers" in test_json.keys() and len(test_json["jobs"]) > 0:
job_list = [
self._match_extract_string(x)
for x in test_json["logs"]["names"]
]
if not all(x == None for x in job_list):
worker_list.append(job_list)
worker_list.sort()
return(worker_list)
return worker_list

def test_biobb_fail(self):
self._tester('src/toil/test/cwl/md_list_reduced.cwl',
'src/toil/test/cwl/md_list_reduced.json',
[['genion', 'grompp', 'pdb2gmx', 'editconf', 'solvate']],
main_args=[])
self._tester(
"src/toil/test/cwl/md_list_reduced.cwl",
"src/toil/test/cwl/md_list_reduced.json",
[["genion", "grompp", "pdb2gmx", "editconf", "solvate"]],
main_args=[],
)
2 changes: 1 addition & 1 deletion src/toil/utils/toilDebugJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,5 @@ def main() -> None:

jobID = options.jobID[0]
logger.debug(f"Running the following job locally: {jobID}")
workerScript(jobStore, config, jobID, jobID, redirectOutputToLogFile=False)
workerScript(jobStore, config, jobID, jobID, None, redirectOutputToLogFile=False)
logger.debug(f"Finished running: {jobID}")
128 changes: 85 additions & 43 deletions src/toil/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,35 +56,52 @@
logger = logging.getLogger(__name__)


def checkSuccessorReadyToRunMultiplePredecessors(successor, predecessor, jobStore, toilState):
def checkSuccessorReadyToRunMultiplePredecessors(
successor: JobDescription,
predecessor: JobDescription,
jobStore: AbstractJobStore,
toilState: ToilState,
) -> bool:
"""
Handle the special cases of checking if a successor job is
ready to run when there are multiple predecessors.
:param toil.job.JobDescription successor: The successor which has failed.
:param toil.job.JobDescription predecessor: The job which the successor comes after.
:param successor: The successor which has failed.
:param predecessor: The job which the successor comes after.
"""
# See implementation note at the top of this file for discussion of multiple predecessors
logger.debug("Successor job: %s of job: %s has multiple "
"predecessors", successor, predecessor)
logger.debug("Already finished predecessors are: %s", successor.predecessorsFinished)
logger.debug(
"Successor job: %s of job: %s has multiple " "predecessors",
successor,
predecessor,
)
logger.debug(
"Already finished predecessors are: %s", successor.predecessorsFinished
)

# Get the successor JobDescription, which is cached
if successor.jobStoreID not in toilState.jobsToBeScheduledWithMultiplePredecessors:
# TODO: We're loading from the job store in an ad-hoc way!
loaded = jobStore.load(successor.jobStoreID)
toilState.jobsToBeScheduledWithMultiplePredecessors[successor.jobStoreID] = loaded
toilState.jobsToBeScheduledWithMultiplePredecessors[
successor.jobStoreID
] = loaded
# TODO: we're clobbering a JobDescription we're passing around by value.
successor = toilState.jobsToBeScheduledWithMultiplePredecessors[successor.jobStoreID]
successor = toilState.jobsToBeScheduledWithMultiplePredecessors[
successor.jobStoreID
]

logger.debug("Already finished predecessors are (2) : %s", successor.predecessorsFinished)
logger.debug(
"Already finished predecessors are (2) : %s", successor.predecessorsFinished
)

# Add the predecessor as a finished predecessor to the successor
successor.predecessorsFinished.add(predecessor.jobStoreID)


logger.debug("Already finished predecessors are (3) : %s", successor.predecessorsFinished)
logger.debug(
"Already finished predecessors are (3) : %s", successor.predecessorsFinished
)

# If the successor job's predecessors have all not all completed then
# ignore the successor as is not yet ready to run
Expand All @@ -97,33 +114,43 @@ def checkSuccessorReadyToRunMultiplePredecessors(successor, predecessor, jobStor
return True



def nextChainable(predecessor: JobDescription, jobStore: AbstractJobStore, toilState, config: Config) -> Optional[JobDescription]:
def nextChainable(
predecessor: JobDescription,
jobStore: AbstractJobStore,
toilState: ToilState,
config: Config,
) -> Optional[JobDescription]:
"""
Returns the next chainable job's JobDescription after the given predecessor
JobDescription, if one exists, or None if the chain must terminate.
:param predecessor: The job to chain from
:param jobStore: The JobStore to fetch JobDescriptions from.
:param config: The configuration for the current run.
:param toil.toilState.ToilState toilState: A local toilState, for providing a mutatable stack
:param toil.toilState.ToilState toilState: A local toilState, for providing a mutatable stack
"""
#If no more jobs to run or services not finished, quit
if len(predecessor.stack) == 0 or len(predecessor.services) > 0 or (isinstance(predecessor, CheckpointJobDescription) and predecessor.checkpoint != None):
logger.debug("Stopping running chain of jobs: length of stack: %s, services: %s, checkpoint: %s",
len(predecessor.stack), len(predecessor.services), (isinstance(predecessor, CheckpointJobDescription) and predecessor.checkpoint != None))
return None

# logger.debug("Length of stack: %s",len(predecessor.stack))
# logger.debug("Number of : %s",len(predecessor.stack))
if len(predecessor.stack) > 1 and len(predecessor.stack[-1]) > 0 and len(predecessor.stack[-2]) > 0:
# logger.debug("Length of stack: %s",len(predecessor.stack))
# logger.debug("Number of : %s",len(predecessor.stack))
if (
len(predecessor.stack) > 1
and len(predecessor.stack[-1]) > 0
and len(predecessor.stack[-2]) > 0
):
# TODO: Without a real stack list we can freely mutate, we can't chain
# to a child, which may branch, and then go back and do the follow-ons
# of the original job.
# TODO: Go back to a free-form stack list and require some kind of
# stack build phase?
#logger.debug("Job has both children and follow-ons - let's see if this breaks")
logger.debug("Stopping running chain of jobs because job has both children and follow-ons")
# logger.debug("Job has both children and follow-ons - let's see if this breaks")
logger.debug(
"Stopping running chain of jobs because job has both children and follow-ons"
)
return None

#Get the next set of jobs to run
Expand All @@ -135,17 +162,17 @@ def nextChainable(predecessor: JobDescription, jobStore: AbstractJobStore, toilS

#If there are 2 or more jobs to run in parallel we quit
if len(jobs) >= 2:
logger.debug("No more jobs can run in series by this worker,"
" it's got %i children", len(jobs))
logger.debug(
"No more jobs can run in series by this worker," " it's got %i children",
len(jobs),
)
return None

# Grab the only job that should be there.
successorID = next(iter(jobs))

# Load the successor JobDescription
successor = jobStore.load(successorID)

#testresult = checkSuccessorReadyToRunMultiplePredecessors(successor, predecessor, jobStore, toilState)

#We check the requirements of the successor to see if we can run it
#within the current worker
Expand All @@ -161,14 +188,19 @@ def nextChainable(predecessor: JobDescription, jobStore: AbstractJobStore, toilS
if successor.preemptable != predecessor.preemptable:
logger.debug("Preemptability is different for the next job, returning to the leader")
return None
# if (successor.predecessorNumber - len(successor.predecessorsFinished)) > 1:
if not checkSuccessorReadyToRunMultiplePredecessors(successor, predecessor, jobStore, toilState):
logger.debug("The next job has %i predecessors that are not yet "
"recorded as finished; we must return to the leader.", successor.predecessorNumber)
# if (successor.predecessorNumber - len(successor.predecessorsFinished)) > 1:
if not checkSuccessorReadyToRunMultiplePredecessors(
successor, predecessor, jobStore, toilState
):
logger.debug(
"The next job has %i predecessors that are not yet "
"recorded as finished; we must return to the leader.",
successor.predecessorNumber,
)
logger.debug(successor.predecessorsFinished)
return None
else:
logger.debug('all predecessors are finished, we can chain to the successor')
logger.debug("all predecessors are finished, we can chain to the successor")

if len(successor.services) > 0:
logger.debug("The next job requires services that will not yet be started; we must return to the leader.")
Expand All @@ -182,7 +214,15 @@ def nextChainable(predecessor: JobDescription, jobStore: AbstractJobStore, toilS
# Made it through! This job is chainable.
return successor

def workerScript(jobStore: AbstractJobStore, config: Config, jobName: str, jobStoreID: str, parentToilState, redirectOutputToLogFile: bool = True) -> int:

def workerScript(
jobStore: AbstractJobStore,
config: Config,
jobName: str,
jobStoreID: str,
parentToilState: Optional[str],
redirectOutputToLogFile: bool = True,
) -> int:
"""
Worker process script, runs a job.
Expand All @@ -191,9 +231,9 @@ def workerScript(jobStore: AbstractJobStore, config: Config, jobName: str, jobSt
:param jobName: The "job name" (a user friendly name) of the job to be run
:param jobStoreID: The job store ID of the job to be run
:param str parentToilState: Pickle containing the parent toilState
:param parentToilState: Pickle containing the parent toilState
:return int: 1 if a job failed, or 0 if all jobs succeeded
:return 1 if a job failed, or 0 if all jobs succeeded
"""

configure_root_logger()
Expand Down Expand Up @@ -357,7 +397,7 @@ def workerScript(jobStore: AbstractJobStore, config: Config, jobName: str, jobSt
if parentToilState:
logger.debug(parentToilState)
else:
logger.debug('parentToilState empty')
logger.debug("parentToilState empty")

##########################################
#Connect to the deferred function system
Expand Down Expand Up @@ -424,16 +464,14 @@ def workerScript(jobStore: AbstractJobStore, config: Config, jobName: str, jobSt

startTime = time.time()


logger.debug(jobStore)
# Get a snap shot of the current state of the jobs in the jobStore
logger.debug(jobStore)
# Get a snap shot of the current state of the jobs in the jobStore
# - creating a local version of the leader's ToilState
if parentToilState:
toilState = pickle.loads(base64.b64decode(parentToilState.encode('utf-8')))
toilState = pickle.loads(base64.b64decode(parentToilState.encode("utf-8")))
else:
toilState = ToilState(jobStore, jobDesc, jobCache=None)



while True:
##########################################
#Run the job body, if there is one
Expand Down Expand Up @@ -725,9 +763,12 @@ def parse_args(args: List[str]) -> argparse.Namespace:
that the worker can then run before/after the job on the batch
system's behalf.""")

parser.add_argument("--toilState", default=None, type=str,
help="""Pickled, base64-encoded copy of the Toul leader's toilState.""")

parser.add_argument(
"--toilState",
default=None,
type=str,
help="""Pickled, base64-encoded copy of the Toul leader's toilState.""",
)

return parser.parse_args(args)

Expand Down Expand Up @@ -779,10 +820,11 @@ def main(argv: Optional[List[str]] = None) -> None:
jobStore = Toil.resumeJobStore(options.jobStoreLocator)
config = jobStore.config


with in_contexts(options.context):
# Call the worker
exit_code = workerScript(jobStore, config, options.jobName, options.jobStoreID, options.toilState)
exit_code = workerScript(
jobStore, config, options.jobName, options.jobStoreID, options.toilState
)

# Exit with its return value
sys.exit(exit_code)

0 comments on commit fd59405

Please sign in to comment.