Skip to content

Commit

Permalink
Resolve PR review
Browse files Browse the repository at this point in the history
  • Loading branch information
mohamedawnallah committed Dec 27, 2023
1 parent 9eea7be commit d4f40d2
Showing 1 changed file with 56 additions and 32 deletions.
88 changes: 56 additions & 32 deletions iyp/crawlers/ripe/atlas_measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,24 @@
NAME = 'ripe.atlas_measurements'


class RequestStatusError(requests.HTTPError):
def __init__(self, message):
self.message = message
super().__init__(self.message)


class JSONDecodeError(ValueError):
def __init__(self, message):
self.message = message
super().__init__(self.message)


class MissingKeyError(Exception):
def __init__(self, message):
self.message = message
super().__init__(self.message)


class Crawler(BaseCrawler):
def __init__(self, organization, url, name):
self.__initialize_session()
Expand All @@ -38,13 +56,13 @@ def __initialize_session(self) -> None:
@staticmethod
def __process_response(response: requests.Response):
if response.status_code != requests.codes.ok:
sys.exit(f'Request to {response.url} failed with status: {response.status_code}')
raise RequestStatusError(f'Request to {response.url} failed with status: {response.status_code}')
try:
data = response.json()
except json.decoder.JSONDecodeError as e:
sys.exit(f'Decoding JSON reply from {response.url} failed with exception: {e}')
raise JSONDecodeError(f'Decoding JSON reply from {response.url} failed with exception: {e}')
if 'next' not in data or 'results' not in data:
sys.exit('"next" or "results" key missing from response data.')
raise MissingKeyError('"next" or "results" key missing from response data.')

next_url = data['next']
if not next_url:
Expand All @@ -67,22 +85,26 @@ def __transform_data(data):
if isinstance(item[key], list) and len(item[key]) == 0:
item[key] = None

# Extracting target information
# Flatten the target information since the original data has multiple fields
# prefixed with 'target_'. Given that we flatten the dict
# on the '_' delimiter, this action would potentially
# cause a TypeError from flatdict if it isn't handled properly.
target_info = {
"domain": item.pop("target", None),
"asn": item.pop("target_asn", None),
"ip": item.pop("target_ip", None),
"prefix": item.pop("target_prefix", None),
"resolved_ips": item.pop("resolved_ips", None)
'domain': item.pop('target', None),
'asn': item.pop('target_asn', None),
'ip': item.pop('target_ip', None),
'prefix': item.pop('target_prefix', None),
'resolved_ips': item.pop('resolved_ips', None)
}
item["target"] = target_info
item['target'] = target_info

# Extracting group information
# Flatten the group information, They are the same as target
# information as they are prefixed with 'group_'.
group_info = {
"value": item.pop("group", None),
"id": item.pop("group_id", None)
'value': item.pop('group', None),
'id': item.pop('group_id', None)
}
item["group"] = group_info
item['group'] = group_info

@staticmethod
def __is_valid_ip(ip):
Expand All @@ -94,14 +116,15 @@ def __is_valid_ip(ip):

@staticmethod
def __get_all_resolved_ips(probe_measurement):
# Ensure "resolved_ips" takes precedence over "target_ip".
# Ensure 'resolved_ips' takes precedence over 'target_ip.
resolved_ips = probe_measurement['target']['resolved_ips'] or probe_measurement['target']['ip'] or []
resolved_ips = [resolved_ips] if not isinstance(resolved_ips, list) else resolved_ips
return resolved_ips
valid_resolved_ips = [ip for ip in resolved_ips if ip is not None and ip != '']
return valid_resolved_ips

@staticmethod
def __add_if_not_none(v, s: set):
if v is not None and v != "" and v not in s:
if v is not None and v != '' and v not in s:
s.add(v)

@staticmethod
Expand All @@ -119,11 +142,13 @@ def run(self):
'page_size': 500}
r = self.session.get(URL, params=params)
next_url, data = self.__process_response(r)
count = 0
while next_url:
next_url, next_data = self.__execute_query(next_url)
data += next_data
logging.info(f'Added {len(next_data)} probes. Total: {len(data)}')
break
if count == 5:
break
print(f'Fetched {len(data)} probe measurements', file=sys.stderr)

# Transform the data to be compatible with the flatdict format.
Expand All @@ -134,7 +159,6 @@ def run(self):
probe_ids = set()
ips = set()
ases = set()
prefixes = set()
domains = set()

valid_probe_measurements = list()
Expand All @@ -151,25 +175,19 @@ def run(self):
resolved_ips = self.__get_all_resolved_ips(probe_measurement)
for i in range(len(resolved_ips)):
probe_af = int(probe_measurement['af'])
if probe_af == 6:
try:
resolved_ips[i] = ipaddress.ip_address(resolved_ips[i]).compressed
except ValueError:
resolved_ips[i] = None
resolved_ips[i] = ipaddress.ip_address(resolved_ips[i]).compressed if probe_af == 6 else resolved_ips[i]

domain = probe_measurement['target']['domain']
if self.__is_valid_ip(domain) or domain == "":
if domain == '' or self.__is_valid_ip(domain):
domain = None
probe_measurement['target']['domain'] = None

asn = probe_measurement['target']['asn']
prefix = probe_measurement['target']['prefix']
probe_ids_participated = probe_measurement['current_probes']

self.__add_if_not_none(probe_measurement_id, probe_measurement_ids)
self.__add_if_not_none(domain, domains)
self.__add_if_not_none(asn, ases)
self.__add_if_not_none(prefix, prefixes)
self.__add_if_not_none_values(resolved_ips, ips)
self.__add_if_not_none_values(probe_ids_participated, probe_ids)

Expand All @@ -181,39 +199,45 @@ def run(self):

attrs_flattened = []
for probe_measurement in valid_probe_measurements:
probe_measurement_flattened = dict(flatdict.FlatterDict(probe_measurement, delimiter='_'))
probe_measurement_copy = probe_measurement.copy()
del probe_measurement_copy['current_probes']
probe_measurement_flattened = dict(flatdict.FlatterDict(probe_measurement_copy, delimiter='_'))
attrs_flattened.append(probe_measurement_flattened)

probe_measurement_ids = self.iyp.batch_get_nodes('AtlasMeasurement', attrs_flattened, ['id'], create=True)
probe_ids = self.iyp.batch_get_nodes_by_single_prop('AtlasProbe', 'id', probe_ids, all=False, create=True)
ip_ids = self.iyp.batch_get_nodes_by_single_prop('IP', 'ip', ips, all=False, create=True)
domain_ids = self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name', domains, all=False, create=True)
_ = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', ases, all=False, create=True)
_ = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes, all=False, create=True)

# compute links
target_links = list()
part_of_links = list()

for probe_measurement in valid_probe_measurements:
probe_measurement_qid = probe_measurement_ids[probe_measurement['id']]
probe_measurement_reference = self.reference.copy()
probe_measurement_reference['reference_url'] = probe_measurement_reference['reference_url'] + \
f'/{probe_measurement_qid}'

probe_measurement_domain = probe_measurement['target']['domain']
if probe_measurement_domain:
domain_qid = domain_ids[probe_measurement_domain]
target_links.append({'src_id': probe_measurement_qid, 'dst_id': domain_qid, 'props': [self.reference]})
target_links.append({'src_id': probe_measurement_qid, 'dst_id': domain_qid,
'props': [probe_measurement_reference]})

probe_measurement_ips = self.__get_all_resolved_ips(probe_measurement)
for probe_measurement_ip in probe_measurement_ips:
ip_qid = ip_ids[probe_measurement_ip]
target_links.append({'src_id': probe_measurement_qid, 'dst_id': ip_qid, 'props': [self.reference]})
target_links.append({'src_id': probe_measurement_qid, 'dst_id': ip_qid,
'props': [probe_measurement_reference]})

probe_ids_participated = probe_measurement['current_probes']
if probe_ids_participated:
for probe_id in probe_ids_participated:
probe_qid = probe_ids[probe_id]
part_of_links.append({'src_id': probe_qid, 'dst_id': probe_measurement_qid,
'props': [self.reference]})
'props': [probe_measurement_reference]})

# Push all links to IYP
logging.info('Fetching/pushing relationships')
Expand Down

0 comments on commit d4f40d2

Please sign in to comment.