-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[feature]: Integrate RIPE Atlas Probe Measurements Data Source #101
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,252 @@ | ||
import argparse | ||
import ipaddress | ||
import json | ||
import logging | ||
import os | ||
import sys | ||
|
||
import flatdict | ||
import requests | ||
from requests import Session | ||
from requests.adapters import HTTPAdapter | ||
from urllib3.util.retry import Retry | ||
|
||
from iyp import BaseCrawler | ||
|
||
ORG = 'RIPE NCC' | ||
|
||
URL = 'https://atlas.ripe.net/api/v2/measurements' | ||
NAME = 'ripe.atlas_measurements' | ||
|
||
|
||
class Crawler(BaseCrawler): | ||
def __init__(self, organization, url, name): | ||
self.__initialize_session() | ||
super().__init__(organization, url, name) | ||
|
||
def __initialize_session(self) -> None: | ||
self.session = Session() | ||
retry = Retry( | ||
backoff_factor=0.1, | ||
status_forcelist=(429, 500, 502, 503, 504), | ||
respect_retry_after_header=True | ||
) | ||
adapter = HTTPAdapter(max_retries=retry) | ||
self.session.mount('http://', adapter) | ||
self.session.mount('https://', adapter) | ||
|
||
@staticmethod | ||
def __process_response(response: requests.Response): | ||
if response.status_code != requests.codes.ok: | ||
sys.exit(f'Request to {response.url} failed with status: {response.status_code}') | ||
try: | ||
data = response.json() | ||
except json.decoder.JSONDecodeError as e: | ||
sys.exit(f'Decoding JSON reply from {response.url} failed with exception: {e}') | ||
if 'next' not in data or 'results' not in data: | ||
sys.exit('"next" or "results" key missing from response data.') | ||
|
||
next_url = data['next'] | ||
if not next_url: | ||
logging.info('Reached end of list') | ||
next_url = str() | ||
return next_url, data['results'] | ||
|
||
def __execute_query(self, url: str): | ||
logging.info(f'Querying {url}') | ||
r = self.session.get(url) | ||
return self.__process_response(r) | ||
|
||
@staticmethod | ||
def __transform_data(data): | ||
for item in data: | ||
# Convert empty lists to None as flatdict library converts it automatically | ||
# to FlatterDict object which would cause a problem when | ||
# loading data in neo4j. | ||
for key in item: | ||
if isinstance(item[key], list) and len(item[key]) == 0: | ||
item[key] = None | ||
|
||
# Extracting target information | ||
target_info = { | ||
"domain": item.pop("target", None), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This project uses single quotes. If you install the pre-commit hook described here this should be handled automatically :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, thanks for sharing this! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm also thinking of configuring the pre-commit run --all-files There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That would be great! When I setup the github actions I did not (and still do not really) know how they work, so they actually don't check exactly the same things as the pre-commit hook. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay I would love to submit PR for this! |
||
"asn": item.pop("target_asn", None), | ||
"ip": item.pop("target_ip", None), | ||
"prefix": item.pop("target_prefix", None), | ||
"resolved_ips": item.pop("resolved_ips", None) | ||
} | ||
item["target"] = target_info | ||
|
||
# Extracting group information | ||
group_info = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think keeping track of the group info is needed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem arises from data retrieved from the RIPE Atlas Measurement dataset. The "group": null,
"group_id": null,
"target": "[k.root-servers.net](http://k.root-servers.net/)",
"target_asn": null,
"target_ip": "193.0.14.129",
"target_prefix": null, When we attempt to flatten this data using the For example, running the provided code snippet below generates an error: import flatdict
object = {"group": None, "group_id": None}
flatdict.FlatterDict(object, delimiter='_') This error resembles the one shown: Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/mohamedawnallah/Desktop/internet-yellow-pages/.venv/lib/python3.11/site-packages/flatdict.py", line 389, in __init__
super(FlatterDict, self).__init__(value, delimiter, dict_class)
File "/Users/mohamedawnallah/Desktop/internet-yellow-pages/.venv/lib/python3.11/site-packages/flatdict.py", line 29, in __init__
self.update(value)
File "/Users/mohamedawnallah/Desktop/internet-yellow-pages/.venv/lib/python3.11/site-packages/flatdict.py", line 356, in update
[self.__setitem__(k, v) for k, v in dict(other or kwargs).items()]
File "/Users/mohamedawnallah/Desktop/internet-yellow-pages/.venv/lib/python3.11/site-packages/flatdict.py", line 356, in <listcomp>
[self.__setitem__(k, v) for k, v in dict(other or kwargs).items()]
^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mohamedawnallah/Desktop/internet-yellow-pages/.venv/lib/python3.11/site-packages/flatdict.py", line 420, in __setitem__
raise TypeError(
TypeError: Assignment to invalid type for key group This issue persists with the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That said I'm gonna update the comments in the code to let anyone who read the code know why we extracted the information in this kind of format. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see! Yes that makes sense then and good idea to add a comment. |
||
"value": item.pop("group", None), | ||
"id": item.pop("group_id", None) | ||
} | ||
item["group"] = group_info | ||
|
||
@staticmethod | ||
def __is_valid_ip(ip): | ||
try: | ||
ipaddress.ip_address(ip) | ||
return True | ||
except ValueError: | ||
return False | ||
|
||
@staticmethod | ||
def __get_all_resolved_ips(probe_measurement): | ||
# Ensure "resolved_ips" takes precedence over "target_ip". | ||
resolved_ips = probe_measurement['target']['resolved_ips'] or probe_measurement['target']['ip'] or [] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. First time I've seen this syntax, thanks for that! You always learn something new :D |
||
resolved_ips = [resolved_ips] if not isinstance(resolved_ips, list) else resolved_ips | ||
return resolved_ips | ||
|
||
@staticmethod | ||
def __add_if_not_none(v, s: set): | ||
if v is not None and v != "" and v not in s: | ||
s.add(v) | ||
|
||
@staticmethod | ||
def __add_if_not_none_values(lst, s: set): | ||
if not isinstance(lst, list): | ||
return | ||
for item in lst: | ||
Crawler.__add_if_not_none(item, s) | ||
|
||
def run(self): | ||
params = {'format': 'json', | ||
'is_public': True, | ||
'status': 2, | ||
'optional_fields': 'current_probes', | ||
'page_size': 500} | ||
r = self.session.get(URL, params=params) | ||
next_url, data = self.__process_response(r) | ||
while next_url: | ||
next_url, next_data = self.__execute_query(next_url) | ||
data += next_data | ||
logging.info(f'Added {len(next_data)} probes. Total: {len(data)}') | ||
break | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is probably leftover from debugging and should be removed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Aha yes 👍 |
||
print(f'Fetched {len(data)} probe measurements', file=sys.stderr) | ||
|
||
# Transform the data to be compatible with the flatdict format. | ||
self.__transform_data(data) | ||
|
||
# Compute nodes | ||
probe_measurement_ids = set() | ||
probe_ids = set() | ||
ips = set() | ||
ases = set() | ||
prefixes = set() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's not create Prefix nodes from this crawler. We are unsure where Atlas actually gets this information from. Maybe we can add it later, but for now let's just keep it in the property. |
||
domains = set() | ||
|
||
valid_probe_measurements = list() | ||
|
||
for probe_measurement in data: | ||
probe_measurement_id = probe_measurement['id'] | ||
if not probe_measurement_id: | ||
logging.error(f'Probe Measurement without ID. Should never happen: {probe_measurement}.') | ||
continue | ||
if probe_measurement_id in probe_measurement_ids: | ||
logging.warning(f'Duplicate probe measurement ID: {probe_measurement_id}.') | ||
continue | ||
|
||
resolved_ips = self.__get_all_resolved_ips(probe_measurement) | ||
for i in range(len(resolved_ips)): | ||
probe_af = int(probe_measurement['af']) | ||
if probe_af == 6: | ||
try: | ||
resolved_ips[i] = ipaddress.ip_address(resolved_ips[i]).compressed | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can actually just do this for all IPs, since There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are some data points where the IP address is either "" or null. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems these absence values are not malformed IP addresses we should handle this in the |
||
except ValueError: | ||
resolved_ips[i] = None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would at least log this, since this really should not happen. |
||
|
||
domain = probe_measurement['target']['domain'] | ||
if self.__is_valid_ip(domain) or domain == "": | ||
domain = None | ||
probe_measurement['target']['domain'] = None | ||
|
||
asn = probe_measurement['target']['asn'] | ||
prefix = probe_measurement['target']['prefix'] | ||
probe_ids_participated = probe_measurement['current_probes'] | ||
|
||
self.__add_if_not_none(probe_measurement_id, probe_measurement_ids) | ||
self.__add_if_not_none(domain, domains) | ||
self.__add_if_not_none(asn, ases) | ||
self.__add_if_not_none(prefix, prefixes) | ||
self.__add_if_not_none_values(resolved_ips, ips) | ||
self.__add_if_not_none_values(probe_ids_participated, probe_ids) | ||
|
||
valid_probe_measurements.append(probe_measurement) | ||
|
||
# push nodes | ||
logging.info('Fetching/pushing nodes') | ||
probe_measurement_ids = dict() | ||
|
||
attrs_flattened = [] | ||
for probe_measurement in valid_probe_measurements: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should exclude the |
||
probe_measurement_flattened = dict(flatdict.FlatterDict(probe_measurement, delimiter='_')) | ||
attrs_flattened.append(probe_measurement_flattened) | ||
|
||
probe_measurement_ids = self.iyp.batch_get_nodes('AtlasMeasurement', attrs_flattened, ['id'], create=True) | ||
probe_ids = self.iyp.batch_get_nodes_by_single_prop('AtlasProbe', 'id', probe_ids, all=False, create=True) | ||
ip_ids = self.iyp.batch_get_nodes_by_single_prop('IP', 'ip', ips, all=False, create=True) | ||
domain_ids = self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name', domains, all=False, create=True) | ||
_ = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', ases, all=False, create=True) | ||
_ = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes, all=False, create=True) | ||
|
||
# compute links | ||
target_links = list() | ||
part_of_links = list() | ||
|
||
for probe_measurement in valid_probe_measurements: | ||
probe_measurement_qid = probe_measurement_ids[probe_measurement['id']] | ||
|
||
probe_measurement_domain = probe_measurement['target']['domain'] | ||
if probe_measurement_domain: | ||
domain_qid = domain_ids[probe_measurement_domain] | ||
target_links.append({'src_id': probe_measurement_qid, 'dst_id': domain_qid, 'props': [self.reference]}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be cool if the self.reference['reference_url'] could point to the respective measurement for each link. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I got what you means regards adding the reference on the relationship link but could you please elaborate more on this: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you mean, should we include the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Regarding the first question: I made the mistake that I've updated the reference dict inside a for loop when computing the relationships, but since only a reference of the dict is stored in the list you actually only overwrite the url field of the same dict. For example links = list()
for x, y in nodes:
self.reference['reference_url'] = ...
links.append({'src_id': x, 'dst_id': y, 'props': [self.reference]})
self.iyp.batch_add_links(...) would lead to all but one relationship having the wrong URL so a For the second question, even though it is a bit tedious I would prefer to update the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes this makes sense, Thanks! |
||
|
||
probe_measurement_ips = self.__get_all_resolved_ips(probe_measurement) | ||
for probe_measurement_ip in probe_measurement_ips: | ||
ip_qid = ip_ids[probe_measurement_ip] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are measurements with an empty string in the |
||
target_links.append({'src_id': probe_measurement_qid, 'dst_id': ip_qid, 'props': [self.reference]}) | ||
|
||
probe_ids_participated = probe_measurement['current_probes'] | ||
if probe_ids_participated: | ||
for probe_id in probe_ids_participated: | ||
probe_qid = probe_ids[probe_id] | ||
part_of_links.append({'src_id': probe_qid, 'dst_id': probe_measurement_qid, | ||
'props': [self.reference]}) | ||
|
||
# Push all links to IYP | ||
logging.info('Fetching/pushing relationships') | ||
self.iyp.batch_add_links('TARGET', target_links) | ||
self.iyp.batch_add_links('PART_OF', part_of_links) | ||
|
||
|
||
def main() -> None: | ||
parser = argparse.ArgumentParser() | ||
parser.add_argument('--unit-test', action='store_true') | ||
args = parser.parse_args() | ||
|
||
scriptname = os.path.basename(sys.argv[0]).replace('/', '_')[0:-3] | ||
FORMAT = '%(asctime)s %(levelname)s %(message)s' | ||
logging.basicConfig( | ||
format=FORMAT, | ||
filename='log/' + scriptname + '.log', | ||
level=logging.INFO, | ||
datefmt='%Y-%m-%d %H:%M:%S' | ||
) | ||
|
||
logging.info(f'Started: {sys.argv}') | ||
print('Fetching RIPE Atlas probe measurements', file=sys.stderr) | ||
|
||
crawler = Crawler(ORG, URL, NAME) | ||
if args.unit_test: | ||
crawler.unit_test(logging) | ||
else: | ||
crawler.run() | ||
crawler.close() | ||
logging.info(f'Finished: {sys.argv}') | ||
|
||
|
||
if __name__ == '__main__': | ||
main() | ||
sys.exit(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this is from the probe crawler (and it is on our list of ToDos), but our crawlers should not call
sys.exit
, since this terminates the entirecreate_db
process. You can just raise an exception withresponse.raise_for_status()
(or raise your own with a descriptive message, either is fine).We are planning to write a best-practices kind of thing for crawlers at some point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this makes much sense. Acknowledged!