diff --git a/beeflow/client/bee_client.py b/beeflow/client/bee_client.py index e3f8cafbc..11f6cc08c 100644 --- a/beeflow/client/bee_client.py +++ b/beeflow/client/bee_client.py @@ -242,7 +242,9 @@ def submit(wf_name: str = typer.Argument(..., help='the workflow name'), # pyli workdir: pathlib.Path = typer.Argument(..., help='working directory for workflow containing input + output files',), no_start: bool = typer.Option(False, '--no-start', '-n', - help='do not start the workflow')): + help='do not start the workflow'), + archive_workdir: bool = typer.Option(False, '--archive-workdir', '-a', + help='archive a copy of the workdir')): """Submit a new workflow.""" def is_parent(parent, path): """Return true if the path is a child of the other path.""" @@ -312,6 +314,51 @@ def is_parent(parent, path): if os.path.commonpath([os.path.realpath('/var/tmp'), workdir]) == os.path.realpath('/var/tmp'): error_exit("Workflow working directory cannot be in \"/var/tmp\"") + # TODO: this should be in a function somewhere + total_size = 0 + size_limit_mb = 1000 + total_size_limit = size_limit_mb * 1024 ** 2 + n_files = 0 + n_files_limit = 10_000 + n_dirs = 0 + n_dirs_limit = 1000 + reason = '' + # check the workdir isn't too large/contains too many things + for dirpath, dirnames, filenames in os.walk(workdir): + n_files += len(filenames) + if n_files > n_files_limit: + reason = f'Directory contains more than {n_files_limit} files' + break + n_dirs += len(dirnames) + if n_dirs > n_dirs_limit: + reason = f'Directory contains more than {n_dirs_limit} subdirectories' + break + for fname in filenames: + fp = os.path.join(dirpath, fname) + # skip if it is symbolic link + if not os.path.islink(fp): + total_size += os.path.getsize(fp) + if total_size > total_size_limit: + reason = f'Total file size of directory is greater than {size_limit_mb}MB' + break + else: + continue + break + + if len(reason) > 0 and archive_workdir: + archive_workdir = False + ans = input(f""" + ****************** + ** WARNING ** + ****************** + Are you sure you want to archive the workdir at: + + {workdir} + + {reason} [y/n]? """) + if ans.lower() in ("y", "yes"): + archive_workdir = True + # TODO: Can all of this information be sent as a file? data = { 'wf_name': wf_name.encode(), @@ -320,6 +367,7 @@ def is_parent(parent, path): 'workflow': jsonpickle.encode(workflow), 'tasks': jsonpickle.encode(tasks, warn=True), 'no_start': no_start, + 'archive_workdir': archive_workdir, } files = { 'workflow_archive': wf_tarball diff --git a/beeflow/common/integration/utils.py b/beeflow/common/integration/utils.py index a3fc45ec2..f05e42bcc 100644 --- a/beeflow/common/integration/utils.py +++ b/beeflow/common/integration/utils.py @@ -77,7 +77,8 @@ def run(self): bee_client.package(Path(self.path), Path(tarball_dir)) print('Submitting and starting workflow') self.wf_id = bee_client.submit(self.name, self.tarball, self.main_cwl, - self.job_file, self.workdir, no_start=False) + self.job_file, self.workdir, no_start=False, + archive_workdir=False) except bee_client.ClientError as error: raise CIError(*error.args) from error diff --git a/beeflow/remote/remote.py b/beeflow/remote/remote.py index 70cc6cf09..25d53392e 100644 --- a/beeflow/remote/remote.py +++ b/beeflow/remote/remote.py @@ -82,7 +82,8 @@ def submit_new_wf_long(wf_name: str, tarball_name: str, main_cwl_file: str, job_ main_cwl_file, job_file, workdir_path, - no_start=False) + no_start=False, + archive_workdir=False) output["result"] = "Submitted new workflow" + str(wf_name) return output except bee_client.ClientError as error: diff --git a/beeflow/wf_manager/resources/wf_list.py b/beeflow/wf_manager/resources/wf_list.py index 48e491167..f8e13de4b 100644 --- a/beeflow/wf_manager/resources/wf_list.py +++ b/beeflow/wf_manager/resources/wf_list.py @@ -51,12 +51,12 @@ def extract_wf(wf_id, filename, workflow_archive): @shared_task(ignore_result=True) def init_workflow(wf_id, wf_name, wf_dir, wf_workdir, no_start, workflow=None, - tasks=None): + tasks=None, archive_workdir=False): """Initialize the workflow in a separate process.""" db = connect_db(wfm_db, db_path) wf_utils.connect_neo4j_driver(db.info.get_port('bolt')) wf_utils.setup_workflow(wf_id, wf_name, wf_dir, wf_workdir, no_start, - workflow, tasks) + workflow, tasks, archive_workdir) db_path = wf_utils.get_db_path() @@ -93,6 +93,7 @@ def post(self): reqparser.add_argument('tasks', type=str, required=True, location='form') reqparser.add_argument('no_start', type=str, required=True, location='form') + reqparser.add_argument('archive_workdir', type=str, required=True, location='form') reqparser.add_argument('workflow_archive', type=FileStorage, required=False, location='files') data = reqparser.parse_args() @@ -102,6 +103,7 @@ def post(self): wf_workdir = data['workdir'] # Note we have to check for the 'true' string value no_start = data['no_start'].lower() == 'true' + archive_workdir = data['archive_workdir'].lower() == 'true' workflow = jsonpickle.decode(data['workflow']) # May have to decode the list and task objects separately tasks = [jsonpickle.decode(task) if isinstance(task, str) else task @@ -113,7 +115,8 @@ def post(self): db.workflows.init_workflow(wf_id, wf_name, wf_dir) init_workflow.delay(wf_id, wf_name, wf_dir, wf_workdir, - no_start, workflow=workflow, tasks=tasks) + no_start, workflow=workflow, tasks=tasks, + archive_workdir=archive_workdir) return make_response(jsonify(msg='Workflow uploaded', status='ok', wf_id=wf_id), 201) diff --git a/beeflow/wf_manager/resources/wf_update.py b/beeflow/wf_manager/resources/wf_update.py index 534e3e3c5..1a7555978 100644 --- a/beeflow/wf_manager/resources/wf_update.py +++ b/beeflow/wf_manager/resources/wf_update.py @@ -33,6 +33,16 @@ def archive_workflow(db, wf_id, final_state=None): os.makedirs(dags_dir, exist_ok=True) wf_utils.export_dag(wf_id, dags_dir, graphmls_dir, no_dag_dir=True, copy_dag_in_archive=False) + # Archive workdir + wfi = wf_utils.get_workflow_interface(wf_id) + _, tasks = wfi.get_workflow() + # metadata related to the workdir is redundantly stored on each task + archive_workdir = wfi.get_task_metadata(tasks[0])['archive_workdir'] + if archive_workdir: + workdir = wfi.get_task_metadata(tasks[0])['workdir'] + workdir_dir = workflow_dir + "/workdir" + shutil.copytree(workdir, workdir_dir, dirs_exist_ok=True) + wf_state = f'Archived/{final_state}' if final_state is not None else 'Archived' db.workflows.update_workflow_state(wf_id, wf_state) wf_utils.update_wf_status(wf_id, wf_state) diff --git a/beeflow/wf_manager/resources/wf_utils.py b/beeflow/wf_manager/resources/wf_utils.py index ced30eb87..4bf7a7675 100644 --- a/beeflow/wf_manager/resources/wf_utils.py +++ b/beeflow/wf_manager/resources/wf_utils.py @@ -266,7 +266,7 @@ def connect_neo4j_driver(bolt_port): def setup_workflow(wf_id, wf_name, wf_dir, wf_workdir, no_start, workflow=None, - tasks=None): + tasks=None, archive_workdir=False): """Initialize Workflow in Separate Process.""" wfi = get_workflow_interface(wf_id) wfi.initialize_workflow(workflow) @@ -279,6 +279,7 @@ def setup_workflow(wf_id, wf_name, wf_dir, wf_workdir, no_start, workflow=None, wfi.add_task(task, task_state) metadata = wfi.get_task_metadata(task) metadata['workdir'] = wf_workdir + metadata['archive_workdir'] = archive_workdir wfi.set_task_metadata(task, metadata) db.workflows.add_task(task.id, wf_id, task.name, task_state)