From c4c4a0f04be38b175e7df29b189b50362039a719 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Mon, 25 Dec 2023 02:27:29 +0200 Subject: [PATCH 1/4] Integrate RIPE Atlas Measurements Data Source This PR integrates RIPE Atlas Measurements Data Source, adding the following relationships and node labels: - "(:AtlasProbe)-[:PART_OF]->(:AtlasMeasurement)-[:TARGET]->(:DomainName)" - "(:AtlasProbe)-[:PART_OF]->(:AtlasMeasurement)-[:TARGET]->(:IP)" --- config.json.example | 1 + iyp/crawlers/ripe/atlas_measurements.py | 246 ++++++++++++++++++++++++ 2 files changed, 247 insertions(+) create mode 100644 iyp/crawlers/ripe/atlas_measurements.py diff --git a/config.json.example b/config.json.example index 57348e7..76229cc 100644 --- a/config.json.example +++ b/config.json.example @@ -69,6 +69,7 @@ "iyp.crawlers.pch.daily_routing_snapshots_v6", "iyp.crawlers.emileaben.as_names", "iyp.crawlers.ripe.atlas_probes", + "iyp.crawlers.ripe.atlas_measurements", "iyp.crawlers.iana.root_zone", "iyp.crawlers.alice_lg.amsix", "iyp.crawlers.alice_lg.bcix", diff --git a/iyp/crawlers/ripe/atlas_measurements.py b/iyp/crawlers/ripe/atlas_measurements.py new file mode 100644 index 0000000..d3270f7 --- /dev/null +++ b/iyp/crawlers/ripe/atlas_measurements.py @@ -0,0 +1,246 @@ +import argparse +import ipaddress +import json +import logging +import os +import sys + +import flatdict +import iso3166 +import requests +from requests import Session +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + +from iyp import BaseCrawler + +ORG = 'RIPE NCC' + +URL = 'https://atlas.ripe.net/api/v2/measurements' +NAME = 'ripe.atlas_measurements' + + +class Crawler(BaseCrawler): + def __init__(self, organization, url, name): + self.__initialize_session() + super().__init__(organization, url, name) + + def __initialize_session(self) -> None: + self.session = Session() + retry = Retry( + backoff_factor=0.1, + status_forcelist=(429, 500, 502, 503, 504), + respect_retry_after_header=True + ) + adapter = HTTPAdapter(max_retries=retry) + self.session.mount('http://', adapter) + self.session.mount('https://', adapter) + + @staticmethod + def __process_response(response: requests.Response): + if response.status_code != requests.codes.ok: + sys.exit(f'Request to {response.url} failed with status: {response.status_code}') + try: + data = response.json() + except json.decoder.JSONDecodeError as e: + sys.exit(f'Decoding JSON reply from {response.url} failed with exception: {e}') + if 'next' not in data or 'results' not in data: + sys.exit('"next" or "results" key missing from response data.') + + next_url = data['next'] + if not next_url: + logging.info('Reached end of list') + next_url = str() + return next_url, data['results'] + + def __execute_query(self, url: str): + logging.info(f'Querying {url}') + r = self.session.get(url) + return self.__process_response(r) + + @staticmethod + def __transform_data(data): + for item in data: + # Convert empty lists to None as flatdict library converts it automatically + # to FlatterDict object which would cause a problem when loading data in neo4j. + for key in item: + if isinstance(item[key], list) and len(item[key]) == 0: + item[key] = None + + # Extracting target information + target_info = { + "domain": item.pop("target", None), + "asn": item.pop("target_asn", None), + "ip": item.pop("target_ip", None), + "prefix": item.pop("target_prefix", None), + "resolved_ips": item.pop("resolved_ips", None) + } + item["target"] = target_info + + # Extracting group information + group_info = { + "value": item.pop("group", None), + "id": item.pop("group_id", None) + } + item["group"] = group_info + + @staticmethod + def __is_valid_ip(ip): + try: + ipaddress.ip_address(ip) + return True + except ValueError: + return False + + @staticmethod + def __get_all_resolved_ips(probe_measurement): + # Ensure "resolved_ips" takes precedence over "target_ip". + resolved_ips = probe_measurement['target']['resolved_ips'] or probe_measurement['target']['ip'] or [] + resolved_ips = [resolved_ips] if not isinstance(resolved_ips, list) else resolved_ips + return resolved_ips + + @staticmethod + def __add_if_not_none(v, s: set): + if v is not None and v != "" and v not in s: + s.add(v) + + @staticmethod + def __add_if_not_none_values(lst, s: set): + if not isinstance(lst, list): return + for item in lst: + Crawler.__add_if_not_none(item, s) + + def run(self): + params = {'format': 'json', + 'is_public': True, + 'status': 2, + 'optional_fields': 'current_probes', + 'page_size': 500} + r = self.session.get(URL, params=params) + next_url, data = self.__process_response(r) + while next_url: + next_url, next_data = self.__execute_query(next_url) + data += next_data + logging.info(f'Added {len(next_data)} probes. Total: {len(data)}') + print(f'Fetched {len(data)} probe measurements', file=sys.stderr) + + # Transform the data to be compatible with the flatdict format. + self.__transform_data(data) + + # Compute nodes + probe_measurement_ids = set() + probe_ids = set() + ips = set() + ases = set() + prefixes = set() + domains = set() + + valid_probe_measurements = list() + + for probe_measurement in data: + probe_measurement_id = probe_measurement['id'] + if not probe_measurement_id: + logging.error(f'Probe Measurement without ID. Should never happen: {probe_measurement}.') + continue + if probe_measurement_id in probe_measurement_ids: + logging.warning(f'Duplicate probe measurement ID: {probe_measurement_id}.') + continue + + resolved_ips = self.__get_all_resolved_ips(probe_measurement) + for i in range(len(resolved_ips)): + probe_af = int(probe_measurement['af']) + if probe_af == 6: + try: + resolved_ips[i] = ipaddress.ip_address(resolved_ips[i]).compressed + except ValueError: + resolved_ips[i] = None + + domain = probe_measurement['target']['domain'] + if self.__is_valid_ip(domain) or domain == "": + domain = None + probe_measurement['target']['domain'] = None + + asn = probe_measurement['target']['asn'] + prefix = probe_measurement['target']['prefix'] + probe_ids_participated = probe_measurement['current_probes'] + + self.__add_if_not_none(probe_measurement_id, probe_measurement_ids) + self.__add_if_not_none(domain, domains) + self.__add_if_not_none(asn, ases) + self.__add_if_not_none(prefix, prefixes) + self.__add_if_not_none_values(resolved_ips, ips) + self.__add_if_not_none_values(probe_ids_participated, probe_ids) + + valid_probe_measurements.append(probe_measurement) + + # push nodes + logging.info('Fetching/pushing nodes') + probe_measurement_ids = dict() + + flattened_probe_measurements = [dict(flatdict.FlatterDict(probe_measurement, delimiter='_')) for probe_measurement in valid_probe_measurements] + + probe_measurement_ids = self.iyp.batch_get_nodes('AtlasMeasurement', flattened_probe_measurements, ['id'], create=True) + probe_ids = self.iyp.batch_get_nodes_by_single_prop('AtlasProbe', 'id', probe_ids, all=False, create=True) + ip_ids = self.iyp.batch_get_nodes_by_single_prop('IP', 'ip', ips, all=False, create=True) + domain_ids = self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name', domains, all=False, create=True) + _ = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', ases, all=False, create=True) + _ = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes, all=False, create=True) + + # compute links + target_links = list() + part_of_links = list() + + for probe_measurement in valid_probe_measurements: + probe_measurement_qid = probe_measurement_ids[probe_measurement['id']] + + probe_measurement_domain = probe_measurement['target']['domain'] + if probe_measurement_domain: + domain_qid = domain_ids[probe_measurement_domain] + target_links.append({'src_id': probe_measurement_qid, 'dst_id': domain_qid, 'props': [self.reference]}) + + probe_measurement_ips = self.__get_all_resolved_ips(probe_measurement) + for probe_measurement_ip in probe_measurement_ips: + ip_qid = ip_ids[probe_measurement_ip] + target_links.append({'src_id': probe_measurement_qid, 'dst_id': ip_qid, 'props': [self.reference]}) + + probe_ids_participated = probe_measurement['current_probes'] + if probe_ids_participated: + for probe_id in probe_ids_participated: + probe_qid = probe_ids[probe_id] + part_of_links.append({'src_id': probe_qid, 'dst_id': probe_measurement_qid, 'props': [self.reference]}) + + # Push all links to IYP + logging.info('Fetching/pushing relationships') + self.iyp.batch_add_links('TARGET', target_links) + self.iyp.batch_add_links('PART_OF', part_of_links) + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument('--unit-test', action='store_true') + args = parser.parse_args() + + scriptname = os.path.basename(sys.argv[0]).replace('/', '_')[0:-3] + FORMAT = '%(asctime)s %(levelname)s %(message)s' + logging.basicConfig( + format=FORMAT, + filename='log/' + scriptname + '.log', + level=logging.INFO, + datefmt='%Y-%m-%d %H:%M:%S' + ) + + logging.info(f'Started: {sys.argv}') + print('Fetching RIPE Atlas probe measurements', file=sys.stderr) + + crawler = Crawler(ORG, URL, NAME) + if args.unit_test: + crawler.unit_test(logging) + else: + crawler.run() + crawler.close() + logging.info(f'Finished: {sys.argv}') + + +if __name__ == '__main__': + main() + sys.exit(0) From 9eea7be088cde20ced6cd9d13ea381e4b583b8d2 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Mon, 25 Dec 2023 02:51:56 +0200 Subject: [PATCH 2/4] Fix linting issues --- iyp/crawlers/ripe/atlas_measurements.py | 28 +++++++++++++++---------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/iyp/crawlers/ripe/atlas_measurements.py b/iyp/crawlers/ripe/atlas_measurements.py index d3270f7..abeae34 100644 --- a/iyp/crawlers/ripe/atlas_measurements.py +++ b/iyp/crawlers/ripe/atlas_measurements.py @@ -6,7 +6,6 @@ import sys import flatdict -import iso3166 import requests from requests import Session from requests.adapters import HTTPAdapter @@ -57,12 +56,13 @@ def __execute_query(self, url: str): logging.info(f'Querying {url}') r = self.session.get(url) return self.__process_response(r) - + @staticmethod def __transform_data(data): for item in data: # Convert empty lists to None as flatdict library converts it automatically - # to FlatterDict object which would cause a problem when loading data in neo4j. + # to FlatterDict object which would cause a problem when + # loading data in neo4j. for key in item: if isinstance(item[key], list) and len(item[key]) == 0: item[key] = None @@ -91,7 +91,7 @@ def __is_valid_ip(ip): return True except ValueError: return False - + @staticmethod def __get_all_resolved_ips(probe_measurement): # Ensure "resolved_ips" takes precedence over "target_ip". @@ -103,10 +103,11 @@ def __get_all_resolved_ips(probe_measurement): def __add_if_not_none(v, s: set): if v is not None and v != "" and v not in s: s.add(v) - + @staticmethod def __add_if_not_none_values(lst, s: set): - if not isinstance(lst, list): return + if not isinstance(lst, list): + return for item in lst: Crawler.__add_if_not_none(item, s) @@ -122,6 +123,7 @@ def run(self): next_url, next_data = self.__execute_query(next_url) data += next_data logging.info(f'Added {len(next_data)} probes. Total: {len(data)}') + break print(f'Fetched {len(data)} probe measurements', file=sys.stderr) # Transform the data to be compatible with the flatdict format. @@ -177,9 +179,12 @@ def run(self): logging.info('Fetching/pushing nodes') probe_measurement_ids = dict() - flattened_probe_measurements = [dict(flatdict.FlatterDict(probe_measurement, delimiter='_')) for probe_measurement in valid_probe_measurements] + attrs_flattened = [] + for probe_measurement in valid_probe_measurements: + probe_measurement_flattened = dict(flatdict.FlatterDict(probe_measurement, delimiter='_')) + attrs_flattened.append(probe_measurement_flattened) - probe_measurement_ids = self.iyp.batch_get_nodes('AtlasMeasurement', flattened_probe_measurements, ['id'], create=True) + probe_measurement_ids = self.iyp.batch_get_nodes('AtlasMeasurement', attrs_flattened, ['id'], create=True) probe_ids = self.iyp.batch_get_nodes_by_single_prop('AtlasProbe', 'id', probe_ids, all=False, create=True) ip_ids = self.iyp.batch_get_nodes_by_single_prop('IP', 'ip', ips, all=False, create=True) domain_ids = self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name', domains, all=False, create=True) @@ -192,12 +197,12 @@ def run(self): for probe_measurement in valid_probe_measurements: probe_measurement_qid = probe_measurement_ids[probe_measurement['id']] - + probe_measurement_domain = probe_measurement['target']['domain'] if probe_measurement_domain: domain_qid = domain_ids[probe_measurement_domain] target_links.append({'src_id': probe_measurement_qid, 'dst_id': domain_qid, 'props': [self.reference]}) - + probe_measurement_ips = self.__get_all_resolved_ips(probe_measurement) for probe_measurement_ip in probe_measurement_ips: ip_qid = ip_ids[probe_measurement_ip] @@ -207,7 +212,8 @@ def run(self): if probe_ids_participated: for probe_id in probe_ids_participated: probe_qid = probe_ids[probe_id] - part_of_links.append({'src_id': probe_qid, 'dst_id': probe_measurement_qid, 'props': [self.reference]}) + part_of_links.append({'src_id': probe_qid, 'dst_id': probe_measurement_qid, + 'props': [self.reference]}) # Push all links to IYP logging.info('Fetching/pushing relationships') From d4f40d208ffb73358ccedd4994d5167f27ae9475 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Wed, 27 Dec 2023 07:35:06 +0200 Subject: [PATCH 3/4] Resolve PR review --- iyp/crawlers/ripe/atlas_measurements.py | 88 ++++++++++++++++--------- 1 file changed, 56 insertions(+), 32 deletions(-) diff --git a/iyp/crawlers/ripe/atlas_measurements.py b/iyp/crawlers/ripe/atlas_measurements.py index abeae34..7c6320a 100644 --- a/iyp/crawlers/ripe/atlas_measurements.py +++ b/iyp/crawlers/ripe/atlas_measurements.py @@ -19,6 +19,24 @@ NAME = 'ripe.atlas_measurements' +class RequestStatusError(requests.HTTPError): + def __init__(self, message): + self.message = message + super().__init__(self.message) + + +class JSONDecodeError(ValueError): + def __init__(self, message): + self.message = message + super().__init__(self.message) + + +class MissingKeyError(Exception): + def __init__(self, message): + self.message = message + super().__init__(self.message) + + class Crawler(BaseCrawler): def __init__(self, organization, url, name): self.__initialize_session() @@ -38,13 +56,13 @@ def __initialize_session(self) -> None: @staticmethod def __process_response(response: requests.Response): if response.status_code != requests.codes.ok: - sys.exit(f'Request to {response.url} failed with status: {response.status_code}') + raise RequestStatusError(f'Request to {response.url} failed with status: {response.status_code}') try: data = response.json() except json.decoder.JSONDecodeError as e: - sys.exit(f'Decoding JSON reply from {response.url} failed with exception: {e}') + raise JSONDecodeError(f'Decoding JSON reply from {response.url} failed with exception: {e}') if 'next' not in data or 'results' not in data: - sys.exit('"next" or "results" key missing from response data.') + raise MissingKeyError('"next" or "results" key missing from response data.') next_url = data['next'] if not next_url: @@ -67,22 +85,26 @@ def __transform_data(data): if isinstance(item[key], list) and len(item[key]) == 0: item[key] = None - # Extracting target information + # Flatten the target information since the original data has multiple fields + # prefixed with 'target_'. Given that we flatten the dict + # on the '_' delimiter, this action would potentially + # cause a TypeError from flatdict if it isn't handled properly. target_info = { - "domain": item.pop("target", None), - "asn": item.pop("target_asn", None), - "ip": item.pop("target_ip", None), - "prefix": item.pop("target_prefix", None), - "resolved_ips": item.pop("resolved_ips", None) + 'domain': item.pop('target', None), + 'asn': item.pop('target_asn', None), + 'ip': item.pop('target_ip', None), + 'prefix': item.pop('target_prefix', None), + 'resolved_ips': item.pop('resolved_ips', None) } - item["target"] = target_info + item['target'] = target_info - # Extracting group information + # Flatten the group information, They are the same as target + # information as they are prefixed with 'group_'. group_info = { - "value": item.pop("group", None), - "id": item.pop("group_id", None) + 'value': item.pop('group', None), + 'id': item.pop('group_id', None) } - item["group"] = group_info + item['group'] = group_info @staticmethod def __is_valid_ip(ip): @@ -94,14 +116,15 @@ def __is_valid_ip(ip): @staticmethod def __get_all_resolved_ips(probe_measurement): - # Ensure "resolved_ips" takes precedence over "target_ip". + # Ensure 'resolved_ips' takes precedence over 'target_ip. resolved_ips = probe_measurement['target']['resolved_ips'] or probe_measurement['target']['ip'] or [] resolved_ips = [resolved_ips] if not isinstance(resolved_ips, list) else resolved_ips - return resolved_ips + valid_resolved_ips = [ip for ip in resolved_ips if ip is not None and ip != ''] + return valid_resolved_ips @staticmethod def __add_if_not_none(v, s: set): - if v is not None and v != "" and v not in s: + if v is not None and v != '' and v not in s: s.add(v) @staticmethod @@ -119,11 +142,13 @@ def run(self): 'page_size': 500} r = self.session.get(URL, params=params) next_url, data = self.__process_response(r) + count = 0 while next_url: next_url, next_data = self.__execute_query(next_url) data += next_data logging.info(f'Added {len(next_data)} probes. Total: {len(data)}') - break + if count == 5: + break print(f'Fetched {len(data)} probe measurements', file=sys.stderr) # Transform the data to be compatible with the flatdict format. @@ -134,7 +159,6 @@ def run(self): probe_ids = set() ips = set() ases = set() - prefixes = set() domains = set() valid_probe_measurements = list() @@ -151,25 +175,19 @@ def run(self): resolved_ips = self.__get_all_resolved_ips(probe_measurement) for i in range(len(resolved_ips)): probe_af = int(probe_measurement['af']) - if probe_af == 6: - try: - resolved_ips[i] = ipaddress.ip_address(resolved_ips[i]).compressed - except ValueError: - resolved_ips[i] = None + resolved_ips[i] = ipaddress.ip_address(resolved_ips[i]).compressed if probe_af == 6 else resolved_ips[i] domain = probe_measurement['target']['domain'] - if self.__is_valid_ip(domain) or domain == "": + if domain == '' or self.__is_valid_ip(domain): domain = None probe_measurement['target']['domain'] = None asn = probe_measurement['target']['asn'] - prefix = probe_measurement['target']['prefix'] probe_ids_participated = probe_measurement['current_probes'] self.__add_if_not_none(probe_measurement_id, probe_measurement_ids) self.__add_if_not_none(domain, domains) self.__add_if_not_none(asn, ases) - self.__add_if_not_none(prefix, prefixes) self.__add_if_not_none_values(resolved_ips, ips) self.__add_if_not_none_values(probe_ids_participated, probe_ids) @@ -181,7 +199,9 @@ def run(self): attrs_flattened = [] for probe_measurement in valid_probe_measurements: - probe_measurement_flattened = dict(flatdict.FlatterDict(probe_measurement, delimiter='_')) + probe_measurement_copy = probe_measurement.copy() + del probe_measurement_copy['current_probes'] + probe_measurement_flattened = dict(flatdict.FlatterDict(probe_measurement_copy, delimiter='_')) attrs_flattened.append(probe_measurement_flattened) probe_measurement_ids = self.iyp.batch_get_nodes('AtlasMeasurement', attrs_flattened, ['id'], create=True) @@ -189,7 +209,6 @@ def run(self): ip_ids = self.iyp.batch_get_nodes_by_single_prop('IP', 'ip', ips, all=False, create=True) domain_ids = self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name', domains, all=False, create=True) _ = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', ases, all=False, create=True) - _ = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes, all=False, create=True) # compute links target_links = list() @@ -197,23 +216,28 @@ def run(self): for probe_measurement in valid_probe_measurements: probe_measurement_qid = probe_measurement_ids[probe_measurement['id']] + probe_measurement_reference = self.reference.copy() + probe_measurement_reference['reference_url'] = probe_measurement_reference['reference_url'] + \ + f'/{probe_measurement_qid}' probe_measurement_domain = probe_measurement['target']['domain'] if probe_measurement_domain: domain_qid = domain_ids[probe_measurement_domain] - target_links.append({'src_id': probe_measurement_qid, 'dst_id': domain_qid, 'props': [self.reference]}) + target_links.append({'src_id': probe_measurement_qid, 'dst_id': domain_qid, + 'props': [probe_measurement_reference]}) probe_measurement_ips = self.__get_all_resolved_ips(probe_measurement) for probe_measurement_ip in probe_measurement_ips: ip_qid = ip_ids[probe_measurement_ip] - target_links.append({'src_id': probe_measurement_qid, 'dst_id': ip_qid, 'props': [self.reference]}) + target_links.append({'src_id': probe_measurement_qid, 'dst_id': ip_qid, + 'props': [probe_measurement_reference]}) probe_ids_participated = probe_measurement['current_probes'] if probe_ids_participated: for probe_id in probe_ids_participated: probe_qid = probe_ids[probe_id] part_of_links.append({'src_id': probe_qid, 'dst_id': probe_measurement_qid, - 'props': [self.reference]}) + 'props': [probe_measurement_reference]}) # Push all links to IYP logging.info('Fetching/pushing relationships') From 938df2363b0cbf57f6069aba61b6bc63d55805e1 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Wed, 27 Dec 2023 07:36:26 +0200 Subject: [PATCH 4/4] Add atlas_measurements docs --- iyp/crawlers/ripe/README.md | 13 +++++++++++++ iyp/crawlers/ripe/atlas_measurements.py | 3 --- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/iyp/crawlers/ripe/README.md b/iyp/crawlers/ripe/README.md index b2a9538..3dcd4d6 100644 --- a/iyp/crawlers/ripe/README.md +++ b/iyp/crawlers/ripe/README.md @@ -47,6 +47,19 @@ ASN(s), and country. (:IP {ip: '202.214.97.16'})-[:ASSIGNED]->(:AtlasProbe {id: 6425}) ``` +### Atlas Measurements - `atlas_measurements.py` + +We fetch the [list of probe measurements](https://atlas.ripe.net/api/v2/measurements) to obtain the measurement data of the `AtlasProbe`. This data is based on the probe's ID, connected through relationships defined by `PART_OF` and `TARGET`. The `TARGET` relationship encompass associations with both `DOMAIN` and `IP`. + +The Cypher query for these relationships appears as follows: + +```Cypher +(:AtlasProbe)-[:PART_OF]->(:AtlasMeasurement)-[:TARGET]->(:DomainName) +(:AtlasProbe)-[:PART_OF]->(:AtlasMeasurement)-[:TARGET]->(:IP) +``` + +This query is designed to identify `AtlasProbes` linked via the `PART_OF` relationship to `AtlasMeasurements`, which, in turn, are linked through `TARGET` to either a `DomainName` or an `IP`. + ## Dependence This crawler is not depending on other crawlers. diff --git a/iyp/crawlers/ripe/atlas_measurements.py b/iyp/crawlers/ripe/atlas_measurements.py index 7c6320a..7acff60 100644 --- a/iyp/crawlers/ripe/atlas_measurements.py +++ b/iyp/crawlers/ripe/atlas_measurements.py @@ -142,13 +142,10 @@ def run(self): 'page_size': 500} r = self.session.get(URL, params=params) next_url, data = self.__process_response(r) - count = 0 while next_url: next_url, next_data = self.__execute_query(next_url) data += next_data logging.info(f'Added {len(next_data)} probes. Total: {len(data)}') - if count == 5: - break print(f'Fetched {len(data)} probe measurements', file=sys.stderr) # Transform the data to be compatible with the flatdict format.