From f649b4c232c4440205ef00debe60b5512359ebd8 Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Mon, 18 Mar 2024 13:27:29 -0600 Subject: [PATCH] Make initial updates for using neo4j 5.x This updates the neo4j container management code and driver version. The gdb code has also been updated to work with the new driver version, which made breaking changes to how transaction functions are handled. No changes have been made to the cypher queries yet, although they seemed to work for the cat-grep-tar workflow. --- beeflow/common/gdb/neo4j_cypher.py | 60 ++++++++++++------------ beeflow/common/gdb/neo4j_driver.py | 50 +++++++++----------- beeflow/wf_manager/common/dep_manager.py | 11 +++-- pyproject.toml | 2 +- 4 files changed, 59 insertions(+), 64 deletions(-) diff --git a/beeflow/common/gdb/neo4j_cypher.py b/beeflow/common/gdb/neo4j_cypher.py index 8a48d0059..85c0614f8 100644 --- a/beeflow/common/gdb/neo4j_cypher.py +++ b/beeflow/common/gdb/neo4j_cypher.py @@ -239,11 +239,11 @@ def get_task_by_id(tx, task_id): :param task_id: the task's ID :type task_id: str - :rtype: BoltStatementResult + :rtype: neo4j.Result """ task_query = "MATCH (t:Task {id: $task_id}) RETURN t" - return tx.run(task_query, task_id=task_id).single() + return tx.run(task_query, task_id=task_id).single()['t'] def get_task_hints(tx, task_id): @@ -251,11 +251,11 @@ def get_task_hints(tx, task_id): :param task_id: the task's ID :type task_id: str - :rtype: BoltStatementResult + :rtype: neo4j.Result """ hints_query = "MATCH (:Task {id: $task_id})<-[:HINT_OF]-(h:Hint) RETURN h" - return tx.run(hints_query, task_id=task_id) + return [rec['h'] for rec in tx.run(hints_query, task_id=task_id)] def get_task_requirements(tx, task_id): @@ -263,11 +263,11 @@ def get_task_requirements(tx, task_id): :param task_id: the task's ID :type task_id: str - :rtype: BoltStatementResult + :rtype: neo4j.Result """ reqs_query = "MATCH (:Task {id: $task_id})<-[:REQUIREMENT_OF]-(r:Requirement) RETURN r" - return tx.run(reqs_query, task_id=task_id) + return [rec['r'] for rec in tx.run(reqs_query, task_id=task_id)] def get_task_inputs(tx, task_id): @@ -275,11 +275,11 @@ def get_task_inputs(tx, task_id): :param task_id: the task's ID :type task_id: str - :rtype: BoltStatementResult + :rtype: neo4j.Result """ inputs_query = "MATCH (:Task {id: $task_id})<-[:INPUT_OF]-(i:Input) RETURN i" - return tx.run(inputs_query, task_id=task_id) + return [rec['i'] for rec in tx.run(inputs_query, task_id=task_id)] def get_task_outputs(tx, task_id): @@ -287,71 +287,71 @@ def get_task_outputs(tx, task_id): :param task_id: the task's ID :type task_id: str - :rtype: BoltStatementResult + :rtype: neo4j.Result """ outputs_query = "MATCH (:Task {id: $task_id})<-[:OUTPUT_OF]-(o:Output) RETURN o" - return tx.run(outputs_query, task_id=task_id) + return [rec['o'] for rec in tx.run(outputs_query, task_id=task_id)] def get_workflow_description(tx): """Get the workflow description from the Neo4j database. - :rtype: BoltStatementResult + :rtype: neo4j.Result """ workflow_desc_query = "MATCH (w:Workflow) RETURN w" - return tx.run(workflow_desc_query).single() + return tx.run(workflow_desc_query).single()['w'] def get_workflow_tasks(tx): """Get workflow tasks from the Neo4j database. - :rtype: BoltStatementResult + :rtype: neo4j.Result """ workflow_query = "MATCH (t:Task) RETURN t" - return tx.run(workflow_query) + return [rec['t'] for rec in tx.run(workflow_query)] def get_workflow_requirements(tx): """Get workflow requirements from the Neo4j database. - :rtype: BoltStatementResult + :rtype: neo4j.Result """ requirements_query = "MATCH (:Workflow)<-[:REQUIREMENT_OF]-(r:Requirement) RETURN r" - return tx.run(requirements_query) + return [rec['r'] for rec in tx.run(requirements_query)] def get_workflow_hints(tx): """Get workflow hints from the Neo4j database. - :rtype: BoltStatementResult + :rtype: neo4j.Result """ hints_query = "MATCH (:Workflow)<-[:HINT_OF]-(h:Hint) RETURN h" - return tx.run(hints_query) + return [rec['h'] for rec in tx.run(hints_query)] def get_workflow_inputs(tx): """Get workflow inputs from the Neo4j database. - :rtype: BoltStatementResult + :rtype: neo4j.Result """ inputs_query = "MATCH (:Workflow)<-[:INPUT_OF]-(i:Input) RETURN i" - return tx.run(inputs_query) + return [rec['i'] for rec in tx.run(inputs_query)] def get_workflow_outputs(tx): """Get workflow outputs from the Neo4j database. - :rtype: BoltStatementResult + :rtype: neo4j.Result """ outputs_query = "MATCH (:Workflow)<-[:OUTPUT_OF]-(o:Output) RETURN o" - return tx.run(outputs_query) + return [rec['o'] for rec in tx.run(outputs_query)] def get_workflow_state(tx): @@ -378,11 +378,11 @@ def set_workflow_state(tx, state): def get_ready_tasks(tx): """Get all tasks that are ready to execute. - :rtype: BoltStatementResult + :rtype: neo4j.Result """ get_ready_query = "MATCH (:Metadata {state: 'READY'})-[:DESCRIBES]->(t:Task) RETURN t" - return tx.run(get_ready_query) + return [rec['t'] for rec in tx.run(get_ready_query)] def get_dependent_tasks(tx, task): @@ -390,11 +390,11 @@ def get_dependent_tasks(tx, task): :param task: the task whose dependencies to obtain :type task: Task - :rtype: BoltStatementResult + :rtype: neo4j.Result """ dependents_query = "MATCH (t:Task)-[:DEPENDS_ON]->(:Task {id: $task_id}) RETURN t" - return tx.run(dependents_query, task_id=task.id) + return [rec['t'] for rec in tx.run(dependents_query, task_id=task.id)] def get_task_state(tx, task): @@ -428,11 +428,11 @@ def get_task_metadata(tx, task): :param task: the task whose metadata to get :type task: Task - :rtype: BoltStatementResult + :rtype: neo4j.Result """ metadata_query = "MATCH (m:Metadata)-[:DESCRIBES]->(:Task {id: $task_id}) RETURN m" - return tx.run(metadata_query, task_id=task.id).single() + return dict(tx.run(metadata_query, task_id=task.id).single()['m']) def set_task_metadata(tx, task, metadata): @@ -466,7 +466,7 @@ def get_task_input(tx, task, input_id): input_query = ("MATCH (t:Task {id: $task_id})<-[:INPUT_OF]-(i:Input {id: $input_id}) " "RETURN i") - return tx.run(input_query, task_id=task.id, input_id=input_id).single() + return dict(tx.run(input_query, task_id=task.id, input_id=input_id).single()['i']) def set_task_input(tx, task, input_id, value): @@ -496,7 +496,7 @@ def get_task_output(tx, task, output_id): output_query = ("MATCH (:Task {id: $task_id})<-[:OUTPUT_OF]-(o:Output {id: $output_id}) " "RETURN o") - return tx.run(output_query, task_id=task.id, output_id=output_id).single() + return dict(tx.run(output_query, task_id=task.id, output_id=output_id).single()['o']) def set_task_output(tx, task, output_id, value): diff --git a/beeflow/common/gdb/neo4j_driver.py b/beeflow/common/gdb/neo4j_driver.py index 3b3222576..6e100aa96 100644 --- a/beeflow/common/gdb/neo4j_driver.py +++ b/beeflow/common/gdb/neo4j_driver.py @@ -6,7 +6,7 @@ """ from neo4j import GraphDatabase as Neo4jDatabase -from neobolt.exceptions import ServiceUnavailable +from neo4j.exceptions import ServiceUnavailable from beeflow.common.gdb.gdb_driver import GraphDatabaseDriver from beeflow.common.gdb import neo4j_cypher as tx @@ -307,7 +307,7 @@ def get_task_input(self, task, input_id): :rtype: StepInput """ input_record = self._read_transaction(tx.get_task_input, task=task, input_id=input_id) - return _reconstruct_task_input(input_record["i"]) + return _reconstruct_task_input(input_record) def set_task_input(self, task, input_id, value): """Set the value of a task input. @@ -330,7 +330,7 @@ def get_task_output(self, task, output_id): :rtype: StepOutput """ output_record = self._read_transaction(tx.get_task_output, task=task, output_id=output_id) - return _reconstruct_task_output(output_record["o"]) + return _reconstruct_task_output(output_record) def set_task_output(self, task, output_id, value): """Set the value of a task output. @@ -401,13 +401,13 @@ def _get_task_data_tuples(self, task_records): with self._driver.session() as session: trecords = list(task_records) hint_records = [session.read_transaction(tx.get_task_hints, - task_id=rec["t"]["id"]) for rec in trecords] + task_id=rec["id"]) for rec in trecords] req_records = [session.read_transaction(tx.get_task_requirements, - task_id=rec["t"]["id"]) for rec in trecords] + task_id=rec["id"]) for rec in trecords] input_records = [session.read_transaction(tx.get_task_inputs, - task_id=rec["t"]["id"]) for rec in trecords] + task_id=rec["id"]) for rec in trecords] output_records = [session.read_transaction(tx.get_task_outputs, - task_id=rec["t"]["id"]) for rec in trecords] + task_id=rec["id"]) for rec in trecords] hints = [_reconstruct_hints(hint_record) for hint_record in hint_records] reqs = [_reconstruct_requirements(req_record) for req_record in req_records] @@ -447,9 +447,8 @@ def _reconstruct_requirements(req_records): :type req_records: BoltStatementResult :rtype: list of Requirement """ - recs = [req_record["r"] for req_record in req_records] return [Requirement(rec["class"], {k: v for k, v in rec.items() if k != "class"}) - for rec in recs] + for rec in req_records] def _reconstruct_hints(hint_records): @@ -459,8 +458,8 @@ def _reconstruct_hints(hint_records): :type hint_records: BoltStatementResult :rtype: list of Hint """ - recs = [hint_record["h"] for hint_record in hint_records] - return [Hint(rec["class"], {k: v for k, v in rec.items() if k != "class"}) for rec in recs] + return [Hint(rec["class"], {k: v for k, v in rec.items() if k != "class"}) + for rec in hint_records] def _reconstruct_workflow_inputs(input_records): @@ -470,8 +469,7 @@ def _reconstruct_workflow_inputs(input_records): :type input_records: BoltStatementResult :rtype: list of InputParameter """ - recs = [input_record["i"] for input_record in input_records] - return [InputParameter(rec["id"], rec["type"], rec["value"]) for rec in recs] + return [InputParameter(rec["id"], rec["type"], rec["value"]) for rec in input_records] def _reconstruct_workflow_outputs(output_records): @@ -481,8 +479,8 @@ def _reconstruct_workflow_outputs(output_records): :type output_records: BoltStatementResult :rtype: list of OutputParameter """ - recs = [output_record["o"] for output_record in output_records] - return [OutputParameter(rec["id"], rec["type"], rec["value"], rec["source"]) for rec in recs] + return [OutputParameter(rec["id"], rec["type"], rec["value"], rec["source"]) + for rec in output_records] def _reconstruct_task_inputs(input_records): @@ -492,8 +490,7 @@ def _reconstruct_task_inputs(input_records): :type input_records: BoltStatementResult :rtype: list of StepInput """ - recs = [input_record["i"] for input_record in input_records] - return [_reconstruct_task_input(rec) for rec in recs] + return [_reconstruct_task_input(rec) for rec in input_records] def _reconstruct_task_input(rec): @@ -514,8 +511,7 @@ def _reconstruct_task_outputs(output_records): :type output_records: BoltStatementResult :rtype: list of StepOutput """ - recs = [output_record["o"] for output_record in output_records] - return [_reconstruct_task_output(rec) for rec in recs] + return [_reconstruct_task_output(rec) for rec in output_records] def _reconstruct_task_output(rec): @@ -543,9 +539,8 @@ def _reconstruct_workflow(workflow_record, hints, requirements, inputs, outputs) :type outputs: list of OutputParameter :rtype: Workflow """ - rec = workflow_record["w"] - return Workflow(name=rec["name"], hints=hints, requirements=requirements, inputs=inputs, - outputs=outputs, workflow_id=rec["id"]) + return Workflow(name=workflow_record["name"], hints=hints, requirements=requirements, + inputs=inputs, outputs=outputs, workflow_id=workflow_record["id"]) def _reconstruct_task(task_record, hints, requirements, inputs, outputs): @@ -563,10 +558,10 @@ def _reconstruct_task(task_record, hints, requirements, inputs, outputs): :type outputs: list of StepOutput :rtype: Task """ - rec = task_record["t"] - return Task(name=rec["name"], base_command=rec["base_command"], hints=hints, - requirements=requirements, inputs=inputs, outputs=outputs, stdout=rec["stdout"], - stderr=rec["stderr"], workflow_id=rec["workflow_id"], task_id=rec["id"]) + return Task(name=task_record["name"], base_command=task_record["base_command"], + hints=hints, requirements=requirements, inputs=inputs, outputs=outputs, + stdout=task_record["stdout"], stderr=task_record["stderr"], + workflow_id=task_record["workflow_id"], task_id=task_record["id"]) def _reconstruct_metadata(metadata_record): @@ -578,8 +573,7 @@ def _reconstruct_metadata(metadata_record): :type keys: iterable of str :rtype: dict """ - rec = metadata_record["m"] - return {key: val for key, val in rec.items() if key != "state"} + return {key: val for key, val in metadata_record.items() if key != "state"} # Ignore E1129: External module is missing proper resource context manager methods. # pylama:ignore=E1129 diff --git a/beeflow/wf_manager/common/dep_manager.py b/beeflow/wf_manager/common/dep_manager.py index c9dbbfc43..34a369c84 100755 --- a/beeflow/wf_manager/common/dep_manager.py +++ b/beeflow/wf_manager/common/dep_manager.py @@ -95,11 +95,11 @@ def setup_gdb_configs(mount_dir, bolt_port, http_port, https_port): with open(gdb_configfile, "rt", encoding="utf8") as cfile: data = cfile.read() - bolt_config = r'#(dbms.connector.bolt.listen_address=):[0-9]*' + bolt_config = r'#(server.bolt.listen_address=):[0-9]*' data = re.sub(bolt_config, rf'\1:{bolt_port}', data) - http_config = r'#(dbms.connector.http.listen_address=):[0-9]*' + http_config = r'#(server.http.listen_address=):[0-9]*' data = re.sub(http_config, rf'\1:{http_port}', data) - https_config = r'#(dbms.connector.https.listen_address=):[0-9]*' + https_config = r'#(server.https.listen_address=):[0-9]*' data = re.sub(https_config, rf'\1:{https_port}', data) with open(gdb_configfile, "wt", encoding="utf8") as cfile: cfile.write(data) @@ -155,7 +155,7 @@ def start_gdb(mount_dir, bolt_port, http_port, https_port, reexecute=False): container_path = get_container_dir() if not reexecute: try: - command = ['neo4j-admin', 'set-initial-password', str(db_password)] + command = ['neo4j-admin', 'dbms', 'set-initial-password', str(db_password)] subprocess.run([ "ch-run", "--set-env=" + container_path + "/ch/environment", @@ -187,7 +187,8 @@ def start_gdb(mount_dir, bolt_port, http_port, https_port, reexecute=False): "--", *command ], stdout=subprocess.PIPE, stderr=subprocess.PIPE) as proc: output = proc.stdout.read().decode('utf-8') - pid = re.search(r'pid ([0-9]*)', output).group(1) + dep_log.info(output) + pid = re.search(r'\(pid:([0-9]*)\)', output).group(1) return pid except FileNotFoundError: dep_log.error("Neo4j failed to start.") diff --git a/pyproject.toml b/pyproject.toml index b18e0bbfa..063a7a5d3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,7 +53,7 @@ python = ">=3.8.3,<=3.12.2" # Package dependencies Flask = { version = "^2.0" } -neo4j = { version = "^1.7.4" } +neo4j = { version = "^5" } PyYAML = { version = "^6.0.1" } flask_restful = "0.3.9" cwl-utils = "^0.16"