Skip to content

Commit

Permalink
Query function rework
Browse files Browse the repository at this point in the history
This PR reworks the main functions used to query/create nodes.
Both get_node and batch_get_nodes functions now have a similar interface
and behavior.
Added support for multi-label nodes.

- batch_get_nodes_by_single_prop
  - Renamed from batch_get_nodes
  - Added create parameter for consistency
- get_node
  - Change default value of create parameter to True. There was no
    instance where this function was called with create=False and now it
    is consistent with batch_get_nodes.
  - Change interface to be the same as batch_get_nodes.
  - Id properties used as the search predicate must now be specified
    explicitly instead of being inferred from constraints.
- batch_get_nodes
  - New function that supports querying/creating nodes with multiple
    properties, where as subset of the properties can be used as the
    search predicate.
- batch_add_node_label
  - New function to add additional labels to existing nodes.
- batch_create_nodes
  - Deleted due to inflexibility and danger of creating duplicate nodes.
  • Loading branch information
m-appel committed Dec 17, 2023
1 parent b358b06 commit baca820
Show file tree
Hide file tree
Showing 37 changed files with 287 additions and 201 deletions.
267 changes: 179 additions & 88 deletions iyp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
from datetime import datetime, time, timezone
from shutil import rmtree

from neo4j.exceptions import ConstraintError

from neo4j import GraphDatabase

# Usual constraints on nodes' properties
Expand Down Expand Up @@ -182,27 +180,29 @@ def rollback(self):
self.tx.rollback()
self.tx = self.session.begin_transaction()

def batch_get_nodes(self, type, prop_name, prop_set=set(), all=True):
"""Find the ID of all nodes in the graph for the given type (label) and check
that a node exists for each value in prop_set for the property prop. Create
these nodes if they don't exist.
def batch_get_nodes_by_single_prop(self, label, prop_name, prop_set=set(), all=True, create=True):
"""Find the ID of all nodes in the graph for the given label and check that a
node exists for each value in prop_set for the property prop. Create these nodes
if they don't exist.
Notice: this is a costly operation if there is a lot of nodes for the
given type. To return only the nodes corresponding to prop_set values
set all=False.
This method commit changes to neo4j.
This method commits changes to the database.
"""
if type(label) is list and create:
raise NotImplementedError('Can not implicitly create multi-label nodes.')

if prop_set and prop_name in prop_formatters:
prop_set = set(map(prop_formatters[prop_name], prop_set))

if all:
existing_nodes = self.tx.run(f'MATCH (n:{type}) RETURN n.{prop_name} AS {prop_name}, ID(n) AS _id')
existing_nodes = self.tx.run(f'MATCH (n:{label}) RETURN n.{prop_name} AS {prop_name}, ID(n) AS _id')
else:
list_prop = list(prop_set)
existing_nodes = self.tx.run(f"""
WITH $list_prop AS list_prop
MATCH (n:{type})
MATCH (n:{label})
WHERE n.{prop_name} IN list_prop
RETURN n.{prop_name} AS {prop_name}, ID(n) AS _id""", list_prop=list_prop)

Expand All @@ -212,119 +212,210 @@ def batch_get_nodes(self, type, prop_name, prop_set=set(), all=True):
missing_nodes = [{prop_name: val} for val in missing_props]

# Create missing nodes
for i in range(0, len(missing_nodes), BATCH_SIZE):
batch = missing_nodes[i:i + BATCH_SIZE]
if create:
for i in range(0, len(missing_nodes), BATCH_SIZE):
batch = missing_nodes[i:i + BATCH_SIZE]

create_query = f"""WITH $batch AS batch
UNWIND batch AS item CREATE (n:{type})
SET n += item RETURN n.{prop_name} AS {prop_name}, ID(n) AS _id"""
create_query = f"""WITH $batch AS batch
UNWIND batch AS item CREATE (n:{label})
SET n = item RETURN n.{prop_name} AS {prop_name}, ID(n) AS _id"""

new_nodes = self.tx.run(create_query, batch=batch)
new_nodes = self.tx.run(create_query, batch=batch)

for node in new_nodes:
ids[node[prop_name]] = node['_id']
for node in new_nodes:
ids[node[prop_name]] = node['_id']

self.commit()

return ids

def get_node(self, type, prop, create=False):
def batch_get_nodes(self, label, properties, id_properties=list(), create=True):
"""Find the IDs of all nodes in the graph for the given label and properties.
label: a str for a single label or a list of str for multiple labels. Multiple
labels are only supported with create=False.
properties: a list of dicts containing the node properties that should be
fetched/set.
id_properties: a list of keys from properties that should be used as the search
predicate. Can be empty if only one node property is given. The order of keys in
this list also defines the order of values for the returned id map.
create: a bool specifying if new nodes shall be created for missing properties.
"""
# HOW TO USE THIS FUNCTION
#
# To _only get_ nodes:
# Call with create=False.
# You can specify a list of labels.
# When getting nodes based on a single property, id_properties can be empty as
# the property name will be inferred automatically.
# When getting nodes based on multiple properties, all of them have to be
# specified in id_properties. PROPERTIES THAT ARE NOT LISTED IN id_properties
# WILL BE IGNORED!
#
# For example:
# properties = [{'id': 1, 'asn_v4': 64496}, {'id': 2, 'asn_v4': 64497}]
# batch_get_nodes('AtlasProbe', properties, ['id', 'asn_v4'], create=False)
# This would return the node ids for these nodes (if they exist) as a dict
# like this (assuming x and y are the node's ids):
# {(1, 64496): x, (2, 64497): y}
#
#
# To get/update/create nodes:
# Call with create=True.
# Only a single label string can be specified.
# This function guarantees that all properties are assigned to nodes. If
# needed, nodes are created.
# Like above, if there is only one property specified, id_properties can be
# empty.
# In contrast to above, if there are multiple properties not all of them have
# to be present in id_properties. id_properties specifies which properties are
# used as a filtering predicate, whereas all of them will be assigned.
#
# For example:
# properties = [{'id': 1, 'asn_v4': 64496}, {'id': 2, 'asn_v4': 64497}]
# batch_get_nodes('AtlasProbe', properties, ['id'])
# Assuming (:AtlasProbe {'id': 1}) already exists, then this function would
# set the asn_v4 property of the existing node to 64496 and it would create a
# new node (:AtlasProbe {'id': 2}) and set the asn_v4 property of that node to
# 64497.
# The returned id map would be:
# {1: x, 2: y}
if isinstance(label, list) and create:
raise NotImplementedError('Can not implicitly create multi-label nodes.')

properties = [format_properties(props) for props in properties]

# Assemble label
label_str = str(label)
if isinstance(label, list):
label_str = ':'.join(label)

if not id_properties:
# We assume that all property dicts have the same keys.
example_props = properties[0]
# Implicit id property.
if len(example_props) != 1:
# In the single get_node case we return the id of the node directly, but
# here we return a map of id_properties to id. If there is more than one
# property, the order of the keys in the dictionary is not really clear,
# so the user should pass an explicit order in id_properties instead.
raise ValueError('batch_get_nodes only supports implicit id property if a single property is passed.')
id_properties = list(example_props.keys())

# Assemble "WHERE" and RETURN clauses.
# The WHERE clause in this case in not an explicit WHERE clause, but the
# predicate that is contained within the node specification.
# For id_properties = ['x', 'y'] this will result in
# {x: prop.x, y: prop.y}
# The RETURN clause is actually only a part, namely
# a.x AS x, a.y AS y
# for the example above.
where_clause = ['{']
return_clause = list()
for prop in id_properties:
where_clause += [f'{prop}: prop.{prop}', ',']
return_clause += [f'a.{prop} AS {prop}', ',']
where_clause.pop()
where_clause.append('}')
where_clause_str = ''.join(where_clause)
return_clause.pop()
return_clause_str = ''.join(return_clause)

action = 'MATCH'
set_line = str()
if create:
action = 'MERGE'
set_line = 'SET a += prop'

query = f"""UNWIND $props AS prop
{action} (a:{label_str} {where_clause_str})
{set_line}
RETURN {return_clause_str}, ID(a) AS _id"""

ids = dict()
for i in range(0, len(properties), BATCH_SIZE):
props = properties[i: i + BATCH_SIZE]
results = self.tx.run(query, props=props)
if len(id_properties) == 1:
# Single id property results in a simple key-to-value mapping.
for r in results:
ids[r[id_properties[0]]] = r['_id']
else:
# Multiple id properties result in a tuple-to-value mapping where the
# order of values in the tuple is defined by the order of keys in
# id_properties.
for r in results:
id_key = tuple([r[prop] for prop in id_properties])
ids[id_key] = r['_id']
self.commit()
return ids

def get_node(self, label, properties, id_properties=list(), create=True):
"""Find the ID of a node in the graph with the possibility to create it if it
is not in the graph.
type: either a string or list of strings giving the type(s) of the node.
prop: dictionary of attributes for the node.
label: either a string or list of strings giving the node label(s). A list
(multiple labels) can only be used with create=False.
properties: dictionary of node properties.
id_properties: list of keys from properties that should be used as the search
predicate. If empty, all properties will be used.
create: if the node doesn't exist, the node can be added to the database
by setting create=True.
Return the node ID or None if the node does not exist and create=False.
"""

prop = format_properties(prop)
if isinstance(label, list) and create:
raise NotImplementedError('Can not implicitly create multi-label nodes.')

properties = format_properties(properties)

# put type in a list
type_str = str(type)
if isinstance(type, list):
type_str = ':'.join(type)
else:
type = [type]
label_str = str(label)
if isinstance(label, list):
label_str = ':'.join(label)

if create:
has_constraints = NODE_CONSTRAINTS_LABELS.intersection(type)
if len(has_constraints):
# MERGE node with constraints
# Search on the constraints and set other values
label = has_constraints.pop()
constraint_prop = dict([(c, prop[c]) for c in NODE_CONSTRAINTS[label].keys()])

# values = ', '.join([ f"a.{p} = {val}" for p, val in prop.items() ])
labels = ', '.join([f'a:{label}' for label in type])

# TODO: fix this. Not working as expected. e.g. getting prefix
# with a descr in prop
try:
result = self.tx.run(
f"""MERGE (a:{label} {dict2str(constraint_prop)})
ON MATCH
SET {dict2str(prop, eq='=', pfx='a.')[1:-1]}, {labels}
ON CREATE
SET {dict2str(prop, eq='=', pfx='a.')[1:-1]}, {labels}
RETURN ID(a)"""
).single()
except ConstraintError:
sys.stderr.write(f'cannot merge {prop}')
result = self.tx.run(
f"""MATCH (a:{label} {dict2str(constraint_prop)}) RETURN ID(a)""").single()

# No explicit id properties means all specified properties should be treated
# as id properties.
if not id_properties:
id_property_dict = properties
else:
# MERGE node without constraints
result = self.tx.run(f'MERGE (a:{type_str} {dict2str(prop)}) RETURN ID(a)').single()
id_property_dict = {prop: properties[prop] for prop in id_properties}
result = self.tx.run(
f"""MERGE (a:{label} {dict2str(id_property_dict)})
SET a += {dict2str(properties)}
RETURN ID(a)"""
).single()
else:
# MATCH node
result = self.tx.run(f'MATCH (a:{type_str} {dict2str(prop)}) RETURN ID(a)').single()
result = self.tx.run(f'MATCH (a:{label_str} {dict2str(properties)}) RETURN ID(a)').single()

if result is not None:
return result[0]
else:
return None

def batch_create_nodes(self, type, id_prop: str, node_props: list):
"""Create multiple nodes in batches based on the entries in node_props.
type: either a string or list of strings giving the type(s) of the node.
id_prop: the name of the property whose value should be used as key for the
returned ID map.
node_props: list of dictionaries of attributes for the nodes.
def batch_add_node_label(self, node_ids, label):
"""Add additional labels to existing nodes.
Return a map of node IDs mapping the value of id_prop to the ID of the
corresponding node.
node_ids: list of node ids
label: label string or list of label strings
"""

node_props = [format_properties(prop) for prop in node_props]

# put type in a list
type_str = str(type)
if isinstance(type, list):
type_str = ':'.join(type)

ids = dict()
# create nodes in batches
for i in range(0, len(node_props), BATCH_SIZE):
batch = node_props[i:i + BATCH_SIZE]

create_query = f"""WITH $batch AS batch
UNWIND batch AS item CREATE (n:{type_str})
SET n = item RETURN n.{id_prop} AS {id_prop}, ID(n) AS _id"""

new_nodes = self.tx.run(create_query, batch=batch)

for node in new_nodes:
ids[node[id_prop]] = node['_id']

label_str = str(label)
if type(label) is list:
label_str = ':'.join(label)

for i in range(0, len(node_ids), BATCH_SIZE):
batch = node_ids[i:i + BATCH_SIZE]

self.tx.run(f"""WITH $batch AS batch
MATCH (n)
WHERE ID(n) IN batch
SET n:{label_str}""",
batch=batch)
self.commit()

return ids

def batch_get_node_extid(self, id_type):
"""Find all nodes in the graph which have an EXTERNAL_ID relationship with the
given id_type.
Expand Down
8 changes: 4 additions & 4 deletions iyp/crawlers/apnic/eyeball.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ def run(self):
logging.info(f'processing {country}')

# Get the QID of the country and corresponding ranking
cc_qid = self.iyp.get_node('Country', {'country_code': cc}, create=True)
ranking_qid = self.iyp.get_node('Ranking', {'name': f'APNIC eyeball estimates ({cc})'}, create=True)
cc_qid = self.iyp.get_node('Country', {'country_code': cc})
ranking_qid = self.iyp.get_node('Ranking', {'name': f'APNIC eyeball estimates ({cc})'})
statements = [['COUNTRY', cc_qid, self.reference]]
self.iyp.add_links(ranking_qid, statements)

Expand All @@ -57,8 +57,8 @@ def run(self):
names.add(asn['autnum'])

# Get node IDs
self.asn_id = self.iyp.batch_get_nodes('AS', 'asn', asns, all=False)
self.name_id = self.iyp.batch_get_nodes('Name', 'name', names, all=False)
self.asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns, all=False)
self.name_id = self.iyp.batch_get_nodes_by_single_prop('Name', 'name', names, all=False)

# Compute links
country_links = []
Expand Down
2 changes: 1 addition & 1 deletion iyp/crawlers/bgpkit/as2rel.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def run(self):
rels.append(rel)

# get ASNs IDs
self.asn_id = self.iyp.batch_get_nodes('AS', 'asn', asns)
self.asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns)

# Compute links
links = []
Expand Down
5 changes: 2 additions & 3 deletions iyp/crawlers/bgpkit/peerstats.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ def run(self):
stats = json.load(bz2.open(req.raw))
collector_qid = self.iyp.get_node(
'BGPCollector',
{'name': stats['collector'], 'project': stats['project']},
create=True
{'name': stats['collector'], 'project': stats['project']}
)
self.reference['reference_url'] = url

Expand All @@ -75,7 +74,7 @@ def run(self):
asns.add(peer['asn'])

# get ASNs' IDs
self.asn_id = self.iyp.batch_get_nodes('AS', 'asn', asns, all=False)
self.asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns, all=False)

# Compute links
links = []
Expand Down
4 changes: 2 additions & 2 deletions iyp/crawlers/bgpkit/pfx2asn.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ def run(self):

logging.info('Pushing nodes to neo4j...\n')
# get ASNs and prefixes IDs
self.asn_id = self.iyp.batch_get_nodes('AS', 'asn', asns)
self.prefix_id = self.iyp.batch_get_nodes('Prefix', 'prefix', prefixes)
self.asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns)
self.prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes)

# Compute links
links = []
Expand Down
Loading

0 comments on commit baca820

Please sign in to comment.