Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export dag #909

Merged
merged 32 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e3069ed
added changes made to visualize-dag from 08/19/24
Aug 28, 2024
5681127
added changes from e2f35ac95c694bc0fae75977f20e0cf82d1151ee in visual…
Aug 28, 2024
accafb3
added changes from 12c3725f405245b91539c961df2630dc8abc8d8d in visual…
Aug 28, 2024
2e284d1
added changes from 6a244f8fa49c2b78463ba6fd727b9e69091d15ef on visual…
Aug 28, 2024
2854c94
decreased verbosity of beeflow dag command
Aug 28, 2024
090a17e
fixed some linting errors
Aug 29, 2024
e65a6a8
linting error
Aug 29, 2024
eb2ca01
fixed indent
Aug 29, 2024
16e71fa
liniting
Aug 29, 2024
811a449
white space fix
Aug 29, 2024
27f1ad3
white space fix
Aug 29, 2024
8d0672b
changed names of functions that export the graphml to export_graphml
Sep 10, 2024
39939ad
integrated generate_graph into the bee code
Sep 10, 2024
ebc7e35
don't export intermediate .dot file
Sep 10, 2024
19613a4
fixed linting error
Sep 11, 2024
bb3f845
added command to documentation
Sep 11, 2024
69441bc
updated visualization page
Sep 11, 2024
b933512
added reference to visualization page
Sep 11, 2024
99bbe69
updated neo4j path instructions
Sep 11, 2024
2462781
hopefully fixed merge conflict
Sep 11, 2024
58d876d
attempt to fix conflict
Sep 11, 2024
eb0e1cb
added graphviz to the poetry.lock in develop
Sep 12, 2024
b9ce644
added graphml_key_updater to add any missing keys
Sep 12, 2024
c8a9762
added hint and requirement nodes to generate_graph
Sep 12, 2024
e506c69
fixed graphml_key_updater linting errors
Sep 12, 2024
647bf66
refactored generate_graph to fix linting error C901
Sep 12, 2024
c2f081a
Merge branch 'develop' into export-dag
Sep 12, 2024
d65a90f
fixed orientation of hint and requirement nodes in generate_graph
Sep 12, 2024
c2fe32d
Updated coverage.svg
github-actions[bot] Sep 12, 2024
f09f3e1
fixed generate_graph and graphml_key_updater linting errors
Sep 12, 2024
85d4edb
Merge branch 'export-dag' of github.com:lanl/BEE into export-dag
Sep 12, 2024
fbd05a4
shortened id in secho statement and made text green
Sep 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions beeflow/client/bee_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@


logging.basicConfig(level=logging.WARNING)
logging.getLogger("neo4j").setLevel(logging.WARNING)
WORKFLOW_MANAGER = 'bee_wfm/v1/jobs/'


Expand Down Expand Up @@ -593,6 +594,13 @@ def reexecute(wf_name: str = typer.Argument(..., help='The workflow name'),
return wf_id


@app.command()
def dag(wf_id: str = typer.Argument(..., callback=match_short_id)):
"""Export a DAG of the workflow to a GraphML file."""
wf_utils.export_dag(wf_id)
typer.echo(f"DAG for workflow {wf_id} has been exported successfully.")


@app.callback(invoke_without_command=True)
def version_callback(version: bool = False):
"""Beeflow."""
Expand Down
24 changes: 24 additions & 0 deletions beeflow/common/deps/neo4j_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
run_dir = mount_dir + '/run'
certs_dir = mount_dir + '/certificates'
confs_dir = mount_dir + "/conf"
dags_dir = os.path.join(bee_workdir, 'dags')
container_path = container_manager.get_container_dir('neo4j')
log = bee_logging.setup('neo4j')

Expand Down Expand Up @@ -59,6 +60,7 @@ def setup_mounts():
os.makedirs(mount_dir, exist_ok=True)
os.makedirs(certs_dir, exist_ok=True)
os.makedirs(run_dir, exist_ok=True)
os.makedirs(dags_dir, exist_ok=True)


def setup_configs(bolt_port, http_port, https_port):
Expand Down Expand Up @@ -86,6 +88,24 @@ def setup_configs(bolt_port, http_port, https_port):
with open(gdb_configfile, "wt", encoding="utf8") as cfile:
cfile.write(data)

apoc_configfile = os.path.join(confs_dir, "apoc.conf")
if not os.path.exists(apoc_configfile):
with open(apoc_configfile, "wt", encoding="utf8") as afile:
afile.write("apoc.export.file.enabled=true\n")
log.info(f"Created {apoc_configfile} with apoc.export.file.enabled=true")
else:
with open(apoc_configfile, "rt", encoding="utf8") as afile:
apoc_data = afile.read()

apoc_config = r'#?(apoc.export.file.enabled=)[^\n]*'
if re.search(apoc_config, apoc_data):
apoc_data = re.sub(apoc_config, r'apoc.export.file.enabled=true', apoc_data)
else:
apoc_data += "\napoc.export.file.enabled=true\n"

with open(apoc_configfile, "wt", encoding="utf8") as afile:
afile.write(apoc_data)


def create_credentials():
"""Create the password and set the logfiles in environment."""
Expand All @@ -95,10 +115,12 @@ def create_credentials():
subprocess.run([
"ch-run",
"--set-env=" + container_path + "/ch/environment",
"--set-env=apoc.export.file.enabled=true",
"-b", confs_dir + ":/var/lib/neo4j/conf",
"-b", data_dir + ":/data",
"-b", logs_dir + ":/logs",
"-b", run_dir + ":/var/lib/neo4j/run", container_path,
"-W", "-b", dags_dir + ":/var/lib/neo4j/import",
"--", *command
], check=True)
except subprocess.CalledProcessError:
Expand All @@ -112,11 +134,13 @@ def create_database():
proc = subprocess.Popen([ #noqa can't use with because returning
"ch-run",
"--set-env=" + container_path + "/ch/environment",
"--set-env=apoc.export.file.enabled=true",
"-b", confs_dir + ":/var/lib/neo4j/conf",
"-b", data_dir + ":/data",
"-b", logs_dir + ":/logs",
"-b", run_dir + ":/var/lib/neo4j/run",
"-b", certs_dir + ":/var/lib/neo4j/certificates",
"-W", "-b", dags_dir + ":/var/lib/neo4j/import",
container_path, "--", *command
], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
wait_gdb()
Expand Down
4 changes: 4 additions & 0 deletions beeflow/common/gdb/gdb_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,7 @@ def workflow_completed(self):
@abstractmethod
def close(self):
"""Close the connection to the graph database."""

@abstractmethod
def export_dag(self):
"""Export a BEE workflow as a graphml."""
16 changes: 16 additions & 0 deletions beeflow/common/gdb/neo4j_cypher.py
Original file line number Diff line number Diff line change
Expand Up @@ -725,3 +725,19 @@ def cleanup(tx):
cleanup_query = "MATCH (n) DETACH DELETE n"

tx.run(cleanup_query)


def export_dag(tx, wf_id):
"""Export BEE workflow as graphml."""
short_id = wf_id[:6]
export_query = (
"WITH \"MATCH (n1)-[r]->(n2) "
f"WHERE n1.workflow_id = '{wf_id}' OR n2.workflow_id = '{wf_id}' "
"RETURN r, n1, n2\" AS query "
f"CALL apoc.export.graphml.query(query, '{short_id}.graphml', {{useTypes: true}}) "
"YIELD file, source, format, nodes, relationships, properties, time, rows, batchSize, "
"batches, done, data "
"RETURN file, source, format, nodes, relationships, properties, time, rows, batchSize, "
"batches, done, data"
)
tx.run(export_query)
5 changes: 5 additions & 0 deletions beeflow/common/gdb/neo4j_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,11 @@ def _write_transaction(self, tx_fun, **kwargs):
with self._driver.session() as session:
session.write_transaction(tx_fun, **kwargs)

def export_dag(self, workflow_id):
"""Export a BEE workflow as a graphml."""
with self._driver.session() as session:
session.write_transaction(tx.export_dag, wf_id=workflow_id)


def _reconstruct_requirements(req_records):
"""Reconstruct requirements by their records retrieved from Neo4j.
Expand Down
4 changes: 4 additions & 0 deletions beeflow/common/wf_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,3 +304,7 @@ def workflow_completed(self):
:rtype: bool
"""
return self._gdb_driver.workflow_completed(self._workflow_id)

def export_dag(self):
"""Export a BEE workflow as a graphml."""
self._gdb_driver.export_dag(self._workflow_id)
6 changes: 6 additions & 0 deletions beeflow/wf_manager/resources/wf_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,12 @@ def setup_workflow(wf_id, wf_name, wf_dir, wf_workdir, no_start, workflow=None,
start_workflow(wf_id)


def export_dag(wf_id):
"""Export the DAG of the workflow."""
wfi = get_workflow_interface(wf_id)
wfi.export_dag()


def start_workflow(wf_id):
"""Attempt to start the workflow, returning True if successful."""
db = connect_db(wfm_db, get_db_path())
Expand Down
56 changes: 56 additions & 0 deletions generate_graph.py
pagrubel marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import networkx as nx
import graphviz

def graphml_to_graphviz(graphml_path, output_path):
# Load the GraphML file using NetworkX
G = nx.read_graphml(graphml_path)

# Create a new directed graph for Graphviz
dot = graphviz.Digraph(comment='Hierarchical Graph')

# Add nodes to the Graphviz graph
for node in G.nodes(data=True):
node_id = node[0]
label = node[1].get('labels', node_id) # Use label if available, otherwise node_id
if label == ":Workflow":
node_label = "Workflow"
color ='steelblue'
if label == ":Output":
node_label = node[1].get('glob', node_id)
color = 'mediumseagreen'
if label == ":Metadata":
node_label = node[1].get('state', node_id)
color = 'skyblue'
if label == ":Task":
node_label = node[1].get('name', node_id)
color ='lightcoral'
if label == ":Input":
node_label = node[1].get('source', node_id)
color ='sandybrown'

dot.node(node_id, label=node_label, style='filled', fillcolor=color)

# Add edges to the Graphviz graph
for edge in G.edges(data=True):
source = edge[0]
target = edge[1]
edge_label = edge[2].get('label', '') # Use edge label if available
if edge_label == "INPUT_OF" or edge_label == "DESCRIBES":
dot.edge(source, target, label=edge_label, fontsize="10")
else:
dot.edge(target, source, label=edge_label, fontsize="10")

# Set the output format to PNG and render the graph
dot.format = 'png'
dot.render(output_path, view=False)

if __name__ == "__main__":
# Path to your GraphML file
graphml_file = '/vast/home/leahh/.beeflow/dags/bebd6f.graphml'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this user specific path, and replace it with a generic path for this PR.

output_file = 'hierarchical_graph_clamr'

# Generate the hierarchical graph
graphml_to_graphviz(graphml_file, output_file)

print(f"Graph has been saved as {output_file}.png")

18 changes: 17 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ docutils = "0.18.1"
google-api-python-client = { version = "^2.66.0", optional = true }
python-openstackclient = { version = "^6.0.0", optional = true }
python-heatclient = { version = "^3.1.0", optional = true }
graphviz = "^0.20.3"

[tool.poetry.extras]
cloud_extras = ["google-api-python-client", "python-openstackclient", "python-heatclient"]
Expand Down