Skip to content

Commit

Permalink
Make initial updates for using neo4j 5.x
Browse files Browse the repository at this point in the history
This updates the neo4j container management code and driver version.
The gdb code has also been updated to work with the new driver version,
which made breaking changes to how transaction functions are handled.

No changes have been made to the cypher queries yet, although they
seemed to work for the cat-grep-tar workflow.
  • Loading branch information
jtronge committed Apr 2, 2024
1 parent 5acf885 commit f649b4c
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 64 deletions.
60 changes: 30 additions & 30 deletions beeflow/common/gdb/neo4j_cypher.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,119 +239,119 @@ def get_task_by_id(tx, task_id):
:param task_id: the task's ID
:type task_id: str
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
task_query = "MATCH (t:Task {id: $task_id}) RETURN t"

return tx.run(task_query, task_id=task_id).single()
return tx.run(task_query, task_id=task_id).single()['t']


def get_task_hints(tx, task_id):
"""Get task hints from the Neo4j database by the task's ID.
:param task_id: the task's ID
:type task_id: str
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
hints_query = "MATCH (:Task {id: $task_id})<-[:HINT_OF]-(h:Hint) RETURN h"

return tx.run(hints_query, task_id=task_id)
return [rec['h'] for rec in tx.run(hints_query, task_id=task_id)]


def get_task_requirements(tx, task_id):
"""Get task requirements from the Neo4j database by the task's ID.
:param task_id: the task's ID
:type task_id: str
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
reqs_query = "MATCH (:Task {id: $task_id})<-[:REQUIREMENT_OF]-(r:Requirement) RETURN r"

return tx.run(reqs_query, task_id=task_id)
return [rec['r'] for rec in tx.run(reqs_query, task_id=task_id)]


def get_task_inputs(tx, task_id):
"""Get task inputs from the Neo4j database by the task's ID.
:param task_id: the task's ID
:type task_id: str
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
inputs_query = "MATCH (:Task {id: $task_id})<-[:INPUT_OF]-(i:Input) RETURN i"

return tx.run(inputs_query, task_id=task_id)
return [rec['i'] for rec in tx.run(inputs_query, task_id=task_id)]


def get_task_outputs(tx, task_id):
"""Get task outputs from the Neo4j database by the task's ID.
:param task_id: the task's ID
:type task_id: str
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
outputs_query = "MATCH (:Task {id: $task_id})<-[:OUTPUT_OF]-(o:Output) RETURN o"

return tx.run(outputs_query, task_id=task_id)
return [rec['o'] for rec in tx.run(outputs_query, task_id=task_id)]


def get_workflow_description(tx):
"""Get the workflow description from the Neo4j database.
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
workflow_desc_query = "MATCH (w:Workflow) RETURN w"

return tx.run(workflow_desc_query).single()
return tx.run(workflow_desc_query).single()['w']


def get_workflow_tasks(tx):
"""Get workflow tasks from the Neo4j database.
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
workflow_query = "MATCH (t:Task) RETURN t"

return tx.run(workflow_query)
return [rec['t'] for rec in tx.run(workflow_query)]


def get_workflow_requirements(tx):
"""Get workflow requirements from the Neo4j database.
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
requirements_query = "MATCH (:Workflow)<-[:REQUIREMENT_OF]-(r:Requirement) RETURN r"

return tx.run(requirements_query)
return [rec['r'] for rec in tx.run(requirements_query)]


def get_workflow_hints(tx):
"""Get workflow hints from the Neo4j database.
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
hints_query = "MATCH (:Workflow)<-[:HINT_OF]-(h:Hint) RETURN h"

return tx.run(hints_query)
return [rec['h'] for rec in tx.run(hints_query)]


def get_workflow_inputs(tx):
"""Get workflow inputs from the Neo4j database.
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
inputs_query = "MATCH (:Workflow)<-[:INPUT_OF]-(i:Input) RETURN i"

return tx.run(inputs_query)
return [rec['i'] for rec in tx.run(inputs_query)]


def get_workflow_outputs(tx):
"""Get workflow outputs from the Neo4j database.
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
outputs_query = "MATCH (:Workflow)<-[:OUTPUT_OF]-(o:Output) RETURN o"

return tx.run(outputs_query)
return [rec['o'] for rec in tx.run(outputs_query)]


def get_workflow_state(tx):
Expand All @@ -378,23 +378,23 @@ def set_workflow_state(tx, state):
def get_ready_tasks(tx):
"""Get all tasks that are ready to execute.
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
get_ready_query = "MATCH (:Metadata {state: 'READY'})-[:DESCRIBES]->(t:Task) RETURN t"

return tx.run(get_ready_query)
return [rec['t'] for rec in tx.run(get_ready_query)]


def get_dependent_tasks(tx, task):
"""Get the tasks that depend on a specified task.
:param task: the task whose dependencies to obtain
:type task: Task
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
dependents_query = "MATCH (t:Task)-[:DEPENDS_ON]->(:Task {id: $task_id}) RETURN t"

return tx.run(dependents_query, task_id=task.id)
return [rec['t'] for rec in tx.run(dependents_query, task_id=task.id)]


def get_task_state(tx, task):
Expand Down Expand Up @@ -428,11 +428,11 @@ def get_task_metadata(tx, task):
:param task: the task whose metadata to get
:type task: Task
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
metadata_query = "MATCH (m:Metadata)-[:DESCRIBES]->(:Task {id: $task_id}) RETURN m"

return tx.run(metadata_query, task_id=task.id).single()
return dict(tx.run(metadata_query, task_id=task.id).single()['m'])


def set_task_metadata(tx, task, metadata):
Expand Down Expand Up @@ -466,7 +466,7 @@ def get_task_input(tx, task, input_id):
input_query = ("MATCH (t:Task {id: $task_id})<-[:INPUT_OF]-(i:Input {id: $input_id}) "
"RETURN i")

return tx.run(input_query, task_id=task.id, input_id=input_id).single()
return dict(tx.run(input_query, task_id=task.id, input_id=input_id).single()['i'])


def set_task_input(tx, task, input_id, value):
Expand Down Expand Up @@ -496,7 +496,7 @@ def get_task_output(tx, task, output_id):
output_query = ("MATCH (:Task {id: $task_id})<-[:OUTPUT_OF]-(o:Output {id: $output_id}) "
"RETURN o")

return tx.run(output_query, task_id=task.id, output_id=output_id).single()
return dict(tx.run(output_query, task_id=task.id, output_id=output_id).single()['o'])


def set_task_output(tx, task, output_id, value):
Expand Down
50 changes: 22 additions & 28 deletions beeflow/common/gdb/neo4j_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"""

from neo4j import GraphDatabase as Neo4jDatabase
from neobolt.exceptions import ServiceUnavailable
from neo4j.exceptions import ServiceUnavailable

from beeflow.common.gdb.gdb_driver import GraphDatabaseDriver
from beeflow.common.gdb import neo4j_cypher as tx
Expand Down Expand Up @@ -307,7 +307,7 @@ def get_task_input(self, task, input_id):
:rtype: StepInput
"""
input_record = self._read_transaction(tx.get_task_input, task=task, input_id=input_id)
return _reconstruct_task_input(input_record["i"])
return _reconstruct_task_input(input_record)

def set_task_input(self, task, input_id, value):
"""Set the value of a task input.
Expand All @@ -330,7 +330,7 @@ def get_task_output(self, task, output_id):
:rtype: StepOutput
"""
output_record = self._read_transaction(tx.get_task_output, task=task, output_id=output_id)
return _reconstruct_task_output(output_record["o"])
return _reconstruct_task_output(output_record)

def set_task_output(self, task, output_id, value):
"""Set the value of a task output.
Expand Down Expand Up @@ -401,13 +401,13 @@ def _get_task_data_tuples(self, task_records):
with self._driver.session() as session:
trecords = list(task_records)
hint_records = [session.read_transaction(tx.get_task_hints,
task_id=rec["t"]["id"]) for rec in trecords]
task_id=rec["id"]) for rec in trecords]
req_records = [session.read_transaction(tx.get_task_requirements,
task_id=rec["t"]["id"]) for rec in trecords]
task_id=rec["id"]) for rec in trecords]
input_records = [session.read_transaction(tx.get_task_inputs,
task_id=rec["t"]["id"]) for rec in trecords]
task_id=rec["id"]) for rec in trecords]
output_records = [session.read_transaction(tx.get_task_outputs,
task_id=rec["t"]["id"]) for rec in trecords]
task_id=rec["id"]) for rec in trecords]

hints = [_reconstruct_hints(hint_record) for hint_record in hint_records]
reqs = [_reconstruct_requirements(req_record) for req_record in req_records]
Expand Down Expand Up @@ -447,9 +447,8 @@ def _reconstruct_requirements(req_records):
:type req_records: BoltStatementResult
:rtype: list of Requirement
"""
recs = [req_record["r"] for req_record in req_records]
return [Requirement(rec["class"], {k: v for k, v in rec.items() if k != "class"})
for rec in recs]
for rec in req_records]


def _reconstruct_hints(hint_records):
Expand All @@ -459,8 +458,8 @@ def _reconstruct_hints(hint_records):
:type hint_records: BoltStatementResult
:rtype: list of Hint
"""
recs = [hint_record["h"] for hint_record in hint_records]
return [Hint(rec["class"], {k: v for k, v in rec.items() if k != "class"}) for rec in recs]
return [Hint(rec["class"], {k: v for k, v in rec.items() if k != "class"})
for rec in hint_records]


def _reconstruct_workflow_inputs(input_records):
Expand All @@ -470,8 +469,7 @@ def _reconstruct_workflow_inputs(input_records):
:type input_records: BoltStatementResult
:rtype: list of InputParameter
"""
recs = [input_record["i"] for input_record in input_records]
return [InputParameter(rec["id"], rec["type"], rec["value"]) for rec in recs]
return [InputParameter(rec["id"], rec["type"], rec["value"]) for rec in input_records]


def _reconstruct_workflow_outputs(output_records):
Expand All @@ -481,8 +479,8 @@ def _reconstruct_workflow_outputs(output_records):
:type output_records: BoltStatementResult
:rtype: list of OutputParameter
"""
recs = [output_record["o"] for output_record in output_records]
return [OutputParameter(rec["id"], rec["type"], rec["value"], rec["source"]) for rec in recs]
return [OutputParameter(rec["id"], rec["type"], rec["value"], rec["source"])
for rec in output_records]


def _reconstruct_task_inputs(input_records):
Expand All @@ -492,8 +490,7 @@ def _reconstruct_task_inputs(input_records):
:type input_records: BoltStatementResult
:rtype: list of StepInput
"""
recs = [input_record["i"] for input_record in input_records]
return [_reconstruct_task_input(rec) for rec in recs]
return [_reconstruct_task_input(rec) for rec in input_records]


def _reconstruct_task_input(rec):
Expand All @@ -514,8 +511,7 @@ def _reconstruct_task_outputs(output_records):
:type output_records: BoltStatementResult
:rtype: list of StepOutput
"""
recs = [output_record["o"] for output_record in output_records]
return [_reconstruct_task_output(rec) for rec in recs]
return [_reconstruct_task_output(rec) for rec in output_records]


def _reconstruct_task_output(rec):
Expand Down Expand Up @@ -543,9 +539,8 @@ def _reconstruct_workflow(workflow_record, hints, requirements, inputs, outputs)
:type outputs: list of OutputParameter
:rtype: Workflow
"""
rec = workflow_record["w"]
return Workflow(name=rec["name"], hints=hints, requirements=requirements, inputs=inputs,
outputs=outputs, workflow_id=rec["id"])
return Workflow(name=workflow_record["name"], hints=hints, requirements=requirements,
inputs=inputs, outputs=outputs, workflow_id=workflow_record["id"])


def _reconstruct_task(task_record, hints, requirements, inputs, outputs):
Expand All @@ -563,10 +558,10 @@ def _reconstruct_task(task_record, hints, requirements, inputs, outputs):
:type outputs: list of StepOutput
:rtype: Task
"""
rec = task_record["t"]
return Task(name=rec["name"], base_command=rec["base_command"], hints=hints,
requirements=requirements, inputs=inputs, outputs=outputs, stdout=rec["stdout"],
stderr=rec["stderr"], workflow_id=rec["workflow_id"], task_id=rec["id"])
return Task(name=task_record["name"], base_command=task_record["base_command"],
hints=hints, requirements=requirements, inputs=inputs, outputs=outputs,
stdout=task_record["stdout"], stderr=task_record["stderr"],
workflow_id=task_record["workflow_id"], task_id=task_record["id"])


def _reconstruct_metadata(metadata_record):
Expand All @@ -578,8 +573,7 @@ def _reconstruct_metadata(metadata_record):
:type keys: iterable of str
:rtype: dict
"""
rec = metadata_record["m"]
return {key: val for key, val in rec.items() if key != "state"}
return {key: val for key, val in metadata_record.items() if key != "state"}

# Ignore E1129: External module is missing proper resource context manager methods.
# pylama:ignore=E1129
11 changes: 6 additions & 5 deletions beeflow/wf_manager/common/dep_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ def setup_gdb_configs(mount_dir, bolt_port, http_port, https_port):
with open(gdb_configfile, "rt", encoding="utf8") as cfile:
data = cfile.read()

bolt_config = r'#(dbms.connector.bolt.listen_address=):[0-9]*'
bolt_config = r'#(server.bolt.listen_address=):[0-9]*'
data = re.sub(bolt_config, rf'\1:{bolt_port}', data)
http_config = r'#(dbms.connector.http.listen_address=):[0-9]*'
http_config = r'#(server.http.listen_address=):[0-9]*'
data = re.sub(http_config, rf'\1:{http_port}', data)
https_config = r'#(dbms.connector.https.listen_address=):[0-9]*'
https_config = r'#(server.https.listen_address=):[0-9]*'
data = re.sub(https_config, rf'\1:{https_port}', data)
with open(gdb_configfile, "wt", encoding="utf8") as cfile:
cfile.write(data)
Expand Down Expand Up @@ -155,7 +155,7 @@ def start_gdb(mount_dir, bolt_port, http_port, https_port, reexecute=False):
container_path = get_container_dir()
if not reexecute:
try:
command = ['neo4j-admin', 'set-initial-password', str(db_password)]
command = ['neo4j-admin', 'dbms', 'set-initial-password', str(db_password)]
subprocess.run([
"ch-run",
"--set-env=" + container_path + "/ch/environment",
Expand Down Expand Up @@ -187,7 +187,8 @@ def start_gdb(mount_dir, bolt_port, http_port, https_port, reexecute=False):
"--", *command
], stdout=subprocess.PIPE, stderr=subprocess.PIPE) as proc:
output = proc.stdout.read().decode('utf-8')
pid = re.search(r'pid ([0-9]*)', output).group(1)
dep_log.info(output)
pid = re.search(r'\(pid:([0-9]*)\)', output).group(1)
return pid
except FileNotFoundError:
dep_log.error("Neo4j failed to start.")
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ python = ">=3.8.3,<=3.12.2"

# Package dependencies
Flask = { version = "^2.0" }
neo4j = { version = "^1.7.4" }
neo4j = { version = "^5" }
PyYAML = { version = "^6.0.1" }
flask_restful = "0.3.9"
cwl-utils = "^0.16"
Expand Down

0 comments on commit f649b4c

Please sign in to comment.