Skip to content

Commit

Permalink
WIP: Add option to archive the workdir
Browse files Browse the repository at this point in the history
  • Loading branch information
arhall0 committed Dec 16, 2024
1 parent 4296dac commit b9dddc2
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 7 deletions.
50 changes: 49 additions & 1 deletion beeflow/client/bee_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,9 @@ def submit(wf_name: str = typer.Argument(..., help='the workflow name'), # pyli
workdir: pathlib.Path = typer.Argument(...,
help='working directory for workflow containing input + output files',),
no_start: bool = typer.Option(False, '--no-start', '-n',
help='do not start the workflow')):
help='do not start the workflow'),
archive_workdir: bool = typer.Option(False, '--archive-workdir', '-a',
help='archive a copy of the workdir')):
"""Submit a new workflow."""
def is_parent(parent, path):
"""Return true if the path is a child of the other path."""
Expand Down Expand Up @@ -312,6 +314,51 @@ def is_parent(parent, path):
if os.path.commonpath([os.path.realpath('/var/tmp'), workdir]) == os.path.realpath('/var/tmp'):
error_exit("Workflow working directory cannot be in \"/var/tmp\"")

# TODO: this should be in a function somewhere
total_size = 0
size_limit_mb = 1000
total_size_limit = size_limit_mb * 1024 ** 2
n_files = 0
n_files_limit = 10_000
n_dirs = 0
n_dirs_limit = 1000
reason = ''
# check the workdir isn't too large/contains too many things
for dirpath, dirnames, filenames in os.walk(workdir):
n_files += len(filenames)
if n_files > n_files_limit:
reason = f'Directory contains more than {n_files_limit} files'
break
n_dirs += len(dirnames)
if n_dirs > n_dirs_limit:
reason = f'Directory contains more than {n_dirs_limit} subdirectories'
break
for fname in filenames:
fp = os.path.join(dirpath, fname)
# skip if it is symbolic link
if not os.path.islink(fp):
total_size += os.path.getsize(fp)
if total_size > total_size_limit:
reason = f'Total file size of directory is greater than {size_limit_mb}MB'
break
else:
continue
break

if len(reason) > 0 and archive_workdir:
archive_workdir = False
ans = input(f"""
******************
** WARNING **
******************
Are you sure you want to archive the workdir at:
{workdir}
{reason} [y/n]? """)
if ans.lower() in ("y", "yes"):
archive_workdir = True

# TODO: Can all of this information be sent as a file?
data = {
'wf_name': wf_name.encode(),
Expand All @@ -320,6 +367,7 @@ def is_parent(parent, path):
'workflow': jsonpickle.encode(workflow),
'tasks': jsonpickle.encode(tasks, warn=True),
'no_start': no_start,
'archive_workdir': archive_workdir,
}
files = {
'workflow_archive': wf_tarball
Expand Down
3 changes: 2 additions & 1 deletion beeflow/common/integration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ def run(self):
bee_client.package(Path(self.path), Path(tarball_dir))
print('Submitting and starting workflow')
self.wf_id = bee_client.submit(self.name, self.tarball, self.main_cwl,
self.job_file, self.workdir, no_start=False)
self.job_file, self.workdir, no_start=False,
archive_workdir=False)
except bee_client.ClientError as error:
raise CIError(*error.args) from error

Expand Down
3 changes: 2 additions & 1 deletion beeflow/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ def submit_new_wf_long(wf_name: str, tarball_name: str, main_cwl_file: str, job_
main_cwl_file,
job_file,
workdir_path,
no_start=False)
no_start=False,
archive_workdir=False)
output["result"] = "Submitted new workflow" + str(wf_name)
return output
except bee_client.ClientError as error:
Expand Down
9 changes: 6 additions & 3 deletions beeflow/wf_manager/resources/wf_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ def extract_wf(wf_id, filename, workflow_archive):

@shared_task(ignore_result=True)
def init_workflow(wf_id, wf_name, wf_dir, wf_workdir, no_start, workflow=None,
tasks=None):
tasks=None, archive_workdir=False):
"""Initialize the workflow in a separate process."""
db = connect_db(wfm_db, db_path)
wf_utils.connect_neo4j_driver(db.info.get_port('bolt'))
wf_utils.setup_workflow(wf_id, wf_name, wf_dir, wf_workdir, no_start,
workflow, tasks)
workflow, tasks, archive_workdir)


db_path = wf_utils.get_db_path()
Expand Down Expand Up @@ -93,6 +93,7 @@ def post(self):
reqparser.add_argument('tasks', type=str, required=True,
location='form')
reqparser.add_argument('no_start', type=str, required=True, location='form')
reqparser.add_argument('archive_workdir', type=str, required=True, location='form')
reqparser.add_argument('workflow_archive', type=FileStorage, required=False,
location='files')
data = reqparser.parse_args()
Expand All @@ -102,6 +103,7 @@ def post(self):
wf_workdir = data['workdir']
# Note we have to check for the 'true' string value
no_start = data['no_start'].lower() == 'true'
archive_workdir = data['archive_workdir'].lower() == 'true'
workflow = jsonpickle.decode(data['workflow'])
# May have to decode the list and task objects separately
tasks = [jsonpickle.decode(task) if isinstance(task, str) else task
Expand All @@ -113,7 +115,8 @@ def post(self):
db.workflows.init_workflow(wf_id, wf_name, wf_dir)

init_workflow.delay(wf_id, wf_name, wf_dir, wf_workdir,
no_start, workflow=workflow, tasks=tasks)
no_start, workflow=workflow, tasks=tasks,
archive_workdir=archive_workdir)

return make_response(jsonify(msg='Workflow uploaded', status='ok',
wf_id=wf_id), 201)
Expand Down
10 changes: 10 additions & 0 deletions beeflow/wf_manager/resources/wf_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ def archive_workflow(db, wf_id, final_state=None):
os.makedirs(dags_dir, exist_ok=True)
wf_utils.export_dag(wf_id, dags_dir, graphmls_dir, no_dag_dir=True, copy_dag_in_archive=False)

# Archive workdir
wfi = wf_utils.get_workflow_interface(wf_id)
_, tasks = wfi.get_workflow()
# metadata related to the workdir is redundantly stored on each task
archive_workdir = wfi.get_task_metadata(tasks[0])['archive_workdir']
if archive_workdir:
workdir = wfi.get_task_metadata(tasks[0])['workdir']
workdir_dir = workflow_dir + "/workdir"
shutil.copytree(workdir, workdir_dir, dirs_exist_ok=True)

wf_state = f'Archived/{final_state}' if final_state is not None else 'Archived'
db.workflows.update_workflow_state(wf_id, wf_state)
wf_utils.update_wf_status(wf_id, wf_state)
Expand Down
3 changes: 2 additions & 1 deletion beeflow/wf_manager/resources/wf_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def connect_neo4j_driver(bolt_port):


def setup_workflow(wf_id, wf_name, wf_dir, wf_workdir, no_start, workflow=None,
tasks=None):
tasks=None, archive_workdir=False):
"""Initialize Workflow in Separate Process."""
wfi = get_workflow_interface(wf_id)
wfi.initialize_workflow(workflow)
Expand All @@ -279,6 +279,7 @@ def setup_workflow(wf_id, wf_name, wf_dir, wf_workdir, no_start, workflow=None,
wfi.add_task(task, task_state)
metadata = wfi.get_task_metadata(task)
metadata['workdir'] = wf_workdir
metadata['archive_workdir'] = archive_workdir
wfi.set_task_metadata(task, metadata)
db.workflows.add_task(task.id, wf_id, task.name, task_state)

Expand Down

0 comments on commit b9dddc2

Please sign in to comment.