Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Visualize dag #903

Closed
wants to merge 36 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
52850ca
Connect BEE Node to new Workflow
Jul 8, 2024
35a9c51
Add SOME queries to check if wf_id matches workflow
Jul 8, 2024
65c8680
Add checks for workflow ID in dependency queries
Jul 9, 2024
cd20720
Most major checking for wf_id added within cypher / passing from driv…
Jul 9, 2024
3ccb511
Remove and skip over gdb_interface
Jul 11, 2024
f76d101
Changed gdb_driver take wf_id as parameter & wf_interface to call wf_…
Jul 11, 2024
ad4424f
Working for multiple workflows in one database (lacks pause / cancel …
Jul 17, 2024
530db47
Delete beeflow/wf_manager/resources/:
kabir-vats Jul 17, 2024
3f4376a
Add GDB to beeflow client processes, GDB now launches on beeflow core…
Jul 24, 2024
489b4ff
Merge branch 'Multiple-Workflows-One-Database' of github.com:lanl/BEE…
Jul 24, 2024
778e516
Merge branch 'develop' into Multiple-Workflows-One-Database
kabir-vats Jul 24, 2024
b489e6b
change default sleep time for gdb to one second
Jul 25, 2024
5e3b5d9
Restructure dep_manager and separate container, redis, and neo4j from…
Jul 26, 2024
59ad3c5
Merge pull request #891 from lanl/Launch-Deps-Background
kabir-vats Jul 26, 2024
d318d4c
Remove some unncessesary code from wfi/neo4j database and rewrite mos…
Jul 30, 2024
0f61ef1
Fixed linting issues
Jul 30, 2024
0b00ff7
minor linting fixes
Jul 30, 2024
298fb25
Fix failing unit tests
Aug 12, 2024
f20df89
linting
Aug 12, 2024
036bdc8
linting
Aug 12, 2024
7ce1524
linting
Aug 12, 2024
614a7bc
Add noqa to gdb_driver connection and popen
Aug 12, 2024
319ba73
check for neo4j for certs dir, remove debug prints
Aug 13, 2024
20ba911
Document Graph Database Structure
Aug 14, 2024
0b0b61a
Revise GDB Design Documentation
kabir-vats Aug 14, 2024
01e866f
added call to export_dag
Aug 19, 2024
c53de6d
added export_dag function and call to export_dag in driver
Aug 19, 2024
8cc81ad
added export_dag abstract method
Aug 19, 2024
2c16645
added export_dag function
Aug 19, 2024
adbc4a9
added export_dag function
Aug 19, 2024
a040388
subtracted indent
Aug 19, 2024
80aab29
Merge branch 'develop' into visualize-dag
Aug 20, 2024
e2f35ac
initial working graphml commit
Aug 20, 2024
b3dc890
Merge branch 'develop' into visualize-dag to add pre-release version …
Aug 22, 2024
12c3725
saving dags in a dags folder in .beeflow
Aug 22, 2024
6a244f8
export and graph generation first commit
Aug 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions beeflow/client/bee_client.py
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes made to bee_client.py and wf_utils.py are causing the NOT_RESPONDING state to appear -- I think. But this is my first attempt at having the "beeflow dag" command.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the steps actually complete and then show NOT_RESPONDING ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,14 @@ def reexecute(wf_name: str = typer.Argument(..., help='The workflow name'),
logging.info(f'ReExecute Workflow: {resp.text}')
return wf_id

@app.command()
def dag(wf_id: str = typer.Argument(..., callback=match_short_id)):
"""Export a DAG of the workflow to a GraphML file."""
try:
wf_utils.export_workflow_dag(wf_id)
typer.echo(f"DAG for workflow {wf_id} has been exported successfully.")
except Exception as e:
error_exit(f"Failed to export DAG: {str(e)}")

@app.callback(invoke_without_command=True)
def version_callback(version: bool = False):
Expand Down
81 changes: 23 additions & 58 deletions beeflow/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
from beeflow.common import paths
from beeflow.wf_manager.resources import wf_utils

from beeflow.common.db import wfm_db
from beeflow.common.db.bdb import connect_db
from beeflow.wf_manager.common import dep_manager
from beeflow.common.deps import container_manager
from beeflow.common.deps import neo4j_manager
from beeflow.common.deps import redis_manager


class ComponentManager:
Expand Down Expand Up @@ -194,49 +194,34 @@ def celery():
"""Start the celery task queue."""
log = open_log('celery')
# Setting --pool=solo to avoid preforking multiple processes
return subprocess.Popen(['celery', '-A', 'beeflow.common.celery', 'worker', '--pool=solo'],
stdout=log, stderr=log)
return subprocess.Popen(['celery', '-A', 'beeflow.common.deps.celery_manager',
'worker', '--pool=solo'], stdout=log, stderr=log)

# Run this before daemonizing in order to avoid slow background start
container_path = paths.redis_container()
# container_path = paths.redis_container()
# If it exists, we assume that it actually has a valid container
if not os.path.exists(container_path):
# if not os.path.exists(container_path):
# print('Unpacking Redis image...')
# subprocess.check_call(['ch-convert', '-i', 'tar', '-o', 'dir',
# bc.get('DEFAULT', 'redis_image'), container_path])
if not container_manager.check_container_dir('redis'):
print('Unpacking Redis image...')
subprocess.check_call(['ch-convert', '-i', 'tar', '-o', 'dir',
bc.get('DEFAULT', 'redis_image'), container_path])
container_manager.create_image('redis')

if not container_manager.check_container_dir('neo4j'):
print('Unpacking Neo4j image...')
container_manager.create_image('neo4j')

@mgr.component('neo4j-database', ('wf_manager',))
def start_neo4j():
"""Start the neo4j graph database."""
return neo4j_manager.start()

@mgr.component('redis', ())
def redis():
def start_redis():
"""Start redis."""
data_dir = 'data'
os.makedirs(os.path.join(paths.redis_root(), data_dir), exist_ok=True)
conf_name = 'redis.conf'
container_path = paths.redis_container()
# Dump the config
conf_path = os.path.join(paths.redis_root(), conf_name)
if not os.path.exists(conf_path):
with open(conf_path, 'w', encoding='utf-8') as fp:
# Don't listen on TCP
print('port 0', file=fp)
print('dir', os.path.join('/mnt', data_dir), file=fp)
print('maxmemory 2mb', file=fp)
print('unixsocket', os.path.join('/mnt', paths.redis_sock_fname()), file=fp)
print('unixsocketperm 700', file=fp)
cmd = [
'ch-run',
f'--bind={paths.redis_root()}:/mnt',
container_path,
'redis-server',
'/mnt/redis.conf',
]
log = open_log('redis')
# Ran into a strange "Failed to configure LOCALE for invalid locale name."
# from Redis, so setting LANG=C. This could have consequences for UTF-8
# strings.
env = dict(os.environ)
env['LANG'] = 'C'
env['LC_ALL'] = 'C'
return subprocess.Popen(cmd, env=env, stdout=log, stderr=log)
return redis_manager.start(log)

# Workflow manager and task manager need to be opened with PIPE for their stdout/stderr
if need_slurmrestd():
Expand Down Expand Up @@ -477,23 +462,6 @@ def stop(query='yes'):
print(f'Beeflow has stopped. Check the log at "{beeflow_log}".')


def kill_active_workflows(active_states, workflow_list):
"""Kill workflows with active states."""
db_path = wf_utils.get_db_path()
db = connect_db(wfm_db, db_path)
success = True
for name, wf_id, state in workflow_list:
if state in active_states:
pid = db.workflows.get_gdb_pid(wf_id)
if pid > 0:
dep_manager.kill_gdb(pid)
else:
# Failure most likely caused by an Initializing workflow.
print(f"No process for {name}, {wf_id}, {state}.")
success = False
return success


def archive_dir(dir_to_archive):
"""Archive directories for archive flag in reset."""
archive_dirs = ['logs', 'container_archive', 'archives', 'workflows']
Expand Down Expand Up @@ -561,9 +529,6 @@ def reset(archive: bool = typer.Option(False, '--archive', '-a',
# Exit out if the user didn't really mean to do a reset
sys.exit()
elif absolutely_sure in ("y", "yes"):
# First stop all active workflow processes
workflow_list = bee_client.get_wf_list()
kill_active_workflows(active_states, workflow_list)
# Stop all of the beeflow processes
stop("quiet")
print("Beeflow is shutting down.")
Expand Down
2 changes: 1 addition & 1 deletion beeflow/common/config_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ def validate_chrun_opts(opts):
info='HTTPS port used for the graph database')
VALIDATOR.option('graphdb', 'gdb_image_mntdir', default=join_path('/tmp', USER),
info='graph database image mount directory', validator=validation.make_dir)
VALIDATOR.option('graphdb', 'sleep_time', validator=int, default=10,
VALIDATOR.option('graphdb', 'sleep_time', validator=int, default=1,
info='how long to wait for the graph database to come up (this can take a while, '
'depending on the system)')
# Builder
Expand Down
84 changes: 31 additions & 53 deletions beeflow/common/db/wfm_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class WorkflowInfo:

def __init__(self, db_file):
"""Initialize Info and db file."""
self.Info = namedtuple("Info", "id wfm_port tm_port sched_port num_workflows") # noqa Snake Case
self.Info = namedtuple("Info", "id wfm_port tm_port sched_port num_workflows bolt_port http_port https_port gdb_pid") # noqa Snake Case
self.db_file = db_file

def set_port(self, component, new_port):
Expand All @@ -22,7 +22,7 @@ def get_port(self, component):
"""Return port for the specified component."""
# Need to add code here to make sure we chose a valid component.
stmt = f"SELECT {component}_port FROM info"
result = bdb.getone(self.db_file, stmt)
result = bdb.getone(self.db_file, stmt)[0]
port = result
return port

Expand All @@ -45,6 +45,18 @@ def get_info(self):
info = self.Info(*result)
return info

def get_gdb_pid(self):
"""Return the gdb pid."""
stmt = "SELECT gdb_pid FROM info"
result = bdb.getone(self.db_file, stmt)[0]
gdb_pid = result
return gdb_pid

def update_gdb_pid(self, gdb_pid):
"""Update the gdb PID."""
stmt = "UPDATE info SET gdb_pid=?"
bdb.run(self.db_file, stmt, [gdb_pid])


class Workflows:
"""Workflow database object."""
Expand All @@ -53,7 +65,7 @@ def __init__(self, db_file):
"""Initialize Task, db_file, and Workflow object."""
self.Task = namedtuple("Task", "id task_id workflow_id name resource state slurm_id") #noqa
self.db_file = db_file
self.Workflow = namedtuple("Workflow", "id workflow_id name state run_dir bolt_port http_port https_port gdb_pid") #noqa
self.Workflow = namedtuple("Workflow", "id workflow_id name state run_dir") #noqa

def get_workflow(self, workflow_id):
"""Return a workflow object."""
Expand All @@ -69,13 +81,11 @@ def get_workflows(self):
workflows = [self.Workflow(*workflow) for workflow in result]
return workflows

def init_workflow(self, workflow_id, name, run_dir, bolt_port, http_port, https_port):
def init_workflow(self, workflow_id, name, run_dir):
"""Insert a new workflow into the database."""
stmt = """INSERT INTO workflows (workflow_id, name, state, run_dir,
bolt_port, http_port, https_port, gdb_pid)
VALUES(?, ?, ?, ?, ?, ?, ?, ?);"""
bdb.run(self.db_file, stmt, [workflow_id, name, 'Initializing', run_dir,
bolt_port, http_port, https_port, -1])
stmt = """INSERT INTO workflows (workflow_id, name, state, run_dir)
VALUES(?, ?, ?, ?);"""
bdb.run(self.db_file, stmt, [workflow_id, name, 'Initializing', run_dir])

def delete_workflow(self, workflow_id):
"""Delete a workflow from the database."""
Expand Down Expand Up @@ -123,42 +133,9 @@ def get_task(self, task_id, workflow_id):
result = bdb.getone(self.db_file, stmt, [task_id, workflow_id])
return result

def get_bolt_port(self, workflow_id):
"""Return the bolt port associated with a workflow."""
stmt = "SELECT bolt_port FROM workflows WHERE workflow_id=?"
result = bdb.getone(self.db_file, stmt, [workflow_id])[0]
bolt_port = result
return bolt_port

def get_http_port(self, workflow_id):
"""Return the bolt port associated with a workflow."""
stmt = "SELECT http_port FROM workflows WHERE workflow_id=?"
result = bdb.getone(self.db_file, stmt, [workflow_id])[0]
http_port = result
return http_port

def get_https_port(self, workflow_id):
"""Return the bolt port associated with a workflow."""
stmt = "SELECT https_port FROM workflows WHERE workflow_id=?"
result = bdb.getone(self.db_file, stmt, [workflow_id])[0]
https_port = result
return https_port

def get_gdb_pid(self, workflow_id):
"""Return the bolt port associated with a workflow."""
stmt = "SELECT gdb_pid FROM workflows WHERE workflow_id=?"
result = bdb.getone(self.db_file, stmt, [workflow_id])[0]
gdb_pid = result
return gdb_pid

def update_gdb_pid(self, workflow_id, gdb_pid):
"""Update the gdb PID associated with a workflow."""
stmt = "UPDATE workflows SET gdb_pid=? WHERE workflow_id=?"
bdb.run(self.db_file, stmt, [gdb_pid, workflow_id])

def get_run_dir(self, workflow_id):
"""Return the bolt port associated with a workflow."""
stmt = "SELECT run_dir FROM workflows WHERE workflow_id=?"
"""Return the run directory."""
stmt = "SELECT run_dir FROM info WHERE workflow_id=?"
result = bdb.getone(self.db_file, stmt, [workflow_id])[0]
run_dir = result
return run_dir
Expand All @@ -180,11 +157,8 @@ def _init_tables(self):
workflow_id INTEGER UNIQUE,
name TEXT,
state TEST NOT NULL,
run_dir STR,
bolt_port INTEGER,
http_port INTEGER,
https_port INTEGER,
gdb_pid INTEGER);"""
run_dir STR
);"""

tasks_stmt = """CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY,
Expand All @@ -205,17 +179,21 @@ def _init_tables(self):
wfm_port INTEGER,
tm_port INTEGER,
sched_port INTEGER,
num_workflows INTEGER
num_workflows INTEGER,
bolt_port INTEGER,
http_port INTEGER,
https_port INTEGER,
gdb_pid INTEGER
);"""

bdb.create_table(self.db_file, workflows_stmt)
bdb.create_table(self.db_file, tasks_stmt)
if not bdb.table_exists(self.db_file, 'info'):
bdb.create_table(self.db_file, info_stmt)
# insert a new workflow into the database
stmt = """INSERT INTO info (wfm_port, tm_port, sched_port, num_workflows)
VALUES(?, ?, ?, ?);"""
bdb.run(self.db_file, stmt, [-1, -1, -1, 0])
stmt = """INSERT INTO info (wfm_port, tm_port, sched_port, num_workflows,
bolt_port, http_port, https_port, gdb_pid) VALUES(?, ?, ?, ?, ?, ?, ?, ?);"""
bdb.run(self.db_file, stmt, [-1, -1, -1, 0, -1, -1, -1, -1])

@property
def workflows(self):
Expand Down
File renamed without changes.
88 changes: 88 additions & 0 deletions beeflow/common/deps/container_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#!/usr/bin/env python3

"""Functions for managing the BEE depency container and associated bind mounts."""

import os
import shutil
import subprocess

from beeflow.common.config_driver import BeeConfig as bc
from beeflow.common import paths
from celery import shared_task #noqa pylama can't find celery


class NoContainerRuntime(Exception):
"""An exception for no container runtime like charliecloud or singularity."""


def check_container_runtime():
"""Check if the container runtime is currently installed."""
# Needs to support singuarity as well
if shutil.which("ch-convert") is None or shutil.which("ch-run") is None:
print("ch-convert or ch-run not found. Charliecloud required"
" for neo4j container.")
raise NoContainerRuntime('')


def make_dep_dir():
"""Make a new bee dependency container directory."""
bee_workdir = paths.workdir()
bee_dir = f'{bee_workdir}/deps'
bee_dir_exists = os.path.isdir(bee_dir)
if not bee_dir_exists:
os.makedirs(bee_dir)


def get_dep_dir():
"""Return the dependency directory path."""
bee_workdir = paths.workdir()
bee_container_dir = f'{bee_workdir}/deps/'
return bee_container_dir


def get_container_dir(dep_name):
"""Return the depency container path."""
container_name = dep_name + '_container'
return get_dep_dir() + container_name


def check_container_dir(dep_name):
"""Return true if the container directory exists."""
container_dir = get_container_dir(dep_name)
container_dir_exists = os.path.isdir(container_dir)
return container_dir_exists


def create_image(dep_name):
"""Create a new BEE dependency container if one does not exist.

By default, the container is stored in /tmp/<user>/beeflow/deps.
"""
# Can throw an exception that needs to be handled by the caller
check_container_runtime()

image = bc.get('DEFAULT', dep_name + '_image')

# Check for BEE dependency container directory:
container_dir_exists = check_container_dir(dep_name)
if container_dir_exists:
print(f"Already have {dep_name} container")
return

make_dep_dir()
container_dir = get_container_dir(dep_name)

# Build new dependency container
try:
subprocess.run(["ch-convert", "-i", "tar", "-o", "dir",
str(image), str(container_dir)], check=True)
except subprocess.CalledProcessError as error:
print(f"ch-convert failed: {error}")
shutil.rmtree(container_dir)
print(f"{dep_name} container mount directory {container_dir} removed")
return

# If neo4j, make the certificates directory
if dep_name == 'neo4j':
container_certs_path = os.path.join(container_dir, 'var/lib/neo4j/certificates')
os.makedirs(container_certs_path, exist_ok=True)
Loading