Skip to content

Commit

Permalink
Merge pull request #819 from lanl/neo4j-update
Browse files Browse the repository at this point in the history
Update Neo4j to 5.x
  • Loading branch information
pagrubel authored Apr 16, 2024
2 parents 46d2e40 + 5054a93 commit e90da17
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 70 deletions.
2 changes: 1 addition & 1 deletion beeflow/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ def pull_deps(outdir: str = typer.Option('.', '--outdir', '-o',
"""Pull required BEE containers and store in outdir."""
load_check_charliecloud()
neo4j_path = os.path.join(os.path.realpath(outdir), 'neo4j.tar.gz')
pull_to_tar('neo4j:3.5.22', neo4j_path)
pull_to_tar('neo4j:5.17', neo4j_path)
redis_path = os.path.join(os.path.realpath(outdir), 'redis.tar.gz')
pull_to_tar('redis', redis_path)
print()
Expand Down
60 changes: 30 additions & 30 deletions beeflow/common/gdb/neo4j_cypher.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,119 +239,119 @@ def get_task_by_id(tx, task_id):
:param task_id: the task's ID
:type task_id: str
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
task_query = "MATCH (t:Task {id: $task_id}) RETURN t"

return tx.run(task_query, task_id=task_id).single()
return tx.run(task_query, task_id=task_id).single()['t']


def get_task_hints(tx, task_id):
"""Get task hints from the Neo4j database by the task's ID.
:param task_id: the task's ID
:type task_id: str
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
hints_query = "MATCH (:Task {id: $task_id})<-[:HINT_OF]-(h:Hint) RETURN h"

return tx.run(hints_query, task_id=task_id)
return [rec['h'] for rec in tx.run(hints_query, task_id=task_id)]


def get_task_requirements(tx, task_id):
"""Get task requirements from the Neo4j database by the task's ID.
:param task_id: the task's ID
:type task_id: str
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
reqs_query = "MATCH (:Task {id: $task_id})<-[:REQUIREMENT_OF]-(r:Requirement) RETURN r"

return tx.run(reqs_query, task_id=task_id)
return [rec['r'] for rec in tx.run(reqs_query, task_id=task_id)]


def get_task_inputs(tx, task_id):
"""Get task inputs from the Neo4j database by the task's ID.
:param task_id: the task's ID
:type task_id: str
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
inputs_query = "MATCH (:Task {id: $task_id})<-[:INPUT_OF]-(i:Input) RETURN i"

return tx.run(inputs_query, task_id=task_id)
return [rec['i'] for rec in tx.run(inputs_query, task_id=task_id)]


def get_task_outputs(tx, task_id):
"""Get task outputs from the Neo4j database by the task's ID.
:param task_id: the task's ID
:type task_id: str
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
outputs_query = "MATCH (:Task {id: $task_id})<-[:OUTPUT_OF]-(o:Output) RETURN o"

return tx.run(outputs_query, task_id=task_id)
return [rec['o'] for rec in tx.run(outputs_query, task_id=task_id)]


def get_workflow_description(tx):
"""Get the workflow description from the Neo4j database.
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
workflow_desc_query = "MATCH (w:Workflow) RETURN w"

return tx.run(workflow_desc_query).single()
return tx.run(workflow_desc_query).single()['w']


def get_workflow_tasks(tx):
"""Get workflow tasks from the Neo4j database.
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
workflow_query = "MATCH (t:Task) RETURN t"

return tx.run(workflow_query)
return [rec['t'] for rec in tx.run(workflow_query)]


def get_workflow_requirements(tx):
"""Get workflow requirements from the Neo4j database.
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
requirements_query = "MATCH (:Workflow)<-[:REQUIREMENT_OF]-(r:Requirement) RETURN r"

return tx.run(requirements_query)
return [rec['r'] for rec in tx.run(requirements_query)]


def get_workflow_hints(tx):
"""Get workflow hints from the Neo4j database.
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
hints_query = "MATCH (:Workflow)<-[:HINT_OF]-(h:Hint) RETURN h"

return tx.run(hints_query)
return [rec['h'] for rec in tx.run(hints_query)]


def get_workflow_inputs(tx):
"""Get workflow inputs from the Neo4j database.
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
inputs_query = "MATCH (:Workflow)<-[:INPUT_OF]-(i:Input) RETURN i"

return tx.run(inputs_query)
return [rec['i'] for rec in tx.run(inputs_query)]


def get_workflow_outputs(tx):
"""Get workflow outputs from the Neo4j database.
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
outputs_query = "MATCH (:Workflow)<-[:OUTPUT_OF]-(o:Output) RETURN o"

return tx.run(outputs_query)
return [rec['o'] for rec in tx.run(outputs_query)]


def get_workflow_state(tx):
Expand All @@ -378,23 +378,23 @@ def set_workflow_state(tx, state):
def get_ready_tasks(tx):
"""Get all tasks that are ready to execute.
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
get_ready_query = "MATCH (:Metadata {state: 'READY'})-[:DESCRIBES]->(t:Task) RETURN t"

return tx.run(get_ready_query)
return [rec['t'] for rec in tx.run(get_ready_query)]


def get_dependent_tasks(tx, task):
"""Get the tasks that depend on a specified task.
:param task: the task whose dependencies to obtain
:type task: Task
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
dependents_query = "MATCH (t:Task)-[:DEPENDS_ON]->(:Task {id: $task_id}) RETURN t"

return tx.run(dependents_query, task_id=task.id)
return [rec['t'] for rec in tx.run(dependents_query, task_id=task.id)]


def get_task_state(tx, task):
Expand Down Expand Up @@ -428,11 +428,11 @@ def get_task_metadata(tx, task):
:param task: the task whose metadata to get
:type task: Task
:rtype: BoltStatementResult
:rtype: neo4j.Result
"""
metadata_query = "MATCH (m:Metadata)-[:DESCRIBES]->(:Task {id: $task_id}) RETURN m"

return tx.run(metadata_query, task_id=task.id).single()
return dict(tx.run(metadata_query, task_id=task.id).single()['m'])


def set_task_metadata(tx, task, metadata):
Expand Down Expand Up @@ -466,7 +466,7 @@ def get_task_input(tx, task, input_id):
input_query = ("MATCH (t:Task {id: $task_id})<-[:INPUT_OF]-(i:Input {id: $input_id}) "
"RETURN i")

return tx.run(input_query, task_id=task.id, input_id=input_id).single()
return dict(tx.run(input_query, task_id=task.id, input_id=input_id).single()['i'])


def set_task_input(tx, task, input_id, value):
Expand Down Expand Up @@ -496,7 +496,7 @@ def get_task_output(tx, task, output_id):
output_query = ("MATCH (:Task {id: $task_id})<-[:OUTPUT_OF]-(o:Output {id: $output_id}) "
"RETURN o")

return tx.run(output_query, task_id=task.id, output_id=output_id).single()
return dict(tx.run(output_query, task_id=task.id, output_id=output_id).single()['o'])


def set_task_output(tx, task, output_id, value):
Expand Down
50 changes: 22 additions & 28 deletions beeflow/common/gdb/neo4j_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"""

from neo4j import GraphDatabase as Neo4jDatabase
from neobolt.exceptions import ServiceUnavailable
from neo4j.exceptions import ServiceUnavailable

from beeflow.common.gdb.gdb_driver import GraphDatabaseDriver
from beeflow.common.gdb import neo4j_cypher as tx
Expand Down Expand Up @@ -307,7 +307,7 @@ def get_task_input(self, task, input_id):
:rtype: StepInput
"""
input_record = self._read_transaction(tx.get_task_input, task=task, input_id=input_id)
return _reconstruct_task_input(input_record["i"])
return _reconstruct_task_input(input_record)

def set_task_input(self, task, input_id, value):
"""Set the value of a task input.
Expand All @@ -330,7 +330,7 @@ def get_task_output(self, task, output_id):
:rtype: StepOutput
"""
output_record = self._read_transaction(tx.get_task_output, task=task, output_id=output_id)
return _reconstruct_task_output(output_record["o"])
return _reconstruct_task_output(output_record)

def set_task_output(self, task, output_id, value):
"""Set the value of a task output.
Expand Down Expand Up @@ -401,13 +401,13 @@ def _get_task_data_tuples(self, task_records):
with self._driver.session() as session:
trecords = list(task_records)
hint_records = [session.read_transaction(tx.get_task_hints,
task_id=rec["t"]["id"]) for rec in trecords]
task_id=rec["id"]) for rec in trecords]
req_records = [session.read_transaction(tx.get_task_requirements,
task_id=rec["t"]["id"]) for rec in trecords]
task_id=rec["id"]) for rec in trecords]
input_records = [session.read_transaction(tx.get_task_inputs,
task_id=rec["t"]["id"]) for rec in trecords]
task_id=rec["id"]) for rec in trecords]
output_records = [session.read_transaction(tx.get_task_outputs,
task_id=rec["t"]["id"]) for rec in trecords]
task_id=rec["id"]) for rec in trecords]

hints = [_reconstruct_hints(hint_record) for hint_record in hint_records]
reqs = [_reconstruct_requirements(req_record) for req_record in req_records]
Expand Down Expand Up @@ -447,9 +447,8 @@ def _reconstruct_requirements(req_records):
:type req_records: BoltStatementResult
:rtype: list of Requirement
"""
recs = [req_record["r"] for req_record in req_records]
return [Requirement(rec["class"], {k: v for k, v in rec.items() if k != "class"})
for rec in recs]
for rec in req_records]


def _reconstruct_hints(hint_records):
Expand All @@ -459,8 +458,8 @@ def _reconstruct_hints(hint_records):
:type hint_records: BoltStatementResult
:rtype: list of Hint
"""
recs = [hint_record["h"] for hint_record in hint_records]
return [Hint(rec["class"], {k: v for k, v in rec.items() if k != "class"}) for rec in recs]
return [Hint(rec["class"], {k: v for k, v in rec.items() if k != "class"})
for rec in hint_records]


def _reconstruct_workflow_inputs(input_records):
Expand All @@ -470,8 +469,7 @@ def _reconstruct_workflow_inputs(input_records):
:type input_records: BoltStatementResult
:rtype: list of InputParameter
"""
recs = [input_record["i"] for input_record in input_records]
return [InputParameter(rec["id"], rec["type"], rec["value"]) for rec in recs]
return [InputParameter(rec["id"], rec["type"], rec["value"]) for rec in input_records]


def _reconstruct_workflow_outputs(output_records):
Expand All @@ -481,8 +479,8 @@ def _reconstruct_workflow_outputs(output_records):
:type output_records: BoltStatementResult
:rtype: list of OutputParameter
"""
recs = [output_record["o"] for output_record in output_records]
return [OutputParameter(rec["id"], rec["type"], rec["value"], rec["source"]) for rec in recs]
return [OutputParameter(rec["id"], rec["type"], rec["value"], rec["source"])
for rec in output_records]


def _reconstruct_task_inputs(input_records):
Expand All @@ -492,8 +490,7 @@ def _reconstruct_task_inputs(input_records):
:type input_records: BoltStatementResult
:rtype: list of StepInput
"""
recs = [input_record["i"] for input_record in input_records]
return [_reconstruct_task_input(rec) for rec in recs]
return [_reconstruct_task_input(rec) for rec in input_records]


def _reconstruct_task_input(rec):
Expand All @@ -514,8 +511,7 @@ def _reconstruct_task_outputs(output_records):
:type output_records: BoltStatementResult
:rtype: list of StepOutput
"""
recs = [output_record["o"] for output_record in output_records]
return [_reconstruct_task_output(rec) for rec in recs]
return [_reconstruct_task_output(rec) for rec in output_records]


def _reconstruct_task_output(rec):
Expand Down Expand Up @@ -543,9 +539,8 @@ def _reconstruct_workflow(workflow_record, hints, requirements, inputs, outputs)
:type outputs: list of OutputParameter
:rtype: Workflow
"""
rec = workflow_record["w"]
return Workflow(name=rec["name"], hints=hints, requirements=requirements, inputs=inputs,
outputs=outputs, workflow_id=rec["id"])
return Workflow(name=workflow_record["name"], hints=hints, requirements=requirements,
inputs=inputs, outputs=outputs, workflow_id=workflow_record["id"])


def _reconstruct_task(task_record, hints, requirements, inputs, outputs):
Expand All @@ -563,10 +558,10 @@ def _reconstruct_task(task_record, hints, requirements, inputs, outputs):
:type outputs: list of StepOutput
:rtype: Task
"""
rec = task_record["t"]
return Task(name=rec["name"], base_command=rec["base_command"], hints=hints,
requirements=requirements, inputs=inputs, outputs=outputs, stdout=rec["stdout"],
stderr=rec["stderr"], workflow_id=rec["workflow_id"], task_id=rec["id"])
return Task(name=task_record["name"], base_command=task_record["base_command"],
hints=hints, requirements=requirements, inputs=inputs, outputs=outputs,
stdout=task_record["stdout"], stderr=task_record["stderr"],
workflow_id=task_record["workflow_id"], task_id=task_record["id"])


def _reconstruct_metadata(metadata_record):
Expand All @@ -578,8 +573,7 @@ def _reconstruct_metadata(metadata_record):
:type keys: iterable of str
:rtype: dict
"""
rec = metadata_record["m"]
return {key: val for key, val in rec.items() if key != "state"}
return {key: val for key, val in metadata_record.items() if key != "state"}

# Ignore E1129: External module is missing proper resource context manager methods.
# pylama:ignore=E1129
11 changes: 6 additions & 5 deletions beeflow/wf_manager/common/dep_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ def setup_gdb_configs(mount_dir, bolt_port, http_port, https_port):
with open(gdb_configfile, "rt", encoding="utf8") as cfile:
data = cfile.read()

bolt_config = r'#(dbms.connector.bolt.listen_address=):[0-9]*'
bolt_config = r'#(server.bolt.listen_address=):[0-9]*'
data = re.sub(bolt_config, rf'\1:{bolt_port}', data)
http_config = r'#(dbms.connector.http.listen_address=):[0-9]*'
http_config = r'#(server.http.listen_address=):[0-9]*'
data = re.sub(http_config, rf'\1:{http_port}', data)
https_config = r'#(dbms.connector.https.listen_address=):[0-9]*'
https_config = r'#(server.https.listen_address=):[0-9]*'
data = re.sub(https_config, rf'\1:{https_port}', data)
with open(gdb_configfile, "wt", encoding="utf8") as cfile:
cfile.write(data)
Expand Down Expand Up @@ -155,7 +155,7 @@ def start_gdb(mount_dir, bolt_port, http_port, https_port, reexecute=False):
container_path = get_container_dir()
if not reexecute:
try:
command = ['neo4j-admin', 'set-initial-password', str(db_password)]
command = ['neo4j-admin', 'dbms', 'set-initial-password', str(db_password)]
subprocess.run([
"ch-run",
"--set-env=" + container_path + "/ch/environment",
Expand Down Expand Up @@ -187,7 +187,8 @@ def start_gdb(mount_dir, bolt_port, http_port, https_port, reexecute=False):
"--", *command
], stdout=subprocess.PIPE, stderr=subprocess.PIPE) as proc:
output = proc.stdout.read().decode('utf-8')
pid = re.search(r'pid ([0-9]*)', output).group(1)
dep_log.info(output)
pid = re.search(r'\(pid:([0-9]*)\)', output).group(1)
return pid
except FileNotFoundError:
dep_log.error("Neo4j failed to start.")
Expand Down
Loading

0 comments on commit e90da17

Please sign in to comment.