Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Neo4j to 5.x #819

Merged
merged 3 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion beeflow/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ def pull_deps(outdir: str = typer.Option('.', '--outdir', '-o',
"""Pull required BEE containers and store in outdir."""
load_check_charliecloud()
neo4j_path = os.path.join(os.path.realpath(outdir), 'neo4j.tar.gz')
pull_to_tar('neo4j:3.5.22', neo4j_path)
pull_to_tar('neo4j:5.17', neo4j_path)
redis_path = os.path.join(os.path.realpath(outdir), 'redis.tar.gz')
pull_to_tar('redis', redis_path)
print()
Expand Down
60 changes: 30 additions & 30 deletions beeflow/common/gdb/neo4j_cypher.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,119 +239,119 @@ def get_task_by_id(tx, task_id):

:param task_id: the task's ID
:type task_id: str
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
task_query = "MATCH (t:Task {id: $task_id}) RETURN t"

return tx.run(task_query, task_id=task_id).single()
return tx.run(task_query, task_id=task_id).single()['t']


def get_task_hints(tx, task_id):
"""Get task hints from the Neo4j database by the task's ID.

:param task_id: the task's ID
:type task_id: str
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
hints_query = "MATCH (:Task {id: $task_id})<-[:HINT_OF]-(h:Hint) RETURN h"

return tx.run(hints_query, task_id=task_id)
return [rec['h'] for rec in tx.run(hints_query, task_id=task_id)]


def get_task_requirements(tx, task_id):
"""Get task requirements from the Neo4j database by the task's ID.

:param task_id: the task's ID
:type task_id: str
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
reqs_query = "MATCH (:Task {id: $task_id})<-[:REQUIREMENT_OF]-(r:Requirement) RETURN r"

return tx.run(reqs_query, task_id=task_id)
return [rec['r'] for rec in tx.run(reqs_query, task_id=task_id)]


def get_task_inputs(tx, task_id):
"""Get task inputs from the Neo4j database by the task's ID.

:param task_id: the task's ID
:type task_id: str
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
inputs_query = "MATCH (:Task {id: $task_id})<-[:INPUT_OF]-(i:Input) RETURN i"

return tx.run(inputs_query, task_id=task_id)
return [rec['i'] for rec in tx.run(inputs_query, task_id=task_id)]


def get_task_outputs(tx, task_id):
"""Get task outputs from the Neo4j database by the task's ID.

:param task_id: the task's ID
:type task_id: str
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
outputs_query = "MATCH (:Task {id: $task_id})<-[:OUTPUT_OF]-(o:Output) RETURN o"

return tx.run(outputs_query, task_id=task_id)
return [rec['o'] for rec in tx.run(outputs_query, task_id=task_id)]


def get_workflow_description(tx):
"""Get the workflow description from the Neo4j database.

:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
workflow_desc_query = "MATCH (w:Workflow) RETURN w"

return tx.run(workflow_desc_query).single()
return tx.run(workflow_desc_query).single()['w']


def get_workflow_tasks(tx):
"""Get workflow tasks from the Neo4j database.

:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
workflow_query = "MATCH (t:Task) RETURN t"

return tx.run(workflow_query)
return [rec['t'] for rec in tx.run(workflow_query)]


def get_workflow_requirements(tx):
"""Get workflow requirements from the Neo4j database.

:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
requirements_query = "MATCH (:Workflow)<-[:REQUIREMENT_OF]-(r:Requirement) RETURN r"

return tx.run(requirements_query)
return [rec['r'] for rec in tx.run(requirements_query)]


def get_workflow_hints(tx):
"""Get workflow hints from the Neo4j database.

:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
hints_query = "MATCH (:Workflow)<-[:HINT_OF]-(h:Hint) RETURN h"

return tx.run(hints_query)
return [rec['h'] for rec in tx.run(hints_query)]


def get_workflow_inputs(tx):
"""Get workflow inputs from the Neo4j database.

:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
inputs_query = "MATCH (:Workflow)<-[:INPUT_OF]-(i:Input) RETURN i"

return tx.run(inputs_query)
return [rec['i'] for rec in tx.run(inputs_query)]


def get_workflow_outputs(tx):
"""Get workflow outputs from the Neo4j database.

:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
outputs_query = "MATCH (:Workflow)<-[:OUTPUT_OF]-(o:Output) RETURN o"

return tx.run(outputs_query)
return [rec['o'] for rec in tx.run(outputs_query)]


def get_workflow_state(tx):
Expand All @@ -378,23 +378,23 @@ def set_workflow_state(tx, state):
def get_ready_tasks(tx):
"""Get all tasks that are ready to execute.

:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
get_ready_query = "MATCH (:Metadata {state: 'READY'})-[:DESCRIBES]->(t:Task) RETURN t"

return tx.run(get_ready_query)
return [rec['t'] for rec in tx.run(get_ready_query)]


def get_dependent_tasks(tx, task):
"""Get the tasks that depend on a specified task.

:param task: the task whose dependencies to obtain
:type task: Task
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
dependents_query = "MATCH (t:Task)-[:DEPENDS_ON]->(:Task {id: $task_id}) RETURN t"

return tx.run(dependents_query, task_id=task.id)
return [rec['t'] for rec in tx.run(dependents_query, task_id=task.id)]


def get_task_state(tx, task):
Expand Down Expand Up @@ -428,11 +428,11 @@ def get_task_metadata(tx, task):

:param task: the task whose metadata to get
:type task: Task
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
metadata_query = "MATCH (m:Metadata)-[:DESCRIBES]->(:Task {id: $task_id}) RETURN m"

return tx.run(metadata_query, task_id=task.id).single()
return dict(tx.run(metadata_query, task_id=task.id).single()['m'])


def set_task_metadata(tx, task, metadata):
Expand Down Expand Up @@ -466,7 +466,7 @@ def get_task_input(tx, task, input_id):
input_query = ("MATCH (t:Task {id: $task_id})<-[:INPUT_OF]-(i:Input {id: $input_id}) "
"RETURN i")

return tx.run(input_query, task_id=task.id, input_id=input_id).single()
return dict(tx.run(input_query, task_id=task.id, input_id=input_id).single()['i'])


def set_task_input(tx, task, input_id, value):
Expand Down Expand Up @@ -496,7 +496,7 @@ def get_task_output(tx, task, output_id):
output_query = ("MATCH (:Task {id: $task_id})<-[:OUTPUT_OF]-(o:Output {id: $output_id}) "
"RETURN o")

return tx.run(output_query, task_id=task.id, output_id=output_id).single()
return dict(tx.run(output_query, task_id=task.id, output_id=output_id).single()['o'])


def set_task_output(tx, task, output_id, value):
Expand Down
50 changes: 22 additions & 28 deletions beeflow/common/gdb/neo4j_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"""

from neo4j import GraphDatabase as Neo4jDatabase
from neobolt.exceptions import ServiceUnavailable
from neo4j.exceptions import ServiceUnavailable

from beeflow.common.gdb.gdb_driver import GraphDatabaseDriver
from beeflow.common.gdb import neo4j_cypher as tx
Expand Down Expand Up @@ -307,7 +307,7 @@ def get_task_input(self, task, input_id):
:rtype: StepInput
"""
input_record = self._read_transaction(tx.get_task_input, task=task, input_id=input_id)
return _reconstruct_task_input(input_record["i"])
return _reconstruct_task_input(input_record)

def set_task_input(self, task, input_id, value):
"""Set the value of a task input.
Expand All @@ -330,7 +330,7 @@ def get_task_output(self, task, output_id):
:rtype: StepOutput
"""
output_record = self._read_transaction(tx.get_task_output, task=task, output_id=output_id)
return _reconstruct_task_output(output_record["o"])
return _reconstruct_task_output(output_record)

def set_task_output(self, task, output_id, value):
"""Set the value of a task output.
Expand Down Expand Up @@ -401,13 +401,13 @@ def _get_task_data_tuples(self, task_records):
with self._driver.session() as session:
trecords = list(task_records)
hint_records = [session.read_transaction(tx.get_task_hints,
task_id=rec["t"]["id"]) for rec in trecords]
task_id=rec["id"]) for rec in trecords]
req_records = [session.read_transaction(tx.get_task_requirements,
task_id=rec["t"]["id"]) for rec in trecords]
task_id=rec["id"]) for rec in trecords]
input_records = [session.read_transaction(tx.get_task_inputs,
task_id=rec["t"]["id"]) for rec in trecords]
task_id=rec["id"]) for rec in trecords]
output_records = [session.read_transaction(tx.get_task_outputs,
task_id=rec["t"]["id"]) for rec in trecords]
task_id=rec["id"]) for rec in trecords]

hints = [_reconstruct_hints(hint_record) for hint_record in hint_records]
reqs = [_reconstruct_requirements(req_record) for req_record in req_records]
Expand Down Expand Up @@ -447,9 +447,8 @@ def _reconstruct_requirements(req_records):
:type req_records: BoltStatementResult
:rtype: list of Requirement
"""
recs = [req_record["r"] for req_record in req_records]
return [Requirement(rec["class"], {k: v for k, v in rec.items() if k != "class"})
for rec in recs]
for rec in req_records]


def _reconstruct_hints(hint_records):
Expand All @@ -459,8 +458,8 @@ def _reconstruct_hints(hint_records):
:type hint_records: BoltStatementResult
:rtype: list of Hint
"""
recs = [hint_record["h"] for hint_record in hint_records]
return [Hint(rec["class"], {k: v for k, v in rec.items() if k != "class"}) for rec in recs]
return [Hint(rec["class"], {k: v for k, v in rec.items() if k != "class"})
for rec in hint_records]


def _reconstruct_workflow_inputs(input_records):
Expand All @@ -470,8 +469,7 @@ def _reconstruct_workflow_inputs(input_records):
:type input_records: BoltStatementResult
:rtype: list of InputParameter
"""
recs = [input_record["i"] for input_record in input_records]
return [InputParameter(rec["id"], rec["type"], rec["value"]) for rec in recs]
return [InputParameter(rec["id"], rec["type"], rec["value"]) for rec in input_records]


def _reconstruct_workflow_outputs(output_records):
Expand All @@ -481,8 +479,8 @@ def _reconstruct_workflow_outputs(output_records):
:type output_records: BoltStatementResult
:rtype: list of OutputParameter
"""
recs = [output_record["o"] for output_record in output_records]
return [OutputParameter(rec["id"], rec["type"], rec["value"], rec["source"]) for rec in recs]
return [OutputParameter(rec["id"], rec["type"], rec["value"], rec["source"])
for rec in output_records]


def _reconstruct_task_inputs(input_records):
Expand All @@ -492,8 +490,7 @@ def _reconstruct_task_inputs(input_records):
:type input_records: BoltStatementResult
:rtype: list of StepInput
"""
recs = [input_record["i"] for input_record in input_records]
return [_reconstruct_task_input(rec) for rec in recs]
return [_reconstruct_task_input(rec) for rec in input_records]


def _reconstruct_task_input(rec):
Expand All @@ -514,8 +511,7 @@ def _reconstruct_task_outputs(output_records):
:type output_records: BoltStatementResult
:rtype: list of StepOutput
"""
recs = [output_record["o"] for output_record in output_records]
return [_reconstruct_task_output(rec) for rec in recs]
return [_reconstruct_task_output(rec) for rec in output_records]


def _reconstruct_task_output(rec):
Expand Down Expand Up @@ -543,9 +539,8 @@ def _reconstruct_workflow(workflow_record, hints, requirements, inputs, outputs)
:type outputs: list of OutputParameter
:rtype: Workflow
"""
rec = workflow_record["w"]
return Workflow(name=rec["name"], hints=hints, requirements=requirements, inputs=inputs,
outputs=outputs, workflow_id=rec["id"])
return Workflow(name=workflow_record["name"], hints=hints, requirements=requirements,
inputs=inputs, outputs=outputs, workflow_id=workflow_record["id"])


def _reconstruct_task(task_record, hints, requirements, inputs, outputs):
Expand All @@ -563,10 +558,10 @@ def _reconstruct_task(task_record, hints, requirements, inputs, outputs):
:type outputs: list of StepOutput
:rtype: Task
"""
rec = task_record["t"]
return Task(name=rec["name"], base_command=rec["base_command"], hints=hints,
requirements=requirements, inputs=inputs, outputs=outputs, stdout=rec["stdout"],
stderr=rec["stderr"], workflow_id=rec["workflow_id"], task_id=rec["id"])
return Task(name=task_record["name"], base_command=task_record["base_command"],
hints=hints, requirements=requirements, inputs=inputs, outputs=outputs,
stdout=task_record["stdout"], stderr=task_record["stderr"],
workflow_id=task_record["workflow_id"], task_id=task_record["id"])


def _reconstruct_metadata(metadata_record):
Expand All @@ -578,8 +573,7 @@ def _reconstruct_metadata(metadata_record):
:type keys: iterable of str
:rtype: dict
"""
rec = metadata_record["m"]
return {key: val for key, val in rec.items() if key != "state"}
return {key: val for key, val in metadata_record.items() if key != "state"}

# Ignore E1129: External module is missing proper resource context manager methods.
# pylama:ignore=E1129
11 changes: 6 additions & 5 deletions beeflow/wf_manager/common/dep_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ def setup_gdb_configs(mount_dir, bolt_port, http_port, https_port):
with open(gdb_configfile, "rt", encoding="utf8") as cfile:
data = cfile.read()

bolt_config = r'#(dbms.connector.bolt.listen_address=):[0-9]*'
bolt_config = r'#(server.bolt.listen_address=):[0-9]*'
data = re.sub(bolt_config, rf'\1:{bolt_port}', data)
http_config = r'#(dbms.connector.http.listen_address=):[0-9]*'
http_config = r'#(server.http.listen_address=):[0-9]*'
data = re.sub(http_config, rf'\1:{http_port}', data)
https_config = r'#(dbms.connector.https.listen_address=):[0-9]*'
https_config = r'#(server.https.listen_address=):[0-9]*'
data = re.sub(https_config, rf'\1:{https_port}', data)
with open(gdb_configfile, "wt", encoding="utf8") as cfile:
cfile.write(data)
Expand Down Expand Up @@ -155,7 +155,7 @@ def start_gdb(mount_dir, bolt_port, http_port, https_port, reexecute=False):
container_path = get_container_dir()
if not reexecute:
try:
command = ['neo4j-admin', 'set-initial-password', str(db_password)]
command = ['neo4j-admin', 'dbms', 'set-initial-password', str(db_password)]
subprocess.run([
"ch-run",
"--set-env=" + container_path + "/ch/environment",
Expand Down Expand Up @@ -187,7 +187,8 @@ def start_gdb(mount_dir, bolt_port, http_port, https_port, reexecute=False):
"--", *command
], stdout=subprocess.PIPE, stderr=subprocess.PIPE) as proc:
output = proc.stdout.read().decode('utf-8')
pid = re.search(r'pid ([0-9]*)', output).group(1)
dep_log.info(output)
pid = re.search(r'\(pid:([0-9]*)\)', output).group(1)
return pid
except FileNotFoundError:
dep_log.error("Neo4j failed to start.")
Expand Down
Loading