From baca82057d7256ba281585039dc790f4e89a53eb Mon Sep 17 00:00:00 2001 From: Malte Tashiro Date: Sun, 17 Dec 2023 06:15:33 +0000 Subject: [PATCH 1/5] Query function rework This PR reworks the main functions used to query/create nodes. Both get_node and batch_get_nodes functions now have a similar interface and behavior. Added support for multi-label nodes. - batch_get_nodes_by_single_prop - Renamed from batch_get_nodes - Added create parameter for consistency - get_node - Change default value of create parameter to True. There was no instance where this function was called with create=False and now it is consistent with batch_get_nodes. - Change interface to be the same as batch_get_nodes. - Id properties used as the search predicate must now be specified explicitly instead of being inferred from constraints. - batch_get_nodes - New function that supports querying/creating nodes with multiple properties, where as subset of the properties can be used as the search predicate. - batch_add_node_label - New function to add additional labels to existing nodes. - batch_create_nodes - Deleted due to inflexibility and danger of creating duplicate nodes. --- iyp/__init__.py | 267 +++++++++++++------ iyp/crawlers/apnic/eyeball.py | 8 +- iyp/crawlers/bgpkit/as2rel.py | 2 +- iyp/crawlers/bgpkit/peerstats.py | 5 +- iyp/crawlers/bgpkit/pfx2asn.py | 4 +- iyp/crawlers/bgptools/anycast_prefixes.py | 4 +- iyp/crawlers/bgptools/as_names.py | 4 +- iyp/crawlers/bgptools/tags.py | 4 +- iyp/crawlers/caida/asrank.py | 8 +- iyp/crawlers/cisco/umbrella_top1M.py | 4 +- iyp/crawlers/citizenlab/urldb.py | 4 +- iyp/crawlers/cloudflare/dns_top_ases.py | 2 +- iyp/crawlers/cloudflare/dns_top_locations.py | 2 +- iyp/crawlers/cloudflare/ranking_bucket.py | 4 +- iyp/crawlers/cloudflare/top100.py | 4 +- iyp/crawlers/emileaben/as_names.py | 4 +- iyp/crawlers/example/crawler.py | 5 +- iyp/crawlers/ihr/__init__.py | 6 +- iyp/crawlers/ihr/country_dependency.py | 8 +- iyp/crawlers/ihr/rov.py | 20 +- iyp/crawlers/inetintel/as_org.py | 4 +- iyp/crawlers/manrs/members.py | 9 +- iyp/crawlers/nro/delegated_stats.py | 8 +- iyp/crawlers/openintel/__init__.py | 10 +- iyp/crawlers/pch/__init__.py | 4 +- iyp/crawlers/peeringdb/fac.py | 10 +- iyp/crawlers/peeringdb/ix.py | 24 +- iyp/crawlers/peeringdb/org.py | 10 +- iyp/crawlers/ripe/as_names.py | 6 +- iyp/crawlers/ripe/atlas_probes.py | 8 +- iyp/crawlers/ripe/roa.py | 4 +- iyp/crawlers/stanford/asdb.py | 4 +- iyp/crawlers/tranco/top1M.py | 4 +- iyp/post/country_information.py | 2 +- iyp/post/dns_hierarchy.py | 4 +- iyp/post/ip2prefix.py | 4 +- iyp/post/url2domain.py | 4 +- 37 files changed, 287 insertions(+), 201 deletions(-) diff --git a/iyp/__init__.py b/iyp/__init__.py index 76749f9..91ac46a 100644 --- a/iyp/__init__.py +++ b/iyp/__init__.py @@ -7,8 +7,6 @@ from datetime import datetime, time, timezone from shutil import rmtree -from neo4j.exceptions import ConstraintError - from neo4j import GraphDatabase # Usual constraints on nodes' properties @@ -182,27 +180,29 @@ def rollback(self): self.tx.rollback() self.tx = self.session.begin_transaction() - def batch_get_nodes(self, type, prop_name, prop_set=set(), all=True): - """Find the ID of all nodes in the graph for the given type (label) and check - that a node exists for each value in prop_set for the property prop. Create - these nodes if they don't exist. + def batch_get_nodes_by_single_prop(self, label, prop_name, prop_set=set(), all=True, create=True): + """Find the ID of all nodes in the graph for the given label and check that a + node exists for each value in prop_set for the property prop. Create these nodes + if they don't exist. Notice: this is a costly operation if there is a lot of nodes for the given type. To return only the nodes corresponding to prop_set values set all=False. - This method commit changes to neo4j. + This method commits changes to the database. """ + if type(label) is list and create: + raise NotImplementedError('Can not implicitly create multi-label nodes.') if prop_set and prop_name in prop_formatters: prop_set = set(map(prop_formatters[prop_name], prop_set)) if all: - existing_nodes = self.tx.run(f'MATCH (n:{type}) RETURN n.{prop_name} AS {prop_name}, ID(n) AS _id') + existing_nodes = self.tx.run(f'MATCH (n:{label}) RETURN n.{prop_name} AS {prop_name}, ID(n) AS _id') else: list_prop = list(prop_set) existing_nodes = self.tx.run(f""" WITH $list_prop AS list_prop - MATCH (n:{type}) + MATCH (n:{label}) WHERE n.{prop_name} IN list_prop RETURN n.{prop_name} AS {prop_name}, ID(n) AS _id""", list_prop=list_prop) @@ -212,119 +212,210 @@ def batch_get_nodes(self, type, prop_name, prop_set=set(), all=True): missing_nodes = [{prop_name: val} for val in missing_props] # Create missing nodes - for i in range(0, len(missing_nodes), BATCH_SIZE): - batch = missing_nodes[i:i + BATCH_SIZE] + if create: + for i in range(0, len(missing_nodes), BATCH_SIZE): + batch = missing_nodes[i:i + BATCH_SIZE] - create_query = f"""WITH $batch AS batch - UNWIND batch AS item CREATE (n:{type}) - SET n += item RETURN n.{prop_name} AS {prop_name}, ID(n) AS _id""" + create_query = f"""WITH $batch AS batch + UNWIND batch AS item CREATE (n:{label}) + SET n = item RETURN n.{prop_name} AS {prop_name}, ID(n) AS _id""" - new_nodes = self.tx.run(create_query, batch=batch) + new_nodes = self.tx.run(create_query, batch=batch) - for node in new_nodes: - ids[node[prop_name]] = node['_id'] + for node in new_nodes: + ids[node[prop_name]] = node['_id'] self.commit() return ids - def get_node(self, type, prop, create=False): + def batch_get_nodes(self, label, properties, id_properties=list(), create=True): + """Find the IDs of all nodes in the graph for the given label and properties. + + label: a str for a single label or a list of str for multiple labels. Multiple + labels are only supported with create=False. + properties: a list of dicts containing the node properties that should be + fetched/set. + id_properties: a list of keys from properties that should be used as the search + predicate. Can be empty if only one node property is given. The order of keys in + this list also defines the order of values for the returned id map. + create: a bool specifying if new nodes shall be created for missing properties. + """ + # HOW TO USE THIS FUNCTION + # + # To _only get_ nodes: + # Call with create=False. + # You can specify a list of labels. + # When getting nodes based on a single property, id_properties can be empty as + # the property name will be inferred automatically. + # When getting nodes based on multiple properties, all of them have to be + # specified in id_properties. PROPERTIES THAT ARE NOT LISTED IN id_properties + # WILL BE IGNORED! + # + # For example: + # properties = [{'id': 1, 'asn_v4': 64496}, {'id': 2, 'asn_v4': 64497}] + # batch_get_nodes('AtlasProbe', properties, ['id', 'asn_v4'], create=False) + # This would return the node ids for these nodes (if they exist) as a dict + # like this (assuming x and y are the node's ids): + # {(1, 64496): x, (2, 64497): y} + # + # + # To get/update/create nodes: + # Call with create=True. + # Only a single label string can be specified. + # This function guarantees that all properties are assigned to nodes. If + # needed, nodes are created. + # Like above, if there is only one property specified, id_properties can be + # empty. + # In contrast to above, if there are multiple properties not all of them have + # to be present in id_properties. id_properties specifies which properties are + # used as a filtering predicate, whereas all of them will be assigned. + # + # For example: + # properties = [{'id': 1, 'asn_v4': 64496}, {'id': 2, 'asn_v4': 64497}] + # batch_get_nodes('AtlasProbe', properties, ['id']) + # Assuming (:AtlasProbe {'id': 1}) already exists, then this function would + # set the asn_v4 property of the existing node to 64496 and it would create a + # new node (:AtlasProbe {'id': 2}) and set the asn_v4 property of that node to + # 64497. + # The returned id map would be: + # {1: x, 2: y} + if isinstance(label, list) and create: + raise NotImplementedError('Can not implicitly create multi-label nodes.') + + properties = [format_properties(props) for props in properties] + + # Assemble label + label_str = str(label) + if isinstance(label, list): + label_str = ':'.join(label) + + if not id_properties: + # We assume that all property dicts have the same keys. + example_props = properties[0] + # Implicit id property. + if len(example_props) != 1: + # In the single get_node case we return the id of the node directly, but + # here we return a map of id_properties to id. If there is more than one + # property, the order of the keys in the dictionary is not really clear, + # so the user should pass an explicit order in id_properties instead. + raise ValueError('batch_get_nodes only supports implicit id property if a single property is passed.') + id_properties = list(example_props.keys()) + + # Assemble "WHERE" and RETURN clauses. + # The WHERE clause in this case in not an explicit WHERE clause, but the + # predicate that is contained within the node specification. + # For id_properties = ['x', 'y'] this will result in + # {x: prop.x, y: prop.y} + # The RETURN clause is actually only a part, namely + # a.x AS x, a.y AS y + # for the example above. + where_clause = ['{'] + return_clause = list() + for prop in id_properties: + where_clause += [f'{prop}: prop.{prop}', ','] + return_clause += [f'a.{prop} AS {prop}', ','] + where_clause.pop() + where_clause.append('}') + where_clause_str = ''.join(where_clause) + return_clause.pop() + return_clause_str = ''.join(return_clause) + + action = 'MATCH' + set_line = str() + if create: + action = 'MERGE' + set_line = 'SET a += prop' + + query = f"""UNWIND $props AS prop + {action} (a:{label_str} {where_clause_str}) + {set_line} + RETURN {return_clause_str}, ID(a) AS _id""" + + ids = dict() + for i in range(0, len(properties), BATCH_SIZE): + props = properties[i: i + BATCH_SIZE] + results = self.tx.run(query, props=props) + if len(id_properties) == 1: + # Single id property results in a simple key-to-value mapping. + for r in results: + ids[r[id_properties[0]]] = r['_id'] + else: + # Multiple id properties result in a tuple-to-value mapping where the + # order of values in the tuple is defined by the order of keys in + # id_properties. + for r in results: + id_key = tuple([r[prop] for prop in id_properties]) + ids[id_key] = r['_id'] + self.commit() + return ids + + def get_node(self, label, properties, id_properties=list(), create=True): """Find the ID of a node in the graph with the possibility to create it if it is not in the graph. - type: either a string or list of strings giving the type(s) of the node. - prop: dictionary of attributes for the node. + label: either a string or list of strings giving the node label(s). A list + (multiple labels) can only be used with create=False. + properties: dictionary of node properties. + id_properties: list of keys from properties that should be used as the search + predicate. If empty, all properties will be used. create: if the node doesn't exist, the node can be added to the database by setting create=True. Return the node ID or None if the node does not exist and create=False. """ - prop = format_properties(prop) + if isinstance(label, list) and create: + raise NotImplementedError('Can not implicitly create multi-label nodes.') + + properties = format_properties(properties) # put type in a list - type_str = str(type) - if isinstance(type, list): - type_str = ':'.join(type) - else: - type = [type] + label_str = str(label) + if isinstance(label, list): + label_str = ':'.join(label) if create: - has_constraints = NODE_CONSTRAINTS_LABELS.intersection(type) - if len(has_constraints): - # MERGE node with constraints - # Search on the constraints and set other values - label = has_constraints.pop() - constraint_prop = dict([(c, prop[c]) for c in NODE_CONSTRAINTS[label].keys()]) - - # values = ', '.join([ f"a.{p} = {val}" for p, val in prop.items() ]) - labels = ', '.join([f'a:{label}' for label in type]) - - # TODO: fix this. Not working as expected. e.g. getting prefix - # with a descr in prop - try: - result = self.tx.run( - f"""MERGE (a:{label} {dict2str(constraint_prop)}) - ON MATCH - SET {dict2str(prop, eq='=', pfx='a.')[1:-1]}, {labels} - ON CREATE - SET {dict2str(prop, eq='=', pfx='a.')[1:-1]}, {labels} - RETURN ID(a)""" - ).single() - except ConstraintError: - sys.stderr.write(f'cannot merge {prop}') - result = self.tx.run( - f"""MATCH (a:{label} {dict2str(constraint_prop)}) RETURN ID(a)""").single() - + # No explicit id properties means all specified properties should be treated + # as id properties. + if not id_properties: + id_property_dict = properties else: - # MERGE node without constraints - result = self.tx.run(f'MERGE (a:{type_str} {dict2str(prop)}) RETURN ID(a)').single() + id_property_dict = {prop: properties[prop] for prop in id_properties} + result = self.tx.run( + f"""MERGE (a:{label} {dict2str(id_property_dict)}) + SET a += {dict2str(properties)} + RETURN ID(a)""" + ).single() else: # MATCH node - result = self.tx.run(f'MATCH (a:{type_str} {dict2str(prop)}) RETURN ID(a)').single() + result = self.tx.run(f'MATCH (a:{label_str} {dict2str(properties)}) RETURN ID(a)').single() if result is not None: return result[0] else: return None - def batch_create_nodes(self, type, id_prop: str, node_props: list): - """Create multiple nodes in batches based on the entries in node_props. - - type: either a string or list of strings giving the type(s) of the node. - id_prop: the name of the property whose value should be used as key for the - returned ID map. - node_props: list of dictionaries of attributes for the nodes. + def batch_add_node_label(self, node_ids, label): + """Add additional labels to existing nodes. - Return a map of node IDs mapping the value of id_prop to the ID of the - corresponding node. + node_ids: list of node ids + label: label string or list of label strings """ - - node_props = [format_properties(prop) for prop in node_props] - - # put type in a list - type_str = str(type) - if isinstance(type, list): - type_str = ':'.join(type) - - ids = dict() - # create nodes in batches - for i in range(0, len(node_props), BATCH_SIZE): - batch = node_props[i:i + BATCH_SIZE] - - create_query = f"""WITH $batch AS batch - UNWIND batch AS item CREATE (n:{type_str}) - SET n = item RETURN n.{id_prop} AS {id_prop}, ID(n) AS _id""" - - new_nodes = self.tx.run(create_query, batch=batch) - - for node in new_nodes: - ids[node[id_prop]] = node['_id'] - + label_str = str(label) + if type(label) is list: + label_str = ':'.join(label) + + for i in range(0, len(node_ids), BATCH_SIZE): + batch = node_ids[i:i + BATCH_SIZE] + + self.tx.run(f"""WITH $batch AS batch + MATCH (n) + WHERE ID(n) IN batch + SET n:{label_str}""", + batch=batch) self.commit() - return ids - def batch_get_node_extid(self, id_type): """Find all nodes in the graph which have an EXTERNAL_ID relationship with the given id_type. diff --git a/iyp/crawlers/apnic/eyeball.py b/iyp/crawlers/apnic/eyeball.py index 594ff8f..4c043db 100644 --- a/iyp/crawlers/apnic/eyeball.py +++ b/iyp/crawlers/apnic/eyeball.py @@ -32,8 +32,8 @@ def run(self): logging.info(f'processing {country}') # Get the QID of the country and corresponding ranking - cc_qid = self.iyp.get_node('Country', {'country_code': cc}, create=True) - ranking_qid = self.iyp.get_node('Ranking', {'name': f'APNIC eyeball estimates ({cc})'}, create=True) + cc_qid = self.iyp.get_node('Country', {'country_code': cc}) + ranking_qid = self.iyp.get_node('Ranking', {'name': f'APNIC eyeball estimates ({cc})'}) statements = [['COUNTRY', cc_qid, self.reference]] self.iyp.add_links(ranking_qid, statements) @@ -57,8 +57,8 @@ def run(self): names.add(asn['autnum']) # Get node IDs - self.asn_id = self.iyp.batch_get_nodes('AS', 'asn', asns, all=False) - self.name_id = self.iyp.batch_get_nodes('Name', 'name', names, all=False) + self.asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns, all=False) + self.name_id = self.iyp.batch_get_nodes_by_single_prop('Name', 'name', names, all=False) # Compute links country_links = [] diff --git a/iyp/crawlers/bgpkit/as2rel.py b/iyp/crawlers/bgpkit/as2rel.py index 8bd89a5..9b2f9d6 100644 --- a/iyp/crawlers/bgpkit/as2rel.py +++ b/iyp/crawlers/bgpkit/as2rel.py @@ -34,7 +34,7 @@ def run(self): rels.append(rel) # get ASNs IDs - self.asn_id = self.iyp.batch_get_nodes('AS', 'asn', asns) + self.asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns) # Compute links links = [] diff --git a/iyp/crawlers/bgpkit/peerstats.py b/iyp/crawlers/bgpkit/peerstats.py index 14de495..b3fc7d2 100644 --- a/iyp/crawlers/bgpkit/peerstats.py +++ b/iyp/crawlers/bgpkit/peerstats.py @@ -63,8 +63,7 @@ def run(self): stats = json.load(bz2.open(req.raw)) collector_qid = self.iyp.get_node( 'BGPCollector', - {'name': stats['collector'], 'project': stats['project']}, - create=True + {'name': stats['collector'], 'project': stats['project']} ) self.reference['reference_url'] = url @@ -75,7 +74,7 @@ def run(self): asns.add(peer['asn']) # get ASNs' IDs - self.asn_id = self.iyp.batch_get_nodes('AS', 'asn', asns, all=False) + self.asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns, all=False) # Compute links links = [] diff --git a/iyp/crawlers/bgpkit/pfx2asn.py b/iyp/crawlers/bgpkit/pfx2asn.py index c82d08e..bd1025c 100644 --- a/iyp/crawlers/bgpkit/pfx2asn.py +++ b/iyp/crawlers/bgpkit/pfx2asn.py @@ -37,8 +37,8 @@ def run(self): logging.info('Pushing nodes to neo4j...\n') # get ASNs and prefixes IDs - self.asn_id = self.iyp.batch_get_nodes('AS', 'asn', asns) - self.prefix_id = self.iyp.batch_get_nodes('Prefix', 'prefix', prefixes) + self.asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns) + self.prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes) # Compute links links = [] diff --git a/iyp/crawlers/bgptools/anycast_prefixes.py b/iyp/crawlers/bgptools/anycast_prefixes.py index 2967a97..2661bb8 100644 --- a/iyp/crawlers/bgptools/anycast_prefixes.py +++ b/iyp/crawlers/bgptools/anycast_prefixes.py @@ -76,8 +76,8 @@ def update(self, res, filename: str): prefixes.add(line) lines.append(line) - prefix_id = self.iyp.batch_get_nodes('Prefix', 'prefix', prefixes) - tag_id = self.iyp.get_node('Tag', {'label': 'Anycast'}, create=True) + prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes) + tag_id = self.iyp.get_node('Tag', {'label': 'Anycast'}) links = [] for line in lines: diff --git a/iyp/crawlers/bgptools/as_names.py b/iyp/crawlers/bgptools/as_names.py index 5b76d4e..a977185 100644 --- a/iyp/crawlers/bgptools/as_names.py +++ b/iyp/crawlers/bgptools/as_names.py @@ -46,8 +46,8 @@ def run(self): lines.append([asn, name]) # get ASNs and names IDs - self.asn_id = self.iyp.batch_get_nodes('AS', 'asn', asns) - self.name_id = self.iyp.batch_get_nodes('Name', 'name', names) + self.asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns) + self.name_id = self.iyp.batch_get_nodes_by_single_prop('Name', 'name', names) # Compute links links = [] diff --git a/iyp/crawlers/bgptools/tags.py b/iyp/crawlers/bgptools/tags.py index 47ff390..d12bd5d 100644 --- a/iyp/crawlers/bgptools/tags.py +++ b/iyp/crawlers/bgptools/tags.py @@ -63,7 +63,7 @@ def run(self): print(req.text) sys.exit('Error while fetching AS names') - self.tag_qid = self.iyp.get_node('Tag', {'label': label}, create=True) + self.tag_qid = self.iyp.get_node('Tag', {'label': label}) for line in req.text.splitlines(): # skip header if line.startswith('asn'): @@ -71,7 +71,7 @@ def run(self): # Parse given line to get ASN, name, and country code asn, _, _ = line.partition(',') - asn_qid = self.iyp.get_node('AS', {'asn': asn[2:]}, create=True) + asn_qid = self.iyp.get_node('AS', {'asn': asn[2:]}) statements = [['CATEGORIZED', self.tag_qid, self.reference]] # Set AS name try: diff --git a/iyp/crawlers/caida/asrank.py b/iyp/crawlers/caida/asrank.py index 3b18393..b05de64 100644 --- a/iyp/crawlers/caida/asrank.py +++ b/iyp/crawlers/caida/asrank.py @@ -59,10 +59,10 @@ def run(self): # Get/create ASNs, names, and country nodes print('Pushing nodes.', file=sys.stderr) logging.info('Pushing nodes.') - self.asn_id = self.iyp.batch_get_nodes('AS', 'asn', asns) - self.country_id = self.iyp.batch_get_nodes('Country', 'country_code', countries) - self.name_id = self.iyp.batch_get_nodes('Name', 'name', names, all=False) - self.asrank_qid = self.iyp.get_node('Ranking', {'name': 'CAIDA ASRank'}, create=True) + self.asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns) + self.country_id = self.iyp.batch_get_nodes_by_single_prop('Country', 'country_code', countries) + self.name_id = self.iyp.batch_get_nodes_by_single_prop('Name', 'name', names, all=False) + self.asrank_qid = self.iyp.get_node('Ranking', {'name': 'CAIDA ASRank'}) # Compute links country_links = list() diff --git a/iyp/crawlers/cisco/umbrella_top1M.py b/iyp/crawlers/cisco/umbrella_top1M.py index 07f2bab..fd2c968 100644 --- a/iyp/crawlers/cisco/umbrella_top1M.py +++ b/iyp/crawlers/cisco/umbrella_top1M.py @@ -20,7 +20,7 @@ class Crawler(BaseCrawler): def run(self): """Fetch Umbrella top 1M and push to IYP.""" - self.cisco_qid = self.iyp.get_node('Ranking', {'name': 'Cisco Umbrella Top 1 million'}, create=True) + self.cisco_qid = self.iyp.get_node('Ranking', {'name': 'Cisco Umbrella Top 1 million'}) sys.stderr.write('Downloading latest list...\n') req = requests.get(URL) @@ -40,7 +40,7 @@ def run(self): links.append({'src_name': domain, 'dst_id': self.cisco_qid, 'props': [self.reference, {'rank': int(rank)}]}) - name_id = self.iyp.batch_get_nodes('DomainName', 'name', domains) + name_id = self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name', domains) for link in links: link['src_id'] = name_id[link['src_name']] diff --git a/iyp/crawlers/citizenlab/urldb.py b/iyp/crawlers/citizenlab/urldb.py index 7310409..c4c69aa 100644 --- a/iyp/crawlers/citizenlab/urldb.py +++ b/iyp/crawlers/citizenlab/urldb.py @@ -68,8 +68,8 @@ def run(self): continue lines.append([url, category]) - url_id = self.iyp.batch_get_nodes('URL', 'url', urls) - category_id = self.iyp.batch_get_nodes('Tag', 'label', categories) + url_id = self.iyp.batch_get_nodes_by_single_prop('URL', 'url', urls) + category_id = self.iyp.batch_get_nodes_by_single_prop('Tag', 'label', categories) links = [] for (url, category) in lines: diff --git a/iyp/crawlers/cloudflare/dns_top_ases.py b/iyp/crawlers/cloudflare/dns_top_ases.py index 683bbda..f24c952 100644 --- a/iyp/crawlers/cloudflare/dns_top_ases.py +++ b/iyp/crawlers/cloudflare/dns_top_ases.py @@ -18,7 +18,7 @@ class Crawler(Crawler): def run(self): """Push data to IYP.""" - self.as_id = self.iyp.batch_get_nodes('AS', 'asn') + self.as_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn') super().run() diff --git a/iyp/crawlers/cloudflare/dns_top_locations.py b/iyp/crawlers/cloudflare/dns_top_locations.py index e58cd8d..d31629e 100644 --- a/iyp/crawlers/cloudflare/dns_top_locations.py +++ b/iyp/crawlers/cloudflare/dns_top_locations.py @@ -103,7 +103,7 @@ def run(self): # FIXME this should be called before/separately self.fetch() - self.country_id = self.iyp.batch_get_nodes('Country', 'country_code') + self.country_id = self.iyp.batch_get_nodes_by_single_prop('Country', 'country_code') self.statements = [] tmp_dir = self.get_tmp_dir() diff --git a/iyp/crawlers/cloudflare/ranking_bucket.py b/iyp/crawlers/cloudflare/ranking_bucket.py index b4b1c75..51ce721 100644 --- a/iyp/crawlers/cloudflare/ranking_bucket.py +++ b/iyp/crawlers/cloudflare/ranking_bucket.py @@ -83,7 +83,7 @@ def run(self): # domain_ids, but iterate over the domains set instead. logging.info(f'Adding/retrieving {len(all_domains)} DomainName nodes.') print(f'Adding/retrieving {len(all_domains)} DomainName nodes') - domain_ids = self.iyp.batch_get_nodes('DomainName', 'name', all_domains) + domain_ids = self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name', all_domains) for dataset, domains in datasets: dataset_title = f'Cloudflare {dataset["title"]}' @@ -96,7 +96,7 @@ def run(self): 'description': dataset['description'], 'top': dataset['meta']['top'] }, - create=True) + id_properties={'name'}) # Create RANK relationships domain_links = [{'src_id': domain_ids[domain], 'dst_id': ranking_id, 'props': [self.reference]} diff --git a/iyp/crawlers/cloudflare/top100.py b/iyp/crawlers/cloudflare/top100.py index 4970eca..fb976d5 100644 --- a/iyp/crawlers/cloudflare/top100.py +++ b/iyp/crawlers/cloudflare/top100.py @@ -26,7 +26,7 @@ def run(self): """Fetch data and push to IYP.""" self.cf_qid = self.iyp.get_node( - 'Ranking', {'name': 'Cloudflare top 100 domains'}, create=True) + 'Ranking', {'name': 'Cloudflare top 100 domains'}) # Fetch data headers = { @@ -52,7 +52,7 @@ def update(self, entry): # Commit to IYP # Get the AS's node ID (create if it is not yet registered) and commit changes - domain_qid = self.iyp.get_node('DomainName', {'name': entry['domain']}, create=True) + domain_qid = self.iyp.get_node('DomainName', {'name': entry['domain']}) self.iyp.add_links(domain_qid, statements) diff --git a/iyp/crawlers/emileaben/as_names.py b/iyp/crawlers/emileaben/as_names.py index 231a2d2..76114f9 100644 --- a/iyp/crawlers/emileaben/as_names.py +++ b/iyp/crawlers/emileaben/as_names.py @@ -48,8 +48,8 @@ def run(self): as_names.add(as_name) lines.append(values) - asns_id = self.iyp.batch_get_nodes('AS', 'asn', asns, all=False) - as_names_id = self.iyp.batch_get_nodes('Name', 'name', as_names, all=False) + asns_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns, all=False) + as_names_id = self.iyp.batch_get_nodes_by_single_prop('Name', 'name', as_names, all=False) links = [] diff --git a/iyp/crawlers/example/crawler.py b/iyp/crawlers/example/crawler.py index c23b4f1..bad4cb1 100644 --- a/iyp/crawlers/example/crawler.py +++ b/iyp/crawlers/example/crawler.py @@ -44,8 +44,7 @@ def update(self, one_line): { 'example_property_0': value, 'example_property_1': value, - }, - create=True + } ) # set relationship @@ -53,7 +52,7 @@ def update(self, one_line): # Commit to IYP # Get the AS's node ID (create if it is not yet registered) and commit changes - as_qid = self.iyp.get_node('AS', {'asn': asn}, create=True) + as_qid = self.iyp.get_node('AS', {'asn': asn}) self.iyp.add_links(as_qid, statements) diff --git a/iyp/crawlers/ihr/__init__.py b/iyp/crawlers/ihr/__init__.py index 02a1d5c..c8fa727 100644 --- a/iyp/crawlers/ihr/__init__.py +++ b/iyp/crawlers/ihr/__init__.py @@ -64,7 +64,7 @@ def run(self): self.csv = lz4Csv(local_filename) self.timebin = None - asn_id = self.iyp.batch_get_nodes('AS', 'asn', set()) + asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', set()) links = [] @@ -83,11 +83,11 @@ def run(self): originasn = int(rec['originasn']) if originasn not in asn_id: - asn_id[originasn] = self.iyp.get_node('AS', {'asn': originasn}, create=True) + asn_id[originasn] = self.iyp.get_node('AS', {'asn': originasn}) asn = int(rec['asn']) if asn not in asn_id: - asn_id[asn] = self.iyp.get_node('AS', {'asn': asn}, create=True) + asn_id[asn] = self.iyp.get_node('AS', {'asn': asn}) links.append({ 'src_id': asn_id[originasn], diff --git a/iyp/crawlers/ihr/country_dependency.py b/iyp/crawlers/ihr/country_dependency.py index fe92148..07d4f23 100644 --- a/iyp/crawlers/ihr/country_dependency.py +++ b/iyp/crawlers/ihr/country_dependency.py @@ -62,8 +62,7 @@ def run(self): country_qid = self.iyp.get_node('Country', { 'country_code': cc, - }, - create=True + } ) countryrank_statements = [] @@ -81,8 +80,7 @@ def run(self): for metric, weight in [('Total eyeball', 'eyeball'), ('Total AS', 'as')]: self.countryrank_qid = self.iyp.get_node('Ranking', - {'name': f'IHR country ranking: {metric} ({cc})'}, - create=True + {'name': f'IHR country ranking: {metric} ({cc})'} ) self.iyp.add_links(self.countryrank_qid, countryrank_statements) @@ -101,7 +99,7 @@ def run(self): asns.add(asn['asn']) asn['rank'] = i + 1 - self.asn_id = self.iyp.batch_get_nodes('AS', 'asn', asns, all=False) + self.asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns, all=False) # Compute links for asn in selected: diff --git a/iyp/crawlers/ihr/rov.py b/iyp/crawlers/ihr/rov.py index 3f24125..76c2857 100644 --- a/iyp/crawlers/ihr/rov.py +++ b/iyp/crawlers/ihr/rov.py @@ -74,10 +74,10 @@ def run(self): self.csv = lz4Csv(local_filename) logging.warning('Getting node IDs from neo4j...\n') - asn_id = self.iyp.batch_get_nodes('AS', 'asn') - prefix_id = self.iyp.batch_get_nodes('Prefix', 'prefix') - tag_id = self.iyp.batch_get_nodes('Tag', 'label') - country_id = self.iyp.batch_get_nodes('Country', 'country_code') + asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn') + prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix') + tag_id = self.iyp.batch_get_nodes_by_single_prop('Tag', 'label') + country_id = self.iyp.batch_get_nodes_by_single_prop('Country', 'country_code') orig_links = [] tag_links = [] @@ -98,26 +98,26 @@ def run(self): prefix = rec['prefix'] if prefix not in prefix_id: - prefix_id[prefix] = self.iyp.get_node('Prefix', {'prefix': prefix}, create=True) + prefix_id[prefix] = self.iyp.get_node('Prefix', {'prefix': prefix}) # make status/country/origin links only for lines where asn=originasn if rec['asn_id'] == rec['originasn_id']: # Make sure all nodes exist originasn = int(rec['originasn_id']) if originasn not in asn_id: - asn_id[originasn] = self.iyp.get_node('AS', {'asn': originasn}, create=True) + asn_id[originasn] = self.iyp.get_node('AS', {'asn': originasn}) rpki_status = 'RPKI ' + rec['rpki_status'] if rpki_status not in tag_id: - tag_id[rpki_status] = self.iyp.get_node('Tag', {'label': rpki_status}, create=True) + tag_id[rpki_status] = self.iyp.get_node('Tag', {'label': rpki_status}) irr_status = 'IRR ' + rec['irr_status'] if irr_status not in tag_id: - tag_id[irr_status] = self.iyp.get_node('Tag', {'label': irr_status}, create=True) + tag_id[irr_status] = self.iyp.get_node('Tag', {'label': irr_status}) cc = rec['country_id'] if cc not in country_id: - country_id[cc] = self.iyp.get_node('Country', {'country_code': cc}, create=True) + country_id[cc] = self.iyp.get_node('Country', {'country_code': cc}) # Compute links orig_links.append({ @@ -147,7 +147,7 @@ def run(self): # Dependency links asn = int(rec['asn_id']) if asn not in asn_id: - asn_id[asn] = self.iyp.get_node('AS', {'asn': asn}, create=True) + asn_id[asn] = self.iyp.get_node('AS', {'asn': asn}) dep_links.append({ 'src_id': prefix_id[prefix], diff --git a/iyp/crawlers/inetintel/as_org.py b/iyp/crawlers/inetintel/as_org.py index 041ce55..a9f6596 100644 --- a/iyp/crawlers/inetintel/as_org.py +++ b/iyp/crawlers/inetintel/as_org.py @@ -108,8 +108,8 @@ def run(self): batch_lines.append([asn, url, sibling_asns, pdb_orgs]) count_rows += 1 - asn_id = self.iyp.batch_get_nodes('AS', 'asn', batch_asns, all=False) - url_id = self.iyp.batch_get_nodes('URL', 'url', batch_urls, all=False) + asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', batch_asns, all=False) + url_id = self.iyp.batch_get_nodes_by_single_prop('URL', 'url', batch_urls, all=False) asn_to_url_links = [] asn_to_sibling_asn_links = [] diff --git a/iyp/crawlers/manrs/members.py b/iyp/crawlers/manrs/members.py index 420edd9..d2eb6a8 100644 --- a/iyp/crawlers/manrs/members.py +++ b/iyp/crawlers/manrs/members.py @@ -23,8 +23,7 @@ def __init__(self, organization, url, name): self.manrs_qid = self.iyp.get_node( 'Organization', - {'name': 'MANRS'}, - create=True + {'name': 'MANRS'} ) # Actions defined by MANRS @@ -55,7 +54,7 @@ def __init__(self, organization, url, name): 'name': action['label'], 'description': action['description'] }, - create=True + id_properties={'name'} ) # Reference information for data pushed to IYP @@ -94,7 +93,7 @@ def update_net(self, one_line): # set countries for cc in areas.split(';'): - country_qid = self.iyp.get_node('Country', {'country_code': cc}, create=True) + country_qid = self.iyp.get_node('Country', {'country_code': cc}) statements.append(['COUNTRY', country_qid, self.reference]) # set actions @@ -106,7 +105,7 @@ def update_net(self, one_line): for asn in asns.split(';'): if asn: # ignore organizations with no ASN # Get the AS QID (create if AS is not yet registered) and commit changes - as_qid = self.iyp.get_node('AS', {'asn': str(asn)}, create=True) + as_qid = self.iyp.get_node('AS', {'asn': str(asn)}) self.iyp.add_links(as_qid, statements) diff --git a/iyp/crawlers/nro/delegated_stats.py b/iyp/crawlers/nro/delegated_stats.py index c8f31cf..2bcd3c0 100644 --- a/iyp/crawlers/nro/delegated_stats.py +++ b/iyp/crawlers/nro/delegated_stats.py @@ -27,7 +27,7 @@ def run(self): if req.status_code != 200: sys.exit('Error while fetching delegated file') - asn_id = self.iyp.batch_get_nodes('AS', 'asn') + asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn') # Read delegated-stats file. see documentation: # https://www.nro.net/wp-content/uploads/nro-extended-stats-readme5.txt @@ -66,9 +66,9 @@ def run(self): # Create all nodes logging.warning('Pushing nodes to neo4j...\n') - opaqueid_id = self.iyp.batch_get_nodes('OpaqueID', 'id', opaqueids) - prefix_id = self.iyp.batch_get_nodes('Prefix', 'prefix', prefixes) - country_id = self.iyp.batch_get_nodes('Country', 'country_code', countries) + opaqueid_id = self.iyp.batch_get_nodes_by_single_prop('OpaqueID', 'id', opaqueids) + prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes) + country_id = self.iyp.batch_get_nodes_by_single_prop('Country', 'country_code', countries) # Compute links country_links = [] diff --git a/iyp/crawlers/openintel/__init__.py b/iyp/crawlers/openintel/__init__.py index bf661d6..2524e73 100644 --- a/iyp/crawlers/openintel/__init__.py +++ b/iyp/crawlers/openintel/__init__.py @@ -175,11 +175,11 @@ def run(self): print('Read {} unique A records from {} Parquet file(s).'.format(len(df), len(self.pandas_df_list))) - domain_id = self.iyp.batch_get_nodes(self.domain_type, 'name', set(df['query_name'])) - ns_id = self.iyp.batch_get_nodes('AuthoritativeNameServer', 'name', - set(df[df.ns_address.notnull()]['ns_address'])) - ip4_id = self.iyp.batch_get_nodes('IP', 'ip', set(df[df.ip4_address.notnull()]['ip4_address'])) - ip6_id = self.iyp.batch_get_nodes('IP', 'ip', set(df[df.ip6_address.notnull()]['ip6_address'])) + domain_id = self.iyp.batch_get_nodes_by_single_prop(self.domain_type, 'name', set(df['query_name'])) + ns_id = self.iyp.batch_get_nodes_by_single_prop('AuthoritativeNameServer', 'name', + set(df[df.ns_address.notnull()]['ns_address'])) + ip4_id = self.iyp.batch_get_nodes_by_single_prop('IP', 'ip', set(df[df.ip4_address.notnull()]['ip4_address'])) + ip6_id = self.iyp.batch_get_nodes_by_single_prop('IP', 'ip', set(df[df.ip6_address.notnull()]['ip6_address'])) res_links = [] mng_links = [] diff --git a/iyp/crawlers/pch/__init__.py b/iyp/crawlers/pch/__init__.py index 49923e9..7071c6a 100644 --- a/iyp/crawlers/pch/__init__.py +++ b/iyp/crawlers/pch/__init__.py @@ -306,8 +306,8 @@ def run(self) -> None: # Get/push nodes. logging.info(f'Fetching {len(ases)} AS and {len(prefixes)} Prefix nodes.') print(f'Fetching {len(ases)} AS and {len(prefixes)} Prefix nodes.') - as_ids = self.iyp.batch_get_nodes('AS', 'asn', ases, all=False) - prefix_ids = self.iyp.batch_get_nodes('Prefix', 'prefix', prefixes, all=False) + as_ids = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', ases, all=False) + prefix_ids = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes, all=False) # Push relationships. relationships = list() diff --git a/iyp/crawlers/peeringdb/fac.py b/iyp/crawlers/peeringdb/fac.py index 471a166..786c790 100644 --- a/iyp/crawlers/peeringdb/fac.py +++ b/iyp/crawlers/peeringdb/fac.py @@ -69,11 +69,11 @@ def run(self): handle_social_media(fac, websites) # push nodes - self.fac_id = self.iyp.batch_get_nodes('Facility', 'name', facs) - self.name_id = self.iyp.batch_get_nodes('Name', 'name', names) - self.website_id = self.iyp.batch_get_nodes('URL', 'url', websites) - self.country_id = self.iyp.batch_get_nodes('Country', 'country_code', countries) - self.facid_id = self.iyp.batch_get_nodes(FACID_LABEL, 'id', facids) + self.fac_id = self.iyp.batch_get_nodes_by_single_prop('Facility', 'name', facs) + self.name_id = self.iyp.batch_get_nodes_by_single_prop('Name', 'name', names) + self.website_id = self.iyp.batch_get_nodes_by_single_prop('URL', 'url', websites) + self.country_id = self.iyp.batch_get_nodes_by_single_prop('Country', 'country_code', countries) + self.facid_id = self.iyp.batch_get_nodes_by_single_prop(FACID_LABEL, 'id', facids) # get organization nodes self.org_id = self.iyp.batch_get_node_extid(ORGID_LABEL) diff --git a/iyp/crawlers/peeringdb/ix.py b/iyp/crawlers/peeringdb/ix.py index 5beb171..366f0e8 100644 --- a/iyp/crawlers/peeringdb/ix.py +++ b/iyp/crawlers/peeringdb/ix.py @@ -96,7 +96,7 @@ def run(self): # get organization, country nodes self.org_id = self.iyp.batch_get_node_extid(ORGID_LABEL) self.fac_id = self.iyp.batch_get_node_extid(FACID_LABEL) - self.country_id = self.iyp.batch_get_nodes('Country', 'country_code') + self.country_id = self.iyp.batch_get_nodes_by_single_prop('Country', 'country_code') req = self.requests.get(URL_PDB_IXS, headers=self.headers) if req.status_code != 200: @@ -145,8 +145,8 @@ def register_net_fac(self): for netfac in self.netfacs: if netfac['net_id'] not in net_id: - as_qid = self.iyp.get_node('AS', {'asn': netfac['local_asn']}, create=True) - extid_qid = self.iyp.get_node(NETID_LABEL, {'id': netfac['net_id']}, create=True) + as_qid = self.iyp.get_node('AS', {'asn': netfac['local_asn']}) + extid_qid = self.iyp.get_node(NETID_LABEL, {'id': netfac['net_id']}) links = [['EXTERNAL_ID', extid_qid, self.reference_netfac]] self.iyp.add_links(as_qid, links) net_id[netfac['net_id']] = as_qid @@ -195,11 +195,11 @@ def register_ix_membership(self): handle_social_media(network, net_website) # TODO add the type PEERING_LAN? may break the unique constraint - self.prefix_id = self.iyp.batch_get_nodes('Prefix', 'prefix', prefixes) - self.name_id = self.iyp.batch_get_nodes('Name', 'name', net_names) - self.website_id = self.iyp.batch_get_nodes('URL', 'url', net_website) - self.netid_id = self.iyp.batch_get_nodes(NETID_LABEL, 'id', net_extid) - self.asn_id = self.iyp.batch_get_nodes('AS', 'asn', net_asn) + self.prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes) + self.name_id = self.iyp.batch_get_nodes_by_single_prop('Name', 'name', net_names) + self.website_id = self.iyp.batch_get_nodes_by_single_prop('URL', 'url', net_website) + self.netid_id = self.iyp.batch_get_nodes_by_single_prop(NETID_LABEL, 'id', net_extid) + self.asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', net_asn) # compute links prefix_links = [] @@ -287,10 +287,10 @@ def register_ixs(self): all_ixs_website.add(ix['website']) handle_social_media(ix, all_ixs_website) - self.ixext_id = self.iyp.batch_get_nodes(IXID_LABEL, 'id', all_ixs_id) - self.ix_id = self.iyp.batch_get_nodes('IXP', 'name', all_ixs_name) - self.website_id = self.iyp.batch_get_nodes('URL', 'url', all_ixs_website) - self.name_id = self.iyp.batch_get_nodes('Name', 'name', all_ixs_name) + self.ixext_id = self.iyp.batch_get_nodes_by_single_prop(IXID_LABEL, 'id', all_ixs_id) + self.ix_id = self.iyp.batch_get_nodes_by_single_prop('IXP', 'name', all_ixs_name) + self.website_id = self.iyp.batch_get_nodes_by_single_prop('URL', 'url', all_ixs_website) + self.name_id = self.iyp.batch_get_nodes_by_single_prop('Name', 'name', all_ixs_name) # Compute links name_links = [] diff --git a/iyp/crawlers/peeringdb/org.py b/iyp/crawlers/peeringdb/org.py index 483673f..17c6eac 100644 --- a/iyp/crawlers/peeringdb/org.py +++ b/iyp/crawlers/peeringdb/org.py @@ -65,11 +65,11 @@ def run(self): handle_social_media(org, websites) # push nodes - self.org_id = self.iyp.batch_get_nodes('Organization', 'name', orgs) - self.name_id = self.iyp.batch_get_nodes('Name', 'name', names) - self.website_id = self.iyp.batch_get_nodes('URL', 'url', websites) - self.country_id = self.iyp.batch_get_nodes('Country', 'country_code', countries) - self.orgid_id = self.iyp.batch_get_nodes(ORGID_LABEL, 'id', orgids) + self.org_id = self.iyp.batch_get_nodes_by_single_prop('Organization', 'name', orgs) + self.name_id = self.iyp.batch_get_nodes_by_single_prop('Name', 'name', names) + self.website_id = self.iyp.batch_get_nodes_by_single_prop('URL', 'url', websites) + self.country_id = self.iyp.batch_get_nodes_by_single_prop('Country', 'country_code', countries) + self.orgid_id = self.iyp.batch_get_nodes_by_single_prop(ORGID_LABEL, 'id', orgids) # compute links name_links = [] diff --git a/iyp/crawlers/ripe/as_names.py b/iyp/crawlers/ripe/as_names.py index cd5a11b..53b8b86 100644 --- a/iyp/crawlers/ripe/as_names.py +++ b/iyp/crawlers/ripe/as_names.py @@ -44,9 +44,9 @@ def run(self): countries.add(cc) # get node IDs for ASNs, names, and countries - asn_id = self.iyp.batch_get_nodes('AS', 'asn', asns) - name_id = self.iyp.batch_get_nodes('Name', 'name', names) - country_id = self.iyp.batch_get_nodes('Country', 'country_code', countries) + asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns) + name_id = self.iyp.batch_get_nodes_by_single_prop('Name', 'name', names) + country_id = self.iyp.batch_get_nodes_by_single_prop('Country', 'country_code', countries) # Compute links name_links = [] diff --git a/iyp/crawlers/ripe/atlas_probes.py b/iyp/crawlers/ripe/atlas_probes.py index eee1d1b..907a062 100644 --- a/iyp/crawlers/ripe/atlas_probes.py +++ b/iyp/crawlers/ripe/atlas_probes.py @@ -121,10 +121,10 @@ def run(self): probe_id = dict() # Each probe is a JSON object with nested fields, so we need to flatten it. flattened_probes = [dict(flatdict.FlatterDict(probe, delimiter='_')) for probe in valid_probes] - probe_id = self.iyp.batch_create_nodes('AtlasProbe', 'id', flattened_probes) - ip_id = self.iyp.batch_get_nodes('IP', 'ip', ips, all=False) - as_id = self.iyp.batch_get_nodes('AS', 'asn', ases, all=False) - country_id = self.iyp.batch_get_nodes('Country', 'country_code', countries) + probe_id = self.iyp.batch_get_nodes('AtlasProbe', flattened_probes, ['id']) + ip_id = self.iyp.batch_get_nodes_by_single_prop('IP', 'ip', ips, all=False) + as_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', ases, all=False) + country_id = self.iyp.batch_get_nodes_by_single_prop('Country', 'country_code', countries) # compute links assigned_links = list() diff --git a/iyp/crawlers/ripe/roa.py b/iyp/crawlers/ripe/roa.py index f0e03b7..9ed5aae 100644 --- a/iyp/crawlers/ripe/roa.py +++ b/iyp/crawlers/ripe/roa.py @@ -67,8 +67,8 @@ def run(self): 'end': end}) # get ASNs and prefixes IDs - asn_id = self.iyp.batch_get_nodes('AS', 'asn', asns) - prefix_id = self.iyp.batch_get_nodes('Prefix', 'prefix', set(prefix_info.keys())) + asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns) + prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', set(prefix_info.keys())) links = [] for prefix, attributes in prefix_info.items(): diff --git a/iyp/crawlers/stanford/asdb.py b/iyp/crawlers/stanford/asdb.py index c84f6a4..58a620d 100644 --- a/iyp/crawlers/stanford/asdb.py +++ b/iyp/crawlers/stanford/asdb.py @@ -61,8 +61,8 @@ def run(self): lines.append([asn, category]) # get ASNs and names IDs - asn_id = self.iyp.batch_get_nodes('AS', 'asn', asns) - category_id = self.iyp.batch_get_nodes('Tag', 'label', categories) + asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns) + category_id = self.iyp.batch_get_nodes_by_single_prop('Tag', 'label', categories) # Compute links links = [] diff --git a/iyp/crawlers/tranco/top1M.py b/iyp/crawlers/tranco/top1M.py index ef94bd5..e6214c5 100644 --- a/iyp/crawlers/tranco/top1M.py +++ b/iyp/crawlers/tranco/top1M.py @@ -20,7 +20,7 @@ class Crawler(BaseCrawler): def run(self): """Fetch Tranco top 1M and push to IYP.""" - self.tranco_qid = self.iyp.get_node('Ranking', {'name': 'Tranco top 1M'}, create=True) + self.tranco_qid = self.iyp.get_node('Ranking', {'name': 'Tranco top 1M'}) sys.stderr.write('Downloading latest list...\n') req = requests.get(URL) @@ -40,7 +40,7 @@ def run(self): links.append({'src_name': domain, 'dst_id': self.tranco_qid, 'props': [self.reference, {'rank': int(rank)}]}) - name_id = self.iyp.batch_get_nodes('DomainName', 'name', domains) + name_id = self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name', domains) for link in links: link['src_id'] = name_id[link['src_name']] diff --git a/iyp/post/country_information.py b/iyp/post/country_information.py index be2b6e4..3d6ccae 100644 --- a/iyp/post/country_information.py +++ b/iyp/post/country_information.py @@ -13,7 +13,7 @@ def run(self): """Enrich Country nodes with additional information like alpha-3 codes and country names.""" - country_id = self.iyp.batch_get_nodes('Country', 'country_code') + country_id = self.iyp.batch_get_nodes_by_single_prop('Country', 'country_code') for country_code in country_id: if country_code not in iso3166.countries_by_alpha2: diff --git a/iyp/post/dns_hierarchy.py b/iyp/post/dns_hierarchy.py index e791977..458e350 100644 --- a/iyp/post/dns_hierarchy.py +++ b/iyp/post/dns_hierarchy.py @@ -21,7 +21,7 @@ def run(self): print('Building DNS hierarchy.', file=sys.stderr) # Fetch all existing DomainName nodes. - dns_id = self.iyp.batch_get_nodes('DomainName', 'name') + dns_id = self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name') logging.info(f'Fetched {len(dns_id):,d} DomainName nodes.') print(f'Fetched {len(dns_id):,d} DomainName nodes.', file=sys.stderr) @@ -42,7 +42,7 @@ def run(self): # Create new nodes. logging.info(f'Creating {len(new_nodes):,d} new DomainName nodes.') print(f'Creating {len(new_nodes):,d} new DomainName nodes.', file=sys.stderr) - dns_id.update(self.iyp.batch_get_nodes('DomainName', 'name', new_nodes, all=False)) + dns_id.update(self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name', new_nodes, all=False)) # Build relationships and push to IYP. part_of_links = list() diff --git a/iyp/post/ip2prefix.py b/iyp/post/ip2prefix.py index f67f40d..2bef77e 100644 --- a/iyp/post/ip2prefix.py +++ b/iyp/post/ip2prefix.py @@ -26,7 +26,7 @@ def run(self): prefix.""" # Get all prefixes in a radix tree - prefix_id = self.iyp.batch_get_nodes('Prefix', 'prefix') + prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix') additional_properties = list() rtree = radix.Radix() @@ -41,7 +41,7 @@ def run(self): self.iyp.batch_add_properties(additional_properties) # Get all IP nodes - ip_id = self.iyp.batch_get_nodes('IP', 'ip') + ip_id = self.iyp.batch_get_nodes_by_single_prop('IP', 'ip') # Compute links for IPs links = [] diff --git a/iyp/post/url2domain.py b/iyp/post/url2domain.py index 93550c2..6a89f94 100644 --- a/iyp/post/url2domain.py +++ b/iyp/post/url2domain.py @@ -11,10 +11,10 @@ def run(self): """Link URLs and their corresponding DomainNames.""" # Get all URL nodes. - url_id = self.iyp.batch_get_nodes('URL', 'url') + url_id = self.iyp.batch_get_nodes_by_single_prop('URL', 'url') # Get all DomainName Nodes - domain_id = self.iyp.batch_get_nodes('DomainName', 'name') + domain_id = self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name') # Compute links links = [] From 3a97a5cbda57d77cfa8bbf16e4e074e43b7ec99e Mon Sep 17 00:00:00 2001 From: mtashiro Date: Mon, 18 Dec 2023 16:47:14 +0900 Subject: [PATCH 2/5] Update bug_report.md --- .github/ISSUE_TEMPLATE/bug_report.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md index 1550068..54f79f7 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.md +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -6,6 +6,11 @@ labels: "" assignees: "" --- +*NOTE (Delete after reading): There is no need to open bug reports based on +error messages in the log of the weekly database dump. We usually notice them +and can judge if a simple rerun of the crawler suffices (e.g., due to a +temporary connectivity issue), or if there is a bug in the crawler.* + **Describe the bug** A clear and concise description of what the bug is. From 2be4f0baf35b7a46cecffa84687735c897e33e3a Mon Sep 17 00:00:00 2001 From: Malte Tashiro Date: Mon, 18 Dec 2023 02:56:30 +0000 Subject: [PATCH 3/5] Consistency and formatting --- iyp/__init__.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/iyp/__init__.py b/iyp/__init__.py index 91ac46a..28b21fe 100644 --- a/iyp/__init__.py +++ b/iyp/__init__.py @@ -190,7 +190,7 @@ def batch_get_nodes_by_single_prop(self, label, prop_name, prop_set=set(), all=T set all=False. This method commits changes to the database. """ - if type(label) is list and create: + if isinstance(label, list) and create: raise NotImplementedError('Can not implicitly create multi-label nodes.') if prop_set and prop_name in prop_formatters: @@ -280,6 +280,7 @@ def batch_get_nodes(self, label, properties, id_properties=list(), create=True): # 64497. # The returned id map would be: # {1: x, 2: y} + if isinstance(label, list) and create: raise NotImplementedError('Can not implicitly create multi-label nodes.') @@ -307,7 +308,7 @@ def batch_get_nodes(self, label, properties, id_properties=list(), create=True): # predicate that is contained within the node specification. # For id_properties = ['x', 'y'] this will result in # {x: prop.x, y: prop.y} - # The RETURN clause is actually only a part, namely + # The RETURN clause is actually only a part of it, namely # a.x AS x, a.y AS y # for the example above. where_clause = ['{'] From b65d2ede2f0631d856b01b510e8af3262c13a390 Mon Sep 17 00:00:00 2001 From: Malte Tashiro Date: Mon, 18 Dec 2023 03:54:46 +0000 Subject: [PATCH 4/5] Fix missing support for multi-label nodes in batch_get_nodes_by_single_prop --- iyp/__init__.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/iyp/__init__.py b/iyp/__init__.py index 28b21fe..1f8e27c 100644 --- a/iyp/__init__.py +++ b/iyp/__init__.py @@ -193,16 +193,21 @@ def batch_get_nodes_by_single_prop(self, label, prop_name, prop_set=set(), all=T if isinstance(label, list) and create: raise NotImplementedError('Can not implicitly create multi-label nodes.') + # Assemble label + label_str = str(label) + if isinstance(label, list): + label_str = ':'.join(label) + if prop_set and prop_name in prop_formatters: prop_set = set(map(prop_formatters[prop_name], prop_set)) if all: - existing_nodes = self.tx.run(f'MATCH (n:{label}) RETURN n.{prop_name} AS {prop_name}, ID(n) AS _id') + existing_nodes = self.tx.run(f'MATCH (n:{label_str}) RETURN n.{prop_name} AS {prop_name}, ID(n) AS _id') else: list_prop = list(prop_set) existing_nodes = self.tx.run(f""" WITH $list_prop AS list_prop - MATCH (n:{label}) + MATCH (n:{label_str}) WHERE n.{prop_name} IN list_prop RETURN n.{prop_name} AS {prop_name}, ID(n) AS _id""", list_prop=list_prop) @@ -217,7 +222,7 @@ def batch_get_nodes_by_single_prop(self, label, prop_name, prop_set=set(), all=T batch = missing_nodes[i:i + BATCH_SIZE] create_query = f"""WITH $batch AS batch - UNWIND batch AS item CREATE (n:{label}) + UNWIND batch AS item CREATE (n:{label_str}) SET n = item RETURN n.{prop_name} AS {prop_name}, ID(n) AS _id""" new_nodes = self.tx.run(create_query, batch=batch) From 63588f6ad55f19e1fa18faffa1dedbcf9a4a9a6a Mon Sep 17 00:00:00 2001 From: Malte Tashiro Date: Mon, 18 Dec 2023 03:56:04 +0000 Subject: [PATCH 5/5] Add implicit constraints and indexes Remove explicit constraints and indexes since these would need manual updates every time we add a new node or relationship type. Instead, we now automatically create UNIQUE constraints based on the id properties used to create nodes. The rationale is that we (at least in the context of crawlers) always get nodes based on unique properties. If there are conflicts introduced by this is should only reveal problems in our existing code. Similarly, we now also create an index on the reference_name field of all relationship types automatically, since this is used very often and exists for all relationship types. --- iyp/__init__.py | 134 ++++++++++++++++++++++++------------------------ 1 file changed, 67 insertions(+), 67 deletions(-) diff --git a/iyp/__init__.py b/iyp/__init__.py index 1f8e27c..7e393c5 100644 --- a/iyp/__init__.py +++ b/iyp/__init__.py @@ -9,41 +9,6 @@ from neo4j import GraphDatabase -# Usual constraints on nodes' properties -NODE_CONSTRAINTS = { - 'AS': { - 'asn': set(['UNIQUE', 'NOT NULL']) - }, - 'Prefix': { - 'prefix': set(['UNIQUE', 'NOT NULL']), - # 'af': set(['NOT NULL']) - }, - 'IP': { - 'ip': set(['UNIQUE', 'NOT NULL']), - # 'af': set(['NOT NULL']) - }, - 'DomainName': { - 'name': set(['UNIQUE', 'NOT NULL']) - }, - 'Country': { - 'country_code': set(['UNIQUE', 'NOT NULL']) - }, - 'Organization': { - 'name': set(['NOT NULL']) - }, - 'AtlasProbe': { - 'id': set(['UNIQUE', 'NOT NULL']) - } -} - -# Properties that may be frequently queried and that are not constraints -NODE_INDEXES = { - 'PeeringdbOrgID': ['id'] -} - -# Set of node labels with constrains (ease search for node merging) -NODE_CONSTRAINTS_LABELS = set(NODE_CONSTRAINTS.keys()) - BATCH_SIZE = 50000 prop_formatters = { @@ -137,34 +102,57 @@ def __init__(self): self.session = self.db.session() - self._db_init() self.tx = self.session.begin_transaction() - def _db_init(self): - """Add constraints and indexes.""" + def __create_unique_constraint(self, label, prop): + """Create a UNIQUE constraint on the given properties for the given node label. + + label: a string specifying the node label. + property: a string or list of strings specifying the property name(s). A list of + properties with more than one entry will create a combined constraint. + """ + # The Neo4j Community Edition only supports UNIQUE constraints, i.e., no reason + # to make this function more flexible. + if isinstance(prop, list): + require_str = '(' + ','.join([f'a.{p}' for p in prop]) + ')' + prop = '_'.join(prop) + else: + require_str = f'a.{prop}' + + # Schema modifications are not allowed in the same transaction as writes. + self.commit() + self.tx.run(f"""CREATE CONSTRAINT {label}_UNIQUE_{prop} IF NOT EXISTS + FOR (a:{label}) + REQUIRE {require_str} IS UNIQUE""") + self.commit() - # Create constraints (implicitly add corresponding indexes) - for label, prop_constraints in NODE_CONSTRAINTS.items(): - for property, constraints in prop_constraints.items(): + def __create_range_index(self, label_type, prop, on_relationship): + """Create a RANGE index (the default) on the given properties for the given node + label or relationship type. - for constraint in constraints: - # neo4j-community only implements the UNIQUE constraint - if not self.neo4j_enterprise and constraint != 'UNIQUE': - continue + label_type: a string specifying a node label or a relationship type. + prop: a string or list of strings specifying the property name(s). A list of + properties with more than one entry will create a combined index. + on_relationship: a bool specifying if label_type refers to a relationship type + (True) or a node label (False). + """ + if isinstance(prop, list): + on_str = '(' + ','.join([f'n.{p}' for p in prop]) + ')' + prop = '_'.join(prop) + else: + on_str = f'a.{prop}' - constraint_formated = constraint.replace(' ', '') - self.session.run( - f' CREATE CONSTRAINT {label}_{constraint_formated}_{property} IF NOT EXISTS ' - f' FOR (n:{label}) ' - f' REQUIRE n.{property} IS {constraint} ') + if on_relationship: + for_str = f'()-[a:{label_type}]-()' + else: + for_str = f'(a:{label_type})' - # Create indexes - for label, indexes in NODE_INDEXES.items(): - for index in indexes: - self.session.run( - f' CREATE INDEX {label}_INDEX_{index} IF NOT EXISTS ' - f' FOR (n:{label}) ' - f' ON (n.{index}) ') + # Schema modifications are not allowed in the same transaction as writes. + self.commit() + self.tx.run(f"""CREATE INDEX {label_type}_INDEX_{prop} IF NOT EXISTS + FOR {for_str} + ON {on_str}""") + self.commit() def commit(self): """Commit all pending queries (node/link creation) and start a new @@ -180,6 +168,12 @@ def rollback(self): self.tx.rollback() self.tx = self.session.begin_transaction() + def close(self): + """Commit pending queries and close IYP.""" + self.tx.commit() + self.session.close() + self.db.close() + def batch_get_nodes_by_single_prop(self, label, prop_name, prop_set=set(), all=True, create=True): """Find the ID of all nodes in the graph for the given label and check that a node exists for each value in prop_set for the property prop. Create these nodes @@ -193,6 +187,10 @@ def batch_get_nodes_by_single_prop(self, label, prop_name, prop_set=set(), all=T if isinstance(label, list) and create: raise NotImplementedError('Can not implicitly create multi-label nodes.') + if create: + # Ensure UNIQUE constraint on id property. + self.__create_unique_constraint(label, prop_name) + # Assemble label label_str = str(label) if isinstance(label, list): @@ -229,8 +227,7 @@ def batch_get_nodes_by_single_prop(self, label, prop_name, prop_set=set(), all=T for node in new_nodes: ids[node[prop_name]] = node['_id'] - - self.commit() + self.commit() return ids @@ -332,6 +329,7 @@ def batch_get_nodes(self, label, properties, id_properties=list(), create=True): if create: action = 'MERGE' set_line = 'SET a += prop' + self.__create_unique_constraint(label, id_properties) query = f"""UNWIND $props AS prop {action} (a:{label_str} {where_clause_str}) @@ -353,7 +351,7 @@ def batch_get_nodes(self, label, properties, id_properties=list(), create=True): for r in results: id_key = tuple([r[prop] for prop in id_properties]) ids[id_key] = r['_id'] - self.commit() + self.commit() return ids def get_node(self, label, properties, id_properties=list(), create=True): @@ -388,6 +386,7 @@ def get_node(self, label, properties, id_properties=list(), create=True): id_property_dict = properties else: id_property_dict = {prop: properties[prop] for prop in id_properties} + self.__create_unique_constraint(label, list(id_property_dict.keys())) result = self.tx.run( f"""MERGE (a:{label} {dict2str(id_property_dict)}) SET a += {dict2str(properties)} @@ -463,6 +462,8 @@ def batch_add_links(self, type, links, action='create'): batch_format_link_properties(links, inplace=True) + self.__create_range_index(type, 'reference_name', on_relationship=True) + # Create links in batches for i in range(0, len(links), BATCH_SIZE): batch = links[i:i + BATCH_SIZE] @@ -503,6 +504,10 @@ def add_links(self, src_node, links): if len(links) == 0: return + relationship_types = {e[0] for e in links} + for relationship_type in relationship_types: + self.__create_range_index(relationship_type, 'reference_name', on_relationship=True) + matches = ' MATCH (x)' where = f' WHERE ID(x) = {src_node}' merges = '' @@ -521,6 +526,7 @@ def add_links(self, src_node, links): merges += f' MERGE (x)-[:{type} {dict2str(prop)}]->(x{i}) ' self.tx.run(matches + where + merges).consume() + self.commit() def batch_add_properties(self, id_prop_list): """Add properties to existing nodes. @@ -542,13 +548,7 @@ def batch_add_properties(self, id_prop_list): res = self.tx.run(add_query, batch=batch) res.consume() - self.commit() - - def close(self): - """Commit pending queries and close IYP.""" - self.tx.commit() - self.session.close() - self.db.close() + self.commit() class BasePostProcess(object):