Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAG Paths #931

Merged
merged 25 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
fbb15e9
add support for custom DAG locations and versioning
Leahh02 Sep 20, 2024
e017107
added support for saving final dag png in workflow archive
Leahh02 Sep 20, 2024
aeea7af
updated documentation to include information about paths
Leahh02 Sep 20, 2024
65a37a9
fixed input type for export_dag
Sep 23, 2024
db228d6
fixed final dag location
Sep 23, 2024
605234e
fixed all my linting errors for my files so far
Sep 23, 2024
e72e1be
added an option to not make the dags dir, set to false for archive
Sep 23, 2024
b9cafbe
updated command documentation and visualization documentation
Sep 23, 2024
f8867ed
added export location information to command docs
Sep 23, 2024
ece517b
changed the underscore in the dags dir to a hyphen
Sep 24, 2024
1f804b2
added the apoc install recipe and updated bee_install.sh to use it
Sep 24, 2024
56479a3
modified permissions for recipe and added location of recipe
Sep 24, 2024
d95039e
add gaphviz and libgraphviz-dev to ci deps_install script
Sep 24, 2024
c7ff910
search for graphviz in install
Sep 24, 2024
f8a4775
added information about the naming for multiple dag versions to docs
Sep 30, 2024
1e798e5
Merge branch 'develop' into Issue921/Dag-paths to get changes from PR…
Sep 30, 2024
105ac74
initial working commit for graphml location in workflow archive
Sep 30, 2024
ec32455
make multiple versions of graphmls without overriding older ones
Sep 30, 2024
68fb161
edited bee_client to not let people make DAGs after the workflow has …
Sep 30, 2024
56181c8
edited bee_client and wf_update and wf_utils so now multiple versions…
Sep 30, 2024
2d7707d
fixed linting error in bee_client and added to docs about saving grap…
Sep 30, 2024
7a19c9f
change context directory for ci neo4j image
Sep 30, 2024
7736bcd
change neo4j path variable in ci
Oct 7, 2024
085ddbd
add sleep to beginning of main
Oct 7, 2024
0512fa1
change neo4j path variable back to original
Oct 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions beeflow/client/bee_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,9 +625,35 @@ def reexecute(wf_name: str = typer.Argument(..., help='The workflow name'),


@app.command()
def dag(wf_id: str = typer.Argument(..., callback=match_short_id)):
def dag(wf_id: str = typer.Argument(..., callback=match_short_id),
output_dir: pathlib.Path = typer.Argument(...,
help='Path to the where the dag output will be'),
no_dag_dir: bool = typer.Option(False, '--no-dag-dir',
help='do not make a subdirectory within ouput_dir for the dags')):
"""Export a DAG of the workflow to a GraphML file."""
wf_utils.export_dag(wf_id)
output_dir = output_dir.resolve()
# Make sure output_dir is an absolute path and exists
output_dir = os.path.expanduser(output_dir)
output_dir = os.path.abspath(output_dir)
if not os.path.exists(output_dir):
error_exit(f"Path for dag directory \"{output_dir}\" doesn't exist")

# output_dir must be a string
output_dir = str(output_dir)
# Check if the workflow is archived
wf_status = get_wf_status(wf_id)
if wf_status == 'Archived':
bee_workdir = wf_utils.get_bee_workdir()
mount_dir = os.path.join(bee_workdir, 'gdb_mount')
graphmls_dir = mount_dir + '/graphmls'
typer.secho("Workflow has been archived. All new DAGs will look the same as the one "
"in the archive directory.",
fg=typer.colors.MAGENTA)
else:
wf_dir = wf_utils.get_workflow_dir(wf_id)
graphmls_dir = wf_dir + '/graphmls'
os.makedirs(graphmls_dir, exist_ok=True)
wf_utils.export_dag(wf_id, output_dir, graphmls_dir, no_dag_dir)
typer.secho(f"DAG for workflow {_short_id(wf_id)} has been exported successfully.",
fg=typer.colors.GREEN)

Expand Down
4 changes: 1 addition & 3 deletions beeflow/common/deps/neo4j_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
run_dir = mount_dir + '/run'
certs_dir = mount_dir + '/certificates'
confs_dir = mount_dir + "/conf"
dags_dir = os.path.join(bee_workdir, 'dags')
graphmls_dir = dags_dir + "/graphmls"
graphmls_dir = mount_dir + '/graphmls'
container_path = container_manager.get_container_dir('neo4j')
log = bee_logging.setup('neo4j')

Expand Down Expand Up @@ -61,7 +60,6 @@ def setup_mounts():
os.makedirs(mount_dir, exist_ok=True)
os.makedirs(certs_dir, exist_ok=True)
os.makedirs(run_dir, exist_ok=True)
os.makedirs(dags_dir, exist_ok=True)
os.makedirs(graphmls_dir, exist_ok=True)


Expand Down
27 changes: 18 additions & 9 deletions beeflow/common/gdb/generate_graph.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,30 @@
"""Module to make a png of a graph from a graphml file."""

import os
import shutil
import networkx as nx
import graphviz

from beeflow.common import paths

bee_workdir = paths.workdir()
dags_dir = os.path.join(bee_workdir, 'dags')
graphmls_dir = dags_dir + "/graphmls"


def generate_viz(wf_id):
def generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir):
"""Generate a PNG of a workflow graph from a GraphML file."""
short_id = wf_id[:6]
graphml_path = graphmls_dir + "/" + short_id + ".graphml"
output_path = dags_dir + "/" + short_id

if no_dag_dir:
dags_dir = output_dir
else:
dags_dir = output_dir + "/" + short_id + "-dags"
os.makedirs(dags_dir, exist_ok=True)

output_path = dags_dir + "/" + short_id + ".png"
if os.path.exists(output_path):
i = 1
backup_path = f'{dags_dir}/{short_id}_v{i}.png'
while os.path.exists(backup_path):
i += 1
backup_path = f'{dags_dir}/{short_id}_v{i}.png'
shutil.copy(output_path, backup_path)

# Load the GraphML file using NetworkX
graph = nx.read_graphml(graphml_path)
Expand All @@ -29,7 +38,7 @@ def generate_viz(wf_id):

# Render the graph and save as PNG
png_data = dot.pipe(format='png')
with open(output_path + ".png", "wb") as png_file:
with open(output_path, "wb") as png_file:
png_file.write(png_data)


Expand Down
24 changes: 17 additions & 7 deletions beeflow/common/gdb/graphml_key_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

import xml.etree.ElementTree as ET
import os
import shutil

from beeflow.common import paths

bee_workdir = paths.workdir()
dags_dir = os.path.join(bee_workdir, 'dags')
graphmls_dir = dags_dir + "/graphmls"
mount_dir = os.path.join(bee_workdir, 'gdb_mount')
gdb_graphmls_dir = mount_dir + '/graphmls'

expected_keys = {"id", "name", "state", "class", "type", "value", "source",
"workflow_id", "base_command", "stdout", "stderr", "default",
Expand All @@ -33,12 +34,21 @@
}


def update_graphml(wf_id):
def update_graphml(wf_id, graphmls_dir):
"""Update GraphML file by ensuring required keys are present and updating its structure."""
short_id = wf_id[:6]
graphml_path = graphmls_dir + "/" + short_id + ".graphml"
gdb_graphml_path = gdb_graphmls_dir + "/" + short_id + ".graphml"
output_graphml_path = graphmls_dir + "/" + short_id + ".graphml"
# Handle making multiple versions of the graphmls without overriding old ones
if os.path.exists(output_graphml_path):
i = 1
backup_path = f'{graphmls_dir}/{short_id}_v{i}.graphml'
while os.path.exists(backup_path):
i += 1
backup_path = f'{graphmls_dir}/{short_id}_v{i}.graphml'
shutil.copy(output_graphml_path, backup_path)
# Parse the GraphML file and preserve namespaces
tree = ET.parse(graphml_path)
tree = ET.parse(gdb_graphml_path)
root = tree.getroot()

name_space = {'graphml': 'http://graphml.graphdrawing.org/xmlns'}
Expand All @@ -56,5 +66,5 @@ def update_graphml(wf_id):
**default_def)
root.insert(0, key_element)

# Save the updated GraphML file by overwriting the original one
tree.write(graphml_path, encoding='UTF-8', xml_declaration=True)
# Save the updated GraphML file
tree.write(output_graphml_path, encoding='UTF-8', xml_declaration=True)
2 changes: 2 additions & 0 deletions beeflow/common/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import shutil
import sys
import time
import uuid
import typer

Expand Down Expand Up @@ -293,6 +294,7 @@ def main(tests = typer.Option(None, '--tests', '-t', # noqa (conflict on '=' si
timeout: int = typer.Option(utils.TIMEOUT, '--timeout',
help='workflow timeout in seconds')):
"""Launch the integration tests."""
time.sleep(60)
if show_tests:
print('INTEGRATION TEST CASES:')
for test_name, ignore in TEST_RUNNER.test_details():
Expand Down
4 changes: 4 additions & 0 deletions beeflow/wf_manager/resources/wf_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ def archive_workflow(db, wf_id, final_state=None):
workflow_dir = wf_utils.get_workflow_dir(wf_id)
shutil.copyfile(os.path.expanduser("~") + '/.config/beeflow/bee.conf',
workflow_dir + '/' + 'bee.conf')
# Archive Completed DAG
graphmls_dir = workflow_dir + "/graphmls"
os.makedirs(graphmls_dir, exist_ok=True)
wf_utils.export_dag(wf_id, workflow_dir, graphmls_dir, no_dag_dir=True)

wf_state = f'Archived/{final_state}' if final_state is not None else 'Archived'
db.workflows.update_workflow_state(wf_id, wf_state)
Expand Down
6 changes: 3 additions & 3 deletions beeflow/wf_manager/resources/wf_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,12 @@ def setup_workflow(wf_id, wf_name, wf_dir, wf_workdir, no_start, workflow=None,
start_workflow(wf_id)


def export_dag(wf_id):
def export_dag(wf_id, output_dir, graphmls_dir, no_dag_dir):
"""Export the DAG of the workflow."""
wfi = get_workflow_interface(wf_id)
wfi.export_graphml()
update_graphml(wf_id)
generate_viz(wf_id)
update_graphml(wf_id, graphmls_dir)
generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir)


def start_workflow(wf_id):
Expand Down
5 changes: 3 additions & 2 deletions ci/bee_install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ printf "**Setting up BEE containers**\n"
printf "\n\n"
mkdir -p $HOME/img
# Pull the Neo4j container
ch-image pull neo4j:5.17 || exit 1
ch-convert -i ch-image -o tar neo4j:5.17 $NEO4J_CONTAINER || exit 1
chmod +x ./ci/install_apoc_docker
ch-image build -t apoc_neo4j -f ./ci/install_apoc_docker ./ci || exit 1
ch-convert -i ch-image -o tar apoc_neo4j $NEO4J_CONTAINER || exit 1
# Pull the Redis container
ch-image pull redis || exit 1
ch-convert -i ch-image -o tar redis $REDIS_CONTAINER || exit 1
Expand Down
4 changes: 3 additions & 1 deletion ci/deps_install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ sudo apt-get install -y slurmctld slurmd slurmrestd munge python3 python3-venv \
curl build-essential zlib1g-dev libncurses5-dev libgdbm-dev libnss3-dev \
libssl-dev libsqlite3-dev libreadline-dev libffi-dev libbz2-dev \
libmunge-dev \
libyaml-dev # needed for PyYAML
libyaml-dev # needed for PyYAML

sudo apt-get install -y graphviz libgraphviz-dev

# Install most recent Charliecloud
curl -O -L https://github.com/hpc/charliecloud/releases/download/v${CHARLIECLOUD_VERSION}/charliecloud-${CHARLIECLOUD_VERSION}.tar.gz
Expand Down
2 changes: 2 additions & 0 deletions ci/install_apoc_docker
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
FROM neo4j:5.17-community
RUN cp /var/lib/neo4j/labs/apoc-5.17.0-core.jar /var/lib/neo4j/plugins/apoc-5.17.8-core.jar
8 changes: 6 additions & 2 deletions docs/sphinx/commands.rst
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,14 @@ Arguments:
Arguments:
WF_ID [required]

``beeflow dag``: Export a directed acyclic graph (DAG) of a submitted workflow. This command can be run at any point of the workflow. To see the DAG of a workflow before it runs, submit the workflow with the ``--no-start`` flag and then use the dag command. The DAGs are exported to ~/.beeflow/dags. See :ref:`workflow-visualization` for more information.
``beeflow dag``: Export a directed acyclic graph (DAG) of a submitted workflow. This command can be run at any point of the workflow. To see the DAG of a workflow before it runs, submit the workflow with the ``--no-start`` flag and then use the dag command. The DAGs are exported to $OUTPUT_DIR/$WD_ID-dags by default. If the ``no-dag-dir`` flag is specified when the dag command is run, the DAG will be exported to $OUTPUT_DIR. The dag command makes multiple versions of the DAGs. The most recent version is $WF_ID.png and the others are $WD_ID_v1.png, $WF_ID_v2.png ... where v1 is the oldest. See :ref:`workflow-visualization` for more information.

Arguments:
WF_ID [required]
- WF_ID [required]
- OUTPUT_DIR, Directory for the output [required]

Options:
``no-dag-dir``: Do not make a subdirectory within the output_dir for the DAGs.

Generating and Managing Configuration Files
===========================================
Expand Down
11 changes: 8 additions & 3 deletions docs/sphinx/visualization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Workflow Visualization
**********************

BEE includes a simple command for viewing BEE workflows. By using the ``beeflow
dag $ID`` command, you can view the directed acyclic graph (DAG) of any submitted
dag $ID $OUTPUT_DIR`` command, you can view the directed acyclic graph (DAG) of any submitted
workflow.

Creating DAGs
Expand All @@ -13,8 +13,13 @@ Creating DAGs
The dag command can be run at any point of the workflow, and can
be run multiple times. To see the DAG of a workflow before it runs, submit
the workflow with the ``--no-start`` flag and then use the dag command. The
DAGs are exported in PNG format to ~/.beeflow/dags. They follow the naming
convention ``$ID.png``
DAGs are exported in PNG format to $OUTPUT_DIR/$WD_ID-dags by default. If the
``no-dag-dir`` flag is specified when the dag command is run, the DAG will be
exported to $OUTPUT_DIR. The dag command makes multiple versions of the DAGs. The
most recent version is $WF_ID.png and the others are $WD_ID_v1.png,
$WF_ID_v2.png ... where v1 is the oldest. The graphmls used to make the DAGs are saved
in the workflow archive and are saved with their version number. These graphmls can
be useful for debugging when there are errors creating the DAGs.

Example DAG
===========
Expand Down