Skip to content

Commit

Permalink
Merge pull request #841 from neo4j-contrib/rc/5.4.1
Browse files Browse the repository at this point in the history
Rc/5.4.1
  • Loading branch information
mariusconjeaud authored Nov 29, 2024
2 parents d2e6704 + f9344d5 commit b4c8e5e
Show file tree
Hide file tree
Showing 14 changed files with 378 additions and 99 deletions.
4 changes: 4 additions & 0 deletions Changelog
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
Version 5.4.1 2024-11
* Add support for Cypher parallel runtime
* Add options for intermediate_transform : distinct, include_in_return, use a prop as source

Version 5.4.0 2024-11
* Traversal option for filtering and ordering
* Insert raw Cypher for ordering
Expand Down
37 changes: 34 additions & 3 deletions doc/source/advanced_query_operations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,41 @@ As discussed in the note above, this is for example useful when you need to orde
# This will return all Coffee nodes, with their most expensive supplier
Coffee.nodes.traverse_relations(suppliers="suppliers")
.intermediate_transform(
{"suppliers": "suppliers"}, ordering=["suppliers.delivery_cost"]
{"suppliers": {"source": "suppliers"}}, ordering=["suppliers.delivery_cost"]
)
.annotate(supps=Last(Collect("suppliers")))

Options for `intermediate_transform` *variables* are:

- `source`: `string`or `Resolver` - the variable to use as source for the transformation. Works with resolvers (see below).
- `source_prop`: `string` - optionally, a property of the source variable to use as source for the transformation.
- `include_in_return`: `bool` - whether to include the variable in the return statement. Defaults to False.

Additional options for the `intermediate_transform` method are:
- `distinct`: `bool` - whether to deduplicate the results. Defaults to False.

Here is a full example::

await Coffee.nodes.fetch_relations("suppliers")
.intermediate_transform(
{
"coffee": "coffee",
"suppliers": NodeNameResolver("suppliers"),
"r": RelationNameResolver("suppliers"),
"coffee": {"source": "coffee", "include_in_return": True}, # Only coffee will be returned
"suppliers": {"source": NodeNameResolver("suppliers")},
"r": {"source": RelationNameResolver("suppliers")},
"cost": {
"source": NodeNameResolver("suppliers"),
"source_prop": "delivery_cost",
},
},
distinct=True,
ordering=["-r.since"],
)
.annotate(oldest_supplier=Last(Collect("suppliers")))
.all()

Subqueries
----------

Expand All @@ -71,7 +102,7 @@ The `subquery` method allows you to perform a `Cypher subquery <https://neo4j.co
.subquery(
Coffee.nodes.traverse_relations(suppliers="suppliers")
.intermediate_transform(
{"suppliers": "suppliers"}, ordering=["suppliers.delivery_cost"]
{"suppliers": {"source": "suppliers"}}, ordering=["suppliers.delivery_cost"]
)
.annotate(supps=Last(Collect("suppliers"))),
["supps"],
Expand Down Expand Up @@ -108,4 +139,4 @@ In some cases though, it is not possible to set explicit aliases, for example wh

.. note::

When using the resolvers in combination with a traversal as in the example above, it will resolve the variable name of the last element in the traversal - the Species node for NodeNameResolver, and Coffee--Species relationship for RelationshipNameResolver.
When using the resolvers in combination with a traversal as in the example above, it will resolve the variable name of the last element in the traversal - the Species node for NodeNameResolver, and Coffee--Species relationship for RelationshipNameResolver.
2 changes: 1 addition & 1 deletion doc/source/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Adjust driver configuration - these options are only available for this connecti
config.MAX_TRANSACTION_RETRY_TIME = 30.0 # default
config.RESOLVER = None # default
config.TRUST = neo4j.TRUST_SYSTEM_CA_SIGNED_CERTIFICATES # default
config.USER_AGENT = neomodel/v5.4.0 # default
config.USER_AGENT = neomodel/v5.4.1 # default

Setting the database name, if different from the default one::

Expand Down
24 changes: 21 additions & 3 deletions doc/source/transactions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ Explicit Transactions
Neomodel also supports `explicit transactions <https://neo4j.com/docs/
api/python-driver/current/transactions.html>`_ that are pre-designated as either *read* or *write*.

This is vital when using neomodel over a `Neo4J causal cluster <https://neo4j.com/docs/
This is vital when using neomodel over a `Neo4j causal cluster <https://neo4j.com/docs/
operations-manual/current/clustering/>`_ because internally, queries will be rerouted to different
servers depending on their designation.

Expand Down Expand Up @@ -168,7 +168,7 @@ Impersonation

*Neo4j Enterprise feature*

Impersonation (`see Neo4j driver documentation <https://neo4j.com/docs/api/python-driver/current/api.html#impersonated-user-ref>``)
Impersonation (`see Neo4j driver documentation <https://neo4j.com/docs/api/python-driver/current/api.html#impersonated-user-ref>`_)
can be enabled via a context manager::

from neomodel import db
Expand Down Expand Up @@ -197,4 +197,22 @@ This can be mixed with other context manager like transactions::

@db.transaction()
def func2():
...
...


Parallel runtime
----------------

As of version 5.13, Neo4j *Enterprise Edition* supports parallel runtime for read transactions.

To use it, you can simply use the `parallel_read_transaction` context manager::

from neomodel import db

with db.parallel_read_transaction:
# It works for both neomodel-generated and custom Cypher queries
parallel_count_1 = len(Coffee.nodes)
parallel_count_2 = db.cypher_query("MATCH (n:Coffee) RETURN count(n)")

It is worth noting that the parallel runtime is only available for read transactions and that it is not enabled by default, because it is not always the fastest option. It is recommended to test it in your specific use case to see if it improves performance, and read the general considerations in the `Neo4j official documentation <https://neo4j.com/docs/cypher-manual/current/planning-and-tuning/runtimes/concepts/#runtimes-parallel-runtime-considerations>`_.

2 changes: 1 addition & 1 deletion neomodel/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "5.4.0"
__version__ = "5.4.1"
34 changes: 32 additions & 2 deletions neomodel/async_/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def __init__(self):
self._database_version = None
self._database_edition = None
self.impersonated_user = None
self._parallel_runtime = False

async def set_connection(self, url: str = None, driver: AsyncDriver = None):
"""
Expand Down Expand Up @@ -239,6 +240,10 @@ def write_transaction(self):
def read_transaction(self):
return AsyncTransactionProxy(self, access_mode="READ")

@property
def parallel_read_transaction(self):
return AsyncTransactionProxy(self, access_mode="READ", parallel_runtime=True)

async def impersonate(self, user: str) -> "ImpersonationHandler":
"""All queries executed within this context manager will be executed as impersonated user
Expand Down Expand Up @@ -454,7 +459,6 @@ async def cypher_query(
:return: A tuple containing a list of results and a tuple of headers.
"""

if self._active_transaction:
# Use current session is a transaction is currently active
results, meta = await self._run_cypher_query(
Expand Down Expand Up @@ -493,6 +497,8 @@ async def _run_cypher_query(
try:
# Retrieve the data
start = time.time()
if self._parallel_runtime:
query = "CYPHER runtime=parallel " + query
response: AsyncResult = await session.run(query, params)
results, meta = [list(r.values()) async for r in response], response.keys()
end = time.time()
Expand Down Expand Up @@ -598,6 +604,18 @@ async def edition_is_enterprise(self) -> bool:
edition = await self.database_edition
return edition == "enterprise"

@ensure_connection
async def parallel_runtime_available(self) -> bool:
"""Returns true if the database supports parallel runtime
Returns:
bool: True if the database supports parallel runtime
"""
return (
await self.version_is_higher_than("5.13")
and await self.edition_is_enterprise()
)

async def change_neo4j_password(self, user, new_password):
await self.cypher_query(f"ALTER USER {user} SET PASSWORD '{new_password}'")

Expand Down Expand Up @@ -1168,17 +1186,29 @@ async def install_all_labels(stdout=None):
class AsyncTransactionProxy:
bookmarks: Optional[Bookmarks] = None

def __init__(self, db: AsyncDatabase, access_mode=None):
def __init__(
self, db: AsyncDatabase, access_mode: str = None, parallel_runtime: bool = False
):
self.db = db
self.access_mode = access_mode
self.parallel_runtime = parallel_runtime

@ensure_connection
async def __aenter__(self):
if self.parallel_runtime and not await self.db.parallel_runtime_available():
warnings.warn(
"Parallel runtime is only available in Neo4j Enterprise Edition 5.13 and above. "
"Reverting to default runtime.",
UserWarning,
)
self.parallel_runtime = False
self.db._parallel_runtime = self.parallel_runtime
await self.db.begin(access_mode=self.access_mode, bookmarks=self.bookmarks)
self.bookmarks = None
return self

async def __aexit__(self, exc_type, exc_value, traceback):
self.db._parallel_runtime = False
if exc_value:
await self.db.rollback()

Expand Down
87 changes: 51 additions & 36 deletions neomodel/async_/match.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import inspect
import re
import string
import warnings
from dataclasses import dataclass
from typing import Any, Dict, List
from typing import Optional as TOptional
Expand All @@ -12,6 +13,7 @@
from neomodel.exceptions import MultipleNodesReturned
from neomodel.match_q import Q, QBase
from neomodel.properties import AliasProperty, ArrayProperty, Property
from neomodel.typing import Transformation
from neomodel.util import INCOMING, OUTGOING

CYPHER_ACTIONS_WITH_SIDE_EFFECT_EXPR = re.compile(r"(?i:MERGE|CREATE|DELETE|DETACH)")
Expand Down Expand Up @@ -588,9 +590,11 @@ def build_traversal_from_path(
}
else:
existing_rhs_name = subgraph[part][
"rel_variable_name"
if relation.get("relation_filtering")
else "variable_name"
(
"rel_variable_name"
if relation.get("relation_filtering")
else "variable_name"
)
]
if relation["include_in_return"] and not already_present:
self._additional_return(rel_ident)
Expand Down Expand Up @@ -838,32 +842,27 @@ def build_query(self) -> str:
query += " WITH "
query += self._ast.with_clause

returned_items: list[str] = []
if hasattr(self.node_set, "_intermediate_transforms"):
for transform in self.node_set._intermediate_transforms:
query += " WITH "
query += "DISTINCT " if transform.get("distinct") else ""
injected_vars: list = []
# Reset return list since we'll probably invalidate most variables
self._ast.return_clause = ""
self._ast.additional_return = []
for name, source in transform["vars"].items():
if type(source) is str:
injected_vars.append(f"{source} AS {name}")
elif isinstance(source, RelationNameResolver):
result = self.lookup_query_variable(
source.relation, return_relation=True
)
if not result:
raise ValueError(
f"Unable to resolve variable name for relation {source.relation}."
)
injected_vars.append(f"{result[0]} AS {name}")
elif isinstance(source, NodeNameResolver):
result = self.lookup_query_variable(source.node)
if not result:
raise ValueError(
f"Unable to resolve variable name for node {source.node}."
)
injected_vars.append(f"{result[0]} AS {name}")
for name, varprops in transform["vars"].items():
source = varprops["source"]
if isinstance(source, (NodeNameResolver, RelationNameResolver)):
transformation = source.resolve(self)
else:
transformation = source
if varprops.get("source_prop"):
transformation += f".{varprops['source_prop']}"
transformation += f" AS {name}"
if varprops.get("include_in_return"):
returned_items += [name]
injected_vars.append(transformation)
query += ",".join(injected_vars)
if not transform["ordering"]:
continue
Expand All @@ -879,7 +878,6 @@ def build_query(self) -> str:
ordering.append(item)
query += ",".join(ordering)

returned_items: list[str] = []
if hasattr(self.node_set, "_subqueries"):
for subquery, return_set in self.node_set._subqueries:
outer_primary_var = self._ast.return_clause
Expand Down Expand Up @@ -978,7 +976,9 @@ async def _execute(self, lazy: bool = False, dict_output: bool = False):
]
query = self.build_query()
results, prop_names = await adb.cypher_query(
query, self._query_params, resolve_objects=True
query,
self._query_params,
resolve_objects=True,
)
if dict_output:
for item in results:
Expand Down Expand Up @@ -1098,6 +1098,14 @@ class RelationNameResolver:

relation: str

def resolve(self, qbuilder: AsyncQueryBuilder) -> str:
result = qbuilder.lookup_query_variable(self.relation, True)
if result is None:
raise ValueError(
f"Unable to resolve variable name for relation {self.relation}"
)
return result[0]


@dataclass
class NodeNameResolver:
Expand All @@ -1111,6 +1119,12 @@ class NodeNameResolver:

node: str

def resolve(self, qbuilder: AsyncQueryBuilder) -> str:
result = qbuilder.lookup_query_variable(self.node)
if result is None:
raise ValueError(f"Unable to resolve variable name for node {self.node}")
return result[0]


@dataclass
class BaseFunction:
Expand All @@ -1123,15 +1137,10 @@ def get_internal_name(self) -> str:
return self._internal_name

def resolve_internal_name(self, qbuilder: AsyncQueryBuilder) -> str:
if isinstance(self.input_name, NodeNameResolver):
result = qbuilder.lookup_query_variable(self.input_name.node)
elif isinstance(self.input_name, RelationNameResolver):
result = qbuilder.lookup_query_variable(self.input_name.relation, True)
if isinstance(self.input_name, (NodeNameResolver, RelationNameResolver)):
self._internal_name = self.input_name.resolve(qbuilder)
else:
result = (str(self.input_name), None)
if result is None:
raise ValueError(f"Unknown variable {self.input_name} used in Collect()")
self._internal_name = result[0]
self._internal_name = str(self.input_name)
return self._internal_name

def render(self, qbuilder: AsyncQueryBuilder) -> str:
Expand Down Expand Up @@ -1538,20 +1547,26 @@ async def subquery(
return self

def intermediate_transform(
self, vars: Dict[str, Any], ordering: TOptional[list] = None
self,
vars: Dict[str, Transformation],
distinct: bool = False,
ordering: TOptional[list] = None,
) -> "AsyncNodeSet":
if not vars:
raise ValueError(
"You must provide one variable at least when calling intermediate_transform()"
)
for name, source in vars.items():
for name, props in vars.items():
source = props["source"]
if type(source) is not str and not isinstance(
source, (NodeNameResolver, RelationNameResolver)
source, (NodeNameResolver, RelationNameResolver, RawCypher)
):
raise ValueError(
f"Wrong source type specified for variable '{name}', should be a string or an instance of NodeNameResolver or RelationNameResolver"
)
self._intermediate_transforms.append({"vars": vars, "ordering": ordering})
self._intermediate_transforms.append(
{"vars": vars, "distinct": distinct, "ordering": ordering}
)
return self


Expand Down
Loading

0 comments on commit b4c8e5e

Please sign in to comment.