Skip to content

Commit

Permalink
Merge branch 'develop' into beeflow-connect
Browse files Browse the repository at this point in the history
  • Loading branch information
aquan9 authored Sep 10, 2024
2 parents 3be6e30 + 8651ef1 commit e6bffec
Show file tree
Hide file tree
Showing 34 changed files with 1,602 additions and 1,310 deletions.
61 changes: 47 additions & 14 deletions beeflow/client/bee_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
import pathlib
import shutil
import subprocess
import tarfile
import tempfile
import textwrap
import time
import importlib.metadata
import jsonpickle
import requests
import typer
import yaml

from beeflow.common import config_driver
from beeflow.common.cli import NaturalOrderGroup
Expand Down Expand Up @@ -192,7 +194,7 @@ def submit(wf_name: str = typer.Argument(..., help='the workflow name'), # pyli
main_cwl: str = typer.Argument(...,
help='filename of main CWL (if using CWL tarball), '
+ 'path of main CWL (if using CWL directory)'),
yaml: str = typer.Argument(...,
yaml_file: str = typer.Argument(...,
help='filename of yaml file (if using CWL tarball), '
+ 'path of yaml file (if using CWL directory)'),
workdir: pathlib.Path = typer.Argument(...,
Expand All @@ -215,32 +217,34 @@ def is_parent(parent, path):
if os.path.isdir(wf_path):
print("Detected directory instead of packaged workflow. Packaging Directory...")
main_cwl_path = pathlib.Path(main_cwl).resolve()
yaml_path = pathlib.Path(yaml).resolve()
yaml_path = pathlib.Path(yaml_file).resolve()

if not main_cwl_path.exists():
error_exit(f'Main CWL file {main_cwl} does not exist')
if not yaml_path.exists():
error_exit(f'YAML file {yaml} does not exist')
error_exit(f'YAML file {yaml_file} does not exist')

# Packaging in temp dir, after copying alternate cwl_main or yaml file
cwl_indir = is_parent(wf_path, main_cwl_path)
yaml_indir = is_parent(wf_path, yaml_path)

# Always create temp dir for the workflow
tempdir_path = pathlib.Path(tempfile.mkdtemp())
tempdir_wf_path = pathlib.Path(tempdir_path / wf_name)
shutil.copytree(wf_path, tempdir_wf_path, dirs_exist_ok=False)
if not cwl_indir:
shutil.copy2(main_cwl, tempdir_wf_path)
if not yaml_indir:
shutil.copy2(yaml, tempdir_wf_path)
shutil.copy2(yaml_file, tempdir_wf_path)
package_path = package(tempdir_wf_path, tempdir_path)
else:
package_path = wf_path

# Untar and parse workflow
untar_path = pathlib.Path(tempfile.mkdtemp())
untar_wf_path = unpackage(package_path, untar_path)
main_cwl_path = untar_wf_path / pathlib.Path(main_cwl).name
yaml_path = untar_wf_path / pathlib.Path(yaml).name
yaml_path = untar_wf_path / pathlib.Path(yaml_file).name
parser = CwlParser()
workflow_id = generate_workflow_id()
workflow, tasks = parser.parse_workflow(workflow_id, str(main_cwl_path),
Expand All @@ -260,6 +264,12 @@ def is_parent(parent, path):
if not os.path.exists(workdir):
error_exit(f"Workflow working directory \"{workdir}\" doesn't exist")

# Make sure the workdir is not in /var or /var/tmp
if os.path.commonpath([os.path.realpath('/tmp'), workdir]) == os.path.realpath('/tmp'):
error_exit("Workflow working directory cannot be in \"/tmp\"")
if os.path.commonpath([os.path.realpath('/var/tmp'), workdir]) == os.path.realpath('/var/tmp'):
error_exit("Workflow working directory cannot be in \"/var/tmp\"")

# TODO: Can all of this information be sent as a file?
data = {
'wf_name': wf_name.encode(),
Expand Down Expand Up @@ -298,16 +308,19 @@ def is_parent(parent, path):

# Store provided arguments in text file for future reference
wf_dir = wf_utils.get_workflow_dir(wf_id)
sub_wf_dir = wf_dir + "/submit_command_args.txt"
sub_wf_dir = wf_dir + "/submit_command_args.yaml"

cmd = {
'wf_name': wf_name,
'wf_path': str(wf_path),
'main_cwl': main_cwl,
'yaml': yaml_file,
'workdir': workdir,
'wf_id': wf_id
}

f_name = open(sub_wf_dir, "w", encoding="utf-8")
f_name.write(f"wf_name: {wf_name}\n")
f_name.write(f"wf_path: {wf_path}\n")
f_name.write(f"main_cwl: {main_cwl}\n")
f_name.write(f"yaml: {yaml}\n")
f_name.write(f"workdir: {workdir}\n")
f_name.write(f"wf_id: {wf_id}")
f_name.close()
with open(sub_wf_dir, "w", encoding='utf-8') as command_file:
yaml.dump(cmd, command_file)

return wf_id

Expand Down Expand Up @@ -564,6 +577,26 @@ def reexecute(wf_name: str = typer.Argument(..., help='The workflow name'),
workdir = os.path.abspath(workdir)
if not os.path.exists(workdir):
error_exit(f"Workflow working directory \"{workdir}\" doesn't exist")
cwl_path = pathlib.Path(tempfile.mkdtemp())
archive_id = str(wf_path.stem)
with tarfile.open(wf_path) as archive:
archive_cmd = yaml.load(archive.extractfile(
str(pathlib.Path(archive_id) / 'submit_command_args.yaml')).read(),
Loader=yaml.Loader)

cwl_files = [
tarinfo for tarinfo in archive.getmembers()
if tarinfo.name.startswith(archive_id + '/cwl_files/')
and tarinfo.isreg()
]
for path in cwl_files:
path.name = os.path.basename(path.name)
archive.extractall(path=cwl_path, members=cwl_files)

main_cwl = cwl_path / pathlib.Path(archive_cmd['main_cwl']).name
yaml_file = cwl_path / pathlib.Path(archive_cmd['yaml']).name

return submit(wf_name, pathlib.Path(cwl_path), main_cwl, yaml_file, pathlib.Path(workdir))

data = {
'wf_filename': os.path.basename(wf_path).encode(),
Expand Down
90 changes: 32 additions & 58 deletions beeflow/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
from beeflow.common import paths
from beeflow.wf_manager.resources import wf_utils

from beeflow.common.db import wfm_db
from beeflow.common.db.bdb import connect_db
from beeflow.wf_manager.common import dep_manager
from beeflow.common.deps import container_manager
from beeflow.common.deps import neo4j_manager
from beeflow.common.deps import redis_manager


class ComponentManager:
Expand Down Expand Up @@ -205,49 +205,34 @@ def celery():
"""Start the celery task queue."""
log = open_log('celery')
# Setting --pool=solo to avoid preforking multiple processes
return subprocess.Popen(['celery', '-A', 'beeflow.common.celery', 'worker', '--pool=solo'],
stdout=log, stderr=log)
return subprocess.Popen(['celery', '-A', 'beeflow.common.deps.celery_manager',
'worker', '--pool=solo'], stdout=log, stderr=log)

# Run this before daemonizing in order to avoid slow background start
container_path = paths.redis_container()
# container_path = paths.redis_container()
# If it exists, we assume that it actually has a valid container
if not os.path.exists(container_path):
# if not os.path.exists(container_path):
# print('Unpacking Redis image...')
# subprocess.check_call(['ch-convert', '-i', 'tar', '-o', 'dir',
# bc.get('DEFAULT', 'redis_image'), container_path])
if not container_manager.check_container_dir('redis'):
print('Unpacking Redis image...')
subprocess.check_call(['ch-convert', '-i', 'tar', '-o', 'dir',
bc.get('DEFAULT', 'redis_image'), container_path])
container_manager.create_image('redis')

if not container_manager.check_container_dir('neo4j'):
print('Unpacking Neo4j image...')
container_manager.create_image('neo4j')

@mgr.component('neo4j-database', ('wf_manager',))
def start_neo4j():
"""Start the neo4j graph database."""
return neo4j_manager.start()

@mgr.component('redis', ())
def redis():
def start_redis():
"""Start redis."""
data_dir = 'data'
os.makedirs(os.path.join(paths.redis_root(), data_dir), exist_ok=True)
conf_name = 'redis.conf'
container_path = paths.redis_container()
# Dump the config
conf_path = os.path.join(paths.redis_root(), conf_name)
if not os.path.exists(conf_path):
with open(conf_path, 'w', encoding='utf-8') as fp:
# Don't listen on TCP
print('port 0', file=fp)
print('dir', os.path.join('/mnt', data_dir), file=fp)
print('maxmemory 2mb', file=fp)
print('unixsocket', os.path.join('/mnt', paths.redis_sock_fname()), file=fp)
print('unixsocketperm 700', file=fp)
cmd = [
'ch-run',
f'--bind={paths.redis_root()}:/mnt',
container_path,
'redis-server',
'/mnt/redis.conf',
]
log = open_log('redis')
# Ran into a strange "Failed to configure LOCALE for invalid locale name."
# from Redis, so setting LANG=C. This could have consequences for UTF-8
# strings.
env = dict(os.environ)
env['LANG'] = 'C'
env['LC_ALL'] = 'C'
return subprocess.Popen(cmd, env=env, stdout=log, stderr=log)
return redis_manager.start(log)

# Workflow manager and task manager need to be opened with PIPE for their stdout/stderr
if need_slurmrestd():
Expand Down Expand Up @@ -452,6 +437,15 @@ def status():
print(f'{comp} ... {stat}')


@app.command()
def info():
"""Get information about beeflow's installation."""
version = importlib.metadata.version("hpc-beeflow")
print(f"Beeflow version: {version}")
print(f"bee_workflow directory: {paths.workdir()}")
print(f"Log path: {paths.log_path()}")


@app.command()
def stop(query='yes'):
"""Stop the current running beeflow daemon."""
Expand Down Expand Up @@ -488,23 +482,6 @@ def stop(query='yes'):
print(f'Beeflow has stopped. Check the log at "{beeflow_log}".')


def kill_active_workflows(active_states, workflow_list):
"""Kill workflows with active states."""
db_path = wf_utils.get_db_path()
db = connect_db(wfm_db, db_path)
success = True
for name, wf_id, state in workflow_list:
if state in active_states:
pid = db.workflows.get_gdb_pid(wf_id)
if pid > 0:
dep_manager.kill_gdb(pid)
else:
# Failure most likely caused by an Initializing workflow.
print(f"No process for {name}, {wf_id}, {state}.")
success = False
return success


def archive_dir(dir_to_archive):
"""Archive directories for archive flag in reset."""
archive_dirs = ['logs', 'container_archive', 'archives', 'workflows']
Expand Down Expand Up @@ -572,9 +549,6 @@ def reset(archive: bool = typer.Option(False, '--archive', '-a',
# Exit out if the user didn't really mean to do a reset
sys.exit()
elif absolutely_sure in ("y", "yes"):
# First stop all active workflow processes
workflow_list = bee_client.get_wf_list()
kill_active_workflows(active_states, workflow_list)
# Stop all of the beeflow processes
stop("quiet")
print("Beeflow is shutting down.")
Expand Down
2 changes: 1 addition & 1 deletion beeflow/common/config_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ def validate_chrun_opts(opts):
info='HTTPS port used for the graph database')
VALIDATOR.option('graphdb', 'gdb_image_mntdir', default=join_path('/tmp', USER),
info='graph database image mount directory', validator=validation.make_dir)
VALIDATOR.option('graphdb', 'sleep_time', validator=int, default=10,
VALIDATOR.option('graphdb', 'sleep_time', validator=int, default=1,
info='how long to wait for the graph database to come up (this can take a while, '
'depending on the system)')
# Builder
Expand Down
Loading

0 comments on commit e6bffec

Please sign in to comment.