Skip to content

Commit

Permalink
Major logging rework
Browse files Browse the repository at this point in the history
All crawlers now log only using the logging module and not using stderr
anymore. Also the IYP module now automatically logs node/relationship
retrieval and creation. As a consequence, some crawlers are a bit
chatty, which should be fixed in the future.

The general log level was changed to INFO and the level name is now
included in the logs instead of the process name.

As parts of this also some crawlers got reworked:

- alice_lg: Fixed unit test that checked for relationship types which
  are not created.
- ihr:
  - Reference modification time is more precise, matching the exact
    timebin used
  - Data is handled in-memory, removing need for temporary files
  - Moved to batch creation, making the crawler faster and removing
    dependency on other crawlers
  • Loading branch information
m-appel committed Sep 10, 2024
1 parent 0bc91ff commit 9a41a47
Show file tree
Hide file tree
Showing 34 changed files with 104 additions and 213 deletions.
8 changes: 4 additions & 4 deletions create_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def main():
os.makedirs(dump_dir, exist_ok=True)

# Initialize logging
FORMAT = '%(asctime)s %(processName)s %(message)s'
FORMAT = '%(asctime)s %(levelname)s %(message)s'
logging.basicConfig(
format=FORMAT,
filename=os.path.join(dump_dir, f'iyp-{date}.log'),
Expand All @@ -59,7 +59,7 @@ def main():

# ######### Start a new docker image ##########

logging.warning('Starting new container...')
logging.info('Starting new container...')
container = client.containers.run(
'neo4j:' + NEO4J_VERSION,
name=f'iyp-{date}',
Expand Down Expand Up @@ -142,7 +142,7 @@ def __init__(self, message):
send_email(relation_count_error)
except Exception as e:
no_error = False
logging.error('crawler crashed!')
logging.error('Crawler crashed!')
status[module_name] = e
send_email(e)

Expand All @@ -162,7 +162,7 @@ def __init__(self, message):

except Exception as e:
no_error = False
logging.error('crawler crashed!')
logging.error('Crawler crashed!')
logging.error(e)
status[module_name] = e

Expand Down
10 changes: 9 additions & 1 deletion iyp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,10 @@ def batch_get_nodes_by_single_prop(self, label, prop_name, prop_set=set(), all=T
prop_set = set(map(prop_formatters[prop_name], prop_set))

if all:
logging.info(f'Fetching all {label_str} nodes.')
existing_nodes = self.tx.run(f'MATCH (n:{label_str}) RETURN n.{prop_name} AS {prop_name}, ID(n) AS _id')
else:
logging.info(f'Fetching up to {len(prop_set)} {label_str} nodes.')
list_prop = list(prop_set)
existing_nodes = self.tx.run(f"""
WITH $list_prop AS list_prop
Expand All @@ -283,7 +285,8 @@ def batch_get_nodes_by_single_prop(self, label, prop_name, prop_set=set(), all=T
missing_nodes = [{prop_name: val} for val in missing_props]

# Create missing nodes
if create:
if create and missing_nodes:
logging.info(f'Creating {len(missing_nodes)} {label_str} nodes.')
for i in range(0, len(missing_nodes), BATCH_SIZE):
batch = missing_nodes[i:i + BATCH_SIZE]

Expand Down Expand Up @@ -479,6 +482,8 @@ def batch_add_node_label(self, node_ids, label):
if isinstance(label, list):
label_str = ':'.join(label)

logging.info(f'Adding label "{label_str}" to {len(node_ids)} nodes.')

for i in range(0, len(node_ids), BATCH_SIZE):
batch = node_ids[i:i + BATCH_SIZE]

Expand Down Expand Up @@ -532,6 +537,9 @@ def batch_add_links(self, type, links, action='create'):

self.__create_range_index(type, 'reference_name', on_relationship=True)

action_str = 'Creating' if action == 'create' else 'Merging'
logging.info(f'{action_str} {len(links)} {type} relationships.')

# Create links in batches
for i in range(0, len(links), BATCH_SIZE):
batch = links[i:i + BATCH_SIZE]
Expand Down
13 changes: 4 additions & 9 deletions iyp/crawlers/alice_lg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,15 +319,14 @@ def __fetch_routes(self) -> None:
if failed_neighbors:
logging.warning(f'Failed to fetch routes for {len(failed_neighbors)} neighbors: {failed_neighbors}')
if failed_pages:
logging.warning(
f'Failed to fetch {sum(failed_pages.values())} pages for {len(failed_pages)} neighbors:')
logging.warning(f'Failed to fetch {sum(failed_pages.values())} pages for {len(failed_pages)} '
f'neighbors:')
for key, count in failed_pages.items():
logging.warning(f' {key}: {count}')

def fetch(self) -> None:
tmp_dir = self.get_tmp_dir()
if not os.path.exists(tmp_dir):
logging.info(f'Creating tmp dir: {tmp_dir}')
self.create_tmp_dir()

self.__fetch_routeservers()
Expand Down Expand Up @@ -360,7 +359,7 @@ def run(self) -> None:
member_ip = neighbor['address']
n = peering_lans.search_best(member_ip)
if n is None:
logging.warning(f'Failed to map member IP to peering LAN: {member_ip}')
logging.debug(f'Failed to map member IP to peering LAN: {member_ip}')
continue
member_asn = neighbor['asn']
if not member_asn or not isinstance(member_asn, int):
Expand Down Expand Up @@ -419,11 +418,9 @@ def run(self) -> None:
'props': [flattened_route, self.reference.copy()]})

# Get/create nodes.
logging.info(f'Getting {len(asns)} AS nodes.')
asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns, all=False)
prefix_id = dict()
if prefixes:
logging.info(f'Getting {len(prefixes)} Prefix nodes.')
prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes, all=False)

# Translate raw values to QID.
Expand All @@ -437,11 +434,9 @@ def run(self) -> None:
relationship['dst_id'] = prefix_id[prefix]

# Push relationships.
logging.info(f'Pushing {len(member_of_rels)} MEMBER_OF relationships.')
self.iyp.batch_add_links('MEMBER_OF', member_of_rels)
if originate_rels:
logging.info(f'Pushing {len(originate_rels)} ORIGINATE relationships.')
self.iyp.batch_add_links('ORIGINATE', originate_rels)

def unit_test(self):
return super().unit_test(['MEMBER_OF', 'ORIGINATE', 'MANAGED_BY'])
return super().unit_test(['MEMBER_OF'])
5 changes: 3 additions & 2 deletions iyp/crawlers/apnic/eyeball.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ def run(self):

processed_asn = set()

logging.info(f'Processing {len(self.countries)} countries...')
for cc, country in self.countries.items():
logging.info(f'processing {country}')
logging.debug(f'processing {country}')

# Get the QID of the country and corresponding ranking
cc_qid = self.iyp.get_node('Country', {'country_code': cc})
Expand All @@ -46,7 +47,7 @@ def run(self):
names = set()

ranking = req.json()
logging.info(f'{len(ranking)} eyeball ASes')
logging.debug(f'{len(ranking)} eyeball ASes')

# Collect all ASNs and names
# and make sure the ranking is sorted and add rank field
Expand Down
2 changes: 0 additions & 2 deletions iyp/crawlers/bgpkit/pfx2asn.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def run(self):

req.close()

logging.info('Pushing nodes to neo4j...')
# get ASNs and prefixes IDs
self.asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns)
self.prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes)
Expand All @@ -58,7 +57,6 @@ def run(self):

links.append({'src_id': asn_qid, 'dst_id': prefix_qid, 'props': [self.reference, entry]})

logging.info('Pushing links to neo4j...')
# Push all links to IYP
self.iyp.batch_add_links('ORIGINATE', links)

Expand Down
2 changes: 0 additions & 2 deletions iyp/crawlers/bgptools/anycast_prefixes.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,12 @@ def run(self):
ipv4_prefixes_response = fetch_dataset(ipv4_prefixes_url)
logging.info('IPv4 prefixes fetched successfully.')
self.update(ipv4_prefixes_response, ipv4_prefixes_filename)
logging.info('IPv4 prefixes pushed to IYP.')

self.reference['reference_url_data'] = ipv6_prefixes_url
self.reference['reference_time_modification'] = get_commit_datetime(self.repo, self.v6_file)
ipv6_prefixes_response = fetch_dataset(ipv6_prefixes_url)
logging.info('IPv6 prefixes fetched successfully.')
self.update(ipv6_prefixes_response, ipv6_prefixes_filename)
logging.info('IPv6 prefixes pushed to IYP.')

def update(self, res, filename: str):
with open(filename, 'w') as file:
Expand Down
12 changes: 3 additions & 9 deletions iyp/crawlers/bgptools/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def run(self):

req = requests.get(url, headers=self.headers)
if req.status_code != 200:
print(req.text)
logging.error(req.text)
raise RequestStatusError('Error while fetching AS names')

self.tag_qid = self.iyp.get_node('Tag', {'label': label})
Expand All @@ -74,14 +74,8 @@ def run(self):
asn_qid = self.iyp.get_node('AS', {'asn': asn[2:]})
statements = [['CATEGORIZED', self.tag_qid, self.reference]] # Set AS name

try:
# Update AS name and country
self.iyp.add_links(asn_qid, statements)

except Exception as error:
# print errors and continue running
print('Error for: ', line)
print(error)
# Update AS name and country
self.iyp.add_links(asn_qid, statements)

def unit_test(self):
return super().unit_test(['CATEGORIZED'])
Expand Down
10 changes: 2 additions & 8 deletions iyp/crawlers/caida/asrank.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,15 @@ def __init__(self, organization, url, name):

def run(self):
"""Fetch networks information from ASRank and push to IYP."""
print('Fetching CAIDA AS Rank', file=sys.stderr)

nodes = list()

has_next = True
i = 0
logging.info('Fetching AS Ranks...')
while has_next:
url = URL + f'&offset={i * 10000}'
i += 1
logging.info(f'Fetching {url}')
logging.debug(f'Fetching {url}')
req = requests.get(url)
if req.status_code != 200:
logging.error(f'Request failed with status: {req.status_code}')
Expand All @@ -42,7 +41,6 @@ def run(self):

nodes += ranking['edges']

print(f'Fetched {len(nodes):,d} ranks.', file=sys.stderr)
logging.info(f'Fetched {len(nodes):,d} ranks.')

# Collect all ASNs, names, and countries
Expand All @@ -59,8 +57,6 @@ def run(self):
asns.add(int(asn['asn']))

# Get/create ASNs, names, and country nodes
print('Pushing nodes.', file=sys.stderr)
logging.info('Pushing nodes.')
self.asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns)
self.country_id = self.iyp.batch_get_nodes_by_single_prop('Country', 'country_code', countries)
self.name_id = self.iyp.batch_get_nodes_by_single_prop('Name', 'name', names, all=False)
Expand Down Expand Up @@ -94,8 +90,6 @@ def run(self):
rank_links.append({'src_id': asn_qid, 'dst_id': self.asrank_qid, 'props': [self.reference, flat_asn]})

# Push all links to IYP
print('Pushing links.', file=sys.stderr)
logging.info('Pushing links.')
self.iyp.batch_add_links('NAME', name_links)
self.iyp.batch_add_links('COUNTRY', country_links)
self.iyp.batch_add_links('RANK', rank_links)
Expand Down
2 changes: 1 addition & 1 deletion iyp/crawlers/caida/ix_asns.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(self, organization, url, name):
raise Exception('No recent CAIDA ix-asns file available')
date = date.datetime.replace(day=1, hour=0, minute=0, second=0, microsecond=0, tzinfo=timezone.utc)

logging.info('going to use this URL: ' + url)
logging.info(f'Fetching data from: {url}')
super().__init__(organization, url, name)
self.reference['reference_url_info'] = 'https://publicdata.caida.org/datasets/ixps/README.txt'
self.reference['reference_time_modification'] = date
Expand Down
6 changes: 1 addition & 5 deletions iyp/crawlers/cisco/umbrella_top1m.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def __set_modification_time(self):
# date now points to the last available historical file , which means the
# current file is the day after this date.
self.reference['reference_time_modification'] = date + timedelta(days=1)
logging.info(self.reference)
logging.info(f'Got list for date {self.reference["reference_time_modification"].strftime("%Y-%m-%d")}')

def run(self):
"""Fetch Umbrella top 1M and push to IYP."""
Expand All @@ -69,7 +69,6 @@ def run(self):
links.append({'src_name': domain, 'dst_id': self.cisco_qid,
'props': [self.reference, {'rank': int(rank)}]})

logging.info('Fetching DomainName/HostName nodes...')
domain_id = self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name')
host_id = self.iyp.batch_get_nodes_by_single_prop('HostName', 'name')

Expand Down Expand Up @@ -103,10 +102,8 @@ def run(self):
new_host_names.add(name)

if new_domain_names:
logging.info(f'Pushing {len(new_domain_names)} additional DomainName nodes...')
domain_id.update(self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name', new_domain_names, all=False))
if new_host_names:
logging.info(f'Pushing {len(new_host_names)} additional HostName nodes...')
host_id.update(self.iyp.batch_get_nodes_by_single_prop('HostName', 'name', new_host_names, all=False))

for link in unprocessed_links:
Expand All @@ -120,7 +117,6 @@ def run(self):
processed_links.append(link)

# Push all links to IYP
logging.info(f'Pushing {len(processed_links)} RANK relationships...')
self.iyp.batch_add_links('RANK', processed_links)

def unit_test(self):
Expand Down
3 changes: 0 additions & 3 deletions iyp/crawlers/cloudflare/dns_top_locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,12 @@ def run(self):
self.compute_link(domain_top)

if i % 100 == 0:
sys.stderr.write(f'Pushing link batch #{int(i / 100)}...\r')
self.iyp.batch_add_links('QUERIED_FROM', self.statements)
self.statements = []

if self.statements:
self.iyp.batch_add_links('QUERIED_FROM', self.statements)

sys.stderr.write('\n')

def compute_link(self, param):
"""Compute link for the given domain name' top countries and corresponding
properties."""
Expand Down
4 changes: 0 additions & 4 deletions iyp/crawlers/cloudflare/ranking_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,11 @@ def run(self):
# Note: Since we do not specify all=False in batch_get_nodes we will get the IDs
# of _all_ DomainName nodes, so we must not create relationships for all
# domain_ids, but iterate over the domains set instead.
logging.info(f'Adding/retrieving {len(all_domains)} DomainName nodes.')
print(f'Adding/retrieving {len(all_domains)} DomainName nodes')
domain_ids = self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name', all_domains)

for dataset, domains in datasets:
dataset_title = f'Cloudflare {dataset["title"]}'
logging.info(f'Processing dataset: {dataset_title}')
print(f'Processing dataset: {dataset_title}')
ranking_id = self.iyp.get_node('Ranking',
{
'name': dataset_title,
Expand All @@ -119,7 +116,6 @@ def run(self):
for domain in domains]
if domain_links:
# Push RANK relationships to IYP
print(f'Adding {len(domain_links)} RANK relationships', file=sys.stderr)
self.iyp.batch_add_links('RANK', domain_links)

def unit_test(self):
Expand Down
7 changes: 3 additions & 4 deletions iyp/crawlers/cloudflare/top100.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def run(self):

req = requests.get(self.reference['reference_url_data'], headers=headers)
if req.status_code != 200:
print(f'Cannot download data {req.status_code}: {req.text}')
logging.error(f'Cannot download data {req.status_code}: {req.text}')
raise RequestStatusError(f'Error while fetching data file: {req.status_code}')

results = req.json()['result']
Expand All @@ -56,9 +56,8 @@ def run(self):
logging.warning(f'Failed to get modification time: {e}')

# Process line one after the other
for i, _ in enumerate(map(self.update, results['top'])):
sys.stderr.write(f'\rProcessed {i} lines')
sys.stderr.write('\n')
processed = list(map(self.update, results['top']))
logging.info(f'Processed {len(processed)} lines')

def update(self, entry):
"""Add the entry to IYP if it's not already there and update its properties."""
Expand Down
5 changes: 0 additions & 5 deletions iyp/crawlers/iana/root_zone.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@ def run(self):
# NAME, TTL, CLASS, TYPE, RDATA
if len(l) < 5:
logging.warning(f'DNS record line is too short: {l}')
print(f'DNS record line is too short: {l}', file=sys.stderr)
continue
if l[2] != 'IN':
logging.warning(f'Unexpected DNS record class: "{l[2]}". Expecting only IN records.')
print(f'Unexpected DNS record class: "{l[2]}". Expecting only IN records.', file=sys.stderr)
continue
record_type = l[3]
if record_type not in {'A', 'AAAA', 'NS'}:
Expand All @@ -54,7 +52,6 @@ def run(self):
nsdname = rdata.rstrip('.')
if not nsdname:
logging.warning(f'NS record points to root node? {l}')
print(f'NS record points to root node? {l}', file=sys.stderr)
continue
if nsdname not in domainnames:
domainnames.add(nsdname)
Expand All @@ -72,8 +69,6 @@ def run(self):
except ValueError as e:
logging.warning(f'Invalid IP address in A/AAAA record: {l}')
logging.warning(e)
print(f'Invalid IP address in A/AAAA record: {l}', file=sys.stderr)
print(e, file=sys.stderr)
continue
if ip not in ips:
ips.add(ip)
Expand Down
3 changes: 1 addition & 2 deletions iyp/crawlers/ihr/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,4 @@ The country geo-location is provided by Maxmind.

## Dependence

`rov.py` assumes ASes and prefixes are already registered in the database, it becomes very slow if
this is not the case. Running `bgpkit.pfx2asn` before makes it much faster.
These crawlers are not depending on other crawlers.
Loading

0 comments on commit 9a41a47

Please sign in to comment.