From a01b75a77be68b0f9578cbed1a5b1a491ef3ffdc Mon Sep 17 00:00:00 2001 From: Leah Howell <77810960+Leahh02@users.noreply.github.com> Date: Mon, 2 Dec 2024 17:19:42 -0500 Subject: [PATCH] Fix Cancel Workflows (#960) * Fix Cancel Workflows - allows jobs already scheduled to run but cancels the rest of the workflow * Archive cancelled workflows after scheduled/running tasks finish --------- Co-authored-by: leahh --- beeflow/common/gdb/gdb_driver.py | 10 ++++++++++ beeflow/common/gdb/neo4j_cypher.py | 19 +++++++++++++++++++ beeflow/common/gdb/neo4j_driver.py | 11 +++++++++++ beeflow/common/wf_interface.py | 7 +++++++ beeflow/wf_manager/resources/wf_actions.py | 6 ++++-- beeflow/wf_manager/resources/wf_update.py | 7 ++++++- 6 files changed, 57 insertions(+), 3 deletions(-) diff --git a/beeflow/common/gdb/gdb_driver.py b/beeflow/common/gdb/gdb_driver.py index 226b638d..1cb243f8 100644 --- a/beeflow/common/gdb/gdb_driver.py +++ b/beeflow/common/gdb/gdb_driver.py @@ -284,6 +284,16 @@ def workflow_completed(self): :rtype: bool """ + @abstractmethod + def cancelled_workflow_completed(self): + """Determine if a cancelled workflow has completed. + + A cancelled workflow has completed if each of its final tasks are not + 'PENDING', 'RUNNING' 'COMPLETING'. + + :rtype: bool + """ + @abstractmethod def close(self): """Close the connection to the graph database.""" diff --git a/beeflow/common/gdb/neo4j_cypher.py b/beeflow/common/gdb/neo4j_cypher.py index 597479c4..9e25bccb 100644 --- a/beeflow/common/gdb/neo4j_cypher.py +++ b/beeflow/common/gdb/neo4j_cypher.py @@ -709,6 +709,25 @@ def final_tasks_completed(tx, wf_id): return bool(tx.run(not_completed_query, wf_id=wf_id).single() is None) +def cancelled_final_tasks_completed(tx, wf_id): + """Return true if all a cancelled workflow's scheduled tasks have completed, else false. + + All of the workflow's scheduled tasks are completed if each of the final task nodes + are not in states 'PENDING', 'RUNNING', or 'COMPLETING'. + + :param wf_id: the workflow's id + :type wf_id: str + :rtype: bool + """ + active_states_query = ("MATCH (m:Metadata)-[:DESCRIBES]->(t:Task {workflow_id: $wf_id}) " + "WHERE NOT (t)<-[:DEPENDS_ON|:RESTARTED_FROM]-(:Task) " + "AND m.state IN ['PENDING', 'RUNNING', 'COMPLETING'] " + "RETURN t IS NOT NULL LIMIT 1") + + # False if at least one task is in 'PENDING', 'RUNNING', or 'COMPLETING' + return bool(tx.run(active_states_query, wf_id=wf_id).single() is None) + + def is_empty(tx): """Return true if the database is empty, else false. diff --git a/beeflow/common/gdb/neo4j_driver.py b/beeflow/common/gdb/neo4j_driver.py index 99d273cf..5ab01b49 100644 --- a/beeflow/common/gdb/neo4j_driver.py +++ b/beeflow/common/gdb/neo4j_driver.py @@ -422,6 +422,17 @@ def workflow_completed(self, workflow_id): """ return self._read_transaction(tx.final_tasks_completed, wf_id=workflow_id) + def cancelled_workflow_completed(self, workflow_id): + """Determine if a cancelled workflow has completed. + + A cancelled workflow has completed if each of its final tasks are not + 'PENDING', 'RUNNING' 'COMPLETING'. + :param workflow_id: the workflow id + :type workflow_id: str + :rtype: bool + """ + return self._read_transaction(tx.cancelled_final_tasks_completed, wf_id=workflow_id) + def close(self): """Close the connection to the Neo4j database.""" self._driver.close() diff --git a/beeflow/common/wf_interface.py b/beeflow/common/wf_interface.py index c6b1701c..83554653 100644 --- a/beeflow/common/wf_interface.py +++ b/beeflow/common/wf_interface.py @@ -305,6 +305,13 @@ def workflow_completed(self): """ return self._gdb_driver.workflow_completed(self._workflow_id) + def cancelled_workflow_completed(self): + """Return true if all a cancelled workflow's scheduled tasks have completed, else false. + + :rtype: bool + """ + return self._gdb_driver.cancelled_workflow_completed(self._workflow_id) + def export_graphml(self): """Export a BEE workflow as a graphml.""" self._gdb_driver.export_graphml(self._workflow_id) diff --git a/beeflow/wf_manager/resources/wf_actions.py b/beeflow/wf_manager/resources/wf_actions.py index 2e922340..cc2e57fc 100644 --- a/beeflow/wf_manager/resources/wf_actions.py +++ b/beeflow/wf_manager/resources/wf_actions.py @@ -60,13 +60,15 @@ def delete(self, wf_id): db = connect_db(wfm_db, db_path) if option == "cancel": wfi = wf_utils.get_workflow_interface(wf_id) + wf_state = wfi.get_workflow_state() # Remove all tasks currently in the database wfi.set_workflow_state('Cancelled') wf_utils.update_wf_status(wf_id, 'Cancelled') db.workflows.update_workflow_state(wf_id, 'Cancelled') log.info(f"Workflow {wf_id} cancelled") - # Archive cancelled workflow - archive_workflow(db, wf_id, final_state='Cancelled') + # Archive cancelled workflow if it was originally paused + if wf_state == 'PAUSED': + archive_workflow(db, wf_id, final_state='Cancelled') resp = make_response(jsonify(status='Cancelled'), 202) elif option == "remove": log.info(f"Removing workflow {wf_id}.") diff --git a/beeflow/wf_manager/resources/wf_update.py b/beeflow/wf_manager/resources/wf_update.py index e2a45dd7..8343e492 100644 --- a/beeflow/wf_manager/resources/wf_update.py +++ b/beeflow/wf_manager/resources/wf_update.py @@ -134,7 +134,7 @@ def handle_state_change(self, state_update, task, wfi, db): wfi.set_task_output(task, output.id, "temp") tasks = wfi.finalize_task(task) wf_state = wfi.get_workflow_state() - if tasks and wf_state != 'PAUSED': + if tasks and wf_state not in ('PAUSED', 'Cancelled'): wf_utils.schedule_submit_tasks(state_update.wf_id, tasks) if wfi.workflow_completed(): @@ -142,6 +142,11 @@ def handle_state_change(self, state_update, task, wfi, db): log.info(f"Workflow {wf_id} Completed") archive_workflow(db, state_update.wf_id) log.info('Workflow Completed') + elif wf_state == 'Cancelled' and wfi.cancelled_workflow_completed(): + wf_id = wfi.workflow_id + log.info(f"Scheduled tasks for cancelled workflow {wf_id} completed") + archive_workflow(db, wf_id, final_state=wf_state) + log.info('Workflow Archived') # If the job failed and it doesn't include a checkpoint-restart hint, # then fail the entire workflow