diff --git a/config.json.example b/config.json.example index 57348e7..76229cc 100644 --- a/config.json.example +++ b/config.json.example @@ -69,6 +69,7 @@ "iyp.crawlers.pch.daily_routing_snapshots_v6", "iyp.crawlers.emileaben.as_names", "iyp.crawlers.ripe.atlas_probes", + "iyp.crawlers.ripe.atlas_measurements", "iyp.crawlers.iana.root_zone", "iyp.crawlers.alice_lg.amsix", "iyp.crawlers.alice_lg.bcix", diff --git a/iyp/crawlers/ripe/README.md b/iyp/crawlers/ripe/README.md index b2a9538..3dcd4d6 100644 --- a/iyp/crawlers/ripe/README.md +++ b/iyp/crawlers/ripe/README.md @@ -47,6 +47,19 @@ ASN(s), and country. (:IP {ip: '202.214.97.16'})-[:ASSIGNED]->(:AtlasProbe {id: 6425}) ``` +### Atlas Measurements - `atlas_measurements.py` + +We fetch the [list of probe measurements](https://atlas.ripe.net/api/v2/measurements) to obtain the measurement data of the `AtlasProbe`. This data is based on the probe's ID, connected through relationships defined by `PART_OF` and `TARGET`. The `TARGET` relationship encompass associations with both `DOMAIN` and `IP`. + +The Cypher query for these relationships appears as follows: + +```Cypher +(:AtlasProbe)-[:PART_OF]->(:AtlasMeasurement)-[:TARGET]->(:DomainName) +(:AtlasProbe)-[:PART_OF]->(:AtlasMeasurement)-[:TARGET]->(:IP) +``` + +This query is designed to identify `AtlasProbes` linked via the `PART_OF` relationship to `AtlasMeasurements`, which, in turn, are linked through `TARGET` to either a `DomainName` or an `IP`. + ## Dependence This crawler is not depending on other crawlers. diff --git a/iyp/crawlers/ripe/atlas_measurements.py b/iyp/crawlers/ripe/atlas_measurements.py new file mode 100644 index 0000000..7acff60 --- /dev/null +++ b/iyp/crawlers/ripe/atlas_measurements.py @@ -0,0 +1,273 @@ +import argparse +import ipaddress +import json +import logging +import os +import sys + +import flatdict +import requests +from requests import Session +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + +from iyp import BaseCrawler + +ORG = 'RIPE NCC' + +URL = 'https://atlas.ripe.net/api/v2/measurements' +NAME = 'ripe.atlas_measurements' + + +class RequestStatusError(requests.HTTPError): + def __init__(self, message): + self.message = message + super().__init__(self.message) + + +class JSONDecodeError(ValueError): + def __init__(self, message): + self.message = message + super().__init__(self.message) + + +class MissingKeyError(Exception): + def __init__(self, message): + self.message = message + super().__init__(self.message) + + +class Crawler(BaseCrawler): + def __init__(self, organization, url, name): + self.__initialize_session() + super().__init__(organization, url, name) + + def __initialize_session(self) -> None: + self.session = Session() + retry = Retry( + backoff_factor=0.1, + status_forcelist=(429, 500, 502, 503, 504), + respect_retry_after_header=True + ) + adapter = HTTPAdapter(max_retries=retry) + self.session.mount('http://', adapter) + self.session.mount('https://', adapter) + + @staticmethod + def __process_response(response: requests.Response): + if response.status_code != requests.codes.ok: + raise RequestStatusError(f'Request to {response.url} failed with status: {response.status_code}') + try: + data = response.json() + except json.decoder.JSONDecodeError as e: + raise JSONDecodeError(f'Decoding JSON reply from {response.url} failed with exception: {e}') + if 'next' not in data or 'results' not in data: + raise MissingKeyError('"next" or "results" key missing from response data.') + + next_url = data['next'] + if not next_url: + logging.info('Reached end of list') + next_url = str() + return next_url, data['results'] + + def __execute_query(self, url: str): + logging.info(f'Querying {url}') + r = self.session.get(url) + return self.__process_response(r) + + @staticmethod + def __transform_data(data): + for item in data: + # Convert empty lists to None as flatdict library converts it automatically + # to FlatterDict object which would cause a problem when + # loading data in neo4j. + for key in item: + if isinstance(item[key], list) and len(item[key]) == 0: + item[key] = None + + # Flatten the target information since the original data has multiple fields + # prefixed with 'target_'. Given that we flatten the dict + # on the '_' delimiter, this action would potentially + # cause a TypeError from flatdict if it isn't handled properly. + target_info = { + 'domain': item.pop('target', None), + 'asn': item.pop('target_asn', None), + 'ip': item.pop('target_ip', None), + 'prefix': item.pop('target_prefix', None), + 'resolved_ips': item.pop('resolved_ips', None) + } + item['target'] = target_info + + # Flatten the group information, They are the same as target + # information as they are prefixed with 'group_'. + group_info = { + 'value': item.pop('group', None), + 'id': item.pop('group_id', None) + } + item['group'] = group_info + + @staticmethod + def __is_valid_ip(ip): + try: + ipaddress.ip_address(ip) + return True + except ValueError: + return False + + @staticmethod + def __get_all_resolved_ips(probe_measurement): + # Ensure 'resolved_ips' takes precedence over 'target_ip. + resolved_ips = probe_measurement['target']['resolved_ips'] or probe_measurement['target']['ip'] or [] + resolved_ips = [resolved_ips] if not isinstance(resolved_ips, list) else resolved_ips + valid_resolved_ips = [ip for ip in resolved_ips if ip is not None and ip != ''] + return valid_resolved_ips + + @staticmethod + def __add_if_not_none(v, s: set): + if v is not None and v != '' and v not in s: + s.add(v) + + @staticmethod + def __add_if_not_none_values(lst, s: set): + if not isinstance(lst, list): + return + for item in lst: + Crawler.__add_if_not_none(item, s) + + def run(self): + params = {'format': 'json', + 'is_public': True, + 'status': 2, + 'optional_fields': 'current_probes', + 'page_size': 500} + r = self.session.get(URL, params=params) + next_url, data = self.__process_response(r) + while next_url: + next_url, next_data = self.__execute_query(next_url) + data += next_data + logging.info(f'Added {len(next_data)} probes. Total: {len(data)}') + print(f'Fetched {len(data)} probe measurements', file=sys.stderr) + + # Transform the data to be compatible with the flatdict format. + self.__transform_data(data) + + # Compute nodes + probe_measurement_ids = set() + probe_ids = set() + ips = set() + ases = set() + domains = set() + + valid_probe_measurements = list() + + for probe_measurement in data: + probe_measurement_id = probe_measurement['id'] + if not probe_measurement_id: + logging.error(f'Probe Measurement without ID. Should never happen: {probe_measurement}.') + continue + if probe_measurement_id in probe_measurement_ids: + logging.warning(f'Duplicate probe measurement ID: {probe_measurement_id}.') + continue + + resolved_ips = self.__get_all_resolved_ips(probe_measurement) + for i in range(len(resolved_ips)): + probe_af = int(probe_measurement['af']) + resolved_ips[i] = ipaddress.ip_address(resolved_ips[i]).compressed if probe_af == 6 else resolved_ips[i] + + domain = probe_measurement['target']['domain'] + if domain == '' or self.__is_valid_ip(domain): + domain = None + probe_measurement['target']['domain'] = None + + asn = probe_measurement['target']['asn'] + probe_ids_participated = probe_measurement['current_probes'] + + self.__add_if_not_none(probe_measurement_id, probe_measurement_ids) + self.__add_if_not_none(domain, domains) + self.__add_if_not_none(asn, ases) + self.__add_if_not_none_values(resolved_ips, ips) + self.__add_if_not_none_values(probe_ids_participated, probe_ids) + + valid_probe_measurements.append(probe_measurement) + + # push nodes + logging.info('Fetching/pushing nodes') + probe_measurement_ids = dict() + + attrs_flattened = [] + for probe_measurement in valid_probe_measurements: + probe_measurement_copy = probe_measurement.copy() + del probe_measurement_copy['current_probes'] + probe_measurement_flattened = dict(flatdict.FlatterDict(probe_measurement_copy, delimiter='_')) + attrs_flattened.append(probe_measurement_flattened) + + probe_measurement_ids = self.iyp.batch_get_nodes('AtlasMeasurement', attrs_flattened, ['id'], create=True) + probe_ids = self.iyp.batch_get_nodes_by_single_prop('AtlasProbe', 'id', probe_ids, all=False, create=True) + ip_ids = self.iyp.batch_get_nodes_by_single_prop('IP', 'ip', ips, all=False, create=True) + domain_ids = self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name', domains, all=False, create=True) + _ = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', ases, all=False, create=True) + + # compute links + target_links = list() + part_of_links = list() + + for probe_measurement in valid_probe_measurements: + probe_measurement_qid = probe_measurement_ids[probe_measurement['id']] + probe_measurement_reference = self.reference.copy() + probe_measurement_reference['reference_url'] = probe_measurement_reference['reference_url'] + \ + f'/{probe_measurement_qid}' + + probe_measurement_domain = probe_measurement['target']['domain'] + if probe_measurement_domain: + domain_qid = domain_ids[probe_measurement_domain] + target_links.append({'src_id': probe_measurement_qid, 'dst_id': domain_qid, + 'props': [probe_measurement_reference]}) + + probe_measurement_ips = self.__get_all_resolved_ips(probe_measurement) + for probe_measurement_ip in probe_measurement_ips: + ip_qid = ip_ids[probe_measurement_ip] + target_links.append({'src_id': probe_measurement_qid, 'dst_id': ip_qid, + 'props': [probe_measurement_reference]}) + + probe_ids_participated = probe_measurement['current_probes'] + if probe_ids_participated: + for probe_id in probe_ids_participated: + probe_qid = probe_ids[probe_id] + part_of_links.append({'src_id': probe_qid, 'dst_id': probe_measurement_qid, + 'props': [probe_measurement_reference]}) + + # Push all links to IYP + logging.info('Fetching/pushing relationships') + self.iyp.batch_add_links('TARGET', target_links) + self.iyp.batch_add_links('PART_OF', part_of_links) + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument('--unit-test', action='store_true') + args = parser.parse_args() + + scriptname = os.path.basename(sys.argv[0]).replace('/', '_')[0:-3] + FORMAT = '%(asctime)s %(levelname)s %(message)s' + logging.basicConfig( + format=FORMAT, + filename='log/' + scriptname + '.log', + level=logging.INFO, + datefmt='%Y-%m-%d %H:%M:%S' + ) + + logging.info(f'Started: {sys.argv}') + print('Fetching RIPE Atlas probe measurements', file=sys.stderr) + + crawler = Crawler(ORG, URL, NAME) + if args.unit_test: + crawler.unit_test(logging) + else: + crawler.run() + crawler.close() + logging.info(f'Finished: {sys.argv}') + + +if __name__ == '__main__': + main() + sys.exit(0)