diff --git a/src/toil/batchSystems/singleMachine.py b/src/toil/batchSystems/singleMachine.py index 082f57d716..b2d92759b9 100644 --- a/src/toil/batchSystems/singleMachine.py +++ b/src/toil/batchSystems/singleMachine.py @@ -405,8 +405,14 @@ def _runDebugJob(self, jobCommand, jobID, environment): # We can actually run in this thread jobName, jobStoreLocator, jobStoreID = jobCommand.split()[1:4] # Parse command jobStore = Toil.resumeJobStore(jobStoreLocator) - toil_worker.workerScript(jobStore, jobStore.config, jobName, jobStoreID, None, - redirectOutputToLogFile=not self.debugWorker) # Call the worker + toil_worker.workerScript( + jobStore, + jobStore.config, + jobName, + jobStoreID, + None, + redirectOutputToLogFile=not self.debugWorker, + ) # Call the worker else: # Run synchronously. If starting or running the command fails, let the exception stop us. subprocess.check_call(jobCommand, diff --git a/src/toil/leader.py b/src/toil/leader.py index e236e49233..dac198ebca 100644 --- a/src/toil/leader.py +++ b/src/toil/leader.py @@ -664,8 +664,8 @@ def innerLoop(self): # Consistency check the toil state assert self.toilState.updatedJobs == {} - #assert self.toilState.successorCounts == {} - #assert self.toilState.successorJobStoreIDToPredecessorJobs == {} + # assert self.toilState.successorCounts == {} + # assert self.toilState.successorJobStoreIDToPredecessorJobs == {} assert self.toilState.serviceJobStoreIDToPredecessorJob == {} assert self.toilState.servicesIssued == {} # assert self.toilState.jobsToBeScheduledWithMultiplePredecessors # These are not properly emptied yet @@ -749,17 +749,22 @@ def issueJob(self, jobNode): for context in self.batchSystem.getWorkerContexts(): # For each context manager hook the batch system wants to run in # the worker, serialize and send it. - workerCommand.append('--context') - workerCommand.append(base64.b64encode(pickle.dumps(context)).decode('utf-8')) + workerCommand.append("--context") + workerCommand.append( + base64.b64encode(pickle.dumps(context)).decode("utf-8") + ) # add the toilState as a pickle - workerCommand.append('--toilState') - workerCommand.append(base64.b64encode(pickle.dumps(self.toilState)).decode('utf-8')) - - jobNode.command = ' '.join(workerCommand) + workerCommand.append("--toilState") + workerCommand.append( + base64.b64encode(pickle.dumps(self.toilState)).decode("utf-8") + ) - omp_threads = os.environ.get('OMP_NUM_THREADS') \ - or str(max(1, int(jobNode.cores))) # make sure OMP_NUM_THREADS is a positive integer + jobNode.command = " ".join(workerCommand) + + omp_threads = os.environ.get("OMP_NUM_THREADS") or str( + max(1, int(jobNode.cores)) + ) # make sure OMP_NUM_THREADS is a positive integer job_environment = { # Set the number of cores used by OpenMP applications diff --git a/src/toil/test/cwl/cwlTest.py b/src/toil/test/cwl/cwlTest.py index 5be716bdf5..d76804da88 100644 --- a/src/toil/test/cwl/cwlTest.py +++ b/src/toil/test/cwl/cwlTest.py @@ -814,11 +814,11 @@ def test_download_structure(self) -> None: class CWLToilOptimizeTests(ToilTest): def setUp(self): """Runs anew before each test to create farm fresh temp dirs.""" - self.outDir = f'/tmp/toil-cwl-test-{str(uuid.uuid4())}' + self.outDir = f"/tmp/toil-cwl-test-{str(uuid.uuid4())}" os.makedirs(self.outDir) self.rootDir = self._projectRootPath() - self.jobDir = os.path.join(self.outDir, 'jobStore') - self.statDir = os.path.join(self.jobDir, 'stats') + self.jobDir = os.path.join(self.outDir, "jobStore") + self.statDir = os.path.join(self.jobDir, "stats") def tearDown(self): """Clean up outputs.""" @@ -828,36 +828,53 @@ def tearDown(self): def _tester(self, cwlfile, jobfile, expect, main_args=[]): from toil.cwl import cwltoil + st = StringIO() main_args = main_args[:] - main_args.extend(['--logDebug','--stats','--outdir', self.outDir, '--jobStore', self.jobDir, - os.path.join(self.rootDir, cwlfile), os.path.join(self.rootDir, jobfile)]) + main_args.extend( + [ + "--logDebug", + "--stats", + "--outdir", + self.outDir, + "--jobStore", + self.jobDir, + os.path.join(self.rootDir, cwlfile), + os.path.join(self.rootDir, jobfile), + ] + ) cwltoil.main(main_args, stdout=st) out = self._extract_job_lists() self.assertEqual(out, expect) def _match_extract_string(self, stringin): import re - search_pattern = re.compile('^.* (\w*) kind-CWLJob/instance-.*$') + + search_pattern = re.compile("^.* (\w*) kind-CWLJob/instance-.*$") if search_pattern.match(stringin): - return(search_pattern.sub(r'\1',stringin)) + return search_pattern.sub(r"\1", stringin) else: - return(None) + return None def _extract_job_lists(self): worker_list = [] for filename in os.listdir(self.statDir): - with open(os.path.join(self.statDir,filename)) as f: + with open(os.path.join(self.statDir, filename)) as f: test_json = json.load(f) - if 'workers' in test_json.keys() and len(test_json['jobs']) > 0: - job_list = [self._match_extract_string(x) for x in test_json['logs']['names']] + if "workers" in test_json.keys() and len(test_json["jobs"]) > 0: + job_list = [ + self._match_extract_string(x) + for x in test_json["logs"]["names"] + ] if not all(x == None for x in job_list): worker_list.append(job_list) worker_list.sort() - return(worker_list) + return worker_list def test_biobb_fail(self): - self._tester('src/toil/test/cwl/md_list_reduced.cwl', - 'src/toil/test/cwl/md_list_reduced.json', - [['genion', 'grompp', 'pdb2gmx', 'editconf', 'solvate']], - main_args=[]) + self._tester( + "src/toil/test/cwl/md_list_reduced.cwl", + "src/toil/test/cwl/md_list_reduced.json", + [["genion", "grompp", "pdb2gmx", "editconf", "solvate"]], + main_args=[], + ) diff --git a/src/toil/utils/toilDebugJob.py b/src/toil/utils/toilDebugJob.py index e42cd3baf1..a96de61730 100644 --- a/src/toil/utils/toilDebugJob.py +++ b/src/toil/utils/toilDebugJob.py @@ -46,5 +46,5 @@ def main() -> None: jobID = options.jobID[0] logger.debug(f"Running the following job locally: {jobID}") - workerScript(jobStore, config, jobID, jobID, redirectOutputToLogFile=False) + workerScript(jobStore, config, jobID, jobID, None, redirectOutputToLogFile=False) logger.debug(f"Finished running: {jobID}") diff --git a/src/toil/worker.py b/src/toil/worker.py index 9ee3bc6da5..426dd7a4b9 100644 --- a/src/toil/worker.py +++ b/src/toil/worker.py @@ -56,35 +56,52 @@ logger = logging.getLogger(__name__) -def checkSuccessorReadyToRunMultiplePredecessors(successor, predecessor, jobStore, toilState): +def checkSuccessorReadyToRunMultiplePredecessors( + successor: JobDescription, + predecessor: JobDescription, + jobStore: AbstractJobStore, + toilState: ToilState, +) -> bool: """ Handle the special cases of checking if a successor job is ready to run when there are multiple predecessors. - :param toil.job.JobDescription successor: The successor which has failed. - :param toil.job.JobDescription predecessor: The job which the successor comes after. + :param successor: The successor which has failed. + :param predecessor: The job which the successor comes after. """ # See implementation note at the top of this file for discussion of multiple predecessors - logger.debug("Successor job: %s of job: %s has multiple " - "predecessors", successor, predecessor) - logger.debug("Already finished predecessors are: %s", successor.predecessorsFinished) + logger.debug( + "Successor job: %s of job: %s has multiple " "predecessors", + successor, + predecessor, + ) + logger.debug( + "Already finished predecessors are: %s", successor.predecessorsFinished + ) # Get the successor JobDescription, which is cached if successor.jobStoreID not in toilState.jobsToBeScheduledWithMultiplePredecessors: # TODO: We're loading from the job store in an ad-hoc way! loaded = jobStore.load(successor.jobStoreID) - toilState.jobsToBeScheduledWithMultiplePredecessors[successor.jobStoreID] = loaded + toilState.jobsToBeScheduledWithMultiplePredecessors[ + successor.jobStoreID + ] = loaded # TODO: we're clobbering a JobDescription we're passing around by value. - successor = toilState.jobsToBeScheduledWithMultiplePredecessors[successor.jobStoreID] + successor = toilState.jobsToBeScheduledWithMultiplePredecessors[ + successor.jobStoreID + ] - logger.debug("Already finished predecessors are (2) : %s", successor.predecessorsFinished) + logger.debug( + "Already finished predecessors are (2) : %s", successor.predecessorsFinished + ) # Add the predecessor as a finished predecessor to the successor successor.predecessorsFinished.add(predecessor.jobStoreID) - - logger.debug("Already finished predecessors are (3) : %s", successor.predecessorsFinished) + logger.debug( + "Already finished predecessors are (3) : %s", successor.predecessorsFinished + ) # If the successor job's predecessors have all not all completed then # ignore the successor as is not yet ready to run @@ -97,8 +114,12 @@ def checkSuccessorReadyToRunMultiplePredecessors(successor, predecessor, jobStor return True - -def nextChainable(predecessor: JobDescription, jobStore: AbstractJobStore, toilState, config: Config) -> Optional[JobDescription]: +def nextChainable( + predecessor: JobDescription, + jobStore: AbstractJobStore, + toilState: ToilState, + config: Config, +) -> Optional[JobDescription]: """ Returns the next chainable job's JobDescription after the given predecessor JobDescription, if one exists, or None if the chain must terminate. @@ -106,7 +127,7 @@ def nextChainable(predecessor: JobDescription, jobStore: AbstractJobStore, toilS :param predecessor: The job to chain from :param jobStore: The JobStore to fetch JobDescriptions from. :param config: The configuration for the current run. - :param toil.toilState.ToilState toilState: A local toilState, for providing a mutatable stack + :param toil.toilState.ToilState toilState: A local toilState, for providing a mutatable stack """ #If no more jobs to run or services not finished, quit if len(predecessor.stack) == 0 or len(predecessor.services) > 0 or (isinstance(predecessor, CheckpointJobDescription) and predecessor.checkpoint != None): @@ -114,16 +135,22 @@ def nextChainable(predecessor: JobDescription, jobStore: AbstractJobStore, toilS len(predecessor.stack), len(predecessor.services), (isinstance(predecessor, CheckpointJobDescription) and predecessor.checkpoint != None)) return None -# logger.debug("Length of stack: %s",len(predecessor.stack)) -# logger.debug("Number of : %s",len(predecessor.stack)) - if len(predecessor.stack) > 1 and len(predecessor.stack[-1]) > 0 and len(predecessor.stack[-2]) > 0: + # logger.debug("Length of stack: %s",len(predecessor.stack)) + # logger.debug("Number of : %s",len(predecessor.stack)) + if ( + len(predecessor.stack) > 1 + and len(predecessor.stack[-1]) > 0 + and len(predecessor.stack[-2]) > 0 + ): # TODO: Without a real stack list we can freely mutate, we can't chain # to a child, which may branch, and then go back and do the follow-ons # of the original job. # TODO: Go back to a free-form stack list and require some kind of # stack build phase? - #logger.debug("Job has both children and follow-ons - let's see if this breaks") - logger.debug("Stopping running chain of jobs because job has both children and follow-ons") + # logger.debug("Job has both children and follow-ons - let's see if this breaks") + logger.debug( + "Stopping running chain of jobs because job has both children and follow-ons" + ) return None #Get the next set of jobs to run @@ -135,8 +162,10 @@ def nextChainable(predecessor: JobDescription, jobStore: AbstractJobStore, toilS #If there are 2 or more jobs to run in parallel we quit if len(jobs) >= 2: - logger.debug("No more jobs can run in series by this worker," - " it's got %i children", len(jobs)) + logger.debug( + "No more jobs can run in series by this worker," " it's got %i children", + len(jobs), + ) return None # Grab the only job that should be there. @@ -144,8 +173,6 @@ def nextChainable(predecessor: JobDescription, jobStore: AbstractJobStore, toilS # Load the successor JobDescription successor = jobStore.load(successorID) - - #testresult = checkSuccessorReadyToRunMultiplePredecessors(successor, predecessor, jobStore, toilState) #We check the requirements of the successor to see if we can run it #within the current worker @@ -161,14 +188,19 @@ def nextChainable(predecessor: JobDescription, jobStore: AbstractJobStore, toilS if successor.preemptable != predecessor.preemptable: logger.debug("Preemptability is different for the next job, returning to the leader") return None -# if (successor.predecessorNumber - len(successor.predecessorsFinished)) > 1: - if not checkSuccessorReadyToRunMultiplePredecessors(successor, predecessor, jobStore, toilState): - logger.debug("The next job has %i predecessors that are not yet " - "recorded as finished; we must return to the leader.", successor.predecessorNumber) + # if (successor.predecessorNumber - len(successor.predecessorsFinished)) > 1: + if not checkSuccessorReadyToRunMultiplePredecessors( + successor, predecessor, jobStore, toilState + ): + logger.debug( + "The next job has %i predecessors that are not yet " + "recorded as finished; we must return to the leader.", + successor.predecessorNumber, + ) logger.debug(successor.predecessorsFinished) return None else: - logger.debug('all predecessors are finished, we can chain to the successor') + logger.debug("all predecessors are finished, we can chain to the successor") if len(successor.services) > 0: logger.debug("The next job requires services that will not yet be started; we must return to the leader.") @@ -182,7 +214,15 @@ def nextChainable(predecessor: JobDescription, jobStore: AbstractJobStore, toilS # Made it through! This job is chainable. return successor -def workerScript(jobStore: AbstractJobStore, config: Config, jobName: str, jobStoreID: str, parentToilState, redirectOutputToLogFile: bool = True) -> int: + +def workerScript( + jobStore: AbstractJobStore, + config: Config, + jobName: str, + jobStoreID: str, + parentToilState: Optional[str], + redirectOutputToLogFile: bool = True, +) -> int: """ Worker process script, runs a job. @@ -191,9 +231,9 @@ def workerScript(jobStore: AbstractJobStore, config: Config, jobName: str, jobSt :param jobName: The "job name" (a user friendly name) of the job to be run :param jobStoreID: The job store ID of the job to be run - :param str parentToilState: Pickle containing the parent toilState + :param parentToilState: Pickle containing the parent toilState - :return int: 1 if a job failed, or 0 if all jobs succeeded + :return 1 if a job failed, or 0 if all jobs succeeded """ configure_root_logger() @@ -357,7 +397,7 @@ def workerScript(jobStore: AbstractJobStore, config: Config, jobName: str, jobSt if parentToilState: logger.debug(parentToilState) else: - logger.debug('parentToilState empty') + logger.debug("parentToilState empty") ########################################## #Connect to the deferred function system @@ -424,16 +464,14 @@ def workerScript(jobStore: AbstractJobStore, config: Config, jobName: str, jobSt startTime = time.time() - - logger.debug(jobStore) - # Get a snap shot of the current state of the jobs in the jobStore + logger.debug(jobStore) + # Get a snap shot of the current state of the jobs in the jobStore # - creating a local version of the leader's ToilState if parentToilState: - toilState = pickle.loads(base64.b64decode(parentToilState.encode('utf-8'))) + toilState = pickle.loads(base64.b64decode(parentToilState.encode("utf-8"))) else: toilState = ToilState(jobStore, jobDesc, jobCache=None) - - + while True: ########################################## #Run the job body, if there is one @@ -725,9 +763,12 @@ def parse_args(args: List[str]) -> argparse.Namespace: that the worker can then run before/after the job on the batch system's behalf.""") - parser.add_argument("--toilState", default=None, type=str, - help="""Pickled, base64-encoded copy of the Toul leader's toilState.""") - + parser.add_argument( + "--toilState", + default=None, + type=str, + help="""Pickled, base64-encoded copy of the Toul leader's toilState.""", + ) return parser.parse_args(args) @@ -779,10 +820,11 @@ def main(argv: Optional[List[str]] = None) -> None: jobStore = Toil.resumeJobStore(options.jobStoreLocator) config = jobStore.config - with in_contexts(options.context): # Call the worker - exit_code = workerScript(jobStore, config, options.jobName, options.jobStoreID, options.toilState) + exit_code = workerScript( + jobStore, config, options.jobName, options.jobStoreID, options.toilState + ) # Exit with its return value sys.exit(exit_code)