diff --git a/src/lpcjobqueue/cluster.py b/src/lpcjobqueue/cluster.py index 8318332..7072e92 100644 --- a/src/lpcjobqueue/cluster.py +++ b/src/lpcjobqueue/cluster.py @@ -54,6 +54,7 @@ def __init__( **base_class_kwargs, ): image = self.container_prefix + image + image = os.path.realpath(image) if ship_env: base_class_kwargs["python"] = f"{self.env_name}/bin/python" base_class_kwargs.setdefault( @@ -93,20 +94,10 @@ async def start(self): job = htcondor.Submit(job) def sub(): - try: - classads = [] - with SCHEDD().transaction() as txn: - cluster_id = job.queue(txn, ad_results=classads) - - logger.debug(f"ClassAds for job {cluster_id}: {classads}") - SCHEDD().spool(classads) - return cluster_id - except htcondor.HTCondorInternalError as ex: - logger.error(str(ex)) - return None - except htcondor.HTCondorIOError as ex: - logger.error(str(ex)) - return None + result = SCHEDD().submit(job, spool=True) + cluster_id = result.cluster() + SCHEDD().spool(list(job.jobs(clusterid=cluster_id))) + return cluster_id self.job_id = await asyncio.get_event_loop().run_in_executor(SCHEDD_POOL, sub) if self.job_id: @@ -147,7 +138,7 @@ def check_gone(): SCHEDD_POOL, check_gone ) except RuntimeError as ex: - if str(ex) == "cannot schedule new futures after interpreter shutdown": + if str(ex) == "cannot schedule new futures after shutdown": logger.info(f"Thread pool lost while checking worker {self.name} job {self.job_id}") # We're not going to be able to do anything async now self.status = Status.undefined