Skip to content

Commit

Permalink
Issue700/remove archive workflows (#774)
Browse files Browse the repository at this point in the history
* Prevent cancelling workflow when in Initializing state

* Add remove workflow option to bee client

* Add method for getting workflow status from database and prevent cancel workflow when in Intializing state

* Fix cancel workflow unit test

* Add unit test for remove workflow

* Add cancel and remove options to delete  workflow action

* Exit match_short_id  in bee_client when there are no workflows
  • Loading branch information
pagrubel authored Feb 21, 2024
1 parent 2542114 commit 2ea76d9
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 27 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ poetry.lock
*.pyc
*.egg-info
*.out
*.tgz
*.tar.gz
*.log
.python-version
.DS_Store
.idea
Expand Down
71 changes: 61 additions & 10 deletions beeflow/client/bee_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,25 @@ def match_short_id(wf_id):
long_wf_id = matched_ids[0]
return long_wf_id
else:
print("There are currently no workflows.")
sys.exit("There are currently no workflows.")

return None


def get_wf_status(wf_id):
"""Get workflow status."""
try:
conn = _wfm_conn()
resp = conn.get(_resource(wf_id), timeout=60)
except requests.exceptions.ConnectionError:
error_exit('Could not reach WF Manager.')

if resp.status_code != requests.codes.okay: # pylint: disable=no-member
error_exit('Could not successfully query workflow manager')

return resp.json()['wf_status']


app = typer.Typer(no_args_is_help=True, add_completion=False, cls=NaturalOrderGroup)
app.add_typer(core.app, name='core')
app.add_typer(config_driver.app, name='config')
Expand Down Expand Up @@ -354,6 +368,36 @@ def package(wf_path: pathlib.Path = typer.Argument(...,
return package_path


@app.command()
def remove(wf_id: str = typer.Argument(..., callback=match_short_id)):
"""Remove a cancelled or archived workflow with a workflow ID."""
long_wf_id = wf_id

wf_status = get_wf_status(wf_id)
print(f"Workflow Status is {wf_status}")
if wf_status in ('Cancelled', 'Archived'):
verify = f"All stored information for workflow {_short_id(wf_id)} will be removed."
verify += "\nContinue to remove? yes(y)/no(n): """
response = input(verify)
if response in ("n", "no"):
sys.exit("Workflow not removed.")
elif response in ("y", "yes"):
try:
conn = _wfm_conn()
resp = conn.delete(_resource(long_wf_id), json={'option': 'remove'}, timeout=60)
except requests.exceptions.ConnectionError:
error_exit('Could not reach WF Manager.')
if resp.status_code != requests.codes.accepted: # pylint: disable=no-member
error_exit('WF Manager could not remove workflow.')
typer.secho("Workflow removed!", fg=typer.colors.GREEN)
logging.info(f'Remove workflow: {resp.text}')
else:
print(f"{_short_id(wf_id)} may still be running.")
print("The workflow must be cancelled before attempting removal.")

sys.exit()


def unpackage(package_path, dest_path):
"""Unpackage a workflow tarball for parsing."""
package_str = str(package_path)
Expand Down Expand Up @@ -479,15 +523,22 @@ def resume(wf_id: str = typer.Argument(..., callback=match_short_id)):
def cancel(wf_id: str = typer.Argument(..., callback=match_short_id)):
"""Cancel a workflow."""
long_wf_id = wf_id
try:
conn = _wfm_conn()
resp = conn.delete(_resource(long_wf_id), timeout=60)
except requests.exceptions.ConnectionError:
error_exit('Could not reach WF Manager.')
if resp.status_code != requests.codes.accepted: # pylint: disable=no-member
error_exit('WF Manager could not cancel workflow.')
typer.secho("Workflow cancelled!", fg=typer.colors.GREEN)
logging.info(f'Cancel workflow: {resp.text}')
wf_status = get_wf_status(wf_id)
if wf_status == "Running":
try:
conn = _wfm_conn()
resp = conn.delete(_resource(long_wf_id), json={'option': 'cancel'}, timeout=60)

except requests.exceptions.ConnectionError:
error_exit('Could not reach WF Manager.')
if resp.status_code != requests.codes.accepted: # pylint: disable=no-member
error_exit('WF Manager could not cancel workflow.')
typer.secho("Workflow cancelled!", fg=typer.colors.GREEN)
logging.info(f'Cancel workflow: {resp.text}')
elif wf_status == "Intializing":
print(f"Workflow is {wf_status}, try cancel later.")
else:
print(f"Workflow is {wf_status} cannot cancel.")


@app.command()
Expand Down
2 changes: 1 addition & 1 deletion beeflow/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ def start(foreground: bool = typer.Option(False, '--foreground', '-F',
sys.exit(1)
print('Starting beeflow...')
if not foreground:
print(f'Check "{beeflow_log}" or run `beeflow core status` for more information.')
print('Run `beeflow core status` for more information.')
# Create the log path if it doesn't exist yet
path = paths.log_path()
os.makedirs(path, exist_ok=True)
Expand Down
25 changes: 24 additions & 1 deletion beeflow/tests/test_wf_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,35 @@ def test_cancel_workflow(client, mocker, setup_teardown_workflow, temp_db):
temp_db.workflows.add_task(124, WF_ID, 'task', "RUNNING")
mocker.patch('beeflow.wf_manager.resources.wf_actions.dep_manager.kill_gdb', return_value=None)

request = {'wf_id': WF_ID}
request = {'wf_id': WF_ID, 'option': 'cancel'}
resp = client().delete(f'/bee_wfm/v1/jobs/{WF_ID}', json=request)
assert resp.json['status'] == 'Cancelled'
assert resp.status_code == 202


def test_remove_workflow(client, mocker, setup_teardown_workflow, temp_db):
"""Test removing a workflow."""
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_workflow_interface',
return_value=MockWFI())
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', temp_db.db_file)
mocker.patch('beeflow.wf_manager.resources.wf_actions.db_path', temp_db.db_file)

wf_name = 'wf'
wf_status = 'Archived'
bolt_port = 3030
gdb_pid = 12345

temp_db.workflows.init_workflow(WF_ID, wf_name, wf_status, 'dir', bolt_port, gdb_pid)
temp_db.workflows.add_task(123, WF_ID, 'task', "WAITING")
temp_db.workflows.add_task(124, WF_ID, 'task', "RUNNING")
mocker.patch('beeflow.wf_manager.resources.wf_actions.dep_manager.kill_gdb', return_value=None)

request = {'wf_id': WF_ID, 'option': 'remove'}
resp = client().delete(f'/bee_wfm/v1/jobs/{WF_ID}', json=request)
assert resp.json['status'] == 'Removed'
assert resp.status_code == 202


def test_pause_workflow(client, mocker, setup_teardown_workflow, temp_db):
"""Test pausing a workflow."""
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_workflow_interface',
Expand Down
42 changes: 28 additions & 14 deletions beeflow/wf_manager/resources/wf_actions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""This module contains the workflow action endpoints."""
import shutil
import os

from flask import make_response, jsonify
from flask_restful import Resource, reqparse
Expand Down Expand Up @@ -53,21 +55,33 @@ def get(wf_id):
wf_status=wf_status, status='ok'), 200)
return resp

@staticmethod
def delete(wf_id):
"""Cancel the workflow. Lets current tasks finish running."""
def delete(self, wf_id):
"""Cancel or delete the workflow. For cancel, current tasks finish running."""
self.reqparse.add_argument('option', type=str, location='json')
option = self.reqparse.parse_args()['option']
db = connect_db(wfm_db, db_path)
wfi = wf_utils.get_workflow_interface(wf_id)
# Remove all tasks currently in the database
if wfi.workflow_loaded():
wfi.finalize_workflow()
wf_utils.update_wf_status(wf_id, 'Cancelled')
db.workflows.update_workflow_state(wf_id, 'Cancelled')
log.info("Workflow cancelled")
log.info("Shutting down gdb")
pid = db.workflows.get_gdb_pid(wf_id)
dep_manager.kill_gdb(pid)
resp = make_response(jsonify(status='Cancelled'), 202)
if option == "cancel":
wfi = wf_utils.get_workflow_interface(wf_id)
# Remove all tasks currently in the database
if wfi.workflow_loaded():
wfi.finalize_workflow()
wf_utils.update_wf_status(wf_id, 'Cancelled')
db.workflows.update_workflow_state(wf_id, 'Cancelled')
log.info("Workflow cancelled")
log.info("Shutting down gdb")
pid = db.workflows.get_gdb_pid(wf_id)
dep_manager.kill_gdb(pid)
resp = make_response(jsonify(status='Cancelled'), 202)
elif option == "remove":
log.info(f"Removing workflow {wf_id}.")
db.workflows.delete_workflow(wf_id)
resp = make_response(jsonify(status='Removed'), 202)
bee_workdir = wf_utils.get_bee_workdir()
workflow_dir = f"{bee_workdir}/workflows/{wf_id}"
shutil.rmtree(workflow_dir, ignore_errors=True)
archive_path = f"{bee_workdir}/archives/{wf_id}.tgz"
if os.path.exists(archive_path):
os.remove(archive_path)
return resp

def patch(self, wf_id):
Expand Down
5 changes: 5 additions & 0 deletions docs/sphinx/commands.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ Arguments:

``beeflow cancel``: Cancel a workflow.

Arguments:
WF_ID [required]

``beeflow remove``: Remove cancelled or archived workflow and it's information.

Arguments:
WF_ID [required]

Expand Down
1 change: 0 additions & 1 deletion examples/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
*.tgz
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ beeflow = 'beeflow.client.bee_client:main'
beecloud = 'beeflow.cloud_launcher:main'

[tool.poetry.dependencies]

# Python version (>=3.8.3, <=3.12.2)
python = ">=3.8.3,<=3.12.2"

Expand Down

0 comments on commit 2ea76d9

Please sign in to comment.