Skip to content

Commit

Permalink
Change SchedulerRequirement to SlurmRequirement, add qos and reservat…
Browse files Browse the repository at this point in the history
…ion, fix dependencies upon SUBMIT_FAIL
  • Loading branch information
pagrubel committed Jan 22, 2025
1 parent 8123777 commit 64a30bc
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 9 deletions.
4 changes: 4 additions & 0 deletions beeflow/common/config_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ def filepath_completion_input(*pargs, **kwargs):
info='default account time limit (leave blank if none)')
VALIDATOR.option('job', 'default_partition', validator=lambda val: val.strip(),
info='default partition to run jobs on (leave blank if none)')
VALIDATOR.option('job', 'default_qos', validator=lambda val: val.strip(),
info='default qos to run jobs on (leave blank if none)')
VALIDATOR.option('job', 'default_reservation', validator=lambda val: val.strip(),
info='default reservation to run jobs on (leave blank if none)')


def validate_chrun_opts(opts):
Expand Down
24 changes: 17 additions & 7 deletions beeflow/common/worker/slurm_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
class BaseSlurmWorker(Worker):
"""Base slurm worker code."""

def __init__(self, default_account='', default_time_limit='', default_partition='', **kwargs):
def __init__(self, default_account='', default_time_limit='', default_partition='',
default_qos='', default_reservation='', **kwargs):
"""Initialize the base slurm worker."""
super().__init__(**kwargs)
self.default_account = default_account
self.default_time_limit = default_time_limit
self.default_partition = default_partition
self.default_qos = default_qos
self.default_reservation = default_reservation

def build_text(self, task):
"""Build text for task script."""
Expand All @@ -50,14 +53,17 @@ def build_text(self, task):
ntasks = task.get_requirement('beeflow:MPIRequirement', 'ntasks', default=nodes)
# Need to rethink the MPI version parameter
mpi_version = task.get_requirement('beeflow:MPIRequirement', 'mpiVersion', default='')
time_limit = task.get_requirement('beeflow:SchedulerRequirement', 'timeLimit',
time_limit = task.get_requirement('beeflow:SlurmRequirement', 'timeLimit',
default=self.default_time_limit)
time_limit = validation.time_limit(time_limit)
account = task.get_requirement('beeflow:SchedulerRequirement', 'account',
account = task.get_requirement('beeflow:SlurmRequirement', 'account',
default=self.default_account)
partition = task.get_requirement('beeflow:SchedulerRequirement',
'partition',
partition = task.get_requirement('beeflow:SlurmRequirement', 'partition',
default=self.default_partition)
qos = task.get_requirement('beeflow:SlurmRequirement', 'qos',
default=self.default_qos)
reservation = task.get_requirement('beeflow:SlurmRequirement', 'reservation',
default=self.default_reservation)

shell = task.get_requirement('beeflow:ScriptRequirement', 'shell', default="/bin/bash")
scripts_enabled = task.get_requirement('beeflow:ScriptRequirement', 'enabled',
Expand All @@ -81,9 +87,13 @@ def build_text(self, task):
if time_limit:
script.append(f'#SBATCH --time={time_limit}')
if account:
script.append(f'#SBATCH -A {account}')
script.append(f'#SBATCH --account {account}')
if partition:
script.append(f'#SBATCH -p {partition}')
script.append(f'#SBATCH --partition {partition}')
if qos:
script.append(f'#SBATCH --qos {qos}')
if reservation:
script.append(f'#SBATCH --reservation {reservation}')

# Return immediately on error
if shell == "/bin/bash":
Expand Down
3 changes: 2 additions & 1 deletion beeflow/task_manager/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ def worker_interface():
'runner_opts': bc.get('task_manager', 'runner_opts'),
}
# Job defaults
for default_key in ['default_account', 'default_time_limit', 'default_partition']:
for default_key in ['default_account', 'default_time_limit', 'default_partition',
'default_qos', 'default_reservation']:
worker_kwargs[default_key] = bc.get('job', default_key)
# Special slurm arguments
if wls == 'Slurm':
Expand Down
2 changes: 1 addition & 1 deletion beeflow/wf_manager/resources/wf_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def handle_state_change(self, state_update, task, wfi, db):

# If the job failed and it doesn't include a checkpoint-restart hint,
# then fail the entire workflow
if state_update.job_state == 'FAILED':
if state_update.job_state in ['FAILED', 'SUBMIT_FAIL']:
set_dependent_tasks_dep_fail(db, wfi, state_update.wf_id, task)
log.info("Workflow failed")
wf_id = wfi.workflow_id
Expand Down

0 comments on commit 64a30bc

Please sign in to comment.