Skip to content

Commit

Permalink
Merge pull request #141 from m-appel/log-rework
Browse files Browse the repository at this point in the history
Logging rework
  • Loading branch information
romain-fontugne authored Sep 12, 2024
2 parents 0bc91ff + 9a41a47 commit 078e5de
Show file tree
Hide file tree
Showing 34 changed files with 104 additions and 213 deletions.
8 changes: 4 additions & 4 deletions create_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def main():
os.makedirs(dump_dir, exist_ok=True)

# Initialize logging
FORMAT = '%(asctime)s %(processName)s %(message)s'
FORMAT = '%(asctime)s %(levelname)s %(message)s'
logging.basicConfig(
format=FORMAT,
filename=os.path.join(dump_dir, f'iyp-{date}.log'),
Expand All @@ -59,7 +59,7 @@ def main():

# ######### Start a new docker image ##########

logging.warning('Starting new container...')
logging.info('Starting new container...')
container = client.containers.run(
'neo4j:' + NEO4J_VERSION,
name=f'iyp-{date}',
Expand Down Expand Up @@ -142,7 +142,7 @@ def __init__(self, message):
send_email(relation_count_error)
except Exception as e:
no_error = False
logging.error('crawler crashed!')
logging.error('Crawler crashed!')
status[module_name] = e
send_email(e)

Expand All @@ -162,7 +162,7 @@ def __init__(self, message):

except Exception as e:
no_error = False
logging.error('crawler crashed!')
logging.error('Crawler crashed!')
logging.error(e)
status[module_name] = e

Expand Down
10 changes: 9 additions & 1 deletion iyp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,10 @@ def batch_get_nodes_by_single_prop(self, label, prop_name, prop_set=set(), all=T
prop_set = set(map(prop_formatters[prop_name], prop_set))

if all:
logging.info(f'Fetching all {label_str} nodes.')
existing_nodes = self.tx.run(f'MATCH (n:{label_str}) RETURN n.{prop_name} AS {prop_name}, ID(n) AS _id')
else:
logging.info(f'Fetching up to {len(prop_set)} {label_str} nodes.')
list_prop = list(prop_set)
existing_nodes = self.tx.run(f"""
WITH $list_prop AS list_prop
Expand All @@ -283,7 +285,8 @@ def batch_get_nodes_by_single_prop(self, label, prop_name, prop_set=set(), all=T
missing_nodes = [{prop_name: val} for val in missing_props]

# Create missing nodes
if create:
if create and missing_nodes:
logging.info(f'Creating {len(missing_nodes)} {label_str} nodes.')
for i in range(0, len(missing_nodes), BATCH_SIZE):
batch = missing_nodes[i:i + BATCH_SIZE]

Expand Down Expand Up @@ -479,6 +482,8 @@ def batch_add_node_label(self, node_ids, label):
if isinstance(label, list):
label_str = ':'.join(label)

logging.info(f'Adding label "{label_str}" to {len(node_ids)} nodes.')

for i in range(0, len(node_ids), BATCH_SIZE):
batch = node_ids[i:i + BATCH_SIZE]

Expand Down Expand Up @@ -532,6 +537,9 @@ def batch_add_links(self, type, links, action='create'):

self.__create_range_index(type, 'reference_name', on_relationship=True)

action_str = 'Creating' if action == 'create' else 'Merging'
logging.info(f'{action_str} {len(links)} {type} relationships.')

# Create links in batches
for i in range(0, len(links), BATCH_SIZE):
batch = links[i:i + BATCH_SIZE]
Expand Down
13 changes: 4 additions & 9 deletions iyp/crawlers/alice_lg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,15 +319,14 @@ def __fetch_routes(self) -> None:
if failed_neighbors:
logging.warning(f'Failed to fetch routes for {len(failed_neighbors)} neighbors: {failed_neighbors}')
if failed_pages:
logging.warning(
f'Failed to fetch {sum(failed_pages.values())} pages for {len(failed_pages)} neighbors:')
logging.warning(f'Failed to fetch {sum(failed_pages.values())} pages for {len(failed_pages)} '
f'neighbors:')
for key, count in failed_pages.items():
logging.warning(f' {key}: {count}')

def fetch(self) -> None:
tmp_dir = self.get_tmp_dir()
if not os.path.exists(tmp_dir):
logging.info(f'Creating tmp dir: {tmp_dir}')
self.create_tmp_dir()

self.__fetch_routeservers()
Expand Down Expand Up @@ -360,7 +359,7 @@ def run(self) -> None:
member_ip = neighbor['address']
n = peering_lans.search_best(member_ip)
if n is None:
logging.warning(f'Failed to map member IP to peering LAN: {member_ip}')
logging.debug(f'Failed to map member IP to peering LAN: {member_ip}')
continue
member_asn = neighbor['asn']
if not member_asn or not isinstance(member_asn, int):
Expand Down Expand Up @@ -419,11 +418,9 @@ def run(self) -> None:
'props': [flattened_route, self.reference.copy()]})

# Get/create nodes.
logging.info(f'Getting {len(asns)} AS nodes.')
asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns, all=False)
prefix_id = dict()
if prefixes:
logging.info(f'Getting {len(prefixes)} Prefix nodes.')
prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes, all=False)

# Translate raw values to QID.
Expand All @@ -437,11 +434,9 @@ def run(self) -> None:
relationship['dst_id'] = prefix_id[prefix]

# Push relationships.
logging.info(f'Pushing {len(member_of_rels)} MEMBER_OF relationships.')
self.iyp.batch_add_links('MEMBER_OF', member_of_rels)
if originate_rels:
logging.info(f'Pushing {len(originate_rels)} ORIGINATE relationships.')
self.iyp.batch_add_links('ORIGINATE', originate_rels)

def unit_test(self):
return super().unit_test(['MEMBER_OF', 'ORIGINATE', 'MANAGED_BY'])
return super().unit_test(['MEMBER_OF'])
5 changes: 3 additions & 2 deletions iyp/crawlers/apnic/eyeball.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ def run(self):

processed_asn = set()

logging.info(f'Processing {len(self.countries)} countries...')
for cc, country in self.countries.items():
logging.info(f'processing {country}')
logging.debug(f'processing {country}')

# Get the QID of the country and corresponding ranking
cc_qid = self.iyp.get_node('Country', {'country_code': cc})
Expand All @@ -46,7 +47,7 @@ def run(self):
names = set()

ranking = req.json()
logging.info(f'{len(ranking)} eyeball ASes')
logging.debug(f'{len(ranking)} eyeball ASes')

# Collect all ASNs and names
# and make sure the ranking is sorted and add rank field
Expand Down
2 changes: 0 additions & 2 deletions iyp/crawlers/bgpkit/pfx2asn.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def run(self):

req.close()

logging.info('Pushing nodes to neo4j...')
# get ASNs and prefixes IDs
self.asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns)
self.prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes)
Expand All @@ -58,7 +57,6 @@ def run(self):

links.append({'src_id': asn_qid, 'dst_id': prefix_qid, 'props': [self.reference, entry]})

logging.info('Pushing links to neo4j...')
# Push all links to IYP
self.iyp.batch_add_links('ORIGINATE', links)

Expand Down
2 changes: 0 additions & 2 deletions iyp/crawlers/bgptools/anycast_prefixes.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,12 @@ def run(self):
ipv4_prefixes_response = fetch_dataset(ipv4_prefixes_url)
logging.info('IPv4 prefixes fetched successfully.')
self.update(ipv4_prefixes_response, ipv4_prefixes_filename)
logging.info('IPv4 prefixes pushed to IYP.')

self.reference['reference_url_data'] = ipv6_prefixes_url
self.reference['reference_time_modification'] = get_commit_datetime(self.repo, self.v6_file)
ipv6_prefixes_response = fetch_dataset(ipv6_prefixes_url)
logging.info('IPv6 prefixes fetched successfully.')
self.update(ipv6_prefixes_response, ipv6_prefixes_filename)
logging.info('IPv6 prefixes pushed to IYP.')

def update(self, res, filename: str):
with open(filename, 'w') as file:
Expand Down
12 changes: 3 additions & 9 deletions iyp/crawlers/bgptools/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def run(self):

req = requests.get(url, headers=self.headers)
if req.status_code != 200:
print(req.text)
logging.error(req.text)
raise RequestStatusError('Error while fetching AS names')

self.tag_qid = self.iyp.get_node('Tag', {'label': label})
Expand All @@ -74,14 +74,8 @@ def run(self):
asn_qid = self.iyp.get_node('AS', {'asn': asn[2:]})
statements = [['CATEGORIZED', self.tag_qid, self.reference]] # Set AS name

try:
# Update AS name and country
self.iyp.add_links(asn_qid, statements)

except Exception as error:
# print errors and continue running
print('Error for: ', line)
print(error)
# Update AS name and country
self.iyp.add_links(asn_qid, statements)

def unit_test(self):
return super().unit_test(['CATEGORIZED'])
Expand Down
10 changes: 2 additions & 8 deletions iyp/crawlers/caida/asrank.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,15 @@ def __init__(self, organization, url, name):

def run(self):
"""Fetch networks information from ASRank and push to IYP."""
print('Fetching CAIDA AS Rank', file=sys.stderr)

nodes = list()

has_next = True
i = 0
logging.info('Fetching AS Ranks...')
while has_next:
url = URL + f'&offset={i * 10000}'
i += 1
logging.info(f'Fetching {url}')
logging.debug(f'Fetching {url}')
req = requests.get(url)
if req.status_code != 200:
logging.error(f'Request failed with status: {req.status_code}')
Expand All @@ -42,7 +41,6 @@ def run(self):

nodes += ranking['edges']

print(f'Fetched {len(nodes):,d} ranks.', file=sys.stderr)
logging.info(f'Fetched {len(nodes):,d} ranks.')

# Collect all ASNs, names, and countries
Expand All @@ -59,8 +57,6 @@ def run(self):
asns.add(int(asn['asn']))

# Get/create ASNs, names, and country nodes
print('Pushing nodes.', file=sys.stderr)
logging.info('Pushing nodes.')
self.asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns)
self.country_id = self.iyp.batch_get_nodes_by_single_prop('Country', 'country_code', countries)
self.name_id = self.iyp.batch_get_nodes_by_single_prop('Name', 'name', names, all=False)
Expand Down Expand Up @@ -94,8 +90,6 @@ def run(self):
rank_links.append({'src_id': asn_qid, 'dst_id': self.asrank_qid, 'props': [self.reference, flat_asn]})

# Push all links to IYP
print('Pushing links.', file=sys.stderr)
logging.info('Pushing links.')
self.iyp.batch_add_links('NAME', name_links)
self.iyp.batch_add_links('COUNTRY', country_links)
self.iyp.batch_add_links('RANK', rank_links)
Expand Down
2 changes: 1 addition & 1 deletion iyp/crawlers/caida/ix_asns.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(self, organization, url, name):
raise Exception('No recent CAIDA ix-asns file available')
date = date.datetime.replace(day=1, hour=0, minute=0, second=0, microsecond=0, tzinfo=timezone.utc)

logging.info('going to use this URL: ' + url)
logging.info(f'Fetching data from: {url}')
super().__init__(organization, url, name)
self.reference['reference_url_info'] = 'https://publicdata.caida.org/datasets/ixps/README.txt'
self.reference['reference_time_modification'] = date
Expand Down
6 changes: 1 addition & 5 deletions iyp/crawlers/cisco/umbrella_top1m.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def __set_modification_time(self):
# date now points to the last available historical file , which means the
# current file is the day after this date.
self.reference['reference_time_modification'] = date + timedelta(days=1)
logging.info(self.reference)
logging.info(f'Got list for date {self.reference["reference_time_modification"].strftime("%Y-%m-%d")}')

def run(self):
"""Fetch Umbrella top 1M and push to IYP."""
Expand All @@ -69,7 +69,6 @@ def run(self):
links.append({'src_name': domain, 'dst_id': self.cisco_qid,
'props': [self.reference, {'rank': int(rank)}]})

logging.info('Fetching DomainName/HostName nodes...')
domain_id = self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name')
host_id = self.iyp.batch_get_nodes_by_single_prop('HostName', 'name')

Expand Down Expand Up @@ -103,10 +102,8 @@ def run(self):
new_host_names.add(name)

if new_domain_names:
logging.info(f'Pushing {len(new_domain_names)} additional DomainName nodes...')
domain_id.update(self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name', new_domain_names, all=False))
if new_host_names:
logging.info(f'Pushing {len(new_host_names)} additional HostName nodes...')
host_id.update(self.iyp.batch_get_nodes_by_single_prop('HostName', 'name', new_host_names, all=False))

for link in unprocessed_links:
Expand All @@ -120,7 +117,6 @@ def run(self):
processed_links.append(link)

# Push all links to IYP
logging.info(f'Pushing {len(processed_links)} RANK relationships...')
self.iyp.batch_add_links('RANK', processed_links)

def unit_test(self):
Expand Down
3 changes: 0 additions & 3 deletions iyp/crawlers/cloudflare/dns_top_locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,12 @@ def run(self):
self.compute_link(domain_top)

if i % 100 == 0:
sys.stderr.write(f'Pushing link batch #{int(i / 100)}...\r')
self.iyp.batch_add_links('QUERIED_FROM', self.statements)
self.statements = []

if self.statements:
self.iyp.batch_add_links('QUERIED_FROM', self.statements)

sys.stderr.write('\n')

def compute_link(self, param):
"""Compute link for the given domain name' top countries and corresponding
properties."""
Expand Down
4 changes: 0 additions & 4 deletions iyp/crawlers/cloudflare/ranking_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,11 @@ def run(self):
# Note: Since we do not specify all=False in batch_get_nodes we will get the IDs
# of _all_ DomainName nodes, so we must not create relationships for all
# domain_ids, but iterate over the domains set instead.
logging.info(f'Adding/retrieving {len(all_domains)} DomainName nodes.')
print(f'Adding/retrieving {len(all_domains)} DomainName nodes')
domain_ids = self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name', all_domains)

for dataset, domains in datasets:
dataset_title = f'Cloudflare {dataset["title"]}'
logging.info(f'Processing dataset: {dataset_title}')
print(f'Processing dataset: {dataset_title}')
ranking_id = self.iyp.get_node('Ranking',
{
'name': dataset_title,
Expand All @@ -119,7 +116,6 @@ def run(self):
for domain in domains]
if domain_links:
# Push RANK relationships to IYP
print(f'Adding {len(domain_links)} RANK relationships', file=sys.stderr)
self.iyp.batch_add_links('RANK', domain_links)

def unit_test(self):
Expand Down
7 changes: 3 additions & 4 deletions iyp/crawlers/cloudflare/top100.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def run(self):

req = requests.get(self.reference['reference_url_data'], headers=headers)
if req.status_code != 200:
print(f'Cannot download data {req.status_code}: {req.text}')
logging.error(f'Cannot download data {req.status_code}: {req.text}')
raise RequestStatusError(f'Error while fetching data file: {req.status_code}')

results = req.json()['result']
Expand All @@ -56,9 +56,8 @@ def run(self):
logging.warning(f'Failed to get modification time: {e}')

# Process line one after the other
for i, _ in enumerate(map(self.update, results['top'])):
sys.stderr.write(f'\rProcessed {i} lines')
sys.stderr.write('\n')
processed = list(map(self.update, results['top']))
logging.info(f'Processed {len(processed)} lines')

def update(self, entry):
"""Add the entry to IYP if it's not already there and update its properties."""
Expand Down
5 changes: 0 additions & 5 deletions iyp/crawlers/iana/root_zone.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@ def run(self):
# NAME, TTL, CLASS, TYPE, RDATA
if len(l) < 5:
logging.warning(f'DNS record line is too short: {l}')
print(f'DNS record line is too short: {l}', file=sys.stderr)
continue
if l[2] != 'IN':
logging.warning(f'Unexpected DNS record class: "{l[2]}". Expecting only IN records.')
print(f'Unexpected DNS record class: "{l[2]}". Expecting only IN records.', file=sys.stderr)
continue
record_type = l[3]
if record_type not in {'A', 'AAAA', 'NS'}:
Expand All @@ -54,7 +52,6 @@ def run(self):
nsdname = rdata.rstrip('.')
if not nsdname:
logging.warning(f'NS record points to root node? {l}')
print(f'NS record points to root node? {l}', file=sys.stderr)
continue
if nsdname not in domainnames:
domainnames.add(nsdname)
Expand All @@ -72,8 +69,6 @@ def run(self):
except ValueError as e:
logging.warning(f'Invalid IP address in A/AAAA record: {l}')
logging.warning(e)
print(f'Invalid IP address in A/AAAA record: {l}', file=sys.stderr)
print(e, file=sys.stderr)
continue
if ip not in ips:
ips.add(ip)
Expand Down
3 changes: 1 addition & 2 deletions iyp/crawlers/ihr/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,4 @@ The country geo-location is provided by Maxmind.

## Dependence

`rov.py` assumes ASes and prefixes are already registered in the database, it becomes very slow if
this is not the case. Running `bgpkit.pfx2asn` before makes it much faster.
These crawlers are not depending on other crawlers.
Loading

0 comments on commit 078e5de

Please sign in to comment.