diff --git a/beeflow/client/core.py b/beeflow/client/core.py index 74fb71c51..7b91e0554 100644 --- a/beeflow/client/core.py +++ b/beeflow/client/core.py @@ -590,7 +590,7 @@ def pull_deps(outdir: str = typer.Option('.', '--outdir', '-o', """Pull required BEE containers and store in outdir.""" load_check_charliecloud() neo4j_path = os.path.join(os.path.realpath(outdir), 'neo4j.tar.gz') - pull_to_tar('neo4j:3.5.22', neo4j_path) + pull_to_tar('neo4j:5.17', neo4j_path) redis_path = os.path.join(os.path.realpath(outdir), 'redis.tar.gz') pull_to_tar('redis', redis_path) print() diff --git a/beeflow/common/gdb/neo4j_cypher.py b/beeflow/common/gdb/neo4j_cypher.py index 8a48d0059..85c0614f8 100644 --- a/beeflow/common/gdb/neo4j_cypher.py +++ b/beeflow/common/gdb/neo4j_cypher.py @@ -239,11 +239,11 @@ def get_task_by_id(tx, task_id): :param task_id: the task's ID :type task_id: str - :rtype: BoltStatementResult + :rtype: neo4j.Result """ task_query = "MATCH (t:Task {id: $task_id}) RETURN t" - return tx.run(task_query, task_id=task_id).single() + return tx.run(task_query, task_id=task_id).single()['t'] def get_task_hints(tx, task_id): @@ -251,11 +251,11 @@ def get_task_hints(tx, task_id): :param task_id: the task's ID :type task_id: str - :rtype: BoltStatementResult + :rtype: neo4j.Result """ hints_query = "MATCH (:Task {id: $task_id})<-[:HINT_OF]-(h:Hint) RETURN h" - return tx.run(hints_query, task_id=task_id) + return [rec['h'] for rec in tx.run(hints_query, task_id=task_id)] def get_task_requirements(tx, task_id): @@ -263,11 +263,11 @@ def get_task_requirements(tx, task_id): :param task_id: the task's ID :type task_id: str - :rtype: BoltStatementResult + :rtype: neo4j.Result """ reqs_query = "MATCH (:Task {id: $task_id})<-[:REQUIREMENT_OF]-(r:Requirement) RETURN r" - return tx.run(reqs_query, task_id=task_id) + return [rec['r'] for rec in tx.run(reqs_query, task_id=task_id)] def get_task_inputs(tx, task_id): @@ -275,11 +275,11 @@ def get_task_inputs(tx, task_id): :param task_id: the task's ID :type task_id: str - :rtype: BoltStatementResult + :rtype: neo4j.Result """ inputs_query = "MATCH (:Task {id: $task_id})<-[:INPUT_OF]-(i:Input) RETURN i" - return tx.run(inputs_query, task_id=task_id) + return [rec['i'] for rec in tx.run(inputs_query, task_id=task_id)] def get_task_outputs(tx, task_id): @@ -287,71 +287,71 @@ def get_task_outputs(tx, task_id): :param task_id: the task's ID :type task_id: str - :rtype: BoltStatementResult + :rtype: neo4j.Result """ outputs_query = "MATCH (:Task {id: $task_id})<-[:OUTPUT_OF]-(o:Output) RETURN o" - return tx.run(outputs_query, task_id=task_id) + return [rec['o'] for rec in tx.run(outputs_query, task_id=task_id)] def get_workflow_description(tx): """Get the workflow description from the Neo4j database. - :rtype: BoltStatementResult + :rtype: neo4j.Result """ workflow_desc_query = "MATCH (w:Workflow) RETURN w" - return tx.run(workflow_desc_query).single() + return tx.run(workflow_desc_query).single()['w'] def get_workflow_tasks(tx): """Get workflow tasks from the Neo4j database. - :rtype: BoltStatementResult + :rtype: neo4j.Result """ workflow_query = "MATCH (t:Task) RETURN t" - return tx.run(workflow_query) + return [rec['t'] for rec in tx.run(workflow_query)] def get_workflow_requirements(tx): """Get workflow requirements from the Neo4j database. - :rtype: BoltStatementResult + :rtype: neo4j.Result """ requirements_query = "MATCH (:Workflow)<-[:REQUIREMENT_OF]-(r:Requirement) RETURN r" - return tx.run(requirements_query) + return [rec['r'] for rec in tx.run(requirements_query)] def get_workflow_hints(tx): """Get workflow hints from the Neo4j database. - :rtype: BoltStatementResult + :rtype: neo4j.Result """ hints_query = "MATCH (:Workflow)<-[:HINT_OF]-(h:Hint) RETURN h" - return tx.run(hints_query) + return [rec['h'] for rec in tx.run(hints_query)] def get_workflow_inputs(tx): """Get workflow inputs from the Neo4j database. - :rtype: BoltStatementResult + :rtype: neo4j.Result """ inputs_query = "MATCH (:Workflow)<-[:INPUT_OF]-(i:Input) RETURN i" - return tx.run(inputs_query) + return [rec['i'] for rec in tx.run(inputs_query)] def get_workflow_outputs(tx): """Get workflow outputs from the Neo4j database. - :rtype: BoltStatementResult + :rtype: neo4j.Result """ outputs_query = "MATCH (:Workflow)<-[:OUTPUT_OF]-(o:Output) RETURN o" - return tx.run(outputs_query) + return [rec['o'] for rec in tx.run(outputs_query)] def get_workflow_state(tx): @@ -378,11 +378,11 @@ def set_workflow_state(tx, state): def get_ready_tasks(tx): """Get all tasks that are ready to execute. - :rtype: BoltStatementResult + :rtype: neo4j.Result """ get_ready_query = "MATCH (:Metadata {state: 'READY'})-[:DESCRIBES]->(t:Task) RETURN t" - return tx.run(get_ready_query) + return [rec['t'] for rec in tx.run(get_ready_query)] def get_dependent_tasks(tx, task): @@ -390,11 +390,11 @@ def get_dependent_tasks(tx, task): :param task: the task whose dependencies to obtain :type task: Task - :rtype: BoltStatementResult + :rtype: neo4j.Result """ dependents_query = "MATCH (t:Task)-[:DEPENDS_ON]->(:Task {id: $task_id}) RETURN t" - return tx.run(dependents_query, task_id=task.id) + return [rec['t'] for rec in tx.run(dependents_query, task_id=task.id)] def get_task_state(tx, task): @@ -428,11 +428,11 @@ def get_task_metadata(tx, task): :param task: the task whose metadata to get :type task: Task - :rtype: BoltStatementResult + :rtype: neo4j.Result """ metadata_query = "MATCH (m:Metadata)-[:DESCRIBES]->(:Task {id: $task_id}) RETURN m" - return tx.run(metadata_query, task_id=task.id).single() + return dict(tx.run(metadata_query, task_id=task.id).single()['m']) def set_task_metadata(tx, task, metadata): @@ -466,7 +466,7 @@ def get_task_input(tx, task, input_id): input_query = ("MATCH (t:Task {id: $task_id})<-[:INPUT_OF]-(i:Input {id: $input_id}) " "RETURN i") - return tx.run(input_query, task_id=task.id, input_id=input_id).single() + return dict(tx.run(input_query, task_id=task.id, input_id=input_id).single()['i']) def set_task_input(tx, task, input_id, value): @@ -496,7 +496,7 @@ def get_task_output(tx, task, output_id): output_query = ("MATCH (:Task {id: $task_id})<-[:OUTPUT_OF]-(o:Output {id: $output_id}) " "RETURN o") - return tx.run(output_query, task_id=task.id, output_id=output_id).single() + return dict(tx.run(output_query, task_id=task.id, output_id=output_id).single()['o']) def set_task_output(tx, task, output_id, value): diff --git a/beeflow/common/gdb/neo4j_driver.py b/beeflow/common/gdb/neo4j_driver.py index 3b3222576..6e100aa96 100644 --- a/beeflow/common/gdb/neo4j_driver.py +++ b/beeflow/common/gdb/neo4j_driver.py @@ -6,7 +6,7 @@ """ from neo4j import GraphDatabase as Neo4jDatabase -from neobolt.exceptions import ServiceUnavailable +from neo4j.exceptions import ServiceUnavailable from beeflow.common.gdb.gdb_driver import GraphDatabaseDriver from beeflow.common.gdb import neo4j_cypher as tx @@ -307,7 +307,7 @@ def get_task_input(self, task, input_id): :rtype: StepInput """ input_record = self._read_transaction(tx.get_task_input, task=task, input_id=input_id) - return _reconstruct_task_input(input_record["i"]) + return _reconstruct_task_input(input_record) def set_task_input(self, task, input_id, value): """Set the value of a task input. @@ -330,7 +330,7 @@ def get_task_output(self, task, output_id): :rtype: StepOutput """ output_record = self._read_transaction(tx.get_task_output, task=task, output_id=output_id) - return _reconstruct_task_output(output_record["o"]) + return _reconstruct_task_output(output_record) def set_task_output(self, task, output_id, value): """Set the value of a task output. @@ -401,13 +401,13 @@ def _get_task_data_tuples(self, task_records): with self._driver.session() as session: trecords = list(task_records) hint_records = [session.read_transaction(tx.get_task_hints, - task_id=rec["t"]["id"]) for rec in trecords] + task_id=rec["id"]) for rec in trecords] req_records = [session.read_transaction(tx.get_task_requirements, - task_id=rec["t"]["id"]) for rec in trecords] + task_id=rec["id"]) for rec in trecords] input_records = [session.read_transaction(tx.get_task_inputs, - task_id=rec["t"]["id"]) for rec in trecords] + task_id=rec["id"]) for rec in trecords] output_records = [session.read_transaction(tx.get_task_outputs, - task_id=rec["t"]["id"]) for rec in trecords] + task_id=rec["id"]) for rec in trecords] hints = [_reconstruct_hints(hint_record) for hint_record in hint_records] reqs = [_reconstruct_requirements(req_record) for req_record in req_records] @@ -447,9 +447,8 @@ def _reconstruct_requirements(req_records): :type req_records: BoltStatementResult :rtype: list of Requirement """ - recs = [req_record["r"] for req_record in req_records] return [Requirement(rec["class"], {k: v for k, v in rec.items() if k != "class"}) - for rec in recs] + for rec in req_records] def _reconstruct_hints(hint_records): @@ -459,8 +458,8 @@ def _reconstruct_hints(hint_records): :type hint_records: BoltStatementResult :rtype: list of Hint """ - recs = [hint_record["h"] for hint_record in hint_records] - return [Hint(rec["class"], {k: v for k, v in rec.items() if k != "class"}) for rec in recs] + return [Hint(rec["class"], {k: v for k, v in rec.items() if k != "class"}) + for rec in hint_records] def _reconstruct_workflow_inputs(input_records): @@ -470,8 +469,7 @@ def _reconstruct_workflow_inputs(input_records): :type input_records: BoltStatementResult :rtype: list of InputParameter """ - recs = [input_record["i"] for input_record in input_records] - return [InputParameter(rec["id"], rec["type"], rec["value"]) for rec in recs] + return [InputParameter(rec["id"], rec["type"], rec["value"]) for rec in input_records] def _reconstruct_workflow_outputs(output_records): @@ -481,8 +479,8 @@ def _reconstruct_workflow_outputs(output_records): :type output_records: BoltStatementResult :rtype: list of OutputParameter """ - recs = [output_record["o"] for output_record in output_records] - return [OutputParameter(rec["id"], rec["type"], rec["value"], rec["source"]) for rec in recs] + return [OutputParameter(rec["id"], rec["type"], rec["value"], rec["source"]) + for rec in output_records] def _reconstruct_task_inputs(input_records): @@ -492,8 +490,7 @@ def _reconstruct_task_inputs(input_records): :type input_records: BoltStatementResult :rtype: list of StepInput """ - recs = [input_record["i"] for input_record in input_records] - return [_reconstruct_task_input(rec) for rec in recs] + return [_reconstruct_task_input(rec) for rec in input_records] def _reconstruct_task_input(rec): @@ -514,8 +511,7 @@ def _reconstruct_task_outputs(output_records): :type output_records: BoltStatementResult :rtype: list of StepOutput """ - recs = [output_record["o"] for output_record in output_records] - return [_reconstruct_task_output(rec) for rec in recs] + return [_reconstruct_task_output(rec) for rec in output_records] def _reconstruct_task_output(rec): @@ -543,9 +539,8 @@ def _reconstruct_workflow(workflow_record, hints, requirements, inputs, outputs) :type outputs: list of OutputParameter :rtype: Workflow """ - rec = workflow_record["w"] - return Workflow(name=rec["name"], hints=hints, requirements=requirements, inputs=inputs, - outputs=outputs, workflow_id=rec["id"]) + return Workflow(name=workflow_record["name"], hints=hints, requirements=requirements, + inputs=inputs, outputs=outputs, workflow_id=workflow_record["id"]) def _reconstruct_task(task_record, hints, requirements, inputs, outputs): @@ -563,10 +558,10 @@ def _reconstruct_task(task_record, hints, requirements, inputs, outputs): :type outputs: list of StepOutput :rtype: Task """ - rec = task_record["t"] - return Task(name=rec["name"], base_command=rec["base_command"], hints=hints, - requirements=requirements, inputs=inputs, outputs=outputs, stdout=rec["stdout"], - stderr=rec["stderr"], workflow_id=rec["workflow_id"], task_id=rec["id"]) + return Task(name=task_record["name"], base_command=task_record["base_command"], + hints=hints, requirements=requirements, inputs=inputs, outputs=outputs, + stdout=task_record["stdout"], stderr=task_record["stderr"], + workflow_id=task_record["workflow_id"], task_id=task_record["id"]) def _reconstruct_metadata(metadata_record): @@ -578,8 +573,7 @@ def _reconstruct_metadata(metadata_record): :type keys: iterable of str :rtype: dict """ - rec = metadata_record["m"] - return {key: val for key, val in rec.items() if key != "state"} + return {key: val for key, val in metadata_record.items() if key != "state"} # Ignore E1129: External module is missing proper resource context manager methods. # pylama:ignore=E1129 diff --git a/beeflow/wf_manager/common/dep_manager.py b/beeflow/wf_manager/common/dep_manager.py index c9dbbfc43..34a369c84 100755 --- a/beeflow/wf_manager/common/dep_manager.py +++ b/beeflow/wf_manager/common/dep_manager.py @@ -95,11 +95,11 @@ def setup_gdb_configs(mount_dir, bolt_port, http_port, https_port): with open(gdb_configfile, "rt", encoding="utf8") as cfile: data = cfile.read() - bolt_config = r'#(dbms.connector.bolt.listen_address=):[0-9]*' + bolt_config = r'#(server.bolt.listen_address=):[0-9]*' data = re.sub(bolt_config, rf'\1:{bolt_port}', data) - http_config = r'#(dbms.connector.http.listen_address=):[0-9]*' + http_config = r'#(server.http.listen_address=):[0-9]*' data = re.sub(http_config, rf'\1:{http_port}', data) - https_config = r'#(dbms.connector.https.listen_address=):[0-9]*' + https_config = r'#(server.https.listen_address=):[0-9]*' data = re.sub(https_config, rf'\1:{https_port}', data) with open(gdb_configfile, "wt", encoding="utf8") as cfile: cfile.write(data) @@ -155,7 +155,7 @@ def start_gdb(mount_dir, bolt_port, http_port, https_port, reexecute=False): container_path = get_container_dir() if not reexecute: try: - command = ['neo4j-admin', 'set-initial-password', str(db_password)] + command = ['neo4j-admin', 'dbms', 'set-initial-password', str(db_password)] subprocess.run([ "ch-run", "--set-env=" + container_path + "/ch/environment", @@ -187,7 +187,8 @@ def start_gdb(mount_dir, bolt_port, http_port, https_port, reexecute=False): "--", *command ], stdout=subprocess.PIPE, stderr=subprocess.PIPE) as proc: output = proc.stdout.read().decode('utf-8') - pid = re.search(r'pid ([0-9]*)', output).group(1) + dep_log.info(output) + pid = re.search(r'\(pid:([0-9]*)\)', output).group(1) return pid except FileNotFoundError: dep_log.error("Neo4j failed to start.") diff --git a/ci/bee_install.sh b/ci/bee_install.sh index 175a25b58..c044e0b78 100755 --- a/ci/bee_install.sh +++ b/ci/bee_install.sh @@ -9,8 +9,8 @@ printf "**Setting up BEE containers**\n" printf "\n\n" mkdir -p $HOME/img # Pull the Neo4j container -ch-image pull neo4j:3.5.22 || exit 1 -ch-convert -i ch-image -o tar neo4j:3.5.22 $NEO4J_CONTAINER || exit 1 +ch-image pull neo4j:5.17 || exit 1 +ch-convert -i ch-image -o tar neo4j:5.17 $NEO4J_CONTAINER || exit 1 # Pull the Redis container ch-image pull redis || exit 1 ch-convert -i ch-image -o tar redis $REDIS_CONTAINER || exit 1 diff --git a/docs/sphinx/installation.rst b/docs/sphinx/installation.rst index 3bad41f97..6325e1115 100644 --- a/docs/sphinx/installation.rst +++ b/docs/sphinx/installation.rst @@ -15,9 +15,9 @@ Requirements: * **Containers**: - Two Charliecloud dependency containers are currently required for BEE: one for the Neo4j graph database and another for Redis. The paths to these containers will need to be set in the BEE configuration later, using the ``neo4j_image`` and the ``redis_image`` options respectively. BEE only supports Neo4j 3.5.x. We are currently using the latest version of Redis supplied on Docker Hub (as of 2023). + Two Charliecloud dependency containers are currently required for BEE: one for the Neo4j graph database and another for Redis. The paths to these containers will need to be set in the BEE configuration later, using the ``neo4j_image`` and the ``redis_image`` options respectively. BEE only supports Neo4j 5.x. We are currently using the latest version of Redis supplied on Docker Hub (as of 2023). - For LANL systems, please use the containers supplied by the BEE team: **/usr/projects/BEE/neo4j-3-5-17-ch.tar.gz**, **/usr/projects/BEE/redis.tar.gz**. + For LANL systems, please use the containers supplied by the BEE team: **/usr/projects/BEE/neo4j-5-17.tar.gz**, **/usr/projects/BEE/redis.tar.gz**. For other users, these containers can be pulled from Docker Hub (after following `Installation:`_ below) using ``beeflow core pull-deps``, which will download and report the container paths to be set in the config later. @@ -83,9 +83,10 @@ To check the status of the bee components run: beeflow core status .. code-block:: - beeflow components: + redis ... RUNNING scheduler ... RUNNING + celery ... RUNNING slurmrestd ... RUNNING wf_manager ... RUNNING task_manager ... RUNNING diff --git a/pyproject.toml b/pyproject.toml index 9dc9b6935..018b157b7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,7 +53,7 @@ python = ">=3.8.3,<=3.12.2" # Package dependencies Flask = { version = "^2.0" } -neo4j = { version = "^1.7.4" } +neo4j = { version = "^5" } PyYAML = { version = "^6.0.1" } flask_restful = "0.3.9" cwl-utils = "^0.16"