diff --git a/beeflow/client/bee_client.py b/beeflow/client/bee_client.py index b17bc1f2b..092aaf1ce 100644 --- a/beeflow/client/bee_client.py +++ b/beeflow/client/bee_client.py @@ -106,15 +106,25 @@ def _resource(tag=""): return _url() + str(tag) +def get_wf_list(): + """Get the list of all workflows.""" + try: + conn = _wfm_conn() + resp = conn.get(_url(), timeout=60) + except requests.exceptions.ConnectionError: + error_exit('Could not reach WF Manager.') + + if resp.status_code != requests.codes.okay: # pylint: disable=no-member + error_exit('WF Manager did not return workflow list') + + logging.info('List Jobs: {resp.text}') + return jsonpickle.decode(resp.json()['workflow_list']) + + def check_short_id_collision(): """Check short workflow IDs for colliions; increase short ID length if detected.""" global short_id_len #noqa: Not a constant - conn = _wfm_conn() - resp = conn.get(_url(), timeout=60) - if resp.status_code != requests.codes.okay: # pylint: disable=no-member - error_exit(f"Checking for ID collision failed: {resp.status_code}") - - workflow_list = jsonpickle.decode(resp.json()['workflow_list']) + workflow_list = get_wf_list() if workflow_list: while short_id_len < MAX_ID_LEN: id_list = [_short_id(job[1]) for job in workflow_list] @@ -133,18 +143,7 @@ def check_short_id_collision(): def match_short_id(wf_id): """Match user-provided short workflow ID to full workflow IDs.""" matched_ids = [] - - try: - conn = _wfm_conn() - resp = conn.get(_url(), timeout=60) - except requests.exceptions.ConnectionError: - error_exit('Could not reach WF Manager.') - - if resp.status_code != requests.codes.okay: # pylint: disable=no-member - error_exit(f'Could not match ID: {wf_id}. Code {resp.status_code}') - # raise ApiError("GET /jobs".format(resp.status_code)) - - workflow_list = jsonpickle.decode(resp.json()['workflow_list']) + workflow_list = get_wf_list() if workflow_list: for job in workflow_list: if job[1].startswith(wf_id): @@ -370,12 +369,12 @@ def package(wf_path: pathlib.Path = typer.Argument(..., @app.command() def remove(wf_id: str = typer.Argument(..., callback=match_short_id)): - """Remove a cancelled or archived workflow with a workflow ID.""" + """Remove cancelled, paused, or archived workflow with a workflow ID.""" long_wf_id = wf_id wf_status = get_wf_status(wf_id) print(f"Workflow Status is {wf_status}") - if wf_status in ('Cancelled', 'Archived'): + if wf_status in ('Cancelled', 'Archived', 'Paused'): verify = f"All stored information for workflow {_short_id(wf_id)} will be removed." verify += "\nContinue to remove? yes(y)/no(n): """ response = input(verify) @@ -421,17 +420,7 @@ def unpackage(package_path, dest_path): @app.command('list') def list_workflows(): """List all workflows.""" - try: - conn = _wfm_conn() - resp = conn.get(_url(), timeout=60) - except requests.exceptions.ConnectionError: - error_exit('Could not reach WF Manager.') - - if resp.status_code != requests.codes.okay: # pylint: disable=no-member - error_exit('WF Manager did not return workflow list') - - logging.info('List Jobs: {resp.text}') - workflow_list = jsonpickle.decode(resp.json()['workflow_list']) + workflow_list = get_wf_list() if workflow_list: typer.secho("Name\tID\tStatus", fg=typer.colors.GREEN) @@ -521,10 +510,10 @@ def resume(wf_id: str = typer.Argument(..., callback=match_short_id)): @app.command() def cancel(wf_id: str = typer.Argument(..., callback=match_short_id)): - """Cancel a workflow.""" + """Cancel a paused or running workflow.""" long_wf_id = wf_id wf_status = get_wf_status(wf_id) - if wf_status == "Running": + if wf_status in ('Running', 'Paused'): try: conn = _wfm_conn() resp = conn.delete(_resource(long_wf_id), json={'option': 'cancel'}, timeout=60) diff --git a/beeflow/client/core.py b/beeflow/client/core.py index 0e0420e8e..ea69345a9 100644 --- a/beeflow/client/core.py +++ b/beeflow/client/core.py @@ -14,16 +14,24 @@ import socket import sys import shutil +import datetime import time import daemon import typer +from beeflow.client import bee_client from beeflow.common.config_driver import BeeConfig as bc from beeflow.common import cli_connection from beeflow.common import paths from beeflow.wf_manager.resources import wf_utils +from beeflow.common.db import wfm_db +from beeflow.common.db.bdb import connect_db +from beeflow.wf_manager.common import dep_manager + +db_path = wf_utils.get_db_path() + class ComponentManager: """Component manager class.""" @@ -431,12 +439,16 @@ def status(): @app.command() -def stop(): +def stop(query='yes'): """Stop the current running beeflow daemon.""" - stop_msg = ("\n** Please ensure all workflows are complete before stopping beeflow. **" - + "\n** Check the status of workflows by running 'beeflow list'. **" - + "\nAre you sure you want to kill beeflow components? [y/n] ") - ans = input(stop_msg) + if query == 'yes': + ans = input(""" + ** Please ensure all workflows are complete before stopping beeflow. ** + ** Check the status of workflows by running 'beeflow list'. ** + + Are you sure you want to kill beeflow components? [y/n] """) + else: + ans = 'y' if ans.lower() != 'y': return resp = cli_connection.send(paths.beeflow_socket(), {'type': 'quit'}) @@ -448,74 +460,112 @@ def stop(): sys.exit(1) # As long as it returned something, we should be good beeflow_log = paths.log_fname('beeflow') - print(f'Beeflow has stopped. Check the log at "{beeflow_log}".') + if query == "no": + print(f'Beeflow has stopped. Check the log at "{beeflow_log}".') + + +def kill_active_workflows(active_states, workflow_list): + """Kill workflows with active states.""" + db = connect_db(wfm_db, db_path) + success = True + for name, wf_id, state in workflow_list: + if state in active_states: + pid = db.workflows.get_gdb_pid(wf_id) + if pid > 0: + dep_manager.kill_gdb(pid) + else: + # Failure most likely caused by an Initializing workflow. + print(f"No process for {name}, {wf_id}, {state}.") + success = False + return success + + +def archive_dir(dir_to_archive): + """Archive directories for archive flag in reset.""" + archive_dirs = ['logs', 'container_archive', 'archives', 'workflows'] + date_str = f"{datetime.datetime.now().strftime('%Y-%m-%d_%H%M%S')}" + backup_dir = f"{dir_to_archive}.{date_str}" + for a_dir in archive_dirs: + try: + shutil.copytree(f"{dir_to_archive}/{a_dir}", + f"{backup_dir}/{a_dir}") + except FileNotFoundError: + pass + print("Archive flag enabled.", + "Existing logs, containers, and workflows backed up in:\n" + f"{backup_dir}") + + +def handle_rm_error(err, dir_to_check, wf_list): + """Handle IO error caused by either initializing workflows or nfs files.""" + # Check if only nfs mounts are causing the problem and ignore + dir_list = os.listdir(dir_to_check) + nfs_list = [x for x in dir_list if x.startswith('.nfs')] + if dir_list and (dir_list != nfs_list): + print(f"Unable to remove {dir_to_check} \n {err.strerror}") + # Often initializing workflows cause a problem + if any('Initializing' in sublist for sublist in wf_list): + warn('Initializing workflows may have prevented removal.\n') + print(f"Try removing {dir_to_check} manually, to complete reset.") @app.command() def reset(archive: bool = typer.Option(False, '--archive', '-a', help='Archive bee_workdir before removal')): - """Delete the bee_workdir directory.""" - # Check to see if the user is absolutely sure. Warning Message. + """Stop all components and delete the bee_workdir directory.""" + # Check workflow states; warn if there are active states. + workflow_list = bee_client.get_wf_list() + active_states = {'Running', 'Paused', 'Initializing', 'Waiting'} + if {item for row in workflow_list for item in row}.intersection(active_states): + caution = """ + ************************************************************** + Caution: There are active workflows! They will be removed! + Try 'beeflow list' to view them. + ************************************************************** + """ absolutely_sure = "" + dir_to_delete = os.path.expanduser(wf_utils.get_bee_workdir()) + warn(f"\n A reset will remove this directory: {dir_to_delete}\n") + if archive: + print(" Archive flag is set: logs, workflows and containers will be backed up.") + print(""" + A reset will: + Shutdown beeflow and all BEE components. + Delete the bee_workdir directory which results in: + Removing the archive of all workflows. + Removing the archive of workflow containers + (unless container_archive is configured elsewhere). + Reset all databases associated with the beeflow app. + Removing all beeflow logs. + Beeflow configuration files from bee_cfg will not be deleted. + """) + warn(f"{caution}\nAre you sure you want to reset?") while absolutely_sure != "y" or absolutely_sure != "n": - # Get the user's bee_workdir directory - directory_to_delete = os.path.expanduser(wf_utils.get_bee_workdir()) - print(f"A reset will remove this directory: {directory_to_delete}") - - absolutely_sure = input( - """ -Are you sure you want to reset? - -Please ensure all workflows are complete before running a reset -Check the status of workflows by running 'beeflow list' - -A reset will shutdown beeflow and its components. - -A reset will delete the bee_workdir directory which results in: -Removing the archive of workflows executed. -Removing the archive of workflow containers. -Reset all databases associated with the beeflow app. -Removing all beeflow logs. - -Beeflow configuration files from bee_cfg will remain. - -Respond with yes(y)/no(n): """) + absolutely_sure = input("Respond with yes(y)/no(n): ") if absolutely_sure in ("n", "no"): # Exit out if the user didn't really mean to do a reset sys.exit() - if absolutely_sure in ("y", "yes"): + elif absolutely_sure in ("y", "yes"): + # First stop all active workflow processes + workflow_list = bee_client.get_wf_list() + kill_active_workflows(active_states, workflow_list) # Stop all of the beeflow processes - resp = cli_connection.send(paths.beeflow_socket(), {'type': 'quit'}) - if resp is not None: - print("Beeflow has been shutdown.") - print("Waiting for components to cleanly stop.") - # This wait is essential. It takes a minute to shut down. - time.sleep(5) - - if os.path.exists(directory_to_delete): - # Save the bee_workdir directory if the archive option was set - if archive: - if os.path.exists(directory_to_delete + "/logs"): - shutil.copytree(directory_to_delete + "/logs", - directory_to_delete + ".backup/logs") - if os.path.exists(directory_to_delete + "/container_archive"): - shutil.copytree(directory_to_delete + "/container_archive", - directory_to_delete + ".backup/container_archive") - if os.path.exists(directory_to_delete + "/archives"): - shutil.copytree(directory_to_delete + "/archives", - directory_to_delete + ".backup/archives") - if os.path.exists(directory_to_delete + "/workflows"): - shutil.copytree(directory_to_delete + "/workflows", - directory_to_delete + ".backup/workflows") - print("Archive flag enabled,") - print("Existing logs, containers, and workflows backed up in:") - print(f"{directory_to_delete}.backup") - shutil.rmtree(directory_to_delete) - print(f"{directory_to_delete} has been removed.") - sys.exit() + stop("quiet") + print("Beeflow is shutting down.") + print("Waiting for components to cleanly stop.") + # This wait is essential. It takes a minute to shut down. + time.sleep(5) + + # Save the bee_workdir directory if the archive option was set + if archive: + archive_dir(dir_to_delete) + try: + shutil.rmtree(dir_to_delete) + except OSError as err: + handle_rm_error(err, dir_to_delete, workflow_list) else: - print(f"{directory_to_delete} does not exist. Exiting.") - sys.exit() + print(f"{dir_to_delete} has been removed.") + sys.exit() print("Please respond with either the letter (y) or (n).")