From 2fee4b24d8128e3faf08352e363438309eea35ca Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Mon, 18 Mar 2024 13:27:29 -0600 Subject: [PATCH 1/3] Make initial updates for using neo4j 5.x This updates the neo4j container management code and driver version. The gdb code has also been updated to work with the new driver version, which made breaking changes to how transaction functions are handled. No changes have been made to the cypher queries yet, although they seemed to work for the cat-grep-tar workflow. --- beeflow/common/gdb/neo4j_cypher.py | 60 ++++++++++++------------ beeflow/common/gdb/neo4j_driver.py | 50 +++++++++----------- beeflow/wf_manager/common/dep_manager.py | 11 +++-- pyproject.toml | 2 +- 4 files changed, 59 insertions(+), 64 deletions(-) diff --git a/beeflow/common/gdb/neo4j_cypher.py b/beeflow/common/gdb/neo4j_cypher.py index 8a48d0059..85c0614f8 100644 --- a/beeflow/common/gdb/neo4j_cypher.py +++ b/beeflow/common/gdb/neo4j_cypher.py @@ -239,11 +239,11 @@ def get_task_by_id(tx, task_id): :param task_id: the task's ID :type task_id: str - :rtype: BoltStatementResult + :rtype: neo4j.Result """ task_query = "MATCH (t:Task {id: $task_id}) RETURN t" - return tx.run(task_query, task_id=task_id).single() + return tx.run(task_query, task_id=task_id).single()['t'] def get_task_hints(tx, task_id): @@ -251,11 +251,11 @@ def get_task_hints(tx, task_id): :param task_id: the task's ID :type task_id: str - :rtype: BoltStatementResult + :rtype: neo4j.Result """ hints_query = "MATCH (:Task {id: $task_id})<-[:HINT_OF]-(h:Hint) RETURN h" - return tx.run(hints_query, task_id=task_id) + return [rec['h'] for rec in tx.run(hints_query, task_id=task_id)] def get_task_requirements(tx, task_id): @@ -263,11 +263,11 @@ def get_task_requirements(tx, task_id): :param task_id: the task's ID :type task_id: str - :rtype: BoltStatementResult + :rtype: neo4j.Result """ reqs_query = "MATCH (:Task {id: $task_id})<-[:REQUIREMENT_OF]-(r:Requirement) RETURN r" - return tx.run(reqs_query, task_id=task_id) + return [rec['r'] for rec in tx.run(reqs_query, task_id=task_id)] def get_task_inputs(tx, task_id): @@ -275,11 +275,11 @@ def get_task_inputs(tx, task_id): :param task_id: the task's ID :type task_id: str - :rtype: BoltStatementResult + :rtype: neo4j.Result """ inputs_query = "MATCH (:Task {id: $task_id})<-[:INPUT_OF]-(i:Input) RETURN i" - return tx.run(inputs_query, task_id=task_id) + return [rec['i'] for rec in tx.run(inputs_query, task_id=task_id)] def get_task_outputs(tx, task_id): @@ -287,71 +287,71 @@ def get_task_outputs(tx, task_id): :param task_id: the task's ID :type task_id: str - :rtype: BoltStatementResult + :rtype: neo4j.Result """ outputs_query = "MATCH (:Task {id: $task_id})<-[:OUTPUT_OF]-(o:Output) RETURN o" - return tx.run(outputs_query, task_id=task_id) + return [rec['o'] for rec in tx.run(outputs_query, task_id=task_id)] def get_workflow_description(tx): """Get the workflow description from the Neo4j database. - :rtype: BoltStatementResult + :rtype: neo4j.Result """ workflow_desc_query = "MATCH (w:Workflow) RETURN w" - return tx.run(workflow_desc_query).single() + return tx.run(workflow_desc_query).single()['w'] def get_workflow_tasks(tx): """Get workflow tasks from the Neo4j database. - :rtype: BoltStatementResult + :rtype: neo4j.Result """ workflow_query = "MATCH (t:Task) RETURN t" - return tx.run(workflow_query) + return [rec['t'] for rec in tx.run(workflow_query)] def get_workflow_requirements(tx): """Get workflow requirements from the Neo4j database. - :rtype: BoltStatementResult + :rtype: neo4j.Result """ requirements_query = "MATCH (:Workflow)<-[:REQUIREMENT_OF]-(r:Requirement) RETURN r" - return tx.run(requirements_query) + return [rec['r'] for rec in tx.run(requirements_query)] def get_workflow_hints(tx): """Get workflow hints from the Neo4j database. - :rtype: BoltStatementResult + :rtype: neo4j.Result """ hints_query = "MATCH (:Workflow)<-[:HINT_OF]-(h:Hint) RETURN h" - return tx.run(hints_query) + return [rec['h'] for rec in tx.run(hints_query)] def get_workflow_inputs(tx): """Get workflow inputs from the Neo4j database. - :rtype: BoltStatementResult + :rtype: neo4j.Result """ inputs_query = "MATCH (:Workflow)<-[:INPUT_OF]-(i:Input) RETURN i" - return tx.run(inputs_query) + return [rec['i'] for rec in tx.run(inputs_query)] def get_workflow_outputs(tx): """Get workflow outputs from the Neo4j database. - :rtype: BoltStatementResult + :rtype: neo4j.Result """ outputs_query = "MATCH (:Workflow)<-[:OUTPUT_OF]-(o:Output) RETURN o" - return tx.run(outputs_query) + return [rec['o'] for rec in tx.run(outputs_query)] def get_workflow_state(tx): @@ -378,11 +378,11 @@ def set_workflow_state(tx, state): def get_ready_tasks(tx): """Get all tasks that are ready to execute. - :rtype: BoltStatementResult + :rtype: neo4j.Result """ get_ready_query = "MATCH (:Metadata {state: 'READY'})-[:DESCRIBES]->(t:Task) RETURN t" - return tx.run(get_ready_query) + return [rec['t'] for rec in tx.run(get_ready_query)] def get_dependent_tasks(tx, task): @@ -390,11 +390,11 @@ def get_dependent_tasks(tx, task): :param task: the task whose dependencies to obtain :type task: Task - :rtype: BoltStatementResult + :rtype: neo4j.Result """ dependents_query = "MATCH (t:Task)-[:DEPENDS_ON]->(:Task {id: $task_id}) RETURN t" - return tx.run(dependents_query, task_id=task.id) + return [rec['t'] for rec in tx.run(dependents_query, task_id=task.id)] def get_task_state(tx, task): @@ -428,11 +428,11 @@ def get_task_metadata(tx, task): :param task: the task whose metadata to get :type task: Task - :rtype: BoltStatementResult + :rtype: neo4j.Result """ metadata_query = "MATCH (m:Metadata)-[:DESCRIBES]->(:Task {id: $task_id}) RETURN m" - return tx.run(metadata_query, task_id=task.id).single() + return dict(tx.run(metadata_query, task_id=task.id).single()['m']) def set_task_metadata(tx, task, metadata): @@ -466,7 +466,7 @@ def get_task_input(tx, task, input_id): input_query = ("MATCH (t:Task {id: $task_id})<-[:INPUT_OF]-(i:Input {id: $input_id}) " "RETURN i") - return tx.run(input_query, task_id=task.id, input_id=input_id).single() + return dict(tx.run(input_query, task_id=task.id, input_id=input_id).single()['i']) def set_task_input(tx, task, input_id, value): @@ -496,7 +496,7 @@ def get_task_output(tx, task, output_id): output_query = ("MATCH (:Task {id: $task_id})<-[:OUTPUT_OF]-(o:Output {id: $output_id}) " "RETURN o") - return tx.run(output_query, task_id=task.id, output_id=output_id).single() + return dict(tx.run(output_query, task_id=task.id, output_id=output_id).single()['o']) def set_task_output(tx, task, output_id, value): diff --git a/beeflow/common/gdb/neo4j_driver.py b/beeflow/common/gdb/neo4j_driver.py index 3b3222576..6e100aa96 100644 --- a/beeflow/common/gdb/neo4j_driver.py +++ b/beeflow/common/gdb/neo4j_driver.py @@ -6,7 +6,7 @@ """ from neo4j import GraphDatabase as Neo4jDatabase -from neobolt.exceptions import ServiceUnavailable +from neo4j.exceptions import ServiceUnavailable from beeflow.common.gdb.gdb_driver import GraphDatabaseDriver from beeflow.common.gdb import neo4j_cypher as tx @@ -307,7 +307,7 @@ def get_task_input(self, task, input_id): :rtype: StepInput """ input_record = self._read_transaction(tx.get_task_input, task=task, input_id=input_id) - return _reconstruct_task_input(input_record["i"]) + return _reconstruct_task_input(input_record) def set_task_input(self, task, input_id, value): """Set the value of a task input. @@ -330,7 +330,7 @@ def get_task_output(self, task, output_id): :rtype: StepOutput """ output_record = self._read_transaction(tx.get_task_output, task=task, output_id=output_id) - return _reconstruct_task_output(output_record["o"]) + return _reconstruct_task_output(output_record) def set_task_output(self, task, output_id, value): """Set the value of a task output. @@ -401,13 +401,13 @@ def _get_task_data_tuples(self, task_records): with self._driver.session() as session: trecords = list(task_records) hint_records = [session.read_transaction(tx.get_task_hints, - task_id=rec["t"]["id"]) for rec in trecords] + task_id=rec["id"]) for rec in trecords] req_records = [session.read_transaction(tx.get_task_requirements, - task_id=rec["t"]["id"]) for rec in trecords] + task_id=rec["id"]) for rec in trecords] input_records = [session.read_transaction(tx.get_task_inputs, - task_id=rec["t"]["id"]) for rec in trecords] + task_id=rec["id"]) for rec in trecords] output_records = [session.read_transaction(tx.get_task_outputs, - task_id=rec["t"]["id"]) for rec in trecords] + task_id=rec["id"]) for rec in trecords] hints = [_reconstruct_hints(hint_record) for hint_record in hint_records] reqs = [_reconstruct_requirements(req_record) for req_record in req_records] @@ -447,9 +447,8 @@ def _reconstruct_requirements(req_records): :type req_records: BoltStatementResult :rtype: list of Requirement """ - recs = [req_record["r"] for req_record in req_records] return [Requirement(rec["class"], {k: v for k, v in rec.items() if k != "class"}) - for rec in recs] + for rec in req_records] def _reconstruct_hints(hint_records): @@ -459,8 +458,8 @@ def _reconstruct_hints(hint_records): :type hint_records: BoltStatementResult :rtype: list of Hint """ - recs = [hint_record["h"] for hint_record in hint_records] - return [Hint(rec["class"], {k: v for k, v in rec.items() if k != "class"}) for rec in recs] + return [Hint(rec["class"], {k: v for k, v in rec.items() if k != "class"}) + for rec in hint_records] def _reconstruct_workflow_inputs(input_records): @@ -470,8 +469,7 @@ def _reconstruct_workflow_inputs(input_records): :type input_records: BoltStatementResult :rtype: list of InputParameter """ - recs = [input_record["i"] for input_record in input_records] - return [InputParameter(rec["id"], rec["type"], rec["value"]) for rec in recs] + return [InputParameter(rec["id"], rec["type"], rec["value"]) for rec in input_records] def _reconstruct_workflow_outputs(output_records): @@ -481,8 +479,8 @@ def _reconstruct_workflow_outputs(output_records): :type output_records: BoltStatementResult :rtype: list of OutputParameter """ - recs = [output_record["o"] for output_record in output_records] - return [OutputParameter(rec["id"], rec["type"], rec["value"], rec["source"]) for rec in recs] + return [OutputParameter(rec["id"], rec["type"], rec["value"], rec["source"]) + for rec in output_records] def _reconstruct_task_inputs(input_records): @@ -492,8 +490,7 @@ def _reconstruct_task_inputs(input_records): :type input_records: BoltStatementResult :rtype: list of StepInput """ - recs = [input_record["i"] for input_record in input_records] - return [_reconstruct_task_input(rec) for rec in recs] + return [_reconstruct_task_input(rec) for rec in input_records] def _reconstruct_task_input(rec): @@ -514,8 +511,7 @@ def _reconstruct_task_outputs(output_records): :type output_records: BoltStatementResult :rtype: list of StepOutput """ - recs = [output_record["o"] for output_record in output_records] - return [_reconstruct_task_output(rec) for rec in recs] + return [_reconstruct_task_output(rec) for rec in output_records] def _reconstruct_task_output(rec): @@ -543,9 +539,8 @@ def _reconstruct_workflow(workflow_record, hints, requirements, inputs, outputs) :type outputs: list of OutputParameter :rtype: Workflow """ - rec = workflow_record["w"] - return Workflow(name=rec["name"], hints=hints, requirements=requirements, inputs=inputs, - outputs=outputs, workflow_id=rec["id"]) + return Workflow(name=workflow_record["name"], hints=hints, requirements=requirements, + inputs=inputs, outputs=outputs, workflow_id=workflow_record["id"]) def _reconstruct_task(task_record, hints, requirements, inputs, outputs): @@ -563,10 +558,10 @@ def _reconstruct_task(task_record, hints, requirements, inputs, outputs): :type outputs: list of StepOutput :rtype: Task """ - rec = task_record["t"] - return Task(name=rec["name"], base_command=rec["base_command"], hints=hints, - requirements=requirements, inputs=inputs, outputs=outputs, stdout=rec["stdout"], - stderr=rec["stderr"], workflow_id=rec["workflow_id"], task_id=rec["id"]) + return Task(name=task_record["name"], base_command=task_record["base_command"], + hints=hints, requirements=requirements, inputs=inputs, outputs=outputs, + stdout=task_record["stdout"], stderr=task_record["stderr"], + workflow_id=task_record["workflow_id"], task_id=task_record["id"]) def _reconstruct_metadata(metadata_record): @@ -578,8 +573,7 @@ def _reconstruct_metadata(metadata_record): :type keys: iterable of str :rtype: dict """ - rec = metadata_record["m"] - return {key: val for key, val in rec.items() if key != "state"} + return {key: val for key, val in metadata_record.items() if key != "state"} # Ignore E1129: External module is missing proper resource context manager methods. # pylama:ignore=E1129 diff --git a/beeflow/wf_manager/common/dep_manager.py b/beeflow/wf_manager/common/dep_manager.py index c9dbbfc43..34a369c84 100755 --- a/beeflow/wf_manager/common/dep_manager.py +++ b/beeflow/wf_manager/common/dep_manager.py @@ -95,11 +95,11 @@ def setup_gdb_configs(mount_dir, bolt_port, http_port, https_port): with open(gdb_configfile, "rt", encoding="utf8") as cfile: data = cfile.read() - bolt_config = r'#(dbms.connector.bolt.listen_address=):[0-9]*' + bolt_config = r'#(server.bolt.listen_address=):[0-9]*' data = re.sub(bolt_config, rf'\1:{bolt_port}', data) - http_config = r'#(dbms.connector.http.listen_address=):[0-9]*' + http_config = r'#(server.http.listen_address=):[0-9]*' data = re.sub(http_config, rf'\1:{http_port}', data) - https_config = r'#(dbms.connector.https.listen_address=):[0-9]*' + https_config = r'#(server.https.listen_address=):[0-9]*' data = re.sub(https_config, rf'\1:{https_port}', data) with open(gdb_configfile, "wt", encoding="utf8") as cfile: cfile.write(data) @@ -155,7 +155,7 @@ def start_gdb(mount_dir, bolt_port, http_port, https_port, reexecute=False): container_path = get_container_dir() if not reexecute: try: - command = ['neo4j-admin', 'set-initial-password', str(db_password)] + command = ['neo4j-admin', 'dbms', 'set-initial-password', str(db_password)] subprocess.run([ "ch-run", "--set-env=" + container_path + "/ch/environment", @@ -187,7 +187,8 @@ def start_gdb(mount_dir, bolt_port, http_port, https_port, reexecute=False): "--", *command ], stdout=subprocess.PIPE, stderr=subprocess.PIPE) as proc: output = proc.stdout.read().decode('utf-8') - pid = re.search(r'pid ([0-9]*)', output).group(1) + dep_log.info(output) + pid = re.search(r'\(pid:([0-9]*)\)', output).group(1) return pid except FileNotFoundError: dep_log.error("Neo4j failed to start.") diff --git a/pyproject.toml b/pyproject.toml index 9dc9b6935..018b157b7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,7 +53,7 @@ python = ">=3.8.3,<=3.12.2" # Package dependencies Flask = { version = "^2.0" } -neo4j = { version = "^1.7.4" } +neo4j = { version = "^5" } PyYAML = { version = "^6.0.1" } flask_restful = "0.3.9" cwl-utils = "^0.16" From 7a9ec6005d562b9b2c220a537eac71113fe8d937 Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Wed, 10 Apr 2024 15:51:17 -0600 Subject: [PATCH 2/3] Update CI version of neo4j to 5.17 --- ci/bee_install.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ci/bee_install.sh b/ci/bee_install.sh index 175a25b58..c044e0b78 100755 --- a/ci/bee_install.sh +++ b/ci/bee_install.sh @@ -9,8 +9,8 @@ printf "**Setting up BEE containers**\n" printf "\n\n" mkdir -p $HOME/img # Pull the Neo4j container -ch-image pull neo4j:3.5.22 || exit 1 -ch-convert -i ch-image -o tar neo4j:3.5.22 $NEO4J_CONTAINER || exit 1 +ch-image pull neo4j:5.17 || exit 1 +ch-convert -i ch-image -o tar neo4j:5.17 $NEO4J_CONTAINER || exit 1 # Pull the Redis container ch-image pull redis || exit 1 ch-convert -i ch-image -o tar redis $REDIS_CONTAINER || exit 1 From 5054a936a9bda336c1e971e69a60aa5d63691f87 Mon Sep 17 00:00:00 2001 From: Patricia Grubel Date: Tue, 16 Apr 2024 16:21:41 -0600 Subject: [PATCH 3/3] Fix documentation and beeflow core pull to use updated neo4j version --- beeflow/client/core.py | 2 +- docs/sphinx/installation.rst | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/beeflow/client/core.py b/beeflow/client/core.py index 74fb71c51..7b91e0554 100644 --- a/beeflow/client/core.py +++ b/beeflow/client/core.py @@ -590,7 +590,7 @@ def pull_deps(outdir: str = typer.Option('.', '--outdir', '-o', """Pull required BEE containers and store in outdir.""" load_check_charliecloud() neo4j_path = os.path.join(os.path.realpath(outdir), 'neo4j.tar.gz') - pull_to_tar('neo4j:3.5.22', neo4j_path) + pull_to_tar('neo4j:5.17', neo4j_path) redis_path = os.path.join(os.path.realpath(outdir), 'redis.tar.gz') pull_to_tar('redis', redis_path) print() diff --git a/docs/sphinx/installation.rst b/docs/sphinx/installation.rst index 3bad41f97..6325e1115 100644 --- a/docs/sphinx/installation.rst +++ b/docs/sphinx/installation.rst @@ -15,9 +15,9 @@ Requirements: * **Containers**: - Two Charliecloud dependency containers are currently required for BEE: one for the Neo4j graph database and another for Redis. The paths to these containers will need to be set in the BEE configuration later, using the ``neo4j_image`` and the ``redis_image`` options respectively. BEE only supports Neo4j 3.5.x. We are currently using the latest version of Redis supplied on Docker Hub (as of 2023). + Two Charliecloud dependency containers are currently required for BEE: one for the Neo4j graph database and another for Redis. The paths to these containers will need to be set in the BEE configuration later, using the ``neo4j_image`` and the ``redis_image`` options respectively. BEE only supports Neo4j 5.x. We are currently using the latest version of Redis supplied on Docker Hub (as of 2023). - For LANL systems, please use the containers supplied by the BEE team: **/usr/projects/BEE/neo4j-3-5-17-ch.tar.gz**, **/usr/projects/BEE/redis.tar.gz**. + For LANL systems, please use the containers supplied by the BEE team: **/usr/projects/BEE/neo4j-5-17.tar.gz**, **/usr/projects/BEE/redis.tar.gz**. For other users, these containers can be pulled from Docker Hub (after following `Installation:`_ below) using ``beeflow core pull-deps``, which will download and report the container paths to be set in the config later. @@ -83,9 +83,10 @@ To check the status of the bee components run: beeflow core status .. code-block:: - beeflow components: + redis ... RUNNING scheduler ... RUNNING + celery ... RUNNING slurmrestd ... RUNNING wf_manager ... RUNNING task_manager ... RUNNING