Skip to content

Commit

Permalink
Merge branch 'develop' into issue883/add-check-for-different-front-end
Browse files Browse the repository at this point in the history
  • Loading branch information
kchilleri committed Sep 30, 2024
2 parents e40d8e6 + cd51dc2 commit 377f070
Show file tree
Hide file tree
Showing 17 changed files with 2,099 additions and 1,991 deletions.
9 changes: 9 additions & 0 deletions beeflow/client/bee_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@


logging.basicConfig(level=logging.WARNING)
logging.getLogger("neo4j").setLevel(logging.WARNING)
WORKFLOW_MANAGER = 'bee_wfm/v1/jobs/'


Expand Down Expand Up @@ -655,6 +656,14 @@ def reexecute(wf_name: str = typer.Argument(..., help='The workflow name'),
return wf_id


@app.command()
def dag(wf_id: str = typer.Argument(..., callback=match_short_id)):
"""Export a DAG of the workflow to a GraphML file."""
wf_utils.export_dag(wf_id)
typer.secho(f"DAG for workflow {_short_id(wf_id)} has been exported successfully.",
fg=typer.colors.GREEN)


@app.callback(invoke_without_command=True)
def version_callback(version: bool = False):
"""Beeflow."""
Expand Down
26 changes: 26 additions & 0 deletions beeflow/common/deps/neo4j_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
run_dir = mount_dir + '/run'
certs_dir = mount_dir + '/certificates'
confs_dir = mount_dir + "/conf"
dags_dir = os.path.join(bee_workdir, 'dags')
graphmls_dir = dags_dir + "/graphmls"
container_path = container_manager.get_container_dir('neo4j')
log = bee_logging.setup('neo4j')

Expand Down Expand Up @@ -59,6 +61,8 @@ def setup_mounts():
os.makedirs(mount_dir, exist_ok=True)
os.makedirs(certs_dir, exist_ok=True)
os.makedirs(run_dir, exist_ok=True)
os.makedirs(dags_dir, exist_ok=True)
os.makedirs(graphmls_dir, exist_ok=True)


def setup_configs(bolt_port, http_port, https_port):
Expand Down Expand Up @@ -86,6 +90,24 @@ def setup_configs(bolt_port, http_port, https_port):
with open(gdb_configfile, "wt", encoding="utf8") as cfile:
cfile.write(data)

apoc_configfile = os.path.join(confs_dir, "apoc.conf")
if not os.path.exists(apoc_configfile):
with open(apoc_configfile, "wt", encoding="utf8") as afile:
afile.write("apoc.export.file.enabled=true\n")
log.info(f"Created {apoc_configfile} with apoc.export.file.enabled=true")
else:
with open(apoc_configfile, "rt", encoding="utf8") as afile:
apoc_data = afile.read()

apoc_config = r'#?(apoc.export.file.enabled=)[^\n]*'
if re.search(apoc_config, apoc_data):
apoc_data = re.sub(apoc_config, r'apoc.export.file.enabled=true', apoc_data)
else:
apoc_data += "\napoc.export.file.enabled=true\n"

with open(apoc_configfile, "wt", encoding="utf8") as afile:
afile.write(apoc_data)


def create_credentials():
"""Create the password and set the logfiles in environment."""
Expand All @@ -95,10 +117,12 @@ def create_credentials():
subprocess.run([
"ch-run",
"--set-env=" + container_path + "/ch/environment",
"--set-env=apoc.export.file.enabled=true",
"-b", confs_dir + ":/var/lib/neo4j/conf",
"-b", data_dir + ":/data",
"-b", logs_dir + ":/logs",
"-b", run_dir + ":/var/lib/neo4j/run", container_path,
"-W", "-b", graphmls_dir + ":/var/lib/neo4j/import",
"--", *command
], check=True)
except subprocess.CalledProcessError:
Expand All @@ -112,11 +136,13 @@ def create_database():
proc = subprocess.Popen([ #noqa can't use with because returning
"ch-run",
"--set-env=" + container_path + "/ch/environment",
"--set-env=apoc.export.file.enabled=true",
"-b", confs_dir + ":/var/lib/neo4j/conf",
"-b", data_dir + ":/data",
"-b", logs_dir + ":/logs",
"-b", run_dir + ":/var/lib/neo4j/run",
"-b", certs_dir + ":/var/lib/neo4j/certificates",
"-W", "-b", graphmls_dir + ":/var/lib/neo4j/import",
container_path, "--", *command
], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
wait_gdb()
Expand Down
4 changes: 4 additions & 0 deletions beeflow/common/gdb/gdb_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,7 @@ def workflow_completed(self):
@abstractmethod
def close(self):
"""Close the connection to the graph database."""

@abstractmethod
def export_graphml(self):
"""Export a BEE workflow as a graphml."""
84 changes: 84 additions & 0 deletions beeflow/common/gdb/generate_graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""Module to make a png of a graph from a graphml file."""

import os
import networkx as nx
import graphviz

from beeflow.common import paths

bee_workdir = paths.workdir()
dags_dir = os.path.join(bee_workdir, 'dags')
graphmls_dir = dags_dir + "/graphmls"


def generate_viz(wf_id):
"""Generate a PNG of a workflow graph from a GraphML file."""
short_id = wf_id[:6]
graphml_path = graphmls_dir + "/" + short_id + ".graphml"
output_path = dags_dir + "/" + short_id

# Load the GraphML file using NetworkX
graph = nx.read_graphml(graphml_path)

# Initialize Graphviz graph
dot = graphviz.Digraph(comment='Hierarchical Graph')

# Add nodes and edges using helper functions
add_nodes_to_dot(graph, dot)
add_edges_to_dot(graph, dot)

# Render the graph and save as PNG
png_data = dot.pipe(format='png')
with open(output_path + ".png", "wb") as png_file:
png_file.write(png_data)


def add_nodes_to_dot(graph, dot):
"""Add nodes from the graph to the Graphviz object with labels and colors."""
label_to_color = {
":Workflow": 'steelblue',
":Output": 'mediumseagreen',
":Metadata": 'skyblue',
":Task": 'lightcoral',
":Input": 'sandybrown',
":Hint": 'plum',
":Requirement": 'lightpink1'
}

for node_id, attributes in graph.nodes(data=True):
label = attributes.get('labels', node_id)
node_label, color = get_node_label_and_color(label, attributes, label_to_color)
dot.node(node_id, label=node_label, style='filled', fillcolor=color)


def get_node_label_and_color(label, attributes, label_to_color):
"""Return the appropriate node label and color based on node type."""
label_to_attribute = {
":Workflow": "Workflow",
":Output": attributes.get('glob', label),
":Metadata": attributes.get('state', label),
":Task": attributes.get('name', label),
":Input": attributes.get('source', label),
":Hint": attributes.get('class', label),
":Requirement": attributes.get('class', label)
}

# Check if the label is in the predefined labels
if label in label_to_attribute:
return label_to_attribute[label], label_to_color.get(label, 'gray')

# Default case if no match
return label, 'gray'


def add_edges_to_dot(graph, dot):
"""Add edges from the graph to the Graphviz object with appropriate labels."""
for source, target, attributes in graph.edges(data=True):
edge_label = attributes.get('label', '')
if edge_label in ('INPUT_OF', 'DESCRIBES', 'HINT_OF', 'REQUIREMENT_OF'):
dot.edge(source, target, label=edge_label, fontsize="10")
elif edge_label in ('DEPENDS_ON', 'RESTARTED_FROM'):
dot.edge(target, source, label=edge_label, penwidth="3",
fontsize="10", fontname="times-bold")
else:
dot.edge(target, source, label=edge_label, fontsize="10")
60 changes: 60 additions & 0 deletions beeflow/common/gdb/graphml_key_updater.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""Module to make sure all required keys are present."""

import xml.etree.ElementTree as ET
import os

from beeflow.common import paths

bee_workdir = paths.workdir()
dags_dir = os.path.join(bee_workdir, 'dags')
graphmls_dir = dags_dir + "/graphmls"

expected_keys = {"id", "name", "state", "class", "type", "value", "source",
"workflow_id", "base_command", "stdout", "stderr", "default",
"prefix", "position", "value_from", "glob"}

default_key_definitions = {
"id": {"for": "node", "attr.name": "id", "attr.type": "string"},
"name": {"for": "node", "attr.name": "name", "attr.type": "string"},
"state": {"for": "node", "attr.name": "state", "attr.type": "string"},
"class": {"for": "node", "attr.name": "class", "attr.type": "string"},
"type": {"for": "node", "attr.name": "type", "attr.type": "string"},
"value": {"for": "node", "attr.name": "value", "attr.type": "string"},
"source": {"for": "node", "attr.name": "source", "attr.type": "string"},
"workflow_id": {"for": "node", "attr.name": "workflow_id", "attr.type": "string"},
"base_command": {"for": "node", "attr.name": "base_command", "attr.type": "string"},
"stdout": {"for": "node", "attr.name": "stdout", "attr.type": "string"},
"stderr": {"for": "node", "attr.name": "stderr", "attr.type": "string"},
"default": {"for": "node", "attr.name": "default", "attr.type": "string"},
"prefix": {"for": "node", "attr.name": "prefix", "attr.type": "string"},
"position": {"for": "node", "attr.name": "position", "attr.type": "long"},
"value_from": {"for": "node", "attr.name": "value_from", "attr.type": "string"},
"glob": {"for": "node", "attr.name": "glob", "attr.type": "string"},
}


def update_graphml(wf_id):
"""Update GraphML file by ensuring required keys are present and updating its structure."""
short_id = wf_id[:6]
graphml_path = graphmls_dir + "/" + short_id + ".graphml"
# Parse the GraphML file and preserve namespaces
tree = ET.parse(graphml_path)
root = tree.getroot()

name_space = {'graphml': 'http://graphml.graphdrawing.org/xmlns'}
defined_keys = {key.attrib['id'] for key in root.findall('graphml:key', name_space)}
used_keys = {data.attrib['key'] for data in root.findall('.//graphml:data', name_space)}

missing_keys = used_keys - defined_keys

# Insert default key definitions for missing keys
for missing_key in missing_keys:
if missing_key in expected_keys:
default_def = default_key_definitions[missing_key]
key_element = ET.Element(f'{{{name_space["graphml"]}}}key',
id=missing_key,
**default_def)
root.insert(0, key_element)

# Save the updated GraphML file by overwriting the original one
tree.write(graphml_path, encoding='UTF-8', xml_declaration=True)
16 changes: 16 additions & 0 deletions beeflow/common/gdb/neo4j_cypher.py
Original file line number Diff line number Diff line change
Expand Up @@ -725,3 +725,19 @@ def cleanup(tx):
cleanup_query = "MATCH (n) DETACH DELETE n"

tx.run(cleanup_query)


def export_graphml(tx, wf_id):
"""Export BEE workflow as graphml."""
short_id = wf_id[:6]
export_query = (
"WITH \"MATCH (n1)-[r]->(n2) "
f"WHERE n1.workflow_id = '{wf_id}' OR n2.workflow_id = '{wf_id}' "
"RETURN r, n1, n2\" AS query "
f"CALL apoc.export.graphml.query(query, '{short_id}.graphml', {{useTypes: true}}) "
"YIELD file, source, format, nodes, relationships, properties, time, rows, batchSize, "
"batches, done, data "
"RETURN file, source, format, nodes, relationships, properties, time, rows, batchSize, "
"batches, done, data"
)
tx.run(export_query)
8 changes: 7 additions & 1 deletion beeflow/common/gdb/neo4j_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ def restart_task(self, old_task, new_task):
session.write_transaction(tx.create_task_requirement_nodes, task=new_task)
session.write_transaction(tx.create_task_input_nodes, task=new_task)
session.write_transaction(tx.create_task_output_nodes, task=new_task)
session.write_transaction(tx.create_task_metadata_node, task=new_task)
session.write_transaction(tx.create_task_metadata_node, task=new_task,
task_state="WAITING")
session.write_transaction(tx.add_dependencies, task=new_task, old_task=old_task,
restarted_task=True)

Expand Down Expand Up @@ -473,6 +474,11 @@ def _write_transaction(self, tx_fun, **kwargs):
with self._driver.session() as session:
session.write_transaction(tx_fun, **kwargs)

def export_graphml(self, workflow_id):
"""Export a BEE workflow as a graphml."""
with self._driver.session() as session:
session.write_transaction(tx.export_graphml, wf_id=workflow_id)


def _reconstruct_requirements(req_records):
"""Reconstruct requirements by their records retrieved from Neo4j.
Expand Down
4 changes: 4 additions & 0 deletions beeflow/common/wf_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,3 +304,7 @@ def workflow_completed(self):
:rtype: bool
"""
return self._gdb_driver.workflow_completed(self._workflow_id)

def export_graphml(self):
"""Export a BEE workflow as a graphml."""
self._gdb_driver.export_graphml(self._workflow_id)
10 changes: 10 additions & 0 deletions beeflow/wf_manager/resources/wf_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from beeflow.common import log as bee_logging
from beeflow.common.config_driver import BeeConfig as bc
from beeflow.common.gdb import neo4j_driver
from beeflow.common.gdb.generate_graph import generate_viz
from beeflow.common.gdb.graphml_key_updater import update_graphml
from beeflow.common.wf_interface import WorkflowInterface
from beeflow.common.connection import Connection
from beeflow.common import paths
Expand Down Expand Up @@ -292,6 +294,14 @@ def setup_workflow(wf_id, wf_name, wf_dir, wf_workdir, no_start, workflow=None,
start_workflow(wf_id)


def export_dag(wf_id):
"""Export the DAG of the workflow."""
wfi = get_workflow_interface(wf_id)
wfi.export_graphml()
update_graphml(wf_id)
generate_viz(wf_id)


def start_workflow(wf_id):
"""Attempt to start the workflow, returning True if successful."""
db = connect_db(wfm_db, get_db_path())
Expand Down
4 changes: 2 additions & 2 deletions coverage.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 5 additions & 0 deletions docs/sphinx/commands.rst
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ Arguments:

``beeflow reexecute``: Reexecute an archived workflow.

Arguments:
WF_ID [required]

``beeflow dag``: Export a directed acyclic graph (DAG) of a submitted workflow. This command can be run at any point of the workflow. To see the DAG of a workflow before it runs, submit the workflow with the ``--no-start`` flag and then use the dag command. The DAGs are exported to ~/.beeflow/dags. See :ref:`workflow-visualization` for more information.

Arguments:
WF_ID [required]

Expand Down
Binary file added docs/sphinx/images/cat-dag.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 3 additions & 2 deletions docs/sphinx/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
error_logs
contribute
development
wf_api
rest_api
visualization

.. Commented out
wf_api
rest_api
Indices and tables
==================
Expand Down
2 changes: 1 addition & 1 deletion docs/sphinx/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Requirements:
* **Containers**:
Two Charliecloud dependency containers are currently required for BEE: one for the Neo4j graph database and another for Redis. The paths to these containers will need to be set in the BEE configuration later, using the ``neo4j_image`` and the ``redis_image`` options respectively. BEE only supports Neo4j 5.x. We are currently using the latest version of Redis supplied on Docker Hub (as of 2023).

For LANL systems, please use the containers supplied by the BEE team: **/usr/projects/BEE/neo4j-5-17.tar.gz**, **/usr/projects/BEE/redis.tar.gz**.
For LANL systems, please use the containers supplied by the BEE team: **/usr/projects/BEE/neo4j.tar.gz**, **/usr/projects/BEE/redis.tar.gz**.

For other users, these containers can be pulled from Docker Hub (after following `Installation:`_ below) using ``beeflow core pull-deps``, which will download and report the container paths to be set in the config later.

Expand Down
Loading

0 comments on commit 377f070

Please sign in to comment.