From 64a30bc6be01f1caba3661ac85b5a3b559ce3a4c Mon Sep 17 00:00:00 2001 From: Patricia Grubel Date: Wed, 22 Jan 2025 14:04:59 -0700 Subject: [PATCH] Change SchedulerRequirement to SlurmRequirement, add qos and reservation, fix dependencies upon SUBMIT_FAIL --- beeflow/common/config_driver.py | 4 ++++ beeflow/common/worker/slurm_worker.py | 24 ++++++++++++++++------- beeflow/task_manager/utils.py | 3 ++- beeflow/wf_manager/resources/wf_update.py | 2 +- 4 files changed, 24 insertions(+), 9 deletions(-) diff --git a/beeflow/common/config_driver.py b/beeflow/common/config_driver.py index 5735e3c0..46d8f386 100644 --- a/beeflow/common/config_driver.py +++ b/beeflow/common/config_driver.py @@ -300,6 +300,10 @@ def filepath_completion_input(*pargs, **kwargs): info='default account time limit (leave blank if none)') VALIDATOR.option('job', 'default_partition', validator=lambda val: val.strip(), info='default partition to run jobs on (leave blank if none)') +VALIDATOR.option('job', 'default_qos', validator=lambda val: val.strip(), + info='default qos to run jobs on (leave blank if none)') +VALIDATOR.option('job', 'default_reservation', validator=lambda val: val.strip(), + info='default reservation to run jobs on (leave blank if none)') def validate_chrun_opts(opts): diff --git a/beeflow/common/worker/slurm_worker.py b/beeflow/common/worker/slurm_worker.py index 83ab7c1e..22f35038 100644 --- a/beeflow/common/worker/slurm_worker.py +++ b/beeflow/common/worker/slurm_worker.py @@ -25,12 +25,15 @@ class BaseSlurmWorker(Worker): """Base slurm worker code.""" - def __init__(self, default_account='', default_time_limit='', default_partition='', **kwargs): + def __init__(self, default_account='', default_time_limit='', default_partition='', + default_qos='', default_reservation='', **kwargs): """Initialize the base slurm worker.""" super().__init__(**kwargs) self.default_account = default_account self.default_time_limit = default_time_limit self.default_partition = default_partition + self.default_qos = default_qos + self.default_reservation = default_reservation def build_text(self, task): """Build text for task script.""" @@ -50,14 +53,17 @@ def build_text(self, task): ntasks = task.get_requirement('beeflow:MPIRequirement', 'ntasks', default=nodes) # Need to rethink the MPI version parameter mpi_version = task.get_requirement('beeflow:MPIRequirement', 'mpiVersion', default='') - time_limit = task.get_requirement('beeflow:SchedulerRequirement', 'timeLimit', + time_limit = task.get_requirement('beeflow:SlurmRequirement', 'timeLimit', default=self.default_time_limit) time_limit = validation.time_limit(time_limit) - account = task.get_requirement('beeflow:SchedulerRequirement', 'account', + account = task.get_requirement('beeflow:SlurmRequirement', 'account', default=self.default_account) - partition = task.get_requirement('beeflow:SchedulerRequirement', - 'partition', + partition = task.get_requirement('beeflow:SlurmRequirement', 'partition', default=self.default_partition) + qos = task.get_requirement('beeflow:SlurmRequirement', 'qos', + default=self.default_qos) + reservation = task.get_requirement('beeflow:SlurmRequirement', 'reservation', + default=self.default_reservation) shell = task.get_requirement('beeflow:ScriptRequirement', 'shell', default="/bin/bash") scripts_enabled = task.get_requirement('beeflow:ScriptRequirement', 'enabled', @@ -81,9 +87,13 @@ def build_text(self, task): if time_limit: script.append(f'#SBATCH --time={time_limit}') if account: - script.append(f'#SBATCH -A {account}') + script.append(f'#SBATCH --account {account}') if partition: - script.append(f'#SBATCH -p {partition}') + script.append(f'#SBATCH --partition {partition}') + if qos: + script.append(f'#SBATCH --qos {qos}') + if reservation: + script.append(f'#SBATCH --reservation {reservation}') # Return immediately on error if shell == "/bin/bash": diff --git a/beeflow/task_manager/utils.py b/beeflow/task_manager/utils.py index dd9762ae..d082fec5 100644 --- a/beeflow/task_manager/utils.py +++ b/beeflow/task_manager/utils.py @@ -38,7 +38,8 @@ def worker_interface(): 'runner_opts': bc.get('task_manager', 'runner_opts'), } # Job defaults - for default_key in ['default_account', 'default_time_limit', 'default_partition']: + for default_key in ['default_account', 'default_time_limit', 'default_partition', + 'default_qos', 'default_reservation']: worker_kwargs[default_key] = bc.get('job', default_key) # Special slurm arguments if wls == 'Slurm': diff --git a/beeflow/wf_manager/resources/wf_update.py b/beeflow/wf_manager/resources/wf_update.py index 534e3e3c..fd0d56ee 100644 --- a/beeflow/wf_manager/resources/wf_update.py +++ b/beeflow/wf_manager/resources/wf_update.py @@ -149,7 +149,7 @@ def handle_state_change(self, state_update, task, wfi, db): # If the job failed and it doesn't include a checkpoint-restart hint, # then fail the entire workflow - if state_update.job_state == 'FAILED': + if state_update.job_state in ['FAILED', 'SUBMIT_FAIL']: set_dependent_tasks_dep_fail(db, wfi, state_update.wf_id, task) log.info("Workflow failed") wf_id = wfi.workflow_id