Skip to content

Commit

Permalink
Merge pull request #728 from lanl/issue-705/remove-gdb-delay
Browse files Browse the repository at this point in the history
Remove delay time on workflow submit
  • Loading branch information
pagrubel authored Oct 17, 2023
2 parents 3a7b1df + 3f930d4 commit ff60683
Show file tree
Hide file tree
Showing 21 changed files with 359 additions and 113 deletions.
7 changes: 2 additions & 5 deletions beeflow/client/bee_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ def is_parent(parent, path):
'wf_filename': os.path.basename(wf_path).encode(),
'workdir': workdir,
'workflow': jsonpickle.encode(workflow),
'tasks': jsonpickle.encode(tasks, warn=True)
'tasks': jsonpickle.encode(tasks, warn=True),
'no_start': no_start,
}
files = {
'workflow_archive': wf_tarball
Expand Down Expand Up @@ -284,10 +285,6 @@ def is_parent(parent, path):
if tarball_path:
os.remove(tarball_path)

# Start the workflow
if not no_start:
start(wf_id)

return wf_id


Expand Down
95 changes: 81 additions & 14 deletions beeflow/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def init_components():
# Slurmrestd will be started only if we're running with Slurm and
# slurm::use_commands is not True

@mgr.component('wf_manager', ('scheduler',))
@mgr.component('wf_manager', ('scheduler', 'celery'))
def start_wfm():
"""Start the WFM."""
fp = open_log('wf_manager')
Expand All @@ -181,6 +181,49 @@ def start_scheduler():
return launch_with_gunicorn('beeflow.scheduler.scheduler:create_app()',
paths.sched_socket(), stdout=fp, stderr=fp)

@mgr.component('celery', ('redis',))
def celery():
"""Start the celery task queue."""
log = open_log('celery')
return subprocess.Popen(['celery', '-A', 'beeflow.common.celery', 'worker'],
stdout=log, stderr=log)

@mgr.component('redis', ())
def redis():
"""Start redis."""
data_dir = 'data'
os.makedirs(os.path.join(paths.redis_root(), data_dir), exist_ok=True)
conf_name = 'redis.conf'
container_path = paths.redis_container()
# If it exists, we assume that it actually has a valid container
if not os.path.exists(container_path):
subprocess.check_call(['ch-convert', '-i', 'tar', '-o', 'dir',
bc.get('DEFAULT', 'redis_image'), container_path])
# Dump the config
conf_path = os.path.join(paths.redis_root(), conf_name)
if not os.path.exists(conf_path):
with open(conf_path, 'w', encoding='utf-8') as fp:
# Don't listen on TCP
print('port 0', file=fp)
print('dir', os.path.join('/mnt', data_dir), file=fp)
print('maxmemory 2mb', file=fp)
print('unixsocket', os.path.join('/mnt', paths.redis_sock_fname()), file=fp)
print('unixsocketperm 700', file=fp)
cmd = [
'ch-run',
f'--bind={paths.redis_root()}:/mnt',
container_path,
'redis-server',
'/mnt/redis.conf',
]
log = open_log('redis')
# Ran into a strange "Failed to configure LOCALE for invalid locale name."
# from Redis, so setting LANG=C. This could have consequences for UTF-8
# strings.
env = dict(os.environ)
env['LANG'] = 'C'
return subprocess.Popen(cmd, env=env, stdout=log, stderr=log)

# Workflow manager and task manager need to be opened with PIPE for their stdout/stderr
if need_slurmrestd():
@mgr.component('slurmrestd')
Expand Down Expand Up @@ -210,21 +253,14 @@ def version_str(version):
return '.'.join([str(part) for part in version])


def load_charliecloud():
"""Load the charliecloud module if it exists."""
lmod = os.environ.get('MODULESHOME')
sys.path.insert(0, lmod + '/init')
from env_modules_python import module #noqa No need to import at top
module("load", "charliecloud")


def check_dependencies():
"""Check for various dependencies in the environment."""
print('Checking dependencies...')
# Check for Charliecloud and it's version
def load_check_charliecloud():
"""Load the charliecloud module if it exists and check the version."""
if not shutil.which('ch-run'):
lmod = os.environ.get('MODULESHOME')
sys.path.insert(0, lmod + '/init')
from env_modules_python import module #noqa No need to import at top
module("load", "charliecloud")
# Try loading the Charliecloud module then test again
load_charliecloud()
if not shutil.which('ch-run'):
warn('Charliecloud is not loaded. Please ensure that it is accessible'
' on your path.\nIf it\'s not installed on your system, please refer'
Expand All @@ -247,6 +283,13 @@ def check_dependencies():
warn('This version of Charliecloud is too old, please upgrade to at '
f'least version {version_str(MIN_CHARLIECLOUD_VERSION)}')
sys.exit(1)


def check_dependencies():
"""Check for various dependencies in the environment."""
print('Checking dependencies...')
# Check for Charliecloud and its version
load_check_charliecloud()
# Check for the flux API
if bc.get('DEFAULT', 'workload_scheduler') == 'Flux':
try:
Expand Down Expand Up @@ -408,3 +451,27 @@ def restart(foreground: bool = typer.Option(False, '--foreground', '-F',
"""Attempt to stop and restart the beeflow daemon."""
stop()
start(foreground)


def pull_to_tar(ref, tarball):
"""Pull a container from a registry and convert to tarball."""
subprocess.check_call(['ch-image', 'pull', ref])
subprocess.check_call(['ch-convert', '-i', 'ch-image', '-o', 'tar', ref, tarball])


@app.command()
def pull_deps(outdir: str = typer.Option('.', '--outdir', '-o',
help='directory to store containers in')):
"""Pull required BEE containers and store in outdir."""
load_check_charliecloud()
neo4j_path = os.path.join(os.path.realpath(outdir), 'neo4j.tar.gz')
pull_to_tar('neo4j:3.5.22', neo4j_path)
redis_path = os.path.join(os.path.realpath(outdir), 'redis.tar.gz')
pull_to_tar('redis', redis_path)
print()
print('The BEE dependency containers have been successfully downloaded. '
'Please make sure to set the following options in your config:')
print()
print('[DEFAULT]')
print('neo4j_image =', neo4j_path)
print('redis_image =', redis_path)
6 changes: 6 additions & 0 deletions beeflow/common/celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""Module for celery configuration."""
from beeflow.wf_manager.wf_manager import create_app


flask_app = create_app()
celery_app = flask_app.extensions['celery']
7 changes: 5 additions & 2 deletions beeflow/common/config_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,11 @@ def filepath_completion_input(*pargs, **kwargs):
info='backend workload scheduler to interact with ')
VALIDATOR.option('DEFAULT', 'use_archive', validator=validation.bool_, attrs={'default': True},
info='use the BEE archiving functinality')
VALIDATOR.option('DEFAULT', 'bee_dep_image', validator=validation.file_,
info='container image with BEE dependencies',
VALIDATOR.option('DEFAULT', 'neo4j_image', validator=validation.file_,
info='neo4j container image',
attrs={'input': filepath_completion_input})
VALIDATOR.option('DEFAULT', 'redis_image', validator=validation.file_,
info='redis container image',
attrs={'input': filepath_completion_input})
VALIDATOR.option('DEFAULT', 'max_restarts', validator=int,
attrs={'default': 3},
Expand Down
20 changes: 14 additions & 6 deletions beeflow/common/db/wfm_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(self, db_file):
"""Initialize Task, db_file, and Workflow object."""
self.Task = namedtuple("Task", "id task_id workflow_id name resource state slurm_id") #noqa
self.db_file = db_file
self.Workflow = namedtuple("Workflow", "id workflow_id name state run_dir bolt_port gdb_pid") #noqa
self.Workflow = namedtuple("Workflow", "id workflow_id name state run_dir bolt_port http_port https_port gdb_pid") #noqa

def get_workflow(self, workflow_id):
"""Return a workflow object."""
Expand All @@ -69,12 +69,13 @@ def get_workflows(self):
workflows = [self.Workflow(*workflow) for workflow in result]
return workflows

def add_workflow(self, workflow_id, name, state, run_dir, bolt_port, gdb_pid):
def init_workflow(self, workflow_id, name, run_dir, bolt_port, http_port, https_port):
"""Insert a new workflow into the database."""
stmt = ("INSERT INTO workflows (workflow_id, name, state, run_dir, bolt_port, gdb_pid) "
"VALUES(?, ?, ?, ?, ?, ?);"
)
bdb.run(self.db_file, stmt, [workflow_id, name, state, run_dir, bolt_port, gdb_pid])
stmt = """INSERT INTO workflows (workflow_id, name, state, run_dir,
bolt_port, http_port, https_port, gdb_pid)
VALUES(?, ?, ?, ?, ?, ?, ?, ?);"""
bdb.run(self.db_file, stmt, [workflow_id, name, 'Initializing', run_dir,
bolt_port, http_port, https_port, -1])

def delete_workflow(self, workflow_id):
"""Delete a workflow from the database."""
Expand Down Expand Up @@ -136,6 +137,11 @@ def get_gdb_pid(self, workflow_id):
gdb_pid = result
return gdb_pid

def update_gdb_pid(self, workflow_id, gdb_pid):
"""Update the gdb PID associated with a workflow."""
stmt = "UPDATE workflows SET gdb_pid=? WHERE workflow_id=?"
bdb.run(self.db_file, stmt, [gdb_pid, workflow_id])

def get_run_dir(self, workflow_id):
"""Return the bolt port associated with a workflow."""
stmt = "SELECT run_dir FROM workflows WHERE workflow_id=?"
Expand All @@ -162,6 +168,8 @@ def _init_tables(self):
state TEST NOT NULL,
run_dir STR,
bolt_port INTEGER,
http_port INTEGER,
https_port INTEGER,
gdb_pid INTEGER);"""

tasks_stmt = """CREATE TABLE IF NOT EXISTS tasks (
Expand Down
34 changes: 34 additions & 0 deletions beeflow/common/paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,37 @@ def log_path():
def log_fname(component):
"""Determine the log file name for the given component."""
return os.path.join(log_path(), f'{component}.log')


def redis_root():
"""Get the redis root directory (create it if it doesn't exist)."""
path = os.path.join(_workdir(), 'redis')
os.makedirs(path, exist_ok=True)
return path


def redis_container():
"""Get the path to the unpacked Redis container."""
return os.path.join(_workdir(), 'redis_container')


def redis_sock_fname():
"""Return the file name for the Redis socket."""
return 'redis.sock'


def _celery_root():
"""Get the celery root directory (create it if it doesn't exist)."""
path = os.path.join(_workdir(), 'celery')
os.makedirs(path, exist_ok=True)
return path


def celery_config():
"""Return the celery config path."""
return os.path.join(_celery_root(), 'celery.py')


def celery_db():
"""Return the celery db path."""
return os.path.join(_celery_root(), 'celery.db')
23 changes: 23 additions & 0 deletions beeflow/common/states.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Workflow and Task state values."""


class WorkflowStates:
"""Workflow status values."""

INITIALIZING = 'INITIALIZING'
PENDING = 'PENDING'
RUNNING = 'RUNNING'
PAUSED = 'PAUSED'
RESUME = 'RESUME'
CANCELLED = 'CANCELLED'


class TaskStates:
"""Task status values."""

PENDING = 'PENDING'
RUNNING = 'RUNNING'
COMPLETED = 'COMPLETED'
FAILED = 'FAILED'
PAUSED = 'PAUSED'
CANCELLED = 'CANCELLED'
44 changes: 39 additions & 5 deletions beeflow/tests/test_wf_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,19 @@ def _resource(tag=""):
return _url() + str(tag)


class MockTask:
"""Mock task class for mocking celery."""

@staticmethod
def delay(*pargs, **kwargs):
"""Mock a delay call to the celery backend."""
return None


# WFList Tests
def test_submit_workflow(client, mocker, teardown_workflow, temp_db):
"""Test submitting a workflow."""
mocker.patch('beeflow.wf_manager.resources.wf_list.init_workflow', new=MockTask)
mocker.patch('beeflow.wf_manager.resources.wf_list.dep_manager.create_image',
return_value=True)
mocker.patch('beeflow.wf_manager.resources.wf_list.dep_manager.start_gdb', return_value=True)
Expand All @@ -92,7 +102,8 @@ def test_submit_workflow(client, mocker, teardown_workflow, temp_db):
'workdir': '.',
'workflow': jsonpickle.encode(WORKFLOW_GOLD),
'tasks': jsonpickle.encode(TASKS_GOLD, warn=True),
'workflow_archive': tarball_contents
'workflow_archive': tarball_contents,
'no_start': False
})

# Remove task added during the test
Expand All @@ -101,6 +112,7 @@ def test_submit_workflow(client, mocker, teardown_workflow, temp_db):

def test_reexecute_workflow(client, mocker, teardown_workflow, temp_db):
"""Test reexecuting a workflow."""
mocker.patch('beeflow.wf_manager.resources.wf_list.init_workflow', new=MockTask)
mocker.patch('beeflow.wf_manager.resources.wf_list.dep_manager.create_image',
return_value=True)
mocker.patch('beeflow.wf_manager.resources.wf_list.dep_manager.start_gdb', return_value=True)
Expand All @@ -126,15 +138,37 @@ def test_reexecute_workflow(client, mocker, teardown_workflow, temp_db):
assert resp.json['msg'] == 'Workflow uploaded'


class MockDBWorkflowHandle:
"""Mock DB workflow handle."""

def update_workflow_state(self, *pargs, **kwargs):
"""Mock update a workflow."""


class MockDB:
"""Mock DB for workflow manager."""

@property
def workflows(self):
"""Return a workflow handle."""
return MockDBWorkflowHandle()


def mock_connect_db(*pargs, **kwargs):
"""Mock a DB connection."""
return MockDB()


# WFActions Tests
def test_start_workflow(client, mocker, temp_db):
"""Test starting a workflow."""
mocker.patch('beeflow.wf_manager.resources.wf_utils.connect_db', new=mock_connect_db)
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_workflow_interface',
return_value=MockWFI())
mocker.patch('beeflow.wf_manager.resources.wf_utils.submit_tasks_tm', return_value=None)
mocker.patch('beeflow.wf_manager.resources.wf_utils.submit_tasks_scheduler', return_value=None)
mocker.patch('beeflow.wf_manager.resources.wf_utils.update_wf_status', return_value=None)
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', temp_db.db_file)
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', new=lambda: temp_db.db_file)
mocker.patch('beeflow.wf_manager.resources.wf_actions.db_path', temp_db.db_file)
resp = client().post(f'/bee_wfm/v1/jobs/{WF_ID}')
assert resp.status_code == 200
Expand All @@ -147,11 +181,11 @@ def test_workflow_status(client, mocker, setup_teardown_workflow, temp_db):
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', temp_db.db_file)
mocker.patch('beeflow.wf_manager.resources.wf_actions.db_path', temp_db.db_file)
wf_name = 'wf'
wf_status = 'Pending'
bolt_port = 3030
gdb_pid = 12345
http_port = 3333
https_port = 3455

temp_db.workflows.add_workflow(WF_ID, wf_name, wf_status, 'dir', bolt_port, gdb_pid)
temp_db.workflows.init_workflow(WF_ID, wf_name, 'dir', bolt_port, http_port, https_port)
temp_db.workflows.add_task(123, WF_ID, 'task', "WAITING")
temp_db.workflows.add_task(124, WF_ID, 'task', "RUNNING")

Expand Down
Loading

0 comments on commit ff60683

Please sign in to comment.