diff --git a/beeflow/client/bee_client.py b/beeflow/client/bee_client.py index 9cd2379b8..8c9556148 100644 --- a/beeflow/client/bee_client.py +++ b/beeflow/client/bee_client.py @@ -255,7 +255,8 @@ def is_parent(parent, path): 'wf_filename': os.path.basename(wf_path).encode(), 'workdir': workdir, 'workflow': jsonpickle.encode(workflow), - 'tasks': jsonpickle.encode(tasks, warn=True) + 'tasks': jsonpickle.encode(tasks, warn=True), + 'no_start': no_start, } files = { 'workflow_archive': wf_tarball @@ -284,10 +285,6 @@ def is_parent(parent, path): if tarball_path: os.remove(tarball_path) - # Start the workflow - if not no_start: - start(wf_id) - return wf_id diff --git a/beeflow/client/core.py b/beeflow/client/core.py index f3684fc09..02c2043ea 100644 --- a/beeflow/client/core.py +++ b/beeflow/client/core.py @@ -155,7 +155,7 @@ def init_components(): # Slurmrestd will be started only if we're running with Slurm and # slurm::use_commands is not True - @mgr.component('wf_manager', ('scheduler',)) + @mgr.component('wf_manager', ('scheduler', 'celery')) def start_wfm(): """Start the WFM.""" fp = open_log('wf_manager') @@ -181,6 +181,49 @@ def start_scheduler(): return launch_with_gunicorn('beeflow.scheduler.scheduler:create_app()', paths.sched_socket(), stdout=fp, stderr=fp) + @mgr.component('celery', ('redis',)) + def celery(): + """Start the celery task queue.""" + log = open_log('celery') + return subprocess.Popen(['celery', '-A', 'beeflow.common.celery', 'worker'], + stdout=log, stderr=log) + + @mgr.component('redis', ()) + def redis(): + """Start redis.""" + data_dir = 'data' + os.makedirs(os.path.join(paths.redis_root(), data_dir), exist_ok=True) + conf_name = 'redis.conf' + container_path = paths.redis_container() + # If it exists, we assume that it actually has a valid container + if not os.path.exists(container_path): + subprocess.check_call(['ch-convert', '-i', 'tar', '-o', 'dir', + bc.get('DEFAULT', 'redis_image'), container_path]) + # Dump the config + conf_path = os.path.join(paths.redis_root(), conf_name) + if not os.path.exists(conf_path): + with open(conf_path, 'w', encoding='utf-8') as fp: + # Don't listen on TCP + print('port 0', file=fp) + print('dir', os.path.join('/mnt', data_dir), file=fp) + print('maxmemory 2mb', file=fp) + print('unixsocket', os.path.join('/mnt', paths.redis_sock_fname()), file=fp) + print('unixsocketperm 700', file=fp) + cmd = [ + 'ch-run', + f'--bind={paths.redis_root()}:/mnt', + container_path, + 'redis-server', + '/mnt/redis.conf', + ] + log = open_log('redis') + # Ran into a strange "Failed to configure LOCALE for invalid locale name." + # from Redis, so setting LANG=C. This could have consequences for UTF-8 + # strings. + env = dict(os.environ) + env['LANG'] = 'C' + return subprocess.Popen(cmd, env=env, stdout=log, stderr=log) + # Workflow manager and task manager need to be opened with PIPE for their stdout/stderr if need_slurmrestd(): @mgr.component('slurmrestd') @@ -210,21 +253,14 @@ def version_str(version): return '.'.join([str(part) for part in version]) -def load_charliecloud(): - """Load the charliecloud module if it exists.""" - lmod = os.environ.get('MODULESHOME') - sys.path.insert(0, lmod + '/init') - from env_modules_python import module #noqa No need to import at top - module("load", "charliecloud") - - -def check_dependencies(): - """Check for various dependencies in the environment.""" - print('Checking dependencies...') - # Check for Charliecloud and it's version +def load_check_charliecloud(): + """Load the charliecloud module if it exists and check the version.""" if not shutil.which('ch-run'): + lmod = os.environ.get('MODULESHOME') + sys.path.insert(0, lmod + '/init') + from env_modules_python import module #noqa No need to import at top + module("load", "charliecloud") # Try loading the Charliecloud module then test again - load_charliecloud() if not shutil.which('ch-run'): warn('Charliecloud is not loaded. Please ensure that it is accessible' ' on your path.\nIf it\'s not installed on your system, please refer' @@ -247,6 +283,13 @@ def check_dependencies(): warn('This version of Charliecloud is too old, please upgrade to at ' f'least version {version_str(MIN_CHARLIECLOUD_VERSION)}') sys.exit(1) + + +def check_dependencies(): + """Check for various dependencies in the environment.""" + print('Checking dependencies...') + # Check for Charliecloud and its version + load_check_charliecloud() # Check for the flux API if bc.get('DEFAULT', 'workload_scheduler') == 'Flux': try: @@ -408,3 +451,27 @@ def restart(foreground: bool = typer.Option(False, '--foreground', '-F', """Attempt to stop and restart the beeflow daemon.""" stop() start(foreground) + + +def pull_to_tar(ref, tarball): + """Pull a container from a registry and convert to tarball.""" + subprocess.check_call(['ch-image', 'pull', ref]) + subprocess.check_call(['ch-convert', '-i', 'ch-image', '-o', 'tar', ref, tarball]) + + +@app.command() +def pull_deps(outdir: str = typer.Option('.', '--outdir', '-o', + help='directory to store containers in')): + """Pull required BEE containers and store in outdir.""" + load_check_charliecloud() + neo4j_path = os.path.join(os.path.realpath(outdir), 'neo4j.tar.gz') + pull_to_tar('neo4j:3.5.22', neo4j_path) + redis_path = os.path.join(os.path.realpath(outdir), 'redis.tar.gz') + pull_to_tar('redis', redis_path) + print() + print('The BEE dependency containers have been successfully downloaded. ' + 'Please make sure to set the following options in your config:') + print() + print('[DEFAULT]') + print('neo4j_image =', neo4j_path) + print('redis_image =', redis_path) diff --git a/beeflow/common/celery.py b/beeflow/common/celery.py new file mode 100644 index 000000000..006fc89f3 --- /dev/null +++ b/beeflow/common/celery.py @@ -0,0 +1,6 @@ +"""Module for celery configuration.""" +from beeflow.wf_manager.wf_manager import create_app + + +flask_app = create_app() +celery_app = flask_app.extensions['celery'] diff --git a/beeflow/common/config_driver.py b/beeflow/common/config_driver.py index 34994115a..4c2afcf9d 100644 --- a/beeflow/common/config_driver.py +++ b/beeflow/common/config_driver.py @@ -231,8 +231,11 @@ def filepath_completion_input(*pargs, **kwargs): info='backend workload scheduler to interact with ') VALIDATOR.option('DEFAULT', 'use_archive', validator=validation.bool_, attrs={'default': True}, info='use the BEE archiving functinality') -VALIDATOR.option('DEFAULT', 'bee_dep_image', validator=validation.file_, - info='container image with BEE dependencies', +VALIDATOR.option('DEFAULT', 'neo4j_image', validator=validation.file_, + info='neo4j container image', + attrs={'input': filepath_completion_input}) +VALIDATOR.option('DEFAULT', 'redis_image', validator=validation.file_, + info='redis container image', attrs={'input': filepath_completion_input}) VALIDATOR.option('DEFAULT', 'max_restarts', validator=int, attrs={'default': 3}, diff --git a/beeflow/common/db/wfm_db.py b/beeflow/common/db/wfm_db.py index 9e87a741c..d1f7dd8d6 100644 --- a/beeflow/common/db/wfm_db.py +++ b/beeflow/common/db/wfm_db.py @@ -53,7 +53,7 @@ def __init__(self, db_file): """Initialize Task, db_file, and Workflow object.""" self.Task = namedtuple("Task", "id task_id workflow_id name resource state slurm_id") #noqa self.db_file = db_file - self.Workflow = namedtuple("Workflow", "id workflow_id name state run_dir bolt_port gdb_pid") #noqa + self.Workflow = namedtuple("Workflow", "id workflow_id name state run_dir bolt_port http_port https_port gdb_pid") #noqa def get_workflow(self, workflow_id): """Return a workflow object.""" @@ -69,12 +69,13 @@ def get_workflows(self): workflows = [self.Workflow(*workflow) for workflow in result] return workflows - def add_workflow(self, workflow_id, name, state, run_dir, bolt_port, gdb_pid): + def init_workflow(self, workflow_id, name, run_dir, bolt_port, http_port, https_port): """Insert a new workflow into the database.""" - stmt = ("INSERT INTO workflows (workflow_id, name, state, run_dir, bolt_port, gdb_pid) " - "VALUES(?, ?, ?, ?, ?, ?);" - ) - bdb.run(self.db_file, stmt, [workflow_id, name, state, run_dir, bolt_port, gdb_pid]) + stmt = """INSERT INTO workflows (workflow_id, name, state, run_dir, + bolt_port, http_port, https_port, gdb_pid) + VALUES(?, ?, ?, ?, ?, ?, ?, ?);""" + bdb.run(self.db_file, stmt, [workflow_id, name, 'Initializing', run_dir, + bolt_port, http_port, https_port, -1]) def delete_workflow(self, workflow_id): """Delete a workflow from the database.""" @@ -136,6 +137,11 @@ def get_gdb_pid(self, workflow_id): gdb_pid = result return gdb_pid + def update_gdb_pid(self, workflow_id, gdb_pid): + """Update the gdb PID associated with a workflow.""" + stmt = "UPDATE workflows SET gdb_pid=? WHERE workflow_id=?" + bdb.run(self.db_file, stmt, [gdb_pid, workflow_id]) + def get_run_dir(self, workflow_id): """Return the bolt port associated with a workflow.""" stmt = "SELECT run_dir FROM workflows WHERE workflow_id=?" @@ -162,6 +168,8 @@ def _init_tables(self): state TEST NOT NULL, run_dir STR, bolt_port INTEGER, + http_port INTEGER, + https_port INTEGER, gdb_pid INTEGER);""" tasks_stmt = """CREATE TABLE IF NOT EXISTS tasks ( diff --git a/beeflow/common/paths.py b/beeflow/common/paths.py index 8a5c39e9d..1d3043c14 100644 --- a/beeflow/common/paths.py +++ b/beeflow/common/paths.py @@ -49,3 +49,37 @@ def log_path(): def log_fname(component): """Determine the log file name for the given component.""" return os.path.join(log_path(), f'{component}.log') + + +def redis_root(): + """Get the redis root directory (create it if it doesn't exist).""" + path = os.path.join(_workdir(), 'redis') + os.makedirs(path, exist_ok=True) + return path + + +def redis_container(): + """Get the path to the unpacked Redis container.""" + return os.path.join(_workdir(), 'redis_container') + + +def redis_sock_fname(): + """Return the file name for the Redis socket.""" + return 'redis.sock' + + +def _celery_root(): + """Get the celery root directory (create it if it doesn't exist).""" + path = os.path.join(_workdir(), 'celery') + os.makedirs(path, exist_ok=True) + return path + + +def celery_config(): + """Return the celery config path.""" + return os.path.join(_celery_root(), 'celery.py') + + +def celery_db(): + """Return the celery db path.""" + return os.path.join(_celery_root(), 'celery.db') diff --git a/beeflow/common/states.py b/beeflow/common/states.py new file mode 100644 index 000000000..f03a41d5a --- /dev/null +++ b/beeflow/common/states.py @@ -0,0 +1,23 @@ +"""Workflow and Task state values.""" + + +class WorkflowStates: + """Workflow status values.""" + + INITIALIZING = 'INITIALIZING' + PENDING = 'PENDING' + RUNNING = 'RUNNING' + PAUSED = 'PAUSED' + RESUME = 'RESUME' + CANCELLED = 'CANCELLED' + + +class TaskStates: + """Task status values.""" + + PENDING = 'PENDING' + RUNNING = 'RUNNING' + COMPLETED = 'COMPLETED' + FAILED = 'FAILED' + PAUSED = 'PAUSED' + CANCELLED = 'CANCELLED' diff --git a/beeflow/tests/test_wf_manager.py b/beeflow/tests/test_wf_manager.py index 7184a6073..068c6a47d 100644 --- a/beeflow/tests/test_wf_manager.py +++ b/beeflow/tests/test_wf_manager.py @@ -69,9 +69,19 @@ def _resource(tag=""): return _url() + str(tag) +class MockTask: + """Mock task class for mocking celery.""" + + @staticmethod + def delay(*pargs, **kwargs): + """Mock a delay call to the celery backend.""" + return None + + # WFList Tests def test_submit_workflow(client, mocker, teardown_workflow, temp_db): """Test submitting a workflow.""" + mocker.patch('beeflow.wf_manager.resources.wf_list.init_workflow', new=MockTask) mocker.patch('beeflow.wf_manager.resources.wf_list.dep_manager.create_image', return_value=True) mocker.patch('beeflow.wf_manager.resources.wf_list.dep_manager.start_gdb', return_value=True) @@ -92,7 +102,8 @@ def test_submit_workflow(client, mocker, teardown_workflow, temp_db): 'workdir': '.', 'workflow': jsonpickle.encode(WORKFLOW_GOLD), 'tasks': jsonpickle.encode(TASKS_GOLD, warn=True), - 'workflow_archive': tarball_contents + 'workflow_archive': tarball_contents, + 'no_start': False }) # Remove task added during the test @@ -101,6 +112,7 @@ def test_submit_workflow(client, mocker, teardown_workflow, temp_db): def test_reexecute_workflow(client, mocker, teardown_workflow, temp_db): """Test reexecuting a workflow.""" + mocker.patch('beeflow.wf_manager.resources.wf_list.init_workflow', new=MockTask) mocker.patch('beeflow.wf_manager.resources.wf_list.dep_manager.create_image', return_value=True) mocker.patch('beeflow.wf_manager.resources.wf_list.dep_manager.start_gdb', return_value=True) @@ -126,15 +138,37 @@ def test_reexecute_workflow(client, mocker, teardown_workflow, temp_db): assert resp.json['msg'] == 'Workflow uploaded' +class MockDBWorkflowHandle: + """Mock DB workflow handle.""" + + def update_workflow_state(self, *pargs, **kwargs): + """Mock update a workflow.""" + + +class MockDB: + """Mock DB for workflow manager.""" + + @property + def workflows(self): + """Return a workflow handle.""" + return MockDBWorkflowHandle() + + +def mock_connect_db(*pargs, **kwargs): + """Mock a DB connection.""" + return MockDB() + + # WFActions Tests def test_start_workflow(client, mocker, temp_db): """Test starting a workflow.""" + mocker.patch('beeflow.wf_manager.resources.wf_utils.connect_db', new=mock_connect_db) mocker.patch('beeflow.wf_manager.resources.wf_utils.get_workflow_interface', return_value=MockWFI()) mocker.patch('beeflow.wf_manager.resources.wf_utils.submit_tasks_tm', return_value=None) mocker.patch('beeflow.wf_manager.resources.wf_utils.submit_tasks_scheduler', return_value=None) mocker.patch('beeflow.wf_manager.resources.wf_utils.update_wf_status', return_value=None) - mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', temp_db.db_file) + mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', new=lambda: temp_db.db_file) mocker.patch('beeflow.wf_manager.resources.wf_actions.db_path', temp_db.db_file) resp = client().post(f'/bee_wfm/v1/jobs/{WF_ID}') assert resp.status_code == 200 @@ -147,11 +181,11 @@ def test_workflow_status(client, mocker, setup_teardown_workflow, temp_db): mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', temp_db.db_file) mocker.patch('beeflow.wf_manager.resources.wf_actions.db_path', temp_db.db_file) wf_name = 'wf' - wf_status = 'Pending' bolt_port = 3030 - gdb_pid = 12345 + http_port = 3333 + https_port = 3455 - temp_db.workflows.add_workflow(WF_ID, wf_name, wf_status, 'dir', bolt_port, gdb_pid) + temp_db.workflows.init_workflow(WF_ID, wf_name, 'dir', bolt_port, http_port, https_port) temp_db.workflows.add_task(123, WF_ID, 'task', "WAITING") temp_db.workflows.add_task(124, WF_ID, 'task', "RUNNING") diff --git a/beeflow/wf_manager/common/dep_manager.py b/beeflow/wf_manager/common/dep_manager.py index afe7bdb9a..c9dbbfc43 100755 --- a/beeflow/wf_manager/common/dep_manager.py +++ b/beeflow/wf_manager/common/dep_manager.py @@ -4,7 +4,6 @@ import os import re -import sys import time import shutil import signal @@ -114,7 +113,7 @@ def create_image(): # Can throw an exception that needs to be handled by the caller check_container_runtime() - dep_img = bc.get('DEFAULT', 'bee_dep_image') + dep_img = bc.get('DEFAULT', 'neo4j_image') # Check for BEE dependency container directory: container_dir_exists = check_container_dir() @@ -127,8 +126,7 @@ def create_image(): # Build new dependency container try: subprocess.run(["ch-convert", "-i", "tar", "-o", "dir", - str(dep_img), str(container_dir)], - stdout=sys.stdout, stderr=sys.stderr, check=True) + str(dep_img), str(container_dir)], check=True) except subprocess.CalledProcessError as error: dep_log.error(f"ch-convert failed: {error}") shutil.rmtree(container_dir) @@ -166,7 +164,7 @@ def start_gdb(mount_dir, bolt_port, http_port, https_port, reexecute=False): "-b", logs_dir + ":/logs", "-b", run_dir + ":/var/lib/neo4j/run", container_path, "--", *command - ], stdout=sys.stdout, stderr=sys.stderr, check=True) + ], check=True) except subprocess.CalledProcessError: dep_log.error("neo4j-admin set-initial-password failed") return -1 diff --git a/beeflow/wf_manager/common/wf_db.py b/beeflow/wf_manager/common/wf_db.py index 80c39bdae..f54fa9d9f 100644 --- a/beeflow/wf_manager/common/wf_db.py +++ b/beeflow/wf_manager/common/wf_db.py @@ -92,7 +92,8 @@ def init_tables(): status TEST NOT NULL, run_dir STR, bolt_port INTEGER, - gdb_pid INTEGER);""" + gdb_pid INTEGER, + init_task_id INTEGER);""" tasks_stmt = """CREATE TABLE IF NOT EXISTS tasks ( id INTEGER PRIMARY KEY, @@ -213,6 +214,25 @@ def add_workflow(workflow_id, name, status, run_dir, bolt_port, gdb_pid): run(stmt, [workflow_id, name, status, run_dir, bolt_port, gdb_pid]) +def complete_gdb_init(workflow_id, gdb_pid): + """Complete the GDB init process for a workflow.""" + stmt = "UPDATE workflows SET gdb_pid=?, status=? WHERE workflow_id = ?" + run(stmt, [gdb_pid, 'Pending', workflow_id]) + + +def init_workflow(workflow_id, name, run_dir, bolt_port, http_port, https_port, init_task_id): + """Insert a new workflow into the database.""" + if not table_exists('workflows'): + # Initialize the database + init_tables() + + stmt = """INSERT INTO workflows (workflow_id, name, status, run_dir, bolt_port, + http_port, https_port, gdb_pid, init_task_id) + VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?);""" + run(stmt, [workflow_id, name, 'Initializing', run_dir, bolt_port, http_port, + https_port, -1, init_task_id]) + + def update_workflow_state(workflow_id, status): """Update the status in a workflow in the database.""" stmt = "UPDATE workflows SET status=? WHERE workflow_id=?" @@ -247,6 +267,13 @@ def get_gdb_pid(workflow_id): return gdb_pid +def get_init_task_id(workflow_id): + """Return the task ID for the GDB initialization.""" + stmt = "SELECT init_task_id FROM workflows WHERE workflow_id=?" + result = get(stmt, [workflow_id])[0] + return result[0] + + def get_run_dir(workflow_id): """Return the bolt port associated with a workflow.""" stmt = "SELECT run_dir FROM workflows WHERE workflow_id=?" diff --git a/beeflow/wf_manager/resources/wf_actions.py b/beeflow/wf_manager/resources/wf_actions.py index 6910599a3..76c27a914 100644 --- a/beeflow/wf_manager/resources/wf_actions.py +++ b/beeflow/wf_manager/resources/wf_actions.py @@ -22,20 +22,12 @@ def __init__(self): def post(self, wf_id): """Start workflow. Send ready tasks to the task manager.""" db = connect_db(wfm_db, db_path) - wfi = wf_utils.get_workflow_interface(wf_id) - state = wfi.get_workflow_state() - if state in ('RUNNING', 'PAUSED', 'COMPLETED'): - resp = make_response(jsonify(msg='Cannot start workflow it is ' - f'{state.lower()}.', - status='ok'), 200) - return resp - wfi.execute_workflow() - tasks = wfi.get_ready_tasks() - wf_utils.schedule_submit_tasks(wf_id, tasks) - wf_id = wfi.workflow_id - wf_utils.update_wf_status(wf_id, 'Running') - db.workflows.update_workflow_state(wf_id, 'Running') - resp = make_response(jsonify(msg='Started workflow!', status='ok'), 200) + if wf_utils.start_workflow(wf_id): + db.workflows.update_workflow_state(wf_id, 'Running') + resp = make_response(jsonify(msg='Started workflow!', status='ok'), 200) + else: + resp_body = jsonify(msg='Cannot start workflow it is {state.lower()}.', status='ok') + resp = make_response(resp_body, 200) return resp @staticmethod @@ -54,7 +46,7 @@ def get(wf_id): for task in tasks: tasks_status.append(f"{task.name}--{task.state}") tasks_status = '\n'.join(tasks_status) - wf_status = wf_utils.read_wf_status(wf_id) + wf_status = db.workflows.get_workflow_state(wf_id) resp = make_response(jsonify(tasks_status=tasks_status, wf_status=wf_status, status='ok'), 200) diff --git a/beeflow/wf_manager/resources/wf_list.py b/beeflow/wf_manager/resources/wf_list.py index 5d71484a0..ea30b2d4f 100644 --- a/beeflow/wf_manager/resources/wf_list.py +++ b/beeflow/wf_manager/resources/wf_list.py @@ -10,6 +10,7 @@ from flask import make_response, jsonify from werkzeug.datastructures import FileStorage from flask_restful import Resource, reqparse +from celery import shared_task # noqa (pylama can't find celery imports) from beeflow.common import log as bee_logging # from beeflow.common.wf_profiler import WorkflowProfiler @@ -51,6 +52,52 @@ def extract_wf(wf_id, filename, workflow_archive, reexecute=False): return archive_dir +@shared_task(ignore_result=True) +def init_workflow(wf_id, wf_name, wf_dir, wf_workdir, bolt_port, http_port, + https_port, no_start, workflow=None, tasks=None, reexecute=False): + """Initialize the workflow in a separate process.""" + print('Initializing GDB and workflow...') + try: + dep_manager.create_image() + except dep_manager.NoContainerRuntime: + crt_message = "Charliecloud not installed in current environment." + log.error(crt_message) + return + + gdb_pid = dep_manager.start_gdb(wf_dir, bolt_port, http_port, https_port, reexecute=reexecute) + dep_manager.wait_gdb(log) + + wfi = wf_utils.get_workflow_interface_by_bolt_port(bolt_port) + if reexecute: + wfi.reset_workflow(wf_id) + else: + wfi.initialize_workflow(workflow) + + # initialize_wf_profiler(wf_name) + + log.info('Setting workflow metadata') + wf_utils.create_wf_metadata(wf_id, wf_name) + db = connect_db(wfm_db, db_path) + if reexecute: + _, tasks = wfi.get_workflow() + for task in tasks: + wfi.add_task(task) + metadata = wfi.get_task_metadata(task) + metadata['workdir'] = wf_workdir + wfi.set_task_metadata(task, metadata) + db.workflows.add_task(task.id, wf_id, task.name, "WAITING") + + db.workflows.update_gdb_pid(wf_id, gdb_pid) + wf_utils.update_wf_status(wf_id, 'Waiting') + db.workflows.update_workflow_state(wf_id, 'Waiting') + if no_start: + log.info('Not starting workflow, as requested') + else: + log.info('Starting workflow') + db.workflows.update_workflow_state(wf_id, 'Running') + wf_utils.start_workflow(wf_id) + + db_path = wf_utils.get_db_path() @@ -71,7 +118,7 @@ def get(self): return resp def post(self): - """Receive a workflow, parse it, and start up a neo4j instance for it.""" + """Upload a workflow, initialize database, and start if required.""" db = connect_db(wfm_db, db_path) reqparser = reqparse.RequestParser() reqparser.add_argument('wf_name', type=str, required=True, @@ -84,6 +131,7 @@ def post(self): location='form') reqparser.add_argument('tasks', type=str, required=True, location='form') + reqparser.add_argument('no_start', type=str, required=True, location='form') reqparser.add_argument('workflow_archive', type=FileStorage, required=False, location='files') data = reqparser.parse_args() @@ -91,43 +139,25 @@ def post(self): wf_filename = data['wf_filename'] wf_name = data['wf_name'] wf_workdir = data['workdir'] + # Note we have to check for the 'true' string value + no_start = data['no_start'].lower() == 'true' workflow = jsonpickle.decode(data['workflow']) # May have to decode the list and task objects separately tasks = [jsonpickle.decode(task) if isinstance(task, str) else task for task in jsonpickle.decode(data['tasks'])] - try: - dep_manager.create_image() - except dep_manager.NoContainerRuntime: - crt_message = "Charliecloud not installed in current environment." - log.error(crt_message) - resp = make_response(jsonify(msg=crt_message, status='error'), 418) - return resp - wf_id = workflow.id wf_dir = extract_wf(wf_id, wf_filename, wf_tarball) bolt_port = wf_utils.get_open_port() http_port = wf_utils.get_open_port() https_port = wf_utils.get_open_port() - gdb_pid = dep_manager.start_gdb(wf_dir, bolt_port, http_port, https_port) - db.workflows.add_workflow(wf_id, wf_name, 'Pending', wf_dir, bolt_port, gdb_pid) - dep_manager.wait_gdb(log) - - wfi = wf_utils.get_workflow_interface(wf_id) - wfi.initialize_workflow(workflow) - # initialize_wf_profiler(wf_name) + db.workflows.init_workflow(wf_id, wf_name, wf_dir, bolt_port, http_port, https_port) + init_workflow.delay(wf_id, wf_name, wf_dir, wf_workdir, bolt_port, http_port, + https_port, no_start, workflow=workflow, tasks=tasks) - wf_utils.create_wf_metadata(wf_id, wf_name) - for task in tasks: - wfi.add_task(task) - metadata = wfi.get_task_metadata(task) - metadata['workdir'] = wf_workdir - wfi.set_task_metadata(task, metadata) - db.workflows.add_task(task.id, wf_id, task.name, "WAITING") - resp = make_response(jsonify(msg='Workflow uploaded', status='ok', + return make_response(jsonify(msg='Workflow uploaded', status='ok', wf_id=wf_id), 201) - return resp def put(self): """Reexecute a workflow.""" @@ -160,19 +190,11 @@ def put(self): bolt_port = wf_utils.get_open_port() http_port = wf_utils.get_open_port() https_port = wf_utils.get_open_port() - gdb_pid = dep_manager.start_gdb(wf_dir, bolt_port, http_port, - https_port, reexecute=True) - db.workflows.add_workflow(wf_id, wf_name, 'Pending', wf_dir, bolt_port, gdb_pid) - dep_manager.wait_gdb(log) - wfi = wf_utils.get_workflow_interface(wf_id) - wfi.reset_workflow(wf_id) - wf_utils.create_wf_metadata(wf_id, wf_name) - _, tasks = wfi.get_workflow() - for task in tasks: - metadata = wfi.get_task_metadata(task) - metadata['workdir'] = wf_workdir - wfi.set_task_metadata(task, metadata) + db.workflows.init_workflow(wf_id, wf_name, wf_dir, bolt_port, http_port, https_port) + init_workflow.delay(wf_id, wf_name, wf_dir, wf_workdir, bolt_port, http_port, + https_port, no_start=False, reexecute=True) + # Return the wf_id and created resp = make_response(jsonify(msg='Workflow uploaded', status='ok', wf_id=wf_id), 201) diff --git a/beeflow/wf_manager/resources/wf_utils.py b/beeflow/wf_manager/resources/wf_utils.py index edcf21899..5a7ad0fe8 100644 --- a/beeflow/wf_manager/resources/wf_utils.py +++ b/beeflow/wf_manager/resources/wf_utils.py @@ -123,7 +123,15 @@ def create_wf_namefile(wf_name, wf_id): def get_workflow_interface(wf_id): """Instantiate and return workflow interface object.""" db = connect_db(wfm_db, get_db_path()) + # Wait for the GDB + if db.workflows.get_workflow_state(wf_id) == 'Initializing': + raise RuntimeError('Workflow is still initializing') bolt_port = db.workflows.get_bolt_port(wf_id) + return get_workflow_interface_by_bolt_port(bolt_port) + + +def get_workflow_interface_by_bolt_port(bolt_port): + """Return a workflow interface connection using just the bolt port.""" try: driver = Neo4jDriver(user="neo4j", bolt_port=bolt_port, db_hostname=bc.get("graphdb", "hostname"), @@ -253,3 +261,19 @@ def schedule_submit_tasks(wf_id, tasks): allocation = submit_tasks_scheduler(tasks) #NOQA # Submit tasks to TM submit_tasks_tm(wf_id, tasks, allocation) + + +def start_workflow(wf_id): + """Attempt to start the workflow, returning True if successful.""" + db = connect_db(wfm_db, get_db_path()) + wfi = get_workflow_interface(wf_id) + state = wfi.get_workflow_state() + if state in ('RUNNING', 'PAUSED', 'COMPLETED'): + return False + wfi.execute_workflow() + tasks = wfi.get_ready_tasks() + schedule_submit_tasks(wf_id, tasks) + wf_id = wfi.workflow_id + update_wf_status(wf_id, 'Running') + db.workflows.update_workflow_state(wf_id, 'Running') + return True diff --git a/beeflow/wf_manager/wf_manager.py b/beeflow/wf_manager/wf_manager.py index 7a3ce74a2..5c3fcbdec 100644 --- a/beeflow/wf_manager/wf_manager.py +++ b/beeflow/wf_manager/wf_manager.py @@ -1,13 +1,14 @@ """Start up the workflow manager connecting all of the endpoints.""" +import os from flask import Flask +from celery import Celery # noqa (pylama can't find celery imports) from beeflow.common.api import BeeApi - +from beeflow.common import paths from beeflow.wf_manager.resources.wf_list import WFList from beeflow.wf_manager.resources.wf_actions import WFActions from beeflow.wf_manager.resources.wf_metadata import WFMetadata from beeflow.wf_manager.resources.wf_update import WFUpdate - from beeflow.wf_manager.resources import wf_utils @@ -21,6 +22,18 @@ def create_app(): api.add_resource(WFActions, '/bee_wfm/v1/jobs/') api.add_resource(WFMetadata, '/bee_wfm/v1/jobs//metadata') api.add_resource(WFUpdate, '/bee_wfm/v1/jobs/update/') + + # Initialize celery app + celery_app = Celery(app.name) + redis_socket = os.path.join(paths.redis_root(), paths.redis_sock_fname()) + celery_app.config_from_object({ + 'broker_url': f'redis+socket://{redis_socket}', + 'result_backend': f'db+sqlite://{paths.celery_db()}', + 'task_serializer': 'pickle', + 'accept_content': ['application/json', 'application/x-python-serialize'], + }) + celery_app.set_default() + app.extensions['celery'] = celery_app return app diff --git a/ci/bee_config.sh b/ci/bee_config.sh index 469378517..4759e79f6 100755 --- a/ci/bee_config.sh +++ b/ci/bee_config.sh @@ -10,7 +10,8 @@ cat >> ~/.config/beeflow/bee.conf <`_ **version 0.34 (or greater)** - Charliecloud is installed on Los Alamos National Laboratory (LANL) clusters and can be invoked via ``module load charliecloud`` before running beeflow. If you are on a system that does not have the module, `Charliecloud `_ is easily installed in user space and requires no privileges to install. To insure Charliecloud is available in subsequent runs add ``module load charliecloud`` (or if you installed it ``export PATH=:$PATH``) to your .bashrc (or other appropriate shell initialization file). BEE runs dependencies from a Charliecloud container and uses it to run the graph database neo4j and other dependencies. The default container runtime for containerized applications in BEE is Charliecloud. +* `Charliecloud `_ **version 0.34 (or greater)** + Charliecloud is installed on Los Alamos National Laboratory (LANL) clusters and can be invoked via ``module load charliecloud`` before running beeflow. If you are on a system that does not have the module, `Charliecloud `_ is easily installed in user space and requires no privileges to install. To insure Charliecloud is available in subsequent runs add ``module load charliecloud`` (or if you installed it ``export PATH=:$PATH``) to your .bashrc (or other appropriate shell initialization file). BEE runs dependencies from a Charliecloud container and uses it to run the graph database neo4j and other dependencies. The default container runtime for containerized applications in BEE is Charliecloud. - * **BEE dependency container**: - If you are on a LANL system, you may use the dependency container supplied by the BEE team: **/usr/projects/BEE/neo4j-3-5-17-ch.tar.gz** +* **Containers**: + Two Charliecloud dependency containers are currently required for BEE: one for the Neo4j graph database and another for Redis. The paths to these containers will need to be set in the BEE configuration later, using the ``neo4j_image`` and the ``redis_image`` options respectively. BEE only supports Neo4j 3.5.x. We are currently using the latest version of Redis supplied on Docker Hub (as of 2023). - At this time the only dependency needed in a container is **neo4j version 3.5.x**. To build the container for X86, invoke Charliecloud on the cluster where BEE components will be running to pull the graph database **neo4j** and create a Charliecloud tarball. + For LANL systems, please use the containers supplied by the BEE team: **/usr/projects/BEE/neo4j-3-5-17-ch.tar.gz**, **/usr/projects/BEE/redis.tar.gz**. - -.. code-block:: - - ch-image pull neo4j:3.5.22 - ch-convert -o tar neo4j:3.5.22 neo4j-3-5-22.tar.gz - -.. + For other users, these containers can be pulled from Docker Hub (after following `Installation:`_ below) using ``beeflow core pull-deps``, which will download and report the container paths to be set in the config later. Installation: ============= @@ -59,11 +53,7 @@ You will need to setup the bee configuration file that will be located in: macOS: ``~/Library/Application Support/beeflow/bee.conf`` -Before creating a bee.conf file you will need to know the path to your **BEE -dependency container** and the type of workload scheduler (Slurm or LSF). (On -LANL systems you may use the BEE provided container: -**/usr/projects/BEE/neo4j-3-5-17-ch.tar.gz**). Depending on the system, you -may also need to know an account name to use. +Before creating a bee.conf file you will need to know the path to the two required Charliecloud containers, one for Neo4j (``neo4j_image``) and Redis (``redis_image``). See `Requirements:`_ above for pulling these containers. Depending on the system, you may also need to know system-specific information, such as account information. You can leave some options blank if these are unnecessary. Once you are ready type ``beeflow config new``. diff --git a/pyproject.toml b/pyproject.toml index 9e88b5273..7056c66dd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -69,6 +69,7 @@ gunicorn = "^20.1.0" typer = "^0.5.0" # Seems to be required for Flux cffi = "^1.15.1" +celery = { version = "^5.3.4", extras = ["redis", "sqlalchemy"] } # Cloud optional dependencies google-api-python-client = { version = "^2.66.0", optional = true }