From 464952938bb503f4bdd6018da08b264177d7117c Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Wed, 6 Sep 2023 15:05:11 -0600 Subject: [PATCH 01/15] Add initial code for celery and redis --- beeflow/client/core.py | 22 ++++++++++++++++++++- beeflow/common/celery.py | 5 +++++ beeflow/common/paths.py | 33 ++++++++++++++++++++++++++++++++ beeflow/common/tasks.py | 0 beeflow/wf_manager/wf_manager.py | 15 +++++++++++++-- pyproject.toml | 1 + 6 files changed, 73 insertions(+), 3 deletions(-) create mode 100644 beeflow/common/celery.py create mode 100644 beeflow/common/tasks.py diff --git a/beeflow/client/core.py b/beeflow/client/core.py index 7805eaf38..22a066984 100644 --- a/beeflow/client/core.py +++ b/beeflow/client/core.py @@ -125,6 +125,7 @@ def kill(self): proc.terminate() + def warn(*pargs): """Print a red warning message.""" typer.secho(' '.join(pargs), fg=typer.colors.RED, file=sys.stderr) @@ -156,7 +157,7 @@ def init_components(): # Slurmrestd will be started only if we're running with Slurm and # slurm::use_commands is not True - @mgr.component('wf_manager', ('scheduler',)) + @mgr.component('wf_manager', ('scheduler', 'celery')) def start_wfm(): """Start the WFM.""" fp = open_log('wf_manager') @@ -182,6 +183,25 @@ def start_scheduler(): return launch_with_gunicorn('beeflow.scheduler.scheduler:create_app()', paths.sched_socket(), stdout=fp, stderr=fp) + @mgr.component('celery', ('redis',)) + def celery(): + """Start the celery task queue.""" + log = open_log('celery') + return subprocess.Popen(['celery', '-A', 'beeflow.common.celery', 'worker'], + stdout=log, stderr=log) + + @mgr.component('redis', ()) + def redis(): + """Start redis.""" + # Dump the config + with open(paths.redis_config(), 'w', encoding='utf-8') as fp: + print('maxmemory 2mb', file=fp) + print('unixsocket', paths.redis_socket(), file=fp) + print('unixsocketperm 700', file=fp) + cmd = ['redis-server', paths.redis_config()] + log = open_log('redis') + return subprocess.Popen(cmd, stdout=log, stderr=log) + # Workflow manager and task manager need to be opened with PIPE for their stdout/stderr if need_slurmrestd(): @mgr.component('slurmrestd') diff --git a/beeflow/common/celery.py b/beeflow/common/celery.py new file mode 100644 index 000000000..ab2c32e89 --- /dev/null +++ b/beeflow/common/celery.py @@ -0,0 +1,5 @@ +from beeflow.wf_manager.wf_manager import create_app + + +flask_app = create_app() +celery_app = flask_app.extensions['celery'] diff --git a/beeflow/common/paths.py b/beeflow/common/paths.py index 8a5c39e9d..652b7f03d 100644 --- a/beeflow/common/paths.py +++ b/beeflow/common/paths.py @@ -49,3 +49,36 @@ def log_path(): def log_fname(component): """Determine the log file name for the given component.""" return os.path.join(log_path(), f'{component}.log') + + +def _redis_root(): + """Get the redis root directory (create it if it doesn't exist).""" + path = os.path.join(_workdir(), 'redis') + os.makedirs(path, exist_ok=True) + return path + +def redis_config(): + """Get the Redis config path.""" + return os.path.join(_redis_root(), 'redis.conf') + + +def redis_socket(): + """Get the Redis socket path.""" + return os.path.join(_redis_root(), 'redis.sock') + + +def _celery_root(): + """Get the celery root directory (create it if it doesn't exist).""" + path = os.path.join(_workdir(), 'celery') + os.makedirs(path, exist_ok=True) + return path + + +def celery_config(): + """Return the celery config path.""" + return os.path.join(_celery_root(), 'celery.py') + + +def celery_db(): + """Return the celery db path.""" + return os.path.join(_celery_root(), 'celery.db') diff --git a/beeflow/common/tasks.py b/beeflow/common/tasks.py new file mode 100644 index 000000000..e69de29bb diff --git a/beeflow/wf_manager/wf_manager.py b/beeflow/wf_manager/wf_manager.py index 7a3ce74a2..6a43e32e5 100644 --- a/beeflow/wf_manager/wf_manager.py +++ b/beeflow/wf_manager/wf_manager.py @@ -1,16 +1,17 @@ """Start up the workflow manager connecting all of the endpoints.""" from flask import Flask +from celery import Celery from beeflow.common.api import BeeApi - +from beeflow.common import paths from beeflow.wf_manager.resources.wf_list import WFList from beeflow.wf_manager.resources.wf_actions import WFActions from beeflow.wf_manager.resources.wf_metadata import WFMetadata from beeflow.wf_manager.resources.wf_update import WFUpdate - from beeflow.wf_manager.resources import wf_utils + def create_app(): """Create flask app object and add REST endpoints.""" app = Flask(__name__) @@ -21,6 +22,16 @@ def create_app(): api.add_resource(WFActions, '/bee_wfm/v1/jobs/') api.add_resource(WFMetadata, '/bee_wfm/v1/jobs//metadata') api.add_resource(WFUpdate, '/bee_wfm/v1/jobs/update/') + + # Initialize celery app + celery_app = Celery(app.name) + celery_app.config_from_object({ + 'broker_url': f'redis+socket://{paths.redis_socket()}', + 'result_backend': f'db+sqlite://{paths.celery_db()}', + 'imports': ('beeflow.common.tasks',), + }) + celery_app.set_default() + app.extensions['celery'] = celery_app return app diff --git a/pyproject.toml b/pyproject.toml index 9e88b5273..7056c66dd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -69,6 +69,7 @@ gunicorn = "^20.1.0" typer = "^0.5.0" # Seems to be required for Flux cffi = "^1.15.1" +celery = { version = "^5.3.4", extras = ["redis", "sqlalchemy"] } # Cloud optional dependencies google-api-python-client = { version = "^2.66.0", optional = true } From cf05ce92341c413f0f1ca2037a51bcf01a13eb18 Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Mon, 18 Sep 2023 12:25:47 -0600 Subject: [PATCH 02/15] Add initial GDB start and workflow init task --- beeflow/client/bee_client.py | 7 +- beeflow/client/core.py | 2 +- beeflow/common/celery.py | 1 + beeflow/common/db/wfm_db.py | 11 +-- beeflow/common/paths.py | 8 +++ beeflow/common/states.py | 23 +++++++ beeflow/common/tasks.py | 0 beeflow/wf_manager/common/wf_db.py | 29 +++++++- beeflow/wf_manager/resources/wf_actions.py | 19 ++---- beeflow/wf_manager/resources/wf_list.py | 79 ++++++++++++---------- beeflow/wf_manager/resources/wf_utils.py | 23 +++++++ beeflow/wf_manager/wf_manager.py | 5 +- 12 files changed, 146 insertions(+), 61 deletions(-) create mode 100644 beeflow/common/states.py delete mode 100644 beeflow/common/tasks.py diff --git a/beeflow/client/bee_client.py b/beeflow/client/bee_client.py index e24a74b2b..8a4bf9526 100644 --- a/beeflow/client/bee_client.py +++ b/beeflow/client/bee_client.py @@ -254,7 +254,8 @@ def is_parent(parent, path): 'wf_filename': os.path.basename(wf_path).encode(), 'workdir': workdir, 'workflow': jsonpickle.encode(workflow), - 'tasks': jsonpickle.encode(tasks, warn=True) + 'tasks': jsonpickle.encode(tasks, warn=True), + 'no_start': no_start, } files = { 'workflow_archive': wf_tarball @@ -283,10 +284,6 @@ def is_parent(parent, path): if tarball_path: os.remove(tarball_path) - # Start the workflow - if not no_start: - start(wf_id) - return wf_id diff --git a/beeflow/client/core.py b/beeflow/client/core.py index 22a066984..5c11409bd 100644 --- a/beeflow/client/core.py +++ b/beeflow/client/core.py @@ -125,7 +125,6 @@ def kill(self): proc.terminate() - def warn(*pargs): """Print a red warning message.""" typer.secho(' '.join(pargs), fg=typer.colors.RED, file=sys.stderr) @@ -195,6 +194,7 @@ def redis(): """Start redis.""" # Dump the config with open(paths.redis_config(), 'w', encoding='utf-8') as fp: + print('dir', paths.redis_data(), file=fp) print('maxmemory 2mb', file=fp) print('unixsocket', paths.redis_socket(), file=fp) print('unixsocketperm 700', file=fp) diff --git a/beeflow/common/celery.py b/beeflow/common/celery.py index ab2c32e89..006fc89f3 100644 --- a/beeflow/common/celery.py +++ b/beeflow/common/celery.py @@ -1,3 +1,4 @@ +"""Module for celery configuration.""" from beeflow.wf_manager.wf_manager import create_app diff --git a/beeflow/common/db/wfm_db.py b/beeflow/common/db/wfm_db.py index 9e87a741c..2e75d1c1d 100644 --- a/beeflow/common/db/wfm_db.py +++ b/beeflow/common/db/wfm_db.py @@ -69,12 +69,12 @@ def get_workflows(self): workflows = [self.Workflow(*workflow) for workflow in result] return workflows - def add_workflow(self, workflow_id, name, state, run_dir, bolt_port, gdb_pid): + def init_workflow(self, workflow_id, name, run_dir, bolt_port, http_port, https_port, init_task_id): """Insert a new workflow into the database.""" - stmt = ("INSERT INTO workflows (workflow_id, name, state, run_dir, bolt_port, gdb_pid) " + stmt = ("INSERT INTO workflows (workflow_id, name, state, run_dir, bolt_port, http_port, https_port, gdb_pid, init_task_id) " "VALUES(?, ?, ?, ?, ?, ?);" ) - bdb.run(self.db_file, stmt, [workflow_id, name, state, run_dir, bolt_port, gdb_pid]) + bdb.run(self.db_file, stmt, [workflow_id, name, 'Initializing', run_dir, bolt_port, http_port, https_port, -1, init_task_id]) def delete_workflow(self, workflow_id): """Delete a workflow from the database.""" @@ -162,7 +162,10 @@ def _init_tables(self): state TEST NOT NULL, run_dir STR, bolt_port INTEGER, - gdb_pid INTEGER);""" + http_port INTEGER, + https_port INTEGER, + gdb_pid INTEGER, + init_task_id INTEGER);""" tasks_stmt = """CREATE TABLE IF NOT EXISTS tasks ( id INTEGER PRIMARY KEY, diff --git a/beeflow/common/paths.py b/beeflow/common/paths.py index 652b7f03d..caac57391 100644 --- a/beeflow/common/paths.py +++ b/beeflow/common/paths.py @@ -57,11 +57,19 @@ def _redis_root(): os.makedirs(path, exist_ok=True) return path + def redis_config(): """Get the Redis config path.""" return os.path.join(_redis_root(), 'redis.conf') +def redis_data(): + """Return the Redis data dir.""" + path = os.path.join(_redis_root(), 'data') + os.makedirs(path, exist_ok=True) + return path + + def redis_socket(): """Get the Redis socket path.""" return os.path.join(_redis_root(), 'redis.sock') diff --git a/beeflow/common/states.py b/beeflow/common/states.py new file mode 100644 index 000000000..f03a41d5a --- /dev/null +++ b/beeflow/common/states.py @@ -0,0 +1,23 @@ +"""Workflow and Task state values.""" + + +class WorkflowStates: + """Workflow status values.""" + + INITIALIZING = 'INITIALIZING' + PENDING = 'PENDING' + RUNNING = 'RUNNING' + PAUSED = 'PAUSED' + RESUME = 'RESUME' + CANCELLED = 'CANCELLED' + + +class TaskStates: + """Task status values.""" + + PENDING = 'PENDING' + RUNNING = 'RUNNING' + COMPLETED = 'COMPLETED' + FAILED = 'FAILED' + PAUSED = 'PAUSED' + CANCELLED = 'CANCELLED' diff --git a/beeflow/common/tasks.py b/beeflow/common/tasks.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/beeflow/wf_manager/common/wf_db.py b/beeflow/wf_manager/common/wf_db.py index 80c39bdae..2f0d8c652 100644 --- a/beeflow/wf_manager/common/wf_db.py +++ b/beeflow/wf_manager/common/wf_db.py @@ -92,7 +92,8 @@ def init_tables(): status TEST NOT NULL, run_dir STR, bolt_port INTEGER, - gdb_pid INTEGER);""" + gdb_pid INTEGER, + init_task_id INTEGER);""" tasks_stmt = """CREATE TABLE IF NOT EXISTS tasks ( id INTEGER PRIMARY KEY, @@ -213,6 +214,25 @@ def add_workflow(workflow_id, name, status, run_dir, bolt_port, gdb_pid): run(stmt, [workflow_id, name, status, run_dir, bolt_port, gdb_pid]) +def complete_gdb_init(workflow_id, gdb_pid): + """Complete the GDB init process for a workflow.""" + stmt = "UPDATE workflows SET gdb_pid=?, status=? WHERE workflow_id = ?" + run(stmt, [gdb_port, 'Pending', workflow_id]) + + +def init_workflow(workflow_id, name, run_dir, bolt_port, http_port, https_port, init_task_id): + """Insert a new workflow into the database.""" + if not table_exists('workflows'): + # Initialize the database + init_tables() + + stmt = ("INSERT INTO workflows " + "(workflow_id, name, status, run_dir, bolt_port, http_port, https_port, gdb_pid, init_task_id) " + "VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?);" + ) + run(stmt, [workflow_id, name, 'Initializing', run_dir, bolt_port, http_port, https_port, -1, init_task_id]) + + def update_workflow_state(workflow_id, status): """Update the status in a workflow in the database.""" stmt = "UPDATE workflows SET status=? WHERE workflow_id=?" @@ -247,6 +267,13 @@ def get_gdb_pid(workflow_id): return gdb_pid +def get_init_task_id(workflow_id): + """Return the task ID for the GDB initialization.""" + stmt = "SELECT init_task_id FROM workflows WHERE workflow_id=?" + result = get(stmt, [workflow_id])[0] + return result[0] + + def get_run_dir(workflow_id): """Return the bolt port associated with a workflow.""" stmt = "SELECT run_dir FROM workflows WHERE workflow_id=?" diff --git a/beeflow/wf_manager/resources/wf_actions.py b/beeflow/wf_manager/resources/wf_actions.py index 6910599a3..5acaf3436 100644 --- a/beeflow/wf_manager/resources/wf_actions.py +++ b/beeflow/wf_manager/resources/wf_actions.py @@ -22,20 +22,11 @@ def __init__(self): def post(self, wf_id): """Start workflow. Send ready tasks to the task manager.""" db = connect_db(wfm_db, db_path) - wfi = wf_utils.get_workflow_interface(wf_id) - state = wfi.get_workflow_state() - if state in ('RUNNING', 'PAUSED', 'COMPLETED'): - resp = make_response(jsonify(msg='Cannot start workflow it is ' - f'{state.lower()}.', - status='ok'), 200) - return resp - wfi.execute_workflow() - tasks = wfi.get_ready_tasks() - wf_utils.schedule_submit_tasks(wf_id, tasks) - wf_id = wfi.workflow_id - wf_utils.update_wf_status(wf_id, 'Running') - db.workflows.update_workflow_state(wf_id, 'Running') - resp = make_response(jsonify(msg='Started workflow!', status='ok'), 200) + if wf_utils.start_workflow(wf_id): + db.workflows.update_workflow_state(wf_id, 'Running') + resp = make_response(jsonify(msg='Started workflow!', status='ok'), 200) + else: + resp = make_response(jsonify(msg='Cannot start workflow it is {state.lower()}.', status='ok'), 200) return resp @staticmethod diff --git a/beeflow/wf_manager/resources/wf_list.py b/beeflow/wf_manager/resources/wf_list.py index 5d71484a0..3f3ad3464 100644 --- a/beeflow/wf_manager/resources/wf_list.py +++ b/beeflow/wf_manager/resources/wf_list.py @@ -10,6 +10,7 @@ from flask import make_response, jsonify from werkzeug.datastructures import FileStorage from flask_restful import Resource, reqparse +from celery import shared_task from beeflow.common import log as bee_logging # from beeflow.common.wf_profiler import WorkflowProfiler @@ -51,6 +52,40 @@ def extract_wf(wf_id, filename, workflow_archive, reexecute=False): return archive_dir +@shared_task(ignore_result=True) +def init_workflow(wf_dir, tasks, bolt_port, http_port, https_port, no_start): + """Initialize the workflow in a separate process.""" + print('Initializing GDB and workflow...') + try: + dep_manager.create_image() + except dep_manager.NoContainerRuntime: + crt_message = "Charliecloud not installed in current environment." + log.error(crt_message) + return -1, 'Failure' + + gdb_pid = dep_manager.start_gdb(wf_dir, bolt_port, http_port, https_port) + dep_manager.wait_gdb(log) + + wfi = wf_utils.get_workflow_interface(wf_id) + wfi.initialize_workflow(workflow) + + # initialize_wf_profiler(wf_name) + + wf_utils.create_wf_metadata(wf_id, wf_name) + for task in tasks: + wfi.add_task(task) + metadata = wfi.get_task_metadata(task) + metadata['workdir'] = wf_workdir + wfi.set_task_metadata(task, metadata) + + if no_start: + # TODO: Is this the right status/state? + return gdb_pid, 'Waiting' + + wf_utils.start_workflow(wf_id) + return gdb_pid, 'Running' + + db_path = wf_utils.get_db_path() @@ -71,7 +106,7 @@ def get(self): return resp def post(self): - """Receive a workflow, parse it, and start up a neo4j instance for it.""" + """Upload a workflow, initialize database, and start if required.""" db = connect_db(wfm_db, db_path) reqparser = reqparse.RequestParser() reqparser.add_argument('wf_name', type=str, required=True, @@ -84,6 +119,7 @@ def post(self): location='form') reqparser.add_argument('tasks', type=str, required=True, location='form') + reqparser.add_argument('no_start', type=bool, required=True, location='form') reqparser.add_argument('workflow_archive', type=FileStorage, required=False, location='files') data = reqparser.parse_args() @@ -91,39 +127,22 @@ def post(self): wf_filename = data['wf_filename'] wf_name = data['wf_name'] wf_workdir = data['workdir'] + no_start = data['no_start'] workflow = jsonpickle.decode(data['workflow']) # May have to decode the list and task objects separately tasks = [jsonpickle.decode(task) if isinstance(task, str) else task for task in jsonpickle.decode(data['tasks'])] - try: - dep_manager.create_image() - except dep_manager.NoContainerRuntime: - crt_message = "Charliecloud not installed in current environment." - log.error(crt_message) - resp = make_response(jsonify(msg=crt_message, status='error'), 418) - return resp - wf_id = workflow.id wf_dir = extract_wf(wf_id, wf_filename, wf_tarball) bolt_port = wf_utils.get_open_port() http_port = wf_utils.get_open_port() https_port = wf_utils.get_open_port() - gdb_pid = dep_manager.start_gdb(wf_dir, bolt_port, http_port, https_port) - db.workflows.add_workflow(wf_id, wf_name, 'Pending', wf_dir, bolt_port, gdb_pid) - dep_manager.wait_gdb(log) - wfi = wf_utils.get_workflow_interface(wf_id) - wfi.initialize_workflow(workflow) + result = init_workflow.delay(wf_dir, tasks, bolt_port, http_port, https_port, no_start) + db.workflows.init_workflow(wf_id, wf_name, wf_dir, bolt_port, http_port, https_port, init_task_id=result.id) - # initialize_wf_profiler(wf_name) - - wf_utils.create_wf_metadata(wf_id, wf_name) for task in tasks: - wfi.add_task(task) - metadata = wfi.get_task_metadata(task) - metadata['workdir'] = wf_workdir - wfi.set_task_metadata(task, metadata) db.workflows.add_task(task.id, wf_id, task.name, "WAITING") resp = make_response(jsonify(msg='Workflow uploaded', status='ok', wf_id=wf_id), 201) @@ -160,19 +179,11 @@ def put(self): bolt_port = wf_utils.get_open_port() http_port = wf_utils.get_open_port() https_port = wf_utils.get_open_port() - gdb_pid = dep_manager.start_gdb(wf_dir, bolt_port, http_port, - https_port, reexecute=True) - db.workflows.add_workflow(wf_id, wf_name, 'Pending', wf_dir, bolt_port, gdb_pid) - dep_manager.wait_gdb(log) - wfi = wf_utils.get_workflow_interface(wf_id) - wfi.reset_workflow(wf_id) - wf_utils.create_wf_metadata(wf_id, wf_name) - - _, tasks = wfi.get_workflow() - for task in tasks: - metadata = wfi.get_task_metadata(task) - metadata['workdir'] = wf_workdir - wfi.set_task_metadata(task, metadata) + + + result = init_workflow.delay(wf_dir, tasks, bolt_port, http_port, https_port, no_start=False) + db.workflows.init_workflow(wf_id, wf_name, wf_dir, bolt_port, http_port, https_port, init_task_id=result.id) + # Return the wf_id and created resp = make_response(jsonify(msg='Workflow uploaded', status='ok', wf_id=wf_id), 201) diff --git a/beeflow/wf_manager/resources/wf_utils.py b/beeflow/wf_manager/resources/wf_utils.py index edcf21899..bb66a9918 100644 --- a/beeflow/wf_manager/resources/wf_utils.py +++ b/beeflow/wf_manager/resources/wf_utils.py @@ -5,6 +5,7 @@ import socket import requests import jsonpickle +from celery.result import AsyncResult from beeflow.common import log as bee_logging from beeflow.common.config_driver import BeeConfig as bc @@ -123,6 +124,14 @@ def create_wf_namefile(wf_name, wf_id): def get_workflow_interface(wf_id): """Instantiate and return workflow interface object.""" db = connect_db(wfm_db, get_db_path()) + # Wait for the GDB + if db.workflows.get_workflow_state(wf_id) == 'Initializing': + task_id = db.workflows.get_init_task_id(wf_id) + result = AsyncResult(task_id) + # Wait until the workflow is initialized + gdb_pid, state = result.wait() + db.workflows.set_workflow_state(state) + db.workflows.set_gdb_pid(gdb_pid) bolt_port = db.workflows.get_bolt_port(wf_id) try: driver = Neo4jDriver(user="neo4j", bolt_port=bolt_port, @@ -253,3 +262,17 @@ def schedule_submit_tasks(wf_id, tasks): allocation = submit_tasks_scheduler(tasks) #NOQA # Submit tasks to TM submit_tasks_tm(wf_id, tasks, allocation) + + +def start_workflow(wf_id): + """Attempt to start the workflow, returning True if successful.""" + wfi = get_workflow_interface(wf_id) + state = wfi.get_workflow_state() + if state in ('RUNNING', 'PAUSED', 'COMPLETED'): + return False + wfi.execute_workflow() + tasks = wfi.get_ready_tasks() + wf_utils.schedule_submit_tasks(wf_id, tasks) + wf_id = wfi.workflow_id + wf_utils.update_wf_status(wf_id, 'Running') + return True diff --git a/beeflow/wf_manager/wf_manager.py b/beeflow/wf_manager/wf_manager.py index 6a43e32e5..f4f5c1ea2 100644 --- a/beeflow/wf_manager/wf_manager.py +++ b/beeflow/wf_manager/wf_manager.py @@ -11,7 +11,6 @@ from beeflow.wf_manager.resources import wf_utils - def create_app(): """Create flask app object and add REST endpoints.""" app = Flask(__name__) @@ -28,7 +27,9 @@ def create_app(): celery_app.config_from_object({ 'broker_url': f'redis+socket://{paths.redis_socket()}', 'result_backend': f'db+sqlite://{paths.celery_db()}', - 'imports': ('beeflow.common.tasks',), + 'task_serializer': 'pickle', + 'accept_content': ['application/json', 'application/x-python-serialize'], + # 'imports': ('beeflow.common.tasks',), }) celery_app.set_default() app.extensions['celery'] = celery_app From b641898aa93dc88ab3c663a93ab4b18e04874079 Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Tue, 19 Sep 2023 09:26:52 -0600 Subject: [PATCH 03/15] Fix GDB start task --- beeflow/wf_manager/common/dep_manager.py | 5 ++--- beeflow/wf_manager/resources/wf_actions.py | 2 +- beeflow/wf_manager/resources/wf_list.py | 9 +++++---- beeflow/wf_manager/resources/wf_utils.py | 5 +++++ 4 files changed, 13 insertions(+), 8 deletions(-) diff --git a/beeflow/wf_manager/common/dep_manager.py b/beeflow/wf_manager/common/dep_manager.py index afe7bdb9a..4774b6a53 100755 --- a/beeflow/wf_manager/common/dep_manager.py +++ b/beeflow/wf_manager/common/dep_manager.py @@ -127,8 +127,7 @@ def create_image(): # Build new dependency container try: subprocess.run(["ch-convert", "-i", "tar", "-o", "dir", - str(dep_img), str(container_dir)], - stdout=sys.stdout, stderr=sys.stderr, check=True) + str(dep_img), str(container_dir)], check=True) except subprocess.CalledProcessError as error: dep_log.error(f"ch-convert failed: {error}") shutil.rmtree(container_dir) @@ -166,7 +165,7 @@ def start_gdb(mount_dir, bolt_port, http_port, https_port, reexecute=False): "-b", logs_dir + ":/logs", "-b", run_dir + ":/var/lib/neo4j/run", container_path, "--", *command - ], stdout=sys.stdout, stderr=sys.stderr, check=True) + ], check=True) except subprocess.CalledProcessError: dep_log.error("neo4j-admin set-initial-password failed") return -1 diff --git a/beeflow/wf_manager/resources/wf_actions.py b/beeflow/wf_manager/resources/wf_actions.py index 5acaf3436..24c2d0155 100644 --- a/beeflow/wf_manager/resources/wf_actions.py +++ b/beeflow/wf_manager/resources/wf_actions.py @@ -45,7 +45,7 @@ def get(wf_id): for task in tasks: tasks_status.append(f"{task.name}--{task.state}") tasks_status = '\n'.join(tasks_status) - wf_status = wf_utils.read_wf_status(wf_id) + wf_status = db.workflows.get_workflow_state(wf_id) resp = make_response(jsonify(tasks_status=tasks_status, wf_status=wf_status, status='ok'), 200) diff --git a/beeflow/wf_manager/resources/wf_list.py b/beeflow/wf_manager/resources/wf_list.py index 3f3ad3464..c2a185b7f 100644 --- a/beeflow/wf_manager/resources/wf_list.py +++ b/beeflow/wf_manager/resources/wf_list.py @@ -53,7 +53,7 @@ def extract_wf(wf_id, filename, workflow_archive, reexecute=False): @shared_task(ignore_result=True) -def init_workflow(wf_dir, tasks, bolt_port, http_port, https_port, no_start): +def init_workflow(workflow, wf_id, wf_name, wf_dir, wf_workdir, tasks, bolt_port, http_port, https_port, no_start): """Initialize the workflow in a separate process.""" print('Initializing GDB and workflow...') try: @@ -66,7 +66,8 @@ def init_workflow(wf_dir, tasks, bolt_port, http_port, https_port, no_start): gdb_pid = dep_manager.start_gdb(wf_dir, bolt_port, http_port, https_port) dep_manager.wait_gdb(log) - wfi = wf_utils.get_workflow_interface(wf_id) + # wfi = wf_utils.get_workflow_interface(wf_id) + wfi = wf_utils.get_workflow_interface_by_bolt_port(bolt_port) wfi.initialize_workflow(workflow) # initialize_wf_profiler(wf_name) @@ -139,7 +140,7 @@ def post(self): http_port = wf_utils.get_open_port() https_port = wf_utils.get_open_port() - result = init_workflow.delay(wf_dir, tasks, bolt_port, http_port, https_port, no_start) + result = init_workflow.delay(workflow, wf_id, wf_name, wf_dir, wf_workdir, tasks, bolt_port, http_port, https_port, no_start) db.workflows.init_workflow(wf_id, wf_name, wf_dir, bolt_port, http_port, https_port, init_task_id=result.id) for task in tasks: @@ -181,7 +182,7 @@ def put(self): https_port = wf_utils.get_open_port() - result = init_workflow.delay(wf_dir, tasks, bolt_port, http_port, https_port, no_start=False) + result = init_workflow.delay(workflow, wf_id, wf_name, wf_dir, wf_workdir, tasks, bolt_port, http_port, https_port, no_start=False) db.workflows.init_workflow(wf_id, wf_name, wf_dir, bolt_port, http_port, https_port, init_task_id=result.id) # Return the wf_id and created diff --git a/beeflow/wf_manager/resources/wf_utils.py b/beeflow/wf_manager/resources/wf_utils.py index bb66a9918..935f7c9f5 100644 --- a/beeflow/wf_manager/resources/wf_utils.py +++ b/beeflow/wf_manager/resources/wf_utils.py @@ -133,6 +133,11 @@ def get_workflow_interface(wf_id): db.workflows.set_workflow_state(state) db.workflows.set_gdb_pid(gdb_pid) bolt_port = db.workflows.get_bolt_port(wf_id) + return get_workflow_interface_by_bolt_port(bolt_port) + + +def get_workflow_interface_by_bolt_port(bolt_port): + """Return a workflow interface connection using just the bolt port.""" try: driver = Neo4jDriver(user="neo4j", bolt_port=bolt_port, db_hostname=bc.get("graphdb", "hostname"), From 4e00a0f5f13a3453560d389f1cb4bc34c490bc82 Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Tue, 26 Sep 2023 10:23:34 -0600 Subject: [PATCH 04/15] Launch GDB and start workflow from celery backend The GDB will now be started in a background celery task and the first workflow steps will be launched from that same task. If --no-start is given on submit then tasks can still launch on another start call from the client. --- beeflow/client/core.py | 2 + beeflow/common/db/wfm_db.py | 21 +++++--- beeflow/wf_manager/common/dep_manager.py | 1 - beeflow/wf_manager/common/wf_db.py | 12 ++--- beeflow/wf_manager/resources/wf_actions.py | 3 +- beeflow/wf_manager/resources/wf_list.py | 57 ++++++++++++++-------- beeflow/wf_manager/resources/wf_utils.py | 16 +++--- beeflow/wf_manager/wf_manager.py | 2 +- 8 files changed, 67 insertions(+), 47 deletions(-) diff --git a/beeflow/client/core.py b/beeflow/client/core.py index 5c11409bd..7d2d089ff 100644 --- a/beeflow/client/core.py +++ b/beeflow/client/core.py @@ -194,6 +194,8 @@ def redis(): """Start redis.""" # Dump the config with open(paths.redis_config(), 'w', encoding='utf-8') as fp: + # Don't listen on TCP + print('port 0', file=fp) print('dir', paths.redis_data(), file=fp) print('maxmemory 2mb', file=fp) print('unixsocket', paths.redis_socket(), file=fp) diff --git a/beeflow/common/db/wfm_db.py b/beeflow/common/db/wfm_db.py index 2e75d1c1d..d1f7dd8d6 100644 --- a/beeflow/common/db/wfm_db.py +++ b/beeflow/common/db/wfm_db.py @@ -53,7 +53,7 @@ def __init__(self, db_file): """Initialize Task, db_file, and Workflow object.""" self.Task = namedtuple("Task", "id task_id workflow_id name resource state slurm_id") #noqa self.db_file = db_file - self.Workflow = namedtuple("Workflow", "id workflow_id name state run_dir bolt_port gdb_pid") #noqa + self.Workflow = namedtuple("Workflow", "id workflow_id name state run_dir bolt_port http_port https_port gdb_pid") #noqa def get_workflow(self, workflow_id): """Return a workflow object.""" @@ -69,12 +69,13 @@ def get_workflows(self): workflows = [self.Workflow(*workflow) for workflow in result] return workflows - def init_workflow(self, workflow_id, name, run_dir, bolt_port, http_port, https_port, init_task_id): + def init_workflow(self, workflow_id, name, run_dir, bolt_port, http_port, https_port): """Insert a new workflow into the database.""" - stmt = ("INSERT INTO workflows (workflow_id, name, state, run_dir, bolt_port, http_port, https_port, gdb_pid, init_task_id) " - "VALUES(?, ?, ?, ?, ?, ?);" - ) - bdb.run(self.db_file, stmt, [workflow_id, name, 'Initializing', run_dir, bolt_port, http_port, https_port, -1, init_task_id]) + stmt = """INSERT INTO workflows (workflow_id, name, state, run_dir, + bolt_port, http_port, https_port, gdb_pid) + VALUES(?, ?, ?, ?, ?, ?, ?, ?);""" + bdb.run(self.db_file, stmt, [workflow_id, name, 'Initializing', run_dir, + bolt_port, http_port, https_port, -1]) def delete_workflow(self, workflow_id): """Delete a workflow from the database.""" @@ -136,6 +137,11 @@ def get_gdb_pid(self, workflow_id): gdb_pid = result return gdb_pid + def update_gdb_pid(self, workflow_id, gdb_pid): + """Update the gdb PID associated with a workflow.""" + stmt = "UPDATE workflows SET gdb_pid=? WHERE workflow_id=?" + bdb.run(self.db_file, stmt, [gdb_pid, workflow_id]) + def get_run_dir(self, workflow_id): """Return the bolt port associated with a workflow.""" stmt = "SELECT run_dir FROM workflows WHERE workflow_id=?" @@ -164,8 +170,7 @@ def _init_tables(self): bolt_port INTEGER, http_port INTEGER, https_port INTEGER, - gdb_pid INTEGER, - init_task_id INTEGER);""" + gdb_pid INTEGER);""" tasks_stmt = """CREATE TABLE IF NOT EXISTS tasks ( id INTEGER PRIMARY KEY, diff --git a/beeflow/wf_manager/common/dep_manager.py b/beeflow/wf_manager/common/dep_manager.py index 4774b6a53..4b519d8ec 100755 --- a/beeflow/wf_manager/common/dep_manager.py +++ b/beeflow/wf_manager/common/dep_manager.py @@ -4,7 +4,6 @@ import os import re -import sys import time import shutil import signal diff --git a/beeflow/wf_manager/common/wf_db.py b/beeflow/wf_manager/common/wf_db.py index 2f0d8c652..f54fa9d9f 100644 --- a/beeflow/wf_manager/common/wf_db.py +++ b/beeflow/wf_manager/common/wf_db.py @@ -217,7 +217,7 @@ def add_workflow(workflow_id, name, status, run_dir, bolt_port, gdb_pid): def complete_gdb_init(workflow_id, gdb_pid): """Complete the GDB init process for a workflow.""" stmt = "UPDATE workflows SET gdb_pid=?, status=? WHERE workflow_id = ?" - run(stmt, [gdb_port, 'Pending', workflow_id]) + run(stmt, [gdb_pid, 'Pending', workflow_id]) def init_workflow(workflow_id, name, run_dir, bolt_port, http_port, https_port, init_task_id): @@ -226,11 +226,11 @@ def init_workflow(workflow_id, name, run_dir, bolt_port, http_port, https_port, # Initialize the database init_tables() - stmt = ("INSERT INTO workflows " - "(workflow_id, name, status, run_dir, bolt_port, http_port, https_port, gdb_pid, init_task_id) " - "VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?);" - ) - run(stmt, [workflow_id, name, 'Initializing', run_dir, bolt_port, http_port, https_port, -1, init_task_id]) + stmt = """INSERT INTO workflows (workflow_id, name, status, run_dir, bolt_port, + http_port, https_port, gdb_pid, init_task_id) + VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?);""" + run(stmt, [workflow_id, name, 'Initializing', run_dir, bolt_port, http_port, + https_port, -1, init_task_id]) def update_workflow_state(workflow_id, status): diff --git a/beeflow/wf_manager/resources/wf_actions.py b/beeflow/wf_manager/resources/wf_actions.py index 24c2d0155..76c27a914 100644 --- a/beeflow/wf_manager/resources/wf_actions.py +++ b/beeflow/wf_manager/resources/wf_actions.py @@ -26,7 +26,8 @@ def post(self, wf_id): db.workflows.update_workflow_state(wf_id, 'Running') resp = make_response(jsonify(msg='Started workflow!', status='ok'), 200) else: - resp = make_response(jsonify(msg='Cannot start workflow it is {state.lower()}.', status='ok'), 200) + resp_body = jsonify(msg='Cannot start workflow it is {state.lower()}.', status='ok') + resp = make_response(resp_body, 200) return resp @staticmethod diff --git a/beeflow/wf_manager/resources/wf_list.py b/beeflow/wf_manager/resources/wf_list.py index c2a185b7f..1bc3dceb0 100644 --- a/beeflow/wf_manager/resources/wf_list.py +++ b/beeflow/wf_manager/resources/wf_list.py @@ -10,7 +10,7 @@ from flask import make_response, jsonify from werkzeug.datastructures import FileStorage from flask_restful import Resource, reqparse -from celery import shared_task +from celery import shared_task # noqa (pylama can't find celery imports) from beeflow.common import log as bee_logging # from beeflow.common.wf_profiler import WorkflowProfiler @@ -53,7 +53,8 @@ def extract_wf(wf_id, filename, workflow_archive, reexecute=False): @shared_task(ignore_result=True) -def init_workflow(workflow, wf_id, wf_name, wf_dir, wf_workdir, tasks, bolt_port, http_port, https_port, no_start): +def init_workflow(wf_id, wf_name, wf_dir, wf_workdir, bolt_port, http_port, + https_port, no_start, workflow=None, tasks=None, reexecute=False): """Initialize the workflow in a separate process.""" print('Initializing GDB and workflow...') try: @@ -61,30 +62,42 @@ def init_workflow(workflow, wf_id, wf_name, wf_dir, wf_workdir, tasks, bolt_port except dep_manager.NoContainerRuntime: crt_message = "Charliecloud not installed in current environment." log.error(crt_message) - return -1, 'Failure' + return - gdb_pid = dep_manager.start_gdb(wf_dir, bolt_port, http_port, https_port) + gdb_pid = dep_manager.start_gdb(wf_dir, bolt_port, http_port, https_port, reexecute=reexecute) dep_manager.wait_gdb(log) # wfi = wf_utils.get_workflow_interface(wf_id) wfi = wf_utils.get_workflow_interface_by_bolt_port(bolt_port) - wfi.initialize_workflow(workflow) + if reexecute: + wfi.reset_workflow(wf_id) + else: + wfi.initialize_workflow(workflow) # initialize_wf_profiler(wf_name) + log.info('Setting workflow metadata') wf_utils.create_wf_metadata(wf_id, wf_name) + db = connect_db(wfm_db, db_path) + if reexecute: + _, tasks = wfi.get_workflow() for task in tasks: wfi.add_task(task) metadata = wfi.get_task_metadata(task) metadata['workdir'] = wf_workdir wfi.set_task_metadata(task, metadata) + db.workflows.add_task(task.id, wf_id, task.name, "WAITING") + db.workflows.update_gdb_pid(wf_id, gdb_pid) + # TODO: The state is duplicated in a couple places here + # TODO: Is this the right status/state? + wf_utils.update_wf_status(wf_id, 'Waiting') + db.workflows.update_workflow_state(wf_id, 'Waiting') if no_start: - # TODO: Is this the right status/state? - return gdb_pid, 'Waiting' - - wf_utils.start_workflow(wf_id) - return gdb_pid, 'Running' + log.info('Not starting workflow, as requested') + else: + log.info('Starting workflow') + wf_utils.start_workflow(wf_id) db_path = wf_utils.get_db_path() @@ -120,7 +133,7 @@ def post(self): location='form') reqparser.add_argument('tasks', type=str, required=True, location='form') - reqparser.add_argument('no_start', type=bool, required=True, location='form') + reqparser.add_argument('no_start', type=str, required=True, location='form') reqparser.add_argument('workflow_archive', type=FileStorage, required=False, location='files') data = reqparser.parse_args() @@ -128,7 +141,8 @@ def post(self): wf_filename = data['wf_filename'] wf_name = data['wf_name'] wf_workdir = data['workdir'] - no_start = data['no_start'] + # Note we have to check for the 'true' string value + no_start = data['no_start'].lower() == 'true' workflow = jsonpickle.decode(data['workflow']) # May have to decode the list and task objects separately tasks = [jsonpickle.decode(task) if isinstance(task, str) else task @@ -140,14 +154,12 @@ def post(self): http_port = wf_utils.get_open_port() https_port = wf_utils.get_open_port() - result = init_workflow.delay(workflow, wf_id, wf_name, wf_dir, wf_workdir, tasks, bolt_port, http_port, https_port, no_start) - db.workflows.init_workflow(wf_id, wf_name, wf_dir, bolt_port, http_port, https_port, init_task_id=result.id) + db.workflows.init_workflow(wf_id, wf_name, wf_dir, bolt_port, http_port, https_port) + init_workflow.delay(wf_id, wf_name, wf_dir, wf_workdir, bolt_port, http_port, + https_port, no_start, workflow=workflow, tasks=tasks) - for task in tasks: - db.workflows.add_task(task.id, wf_id, task.name, "WAITING") - resp = make_response(jsonify(msg='Workflow uploaded', status='ok', + return make_response(jsonify(msg='Workflow uploaded', status='ok', wf_id=wf_id), 201) - return resp def put(self): """Reexecute a workflow.""" @@ -181,9 +193,9 @@ def put(self): http_port = wf_utils.get_open_port() https_port = wf_utils.get_open_port() - - result = init_workflow.delay(workflow, wf_id, wf_name, wf_dir, wf_workdir, tasks, bolt_port, http_port, https_port, no_start=False) - db.workflows.init_workflow(wf_id, wf_name, wf_dir, bolt_port, http_port, https_port, init_task_id=result.id) + db.workflows.init_workflow(wf_id, wf_name, wf_dir, bolt_port, http_port, https_port) + init_workflow.delay(wf_id, wf_name, wf_dir, wf_workdir, bolt_port, http_port, + https_port, no_start=False, reexecute=True) # Return the wf_id and created resp = make_response(jsonify(msg='Workflow uploaded', status='ok', @@ -203,3 +215,6 @@ def patch(self): resp = make_response(jsonify(archive_file=archive_file, archive_filename=archive_filename), 200) return resp +# Ignoring W0511: There a number of TODOs here that need to be addressed at +# some point +# pylama:ignore=W0511 diff --git a/beeflow/wf_manager/resources/wf_utils.py b/beeflow/wf_manager/resources/wf_utils.py index 935f7c9f5..edd05feb3 100644 --- a/beeflow/wf_manager/resources/wf_utils.py +++ b/beeflow/wf_manager/resources/wf_utils.py @@ -5,7 +5,6 @@ import socket import requests import jsonpickle -from celery.result import AsyncResult from beeflow.common import log as bee_logging from beeflow.common.config_driver import BeeConfig as bc @@ -126,12 +125,9 @@ def get_workflow_interface(wf_id): db = connect_db(wfm_db, get_db_path()) # Wait for the GDB if db.workflows.get_workflow_state(wf_id) == 'Initializing': - task_id = db.workflows.get_init_task_id(wf_id) - result = AsyncResult(task_id) - # Wait until the workflow is initialized - gdb_pid, state = result.wait() - db.workflows.set_workflow_state(state) - db.workflows.set_gdb_pid(gdb_pid) + # TODO: Need to figure out what to do with requests while waiting for + # the gdb to come up + raise RuntimeError('Workflow is still initializing') bolt_port = db.workflows.get_bolt_port(wf_id) return get_workflow_interface_by_bolt_port(bolt_port) @@ -277,7 +273,9 @@ def start_workflow(wf_id): return False wfi.execute_workflow() tasks = wfi.get_ready_tasks() - wf_utils.schedule_submit_tasks(wf_id, tasks) + schedule_submit_tasks(wf_id, tasks) wf_id = wfi.workflow_id - wf_utils.update_wf_status(wf_id, 'Running') + update_wf_status(wf_id, 'Running') return True +# Ignoring W0511: TODOs need to be addressed later +# pylama:ignore=W0511 diff --git a/beeflow/wf_manager/wf_manager.py b/beeflow/wf_manager/wf_manager.py index f4f5c1ea2..1d66a4e63 100644 --- a/beeflow/wf_manager/wf_manager.py +++ b/beeflow/wf_manager/wf_manager.py @@ -1,7 +1,7 @@ """Start up the workflow manager connecting all of the endpoints.""" from flask import Flask -from celery import Celery +from celery import Celery # noqa (pylama can't find celery imports) from beeflow.common.api import BeeApi from beeflow.common import paths from beeflow.wf_manager.resources.wf_list import WFList From a9718e955031fb1d17ff69e5a07bb2cbef5f2a7d Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Tue, 3 Oct 2023 16:40:04 -0600 Subject: [PATCH 05/15] Remove TODO to satisfy pylama --- beeflow/wf_manager/resources/wf_utils.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/beeflow/wf_manager/resources/wf_utils.py b/beeflow/wf_manager/resources/wf_utils.py index edd05feb3..962281556 100644 --- a/beeflow/wf_manager/resources/wf_utils.py +++ b/beeflow/wf_manager/resources/wf_utils.py @@ -125,8 +125,6 @@ def get_workflow_interface(wf_id): db = connect_db(wfm_db, get_db_path()) # Wait for the GDB if db.workflows.get_workflow_state(wf_id) == 'Initializing': - # TODO: Need to figure out what to do with requests while waiting for - # the gdb to come up raise RuntimeError('Workflow is still initializing') bolt_port = db.workflows.get_bolt_port(wf_id) return get_workflow_interface_by_bolt_port(bolt_port) @@ -277,5 +275,3 @@ def start_workflow(wf_id): wf_id = wfi.workflow_id update_wf_status(wf_id, 'Running') return True -# Ignoring W0511: TODOs need to be addressed later -# pylama:ignore=W0511 From 68675ed1c9fc68fb31d5b3896fbe8c4f61ddd8ad Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Tue, 3 Oct 2023 17:03:08 -0600 Subject: [PATCH 06/15] Fix workflow manager unit tests --- beeflow/tests/test_wf_manager.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/beeflow/tests/test_wf_manager.py b/beeflow/tests/test_wf_manager.py index 7184a6073..df1256908 100644 --- a/beeflow/tests/test_wf_manager.py +++ b/beeflow/tests/test_wf_manager.py @@ -69,9 +69,19 @@ def _resource(tag=""): return _url() + str(tag) +class MockTask: + """Mock task class for mocking celery.""" + + @staticmethod + def delay(*pargs, **kwargs): + """Mock a delay call to the celery backend.""" + return None + + # WFList Tests def test_submit_workflow(client, mocker, teardown_workflow, temp_db): """Test submitting a workflow.""" + mocker.patch('beeflow.wf_manager.resources.wf_list.init_workflow', new=MockTask) mocker.patch('beeflow.wf_manager.resources.wf_list.dep_manager.create_image', return_value=True) mocker.patch('beeflow.wf_manager.resources.wf_list.dep_manager.start_gdb', return_value=True) @@ -92,7 +102,8 @@ def test_submit_workflow(client, mocker, teardown_workflow, temp_db): 'workdir': '.', 'workflow': jsonpickle.encode(WORKFLOW_GOLD), 'tasks': jsonpickle.encode(TASKS_GOLD, warn=True), - 'workflow_archive': tarball_contents + 'workflow_archive': tarball_contents, + 'no_start': False }) # Remove task added during the test @@ -101,6 +112,7 @@ def test_submit_workflow(client, mocker, teardown_workflow, temp_db): def test_reexecute_workflow(client, mocker, teardown_workflow, temp_db): """Test reexecuting a workflow.""" + mocker.patch('beeflow.wf_manager.resources.wf_list.init_workflow', new=MockTask) mocker.patch('beeflow.wf_manager.resources.wf_list.dep_manager.create_image', return_value=True) mocker.patch('beeflow.wf_manager.resources.wf_list.dep_manager.start_gdb', return_value=True) @@ -147,11 +159,11 @@ def test_workflow_status(client, mocker, setup_teardown_workflow, temp_db): mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', temp_db.db_file) mocker.patch('beeflow.wf_manager.resources.wf_actions.db_path', temp_db.db_file) wf_name = 'wf' - wf_status = 'Pending' bolt_port = 3030 - gdb_pid = 12345 + http_port = 3333 + https_port = 3455 - temp_db.workflows.add_workflow(WF_ID, wf_name, wf_status, 'dir', bolt_port, gdb_pid) + temp_db.workflows.init_workflow(WF_ID, wf_name, 'dir', bolt_port, http_port, https_port) temp_db.workflows.add_task(123, WF_ID, 'task', "WAITING") temp_db.workflows.add_task(124, WF_ID, 'task', "RUNNING") From 03f821580f2a64d7a4e11a5190b2aa90826e8f92 Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Wed, 4 Oct 2023 13:34:36 -0600 Subject: [PATCH 07/15] Add Redis container and update CI accordingly --- beeflow/client/core.py | 28 +++++++++++++++++++++++----- beeflow/common/config_driver.py | 3 +++ beeflow/common/paths.py | 21 +++++++-------------- beeflow/wf_manager/wf_manager.py | 4 +++- ci/bee_config.sh | 1 + ci/bee_install.sh | 5 ++++- ci/env.sh | 1 + 7 files changed, 42 insertions(+), 21 deletions(-) diff --git a/beeflow/client/core.py b/beeflow/client/core.py index 7d2d089ff..2d0d5be4f 100644 --- a/beeflow/client/core.py +++ b/beeflow/client/core.py @@ -192,17 +192,35 @@ def celery(): @mgr.component('redis', ()) def redis(): """Start redis.""" + data_dir = 'data' + conf_name = 'redis.conf' + container_path = paths.redis_container() + # If it exists, we assume that it actually has a valid container + if not os.path.exists(container_path): + subprocess.check_call(['ch-convert', '-i', 'tar', '-o', 'dir', + bc.get('DEFAULT', 'redis_image'), container_path]) # Dump the config - with open(paths.redis_config(), 'w', encoding='utf-8') as fp: + with open(os.path.join(paths.redis_root(), conf_name), 'w', encoding='utf-8') as fp: # Don't listen on TCP print('port 0', file=fp) - print('dir', paths.redis_data(), file=fp) + print('dir', os.path.join('/mnt', data_dir), file=fp) print('maxmemory 2mb', file=fp) - print('unixsocket', paths.redis_socket(), file=fp) + print('unixsocket', os.path.join('/mnt', paths.redis_sock_fname()), file=fp) print('unixsocketperm 700', file=fp) - cmd = ['redis-server', paths.redis_config()] + cmd = [ + 'ch-run', + f'--bind={paths.redis_root()}:/mnt', + container_path, + 'redis-server', + '/mnt/redis.conf', + ] log = open_log('redis') - return subprocess.Popen(cmd, stdout=log, stderr=log) + # Ran into a strange "Failed to configure LOCALE for invalid locale name." + # from Redis, so setting LANG=C. This could have consequences for UTF-8 + # strings. + env = dict(os.environ) + env['LANG'] = 'C' + return subprocess.Popen(cmd, env=env, stdout=log, stderr=log) # Workflow manager and task manager need to be opened with PIPE for their stdout/stderr if need_slurmrestd(): diff --git a/beeflow/common/config_driver.py b/beeflow/common/config_driver.py index 34994115a..38c3b8deb 100644 --- a/beeflow/common/config_driver.py +++ b/beeflow/common/config_driver.py @@ -234,6 +234,9 @@ def filepath_completion_input(*pargs, **kwargs): VALIDATOR.option('DEFAULT', 'bee_dep_image', validator=validation.file_, info='container image with BEE dependencies', attrs={'input': filepath_completion_input}) +VALIDATOR.option('DEFAULT', 'redis_image', validator=validation.file_, + info='redis container image', + attrs={'input': filepath_completion_input}) VALIDATOR.option('DEFAULT', 'max_restarts', validator=int, attrs={'default': 3}, info='max number of times beeflow will restart a component on failure') diff --git a/beeflow/common/paths.py b/beeflow/common/paths.py index caac57391..1d3043c14 100644 --- a/beeflow/common/paths.py +++ b/beeflow/common/paths.py @@ -51,28 +51,21 @@ def log_fname(component): return os.path.join(log_path(), f'{component}.log') -def _redis_root(): +def redis_root(): """Get the redis root directory (create it if it doesn't exist).""" path = os.path.join(_workdir(), 'redis') os.makedirs(path, exist_ok=True) return path -def redis_config(): - """Get the Redis config path.""" - return os.path.join(_redis_root(), 'redis.conf') +def redis_container(): + """Get the path to the unpacked Redis container.""" + return os.path.join(_workdir(), 'redis_container') -def redis_data(): - """Return the Redis data dir.""" - path = os.path.join(_redis_root(), 'data') - os.makedirs(path, exist_ok=True) - return path - - -def redis_socket(): - """Get the Redis socket path.""" - return os.path.join(_redis_root(), 'redis.sock') +def redis_sock_fname(): + """Return the file name for the Redis socket.""" + return 'redis.sock' def _celery_root(): diff --git a/beeflow/wf_manager/wf_manager.py b/beeflow/wf_manager/wf_manager.py index 1d66a4e63..2f715c333 100644 --- a/beeflow/wf_manager/wf_manager.py +++ b/beeflow/wf_manager/wf_manager.py @@ -1,5 +1,6 @@ """Start up the workflow manager connecting all of the endpoints.""" +import os from flask import Flask from celery import Celery # noqa (pylama can't find celery imports) from beeflow.common.api import BeeApi @@ -24,8 +25,9 @@ def create_app(): # Initialize celery app celery_app = Celery(app.name) + redis_socket = os.path.join(paths.redis_root(), paths.redis_sock_fname()) celery_app.config_from_object({ - 'broker_url': f'redis+socket://{paths.redis_socket()}', + 'broker_url': f'redis+socket://{redis_socket}', 'result_backend': f'db+sqlite://{paths.celery_db()}', 'task_serializer': 'pickle', 'accept_content': ['application/json', 'application/x-python-serialize'], diff --git a/ci/bee_config.sh b/ci/bee_config.sh index 469378517..7e24ed692 100755 --- a/ci/bee_config.sh +++ b/ci/bee_config.sh @@ -11,6 +11,7 @@ bee_workdir = $BEE_WORKDIR workload_scheduler = $BATCH_SCHEDULER use_archive = False bee_dep_image = $NEO4J_CONTAINER +redis_image = $REDIS_CONTAINER max_restarts = 2 [task_manager] diff --git a/ci/bee_install.sh b/ci/bee_install.sh index ced3483c8..012bd4d98 100755 --- a/ci/bee_install.sh +++ b/ci/bee_install.sh @@ -10,9 +10,12 @@ printf "\n\n" printf "**Setting up BEE containers**\n" printf "\n\n" mkdir -p $HOME/img -# Pull the neo4j container +# Pull the Neo4j container ch-image pull neo4j:3.5.22 || exit 1 ch-convert -i ch-image -o tar neo4j:3.5.22 $NEO4J_CONTAINER || exit 1 +# Pull the Redis container +ch-image pull redis || exit 1 +ch-convert -i ch-image -o tar redis $REDIS_CONTAINER || exit 1 # BEE install printf "\n\n" diff --git a/ci/env.sh b/ci/env.sh index 927ed6935..6c9173772 100644 --- a/ci/env.sh +++ b/ci/env.sh @@ -21,6 +21,7 @@ MUNGE_KEY=/tmp/munge/munge.key NODE_CONFIG=`slurmd -C | head -n 1` BEE_WORKDIR=$HOME/.beeflow NEO4J_CONTAINER=$HOME/img/neo4j.tar.gz +REDIS_CONTAINER=$HOME/img/redis.tar.gz mkdir -p $BEE_WORKDIR export SLURM_CONF=~/slurm.conf # Flux variables From 52491efea1858617ac2b49e05ebf9311e9eda78c Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Wed, 4 Oct 2023 13:46:13 -0600 Subject: [PATCH 08/15] Create redis data dir on start --- beeflow/client/core.py | 1 + 1 file changed, 1 insertion(+) diff --git a/beeflow/client/core.py b/beeflow/client/core.py index 2d0d5be4f..92a56212b 100644 --- a/beeflow/client/core.py +++ b/beeflow/client/core.py @@ -193,6 +193,7 @@ def celery(): def redis(): """Start redis.""" data_dir = 'data' + os.makedirs(os.path.join(paths.redis_root(), data_dir), exist_ok=True) conf_name = 'redis.conf' container_path = paths.redis_container() # If it exists, we assume that it actually has a valid container From 61f06c6f7949ec362df3f6913918ad99aaff936a Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Wed, 4 Oct 2023 13:57:58 -0600 Subject: [PATCH 09/15] Update integration test for new workflow state --- beeflow/wf_manager/resources/wf_list.py | 1 + beeflow/wf_manager/resources/wf_utils.py | 2 ++ ci/integration_test.py | 2 +- 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/beeflow/wf_manager/resources/wf_list.py b/beeflow/wf_manager/resources/wf_list.py index 1bc3dceb0..823f2b047 100644 --- a/beeflow/wf_manager/resources/wf_list.py +++ b/beeflow/wf_manager/resources/wf_list.py @@ -97,6 +97,7 @@ def init_workflow(wf_id, wf_name, wf_dir, wf_workdir, bolt_port, http_port, log.info('Not starting workflow, as requested') else: log.info('Starting workflow') + db.workflows.update_workflow_state(wf_id, 'Running') wf_utils.start_workflow(wf_id) diff --git a/beeflow/wf_manager/resources/wf_utils.py b/beeflow/wf_manager/resources/wf_utils.py index 962281556..5a7ad0fe8 100644 --- a/beeflow/wf_manager/resources/wf_utils.py +++ b/beeflow/wf_manager/resources/wf_utils.py @@ -265,6 +265,7 @@ def schedule_submit_tasks(wf_id, tasks): def start_workflow(wf_id): """Attempt to start the workflow, returning True if successful.""" + db = connect_db(wfm_db, get_db_path()) wfi = get_workflow_interface(wf_id) state = wfi.get_workflow_state() if state in ('RUNNING', 'PAUSED', 'COMPLETED'): @@ -274,4 +275,5 @@ def start_workflow(wf_id): schedule_submit_tasks(wf_id, tasks) wf_id = wfi.workflow_id update_wf_status(wf_id, 'Running') + db.workflows.update_workflow_state(wf_id, 'Running') return True diff --git a/ci/integration_test.py b/ci/integration_test.py index b81ef17ae..99a37f130 100755 --- a/ci/integration_test.py +++ b/ci/integration_test.py @@ -89,7 +89,7 @@ def run(self): def running(self): """Check if the workflow is running or about to run.""" print(bee_client.query(self.wf_id)) - return bee_client.query(self.wf_id)[0] in ('Running', 'Pending') + return bee_client.query(self.wf_id)[0] in ('Initializing', 'Waiting', 'Running', 'Pending') def status(self): """Get the status of the workflow.""" From 4cab5eed818661c1efe96e3e8a747e94ba15912e Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Wed, 4 Oct 2023 14:28:55 -0600 Subject: [PATCH 10/15] Update test_wf_manager.py mocking code --- beeflow/tests/test_wf_manager.py | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/beeflow/tests/test_wf_manager.py b/beeflow/tests/test_wf_manager.py index df1256908..068c6a47d 100644 --- a/beeflow/tests/test_wf_manager.py +++ b/beeflow/tests/test_wf_manager.py @@ -138,15 +138,37 @@ def test_reexecute_workflow(client, mocker, teardown_workflow, temp_db): assert resp.json['msg'] == 'Workflow uploaded' +class MockDBWorkflowHandle: + """Mock DB workflow handle.""" + + def update_workflow_state(self, *pargs, **kwargs): + """Mock update a workflow.""" + + +class MockDB: + """Mock DB for workflow manager.""" + + @property + def workflows(self): + """Return a workflow handle.""" + return MockDBWorkflowHandle() + + +def mock_connect_db(*pargs, **kwargs): + """Mock a DB connection.""" + return MockDB() + + # WFActions Tests def test_start_workflow(client, mocker, temp_db): """Test starting a workflow.""" + mocker.patch('beeflow.wf_manager.resources.wf_utils.connect_db', new=mock_connect_db) mocker.patch('beeflow.wf_manager.resources.wf_utils.get_workflow_interface', return_value=MockWFI()) mocker.patch('beeflow.wf_manager.resources.wf_utils.submit_tasks_tm', return_value=None) mocker.patch('beeflow.wf_manager.resources.wf_utils.submit_tasks_scheduler', return_value=None) mocker.patch('beeflow.wf_manager.resources.wf_utils.update_wf_status', return_value=None) - mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', temp_db.db_file) + mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', new=lambda: temp_db.db_file) mocker.patch('beeflow.wf_manager.resources.wf_actions.db_path', temp_db.db_file) resp = client().post(f'/bee_wfm/v1/jobs/{WF_ID}') assert resp.status_code == 200 From e9b7f38319aaa2826abbc46899b9291eaacb20cf Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Wed, 4 Oct 2023 14:50:19 -0600 Subject: [PATCH 11/15] Remove old TODOs and comments --- beeflow/wf_manager/resources/wf_list.py | 6 ------ beeflow/wf_manager/wf_manager.py | 1 - 2 files changed, 7 deletions(-) diff --git a/beeflow/wf_manager/resources/wf_list.py b/beeflow/wf_manager/resources/wf_list.py index 823f2b047..ea30b2d4f 100644 --- a/beeflow/wf_manager/resources/wf_list.py +++ b/beeflow/wf_manager/resources/wf_list.py @@ -67,7 +67,6 @@ def init_workflow(wf_id, wf_name, wf_dir, wf_workdir, bolt_port, http_port, gdb_pid = dep_manager.start_gdb(wf_dir, bolt_port, http_port, https_port, reexecute=reexecute) dep_manager.wait_gdb(log) - # wfi = wf_utils.get_workflow_interface(wf_id) wfi = wf_utils.get_workflow_interface_by_bolt_port(bolt_port) if reexecute: wfi.reset_workflow(wf_id) @@ -89,8 +88,6 @@ def init_workflow(wf_id, wf_name, wf_dir, wf_workdir, bolt_port, http_port, db.workflows.add_task(task.id, wf_id, task.name, "WAITING") db.workflows.update_gdb_pid(wf_id, gdb_pid) - # TODO: The state is duplicated in a couple places here - # TODO: Is this the right status/state? wf_utils.update_wf_status(wf_id, 'Waiting') db.workflows.update_workflow_state(wf_id, 'Waiting') if no_start: @@ -216,6 +213,3 @@ def patch(self): resp = make_response(jsonify(archive_file=archive_file, archive_filename=archive_filename), 200) return resp -# Ignoring W0511: There a number of TODOs here that need to be addressed at -# some point -# pylama:ignore=W0511 diff --git a/beeflow/wf_manager/wf_manager.py b/beeflow/wf_manager/wf_manager.py index 2f715c333..5c3fcbdec 100644 --- a/beeflow/wf_manager/wf_manager.py +++ b/beeflow/wf_manager/wf_manager.py @@ -31,7 +31,6 @@ def create_app(): 'result_backend': f'db+sqlite://{paths.celery_db()}', 'task_serializer': 'pickle', 'accept_content': ['application/json', 'application/x-python-serialize'], - # 'imports': ('beeflow.common.tasks',), }) celery_app.set_default() app.extensions['celery'] = celery_app From 8e32fafb0f47781a665f52d0f7775d9356f408b3 Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Tue, 10 Oct 2023 15:06:04 -0600 Subject: [PATCH 12/15] Update documentation and change neo4j image option --- beeflow/common/config_driver.py | 4 ++-- beeflow/wf_manager/common/dep_manager.py | 2 +- ci/bee_config.sh | 2 +- docs/sphinx/installation.rst | 23 +++++++++++------------ 4 files changed, 15 insertions(+), 16 deletions(-) diff --git a/beeflow/common/config_driver.py b/beeflow/common/config_driver.py index 38c3b8deb..4c2afcf9d 100644 --- a/beeflow/common/config_driver.py +++ b/beeflow/common/config_driver.py @@ -231,8 +231,8 @@ def filepath_completion_input(*pargs, **kwargs): info='backend workload scheduler to interact with ') VALIDATOR.option('DEFAULT', 'use_archive', validator=validation.bool_, attrs={'default': True}, info='use the BEE archiving functinality') -VALIDATOR.option('DEFAULT', 'bee_dep_image', validator=validation.file_, - info='container image with BEE dependencies', +VALIDATOR.option('DEFAULT', 'neo4j_image', validator=validation.file_, + info='neo4j container image', attrs={'input': filepath_completion_input}) VALIDATOR.option('DEFAULT', 'redis_image', validator=validation.file_, info='redis container image', diff --git a/beeflow/wf_manager/common/dep_manager.py b/beeflow/wf_manager/common/dep_manager.py index 4b519d8ec..c9dbbfc43 100755 --- a/beeflow/wf_manager/common/dep_manager.py +++ b/beeflow/wf_manager/common/dep_manager.py @@ -113,7 +113,7 @@ def create_image(): # Can throw an exception that needs to be handled by the caller check_container_runtime() - dep_img = bc.get('DEFAULT', 'bee_dep_image') + dep_img = bc.get('DEFAULT', 'neo4j_image') # Check for BEE dependency container directory: container_dir_exists = check_container_dir() diff --git a/ci/bee_config.sh b/ci/bee_config.sh index 7e24ed692..4759e79f6 100755 --- a/ci/bee_config.sh +++ b/ci/bee_config.sh @@ -10,7 +10,7 @@ cat >> ~/.config/beeflow/bee.conf <`_ **version 0.32 (or greater)** - Charliecloud is installed on Los Alamos National Laboratory (LANL) clusters and can be invoked via ``module load charliecloud`` before running beeflow. If you are on a system that does not have the module, `Charliecloud `_ is easily installed in user space and requires no privileges to install. To insure Charliecloud is available in subsequent runs add ``module load charliecloud`` (or if you installed it ``export PATH=:$PATH``) to your .bashrc (or other appropriate shell initialization file). BEE runs dependencies from a Charliecloud container and uses it to run the graph database neo4j and other dependencies. The default container runtime for containerized applications in BEE is Charliecloud. +* `Charliecloud `_ **version 0.32 (or greater)** + Charliecloud is installed on Los Alamos National Laboratory (LANL) clusters and can be invoked via ``module load charliecloud`` before running beeflow. If you are on a system that does not have the module, `Charliecloud `_ is easily installed in user space and requires no privileges to install. To insure Charliecloud is available in subsequent runs add ``module load charliecloud`` (or if you installed it ``export PATH=:$PATH``) to your .bashrc (or other appropriate shell initialization file). BEE runs dependencies from a Charliecloud container and uses it to run the graph database neo4j and other dependencies. The default container runtime for containerized applications in BEE is Charliecloud. - * **BEE dependency container**: - If you are on a LANL system, you may use the dependency container supplied by the BEE team: **/usr/projects/BEE/neo4j-3-5-17-ch.tar.gz** +* **Containers**: + Two Charliecloud dependency containers are currently required for BEE: one for the Neo4j graph database and another for Redis. The paths to these containers will need to be set in the BEE configuration later, using the ``neo4j_image`` and the ``redis_image`` options respectively. BEE only supports Neo4j 3.5.x. We are currently using the latest version of Redis supplied on Docker Hub (as of 2023). - At this time the only dependency needed in a container is **neo4j version 3.5.x**. To build the container for X86, invoke Charliecloud on the cluster where BEE components will be running to pull the graph database **neo4j** and create a Charliecloud tarball. + For LANL systems, please use the containers supplied by the BEE team: **/usr/projects/BEE/neo4j-3-5-17-ch.tar.gz**, **/usr/projects/BEE/redis.tar.gz**. + For other users, these containers can be pulled from Docker Hub and converted to a Charliecloud tarball using the following commands: .. code-block:: ch-image pull neo4j:3.5.22 ch-convert -o tar neo4j:3.5.22 neo4j-3-5-22.tar.gz - + ch-image pull redis + ch-convert -o tar redis redis.tar.gz .. + Installation: ============= @@ -59,11 +62,7 @@ You will need to setup the bee configuration file that will be located in: macOS: ``~/Library/Application Support/beeflow/bee.conf`` -Before creating a bee.conf file you will need to know the path to your **BEE -dependency container** and the type of workload scheduler (Slurm or LSF). (On -LANL systems you may use the BEE provided container: -**/usr/projects/BEE/neo4j-3-5-17-ch.tar.gz**). Depending on the system, you -may also need to know an account name to use. +Before creating a bee.conf file you will need to know the path to the two required Charliecloud containers, one for Neo4j (``neo4j_image``) and Redis (``redis_image``). See `Requirements:`_ above for pulling these containers. Depending on the system, you may also need to know system-specific information, such as account information. You can leave some options blank if these are unnecessary. Once you are ready type ``beeflow config new``. From 2aaa2dc5818a8f7808c21b49a0ee3e5ea78d24c8 Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Thu, 12 Oct 2023 12:07:13 -0600 Subject: [PATCH 13/15] Add pull-deps option to download containers --- beeflow/client/core.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/beeflow/client/core.py b/beeflow/client/core.py index deb0befff..6ea3b23fe 100644 --- a/beeflow/client/core.py +++ b/beeflow/client/core.py @@ -449,3 +449,26 @@ def restart(foreground: bool = typer.Option(False, '--foreground', '-F', """Attempt to stop and restart the beeflow daemon.""" stop() start(foreground) + + +def pull_to_tar(ref, tarball): + """Pull a container from a registry and convert to tarball.""" + subprocess.check_call(['ch-image', 'pull', ref]) + subprocess.check_call(['ch-convert', '-i', 'ch-image', '-o', 'tar', ref, tarball]) + + +@app.command() +def pull_deps(): + """Pull required BEE containers and store in bee workdir.""" + bee_workdir = bc.get('DEFAULT', 'bee_workdir') + neo4j_path = os.path.join(bee_workdir, 'neo4j.tar.gz') + pull_to_tar('neo4j:3.5.22', neo4j_path) + redis_path = os.path.join(bee_workdir, 'redis.tar.gz') + pull_to_tar('redis', redis_path) + print() + print('The BEE dependency containers have been successfully downloaded. ' + 'Please make sure to set the following options in your config:') + print() + print('[DEFAULT]') + print('neo4j_image =', neo4j_path) + print('redis_image =', redis_path) From bb9ecb815caefbd70fc84cb37aaf23bef0dbce1b Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Thu, 12 Oct 2023 12:37:58 -0600 Subject: [PATCH 14/15] Allow pull-deps to work with no config and add documentation --- beeflow/client/core.py | 10 +++++----- docs/sphinx/installation.rst | 11 +---------- 2 files changed, 6 insertions(+), 15 deletions(-) diff --git a/beeflow/client/core.py b/beeflow/client/core.py index 6ea3b23fe..a9879b477 100644 --- a/beeflow/client/core.py +++ b/beeflow/client/core.py @@ -458,12 +458,12 @@ def pull_to_tar(ref, tarball): @app.command() -def pull_deps(): - """Pull required BEE containers and store in bee workdir.""" - bee_workdir = bc.get('DEFAULT', 'bee_workdir') - neo4j_path = os.path.join(bee_workdir, 'neo4j.tar.gz') +def pull_deps(outdir: str = typer.Option('.', '--outdir', '-o', + help='directory to store containers in')): + """Pull required BEE containers and store in outdir.""" + neo4j_path = os.path.join(os.path.realpath(outdir), 'neo4j.tar.gz') pull_to_tar('neo4j:3.5.22', neo4j_path) - redis_path = os.path.join(bee_workdir, 'redis.tar.gz') + redis_path = os.path.join(os.path.realpath(outdir), 'redis.tar.gz') pull_to_tar('redis', redis_path) print() print('The BEE dependency containers have been successfully downloaded. ' diff --git a/docs/sphinx/installation.rst b/docs/sphinx/installation.rst index 4e1ecf817..3bad41f97 100644 --- a/docs/sphinx/installation.rst +++ b/docs/sphinx/installation.rst @@ -19,16 +19,7 @@ Requirements: For LANL systems, please use the containers supplied by the BEE team: **/usr/projects/BEE/neo4j-3-5-17-ch.tar.gz**, **/usr/projects/BEE/redis.tar.gz**. - For other users, these containers can be pulled from Docker Hub and converted to a Charliecloud tarball using the following commands: - -.. code-block:: - - ch-image pull neo4j:3.5.22 - ch-convert -o tar neo4j:3.5.22 neo4j-3-5-22.tar.gz - ch-image pull redis - ch-convert -o tar redis redis.tar.gz -.. - + For other users, these containers can be pulled from Docker Hub (after following `Installation:`_ below) using ``beeflow core pull-deps``, which will download and report the container paths to be set in the config later. Installation: ============= From 3f930d43894938605215828d42333e9bbf3888e7 Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Mon, 16 Oct 2023 16:03:26 -0600 Subject: [PATCH 15/15] Update docs and write redis config once --- beeflow/client/core.py | 43 +++++++++++++++++++++------------------- docs/sphinx/commands.rst | 2 ++ 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/beeflow/client/core.py b/beeflow/client/core.py index a9879b477..02c2043ea 100644 --- a/beeflow/client/core.py +++ b/beeflow/client/core.py @@ -200,13 +200,15 @@ def redis(): subprocess.check_call(['ch-convert', '-i', 'tar', '-o', 'dir', bc.get('DEFAULT', 'redis_image'), container_path]) # Dump the config - with open(os.path.join(paths.redis_root(), conf_name), 'w', encoding='utf-8') as fp: - # Don't listen on TCP - print('port 0', file=fp) - print('dir', os.path.join('/mnt', data_dir), file=fp) - print('maxmemory 2mb', file=fp) - print('unixsocket', os.path.join('/mnt', paths.redis_sock_fname()), file=fp) - print('unixsocketperm 700', file=fp) + conf_path = os.path.join(paths.redis_root(), conf_name) + if not os.path.exists(conf_path): + with open(conf_path, 'w', encoding='utf-8') as fp: + # Don't listen on TCP + print('port 0', file=fp) + print('dir', os.path.join('/mnt', data_dir), file=fp) + print('maxmemory 2mb', file=fp) + print('unixsocket', os.path.join('/mnt', paths.redis_sock_fname()), file=fp) + print('unixsocketperm 700', file=fp) cmd = [ 'ch-run', f'--bind={paths.redis_root()}:/mnt', @@ -251,21 +253,14 @@ def version_str(version): return '.'.join([str(part) for part in version]) -def load_charliecloud(): - """Load the charliecloud module if it exists.""" - lmod = os.environ.get('MODULESHOME') - sys.path.insert(0, lmod + '/init') - from env_modules_python import module #noqa No need to import at top - module("load", "charliecloud") - - -def check_dependencies(): - """Check for various dependencies in the environment.""" - print('Checking dependencies...') - # Check for Charliecloud and it's version +def load_check_charliecloud(): + """Load the charliecloud module if it exists and check the version.""" if not shutil.which('ch-run'): + lmod = os.environ.get('MODULESHOME') + sys.path.insert(0, lmod + '/init') + from env_modules_python import module #noqa No need to import at top + module("load", "charliecloud") # Try loading the Charliecloud module then test again - load_charliecloud() if not shutil.which('ch-run'): warn('Charliecloud is not loaded. Please ensure that it is accessible' ' on your path.\nIf it\'s not installed on your system, please refer' @@ -288,6 +283,13 @@ def check_dependencies(): warn('This version of Charliecloud is too old, please upgrade to at ' f'least version {version_str(MIN_CHARLIECLOUD_VERSION)}') sys.exit(1) + + +def check_dependencies(): + """Check for various dependencies in the environment.""" + print('Checking dependencies...') + # Check for Charliecloud and its version + load_check_charliecloud() # Check for the flux API if bc.get('DEFAULT', 'workload_scheduler') == 'Flux': try: @@ -461,6 +463,7 @@ def pull_to_tar(ref, tarball): def pull_deps(outdir: str = typer.Option('.', '--outdir', '-o', help='directory to store containers in')): """Pull required BEE containers and store in outdir.""" + load_check_charliecloud() neo4j_path = os.path.join(os.path.realpath(outdir), 'neo4j.tar.gz') pull_to_tar('neo4j:3.5.22', neo4j_path) redis_path = os.path.join(os.path.realpath(outdir), 'redis.tar.gz') diff --git a/docs/sphinx/commands.rst b/docs/sphinx/commands.rst index 18cf0d8f5..53e6b1a84 100644 --- a/docs/sphinx/commands.rst +++ b/docs/sphinx/commands.rst @@ -20,6 +20,8 @@ Options: ``beeflow core --version``: Display the version number of BEE. +``beeflow core pull-deps``: Pull BEE dependency containers + Submission and workflow commands ================================