From 7ad1809c4549565b960d626e5723a022ea007353 Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Thu, 28 Mar 2024 11:01:39 -0600 Subject: [PATCH 1/2] Ensure failed tasks cause dependent tasks to fail This also adds an integration test and workflow for this. --- beeflow/common/integration/utils.py | 5 + beeflow/common/integration_test.py | 21 ++++ beeflow/common/wf_data.py | 5 +- beeflow/wf_manager/resources/wf_update.py | 37 ++++-- .../failure-dependent-tasks/input.yml | 2 + .../failure-dependent-tasks/workflow.cwl | 105 ++++++++++++++++++ 6 files changed, 166 insertions(+), 9 deletions(-) create mode 100644 ci/test_workflows/failure-dependent-tasks/input.yml create mode 100644 ci/test_workflows/failure-dependent-tasks/workflow.cwl diff --git a/beeflow/common/integration/utils.py b/beeflow/common/integration/utils.py index 981e0b97a..0a5b2cae8 100644 --- a/beeflow/common/integration/utils.py +++ b/beeflow/common/integration/utils.py @@ -96,6 +96,11 @@ def task_states(self): """Get the task states of the workflow.""" return bee_client.query(self.wf_id)[1] + def get_task_state_by_name(self, name): + """Get the state of a task by name.""" + task_states = self.task_states + return [task_state for _, task_name, task_state in task_states if task_name == name][0] + def cleanup(self): """Clean up any leftover workflow data.""" # Remove the generated tarball diff --git a/beeflow/common/integration_test.py b/beeflow/common/integration_test.py index 1dbc5b65b..9a8517faf 100644 --- a/beeflow/common/integration_test.py +++ b/beeflow/common/integration_test.py @@ -210,6 +210,27 @@ def build_failure(outer_workdir): f'task was not in state BUILD_FAIL as expected: {task_state}') +@TEST_RUNNER.add() +def dependent_tasks_fail(outer_workdir): + """Test that dependent tasks don't run after a failure.""" + workdir = os.path.join(outer_workdir, uuid.uuid4().hex) + os.makedirs(workdir) + workflow = utils.Workflow('failure-dependent-tasks', + 'ci/test_workflows/failure-dependent-tasks', + main_cwl='workflow.cwl', job_file='input.yml', + workdir=workdir, containers=[]) + yield [workflow] + utils.check_workflow_failed(workflow) + # Check each task state + fail_state = workflow.get_task_state_by_name('fail') + utils.ci_assert(fail_state == 'FAILED', + f'task fail did not fail as expected: {fail_state}') + for task in ['dependent0', 'dependent1', 'dependent2']: + task_state = workflow.get_task_state_by_name(task) + utils.ci_assert(task_state == 'DEP_FAIL', + f'task {task} did not get state DEP_FAIL as expected: {task_state}') + + @TEST_RUNNER.add(ignore=True) def checkpoint_restart(outer_workdir): """Test the clamr-ffmpeg checkpoint restart workflow.""" diff --git a/beeflow/common/wf_data.py b/beeflow/common/wf_data.py index 0761d8715..7b1e0b99c 100644 --- a/beeflow/common/wf_data.py +++ b/beeflow/common/wf_data.py @@ -287,7 +287,10 @@ def command(self): nonpositional_inputs = [] for input_ in self.inputs: if input_.value is None: - raise ValueError("trying to construct command for task with missing input value") + raise ValueError( + ("trying to construct command for task with missing input value " + f"(id: {input_.id})") + ) if input_.position is not None: positional_inputs.append(input_) diff --git a/beeflow/wf_manager/resources/wf_update.py b/beeflow/wf_manager/resources/wf_update.py index 8e557a163..04aa97866 100644 --- a/beeflow/wf_manager/resources/wf_update.py +++ b/beeflow/wf_manager/resources/wf_update.py @@ -40,6 +40,18 @@ def archive_workflow(db, wf_id): subprocess.call(['tar', '-czf', archive_path, wf_id], cwd=workflows_dir) +def set_dependent_tasks_dep_fail(db, wfi, wf_id, task): + """Recursively set all dependent task states of this task to DEP_FAIL.""" + # List of tasks whose states have already been updated + set_tasks = [task] + while len(set_tasks) > 0: + dep_tasks = wfi.get_dependent_tasks(set_tasks.pop()) + for dep_task in dep_tasks: + wfi.set_task_state(dep_task, 'DEP_FAIL') + db.workflows.update_task_state(dep_task.id, wf_id, 'DEP_FAIL') + set_tasks.extend(dep_tasks) + + class WFUpdate(Resource): """Class to interact with an existing workflow.""" @@ -109,13 +121,14 @@ def put(self): wf_utils.schedule_submit_tasks(wf_id, tasks) return make_response(jsonify(status='Task {task_id} restarted')) - if job_state in ('COMPLETED', 'FAILED'): + if job_state == 'COMPLETED': for output in task.outputs: if output.glob is not None: wfi.set_task_output(task, output.id, output.glob) else: wfi.set_task_output(task, output.id, "temp") tasks = wfi.finalize_task(task) + log.info(f'next tasks to run: {tasks}') wf_state = wfi.get_workflow_state() if tasks and wf_state != 'PAUSED': wf_utils.schedule_submit_tasks(wf_id, tasks) @@ -126,13 +139,21 @@ def put(self): archive_workflow(db, wf_id) pid = db.workflows.get_gdb_pid(wf_id) dep_manager.kill_gdb(pid) - if wf_state == 'FAILED': - log.info("Workflow failed") - log.info("Shutting down GDB") - wf_id = wfi.workflow_id - archive_workflow(db, wf_id) - pid = db.workflows.get_gdb_pid(wf_id) - dep_manager.kill_gdb(pid) + + # If the job failed and it doesn't include a checkpoint-restart hint, + # then fail the entire workflow + if job_state == 'FAILED': + wfi.set_workflow_state('FAILED') + wf_utils.update_wf_status(wf_id, 'Failed') + db.workflows.update_workflow_state(wf_id, 'Failed') + set_dependent_tasks_dep_fail(db, wfi, wf_id, task) + log.info("Workflow failed") + log.info("Shutting down GDB") + wf_id = wfi.workflow_id + # Should failed workflows be archived? + # archive_workflow(db, wf_id) + pid = db.workflows.get_gdb_pid(wf_id) + dep_manager.kill_gdb(pid) if job_state == 'BUILD_FAIL': log.error(f'Workflow failed due to failed container build for task {task.name}') diff --git a/ci/test_workflows/failure-dependent-tasks/input.yml b/ci/test_workflows/failure-dependent-tasks/input.yml new file mode 100644 index 000000000..80930a4b0 --- /dev/null +++ b/ci/test_workflows/failure-dependent-tasks/input.yml @@ -0,0 +1,2 @@ +fname: some_file_that_doesnt_exist +cat_argument: -n diff --git a/ci/test_workflows/failure-dependent-tasks/workflow.cwl b/ci/test_workflows/failure-dependent-tasks/workflow.cwl new file mode 100644 index 000000000..80be5d864 --- /dev/null +++ b/ci/test_workflows/failure-dependent-tasks/workflow.cwl @@ -0,0 +1,105 @@ +cwlVersion: v1.2 +class: Workflow + +inputs: + fname: File + cat_argument: string + +outputs: + fail_stdout: + type: File + outputSource: fail/fail_stdout + dependent0_stdout: + type: File + outputSource: dependent0/dependent_stdout + dependent1_stdout: + type: File + outputSource: dependent1/dependent_stdout + dependent2_stdout: + type: File + outputSource: dependent2/dependent_stdout + +steps: + fail: + run: + class: CommandLineTool + baseCommand: [ls] + stdout: fail.txt + inputs: + fname: + type: File + inputBinding: + position: 1 + outputs: + fail_stdout: + type: stdout + in: + fname: fname + out: [fail_stdout] + # Two duplicate tasks that depend on the task above, which should fail and cause these to not run. + dependent0: + run: + cwlVersion: v1.2 + class: CommandLineTool + baseCommand: [cat] + stdout: dependent.txt + inputs: + cat_argument: + type: string + inputBinding: + position: 1 + file_to_cat: + type: File + inputBinding: + position: 1 + outputs: + dependent_stdout: + type: stdout + in: + file_to_cat: fail/fail_stdout + cat_argument: cat_argument + out: [dependent_stdout] + dependent1: + run: + cwlVersion: v1.2 + class: CommandLineTool + baseCommand: [cat] + stdout: dependent1.txt + inputs: + cat_argument: + type: string + inputBinding: + position: 1 + file_to_cat: + type: File + inputBinding: + position: 2 + outputs: + dependent_stdout: + type: stdout + in: + cat_argument: cat_argument + file_to_cat: fail/fail_stdout + out: [dependent_stdout] + dependent2: + run: + cwlVersion: v1.2 + class: CommandLineTool + baseCommand: [cat] + stdout: dependent1.txt + inputs: + cat_argument: + type: string + inputBinding: + position: 1 + file_to_cat: + type: File + inputBinding: + position: 2 + outputs: + dependent_stdout: + type: stdout + in: + cat_argument: cat_argument + file_to_cat: dependent1/dependent_stdout + out: [dependent_stdout] From 27d53b21e1790739bbaec411214637a46fe45fbc Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Mon, 15 Apr 2024 09:02:16 -0600 Subject: [PATCH 2/2] Archive failed workflows Failed workflows, including those that failed due to job errors and container build errors, will now be archived with a state of `Archived/Failed`. --- beeflow/common/integration/utils.py | 2 +- beeflow/wf_manager/resources/wf_update.py | 19 ++++++++----------- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/beeflow/common/integration/utils.py b/beeflow/common/integration/utils.py index 0a5b2cae8..b6116724e 100644 --- a/beeflow/common/integration/utils.py +++ b/beeflow/common/integration/utils.py @@ -248,5 +248,5 @@ def check_completed(workflow): def check_workflow_failed(workflow): """Ensure that the workflow completed in a Failed state.""" - ci_assert(workflow.status == 'Failed', + ci_assert(workflow.status == 'Archived/Failed', f'workflow did not fail as expected (final status: {workflow.status})') diff --git a/beeflow/wf_manager/resources/wf_update.py b/beeflow/wf_manager/resources/wf_update.py index 04aa97866..cc483d854 100644 --- a/beeflow/wf_manager/resources/wf_update.py +++ b/beeflow/wf_manager/resources/wf_update.py @@ -21,15 +21,16 @@ db_path = wf_utils.get_db_path() -def archive_workflow(db, wf_id): +def archive_workflow(db, wf_id, final_state=None): """Archive a workflow after completion.""" # Archive Config workflow_dir = wf_utils.get_workflow_dir(wf_id) shutil.copyfile(os.path.expanduser("~") + '/.config/beeflow/bee.conf', workflow_dir + '/' + 'bee.conf') - db.workflows.update_workflow_state(wf_id, 'Archived') - wf_utils.update_wf_status(wf_id, 'Archived') + wf_state = f'Archived/{final_state}' if final_state is not None else 'Archived' + db.workflows.update_workflow_state(wf_id, wf_state) + wf_utils.update_wf_status(wf_id, wf_state) bee_workdir = wf_utils.get_bee_workdir() archive_dir = os.path.join(bee_workdir, 'archives') @@ -143,23 +144,19 @@ def put(self): # If the job failed and it doesn't include a checkpoint-restart hint, # then fail the entire workflow if job_state == 'FAILED': - wfi.set_workflow_state('FAILED') - wf_utils.update_wf_status(wf_id, 'Failed') - db.workflows.update_workflow_state(wf_id, 'Failed') set_dependent_tasks_dep_fail(db, wfi, wf_id, task) log.info("Workflow failed") log.info("Shutting down GDB") wf_id = wfi.workflow_id - # Should failed workflows be archived? - # archive_workflow(db, wf_id) + archive_workflow(db, wf_id, final_state='Failed') pid = db.workflows.get_gdb_pid(wf_id) dep_manager.kill_gdb(pid) if job_state == 'BUILD_FAIL': log.error(f'Workflow failed due to failed container build for task {task.name}') - wfi.set_workflow_state('Failed') - wf_utils.update_wf_status(wf_id, 'Failed') - db.workflows.update_workflow_state(wf_id, 'Failed') + archive_workflow(db, wf_id, final_state='Failed') + pid = db.workflows.get_gdb_pid(wf_id) + dep_manager.kill_gdb(pid) resp = make_response(jsonify(status=(f'Task {task_id} belonging to WF {wf_id} set to' f'{job_state}')), 200)