From 9a41a47606541500e74664069958716cb2a745e4 Mon Sep 17 00:00:00 2001 From: Malte Tashiro Date: Tue, 10 Sep 2024 09:28:56 +0000 Subject: [PATCH] Major logging rework All crawlers now log only using the logging module and not using stderr anymore. Also the IYP module now automatically logs node/relationship retrieval and creation. As a consequence, some crawlers are a bit chatty, which should be fixed in the future. The general log level was changed to INFO and the level name is now included in the logs instead of the process name. As parts of this also some crawlers got reworked: - alice_lg: Fixed unit test that checked for relationship types which are not created. - ihr: - Reference modification time is more precise, matching the exact timebin used - Data is handled in-memory, removing need for temporary files - Moved to batch creation, making the crawler faster and removing dependency on other crawlers --- create_db.py | 8 +- iyp/__init__.py | 10 +- iyp/crawlers/alice_lg/__init__.py | 13 +-- iyp/crawlers/apnic/eyeball.py | 5 +- iyp/crawlers/bgpkit/pfx2asn.py | 2 - iyp/crawlers/bgptools/anycast_prefixes.py | 2 - iyp/crawlers/bgptools/tags.py | 12 +-- iyp/crawlers/caida/asrank.py | 10 +- iyp/crawlers/caida/ix_asns.py | 2 +- iyp/crawlers/cisco/umbrella_top1m.py | 6 +- iyp/crawlers/cloudflare/dns_top_locations.py | 3 - iyp/crawlers/cloudflare/ranking_bucket.py | 4 - iyp/crawlers/cloudflare/top100.py | 7 +- iyp/crawlers/iana/root_zone.py | 5 - iyp/crawlers/ihr/README.md | 3 +- iyp/crawlers/ihr/__init__.py | 102 ++++++++----------- iyp/crawlers/ihr/local_hegemony_v4.py | 4 +- iyp/crawlers/ihr/local_hegemony_v6.py | 4 +- iyp/crawlers/inetintel/as_org.py | 10 +- iyp/crawlers/manrs/members.py | 3 - iyp/crawlers/nro/delegated_stats.py | 2 +- iyp/crawlers/openintel/__init__.py | 21 ++-- iyp/crawlers/pch/__init__.py | 16 --- iyp/crawlers/pch/show_bgp_parser.py | 10 +- iyp/crawlers/peeringdb/fac.py | 3 +- iyp/crawlers/peeringdb/ix.py | 2 - iyp/crawlers/peeringdb/org.py | 1 - iyp/crawlers/ripe/as_names.py | 2 +- iyp/crawlers/ripe/atlas_measurements.py | 19 +--- iyp/crawlers/ripe/atlas_probes.py | 12 +-- iyp/crawlers/tranco/top1m.py | 2 +- iyp/crawlers/virginiatech/rovista.py | 2 - iyp/post/country_information.py | 2 +- iyp/post/dns_hierarchy.py | 8 +- 34 files changed, 104 insertions(+), 213 deletions(-) diff --git a/create_db.py b/create_db.py index 8910274e..94c7fcfd 100644 --- a/create_db.py +++ b/create_db.py @@ -38,7 +38,7 @@ def main(): os.makedirs(dump_dir, exist_ok=True) # Initialize logging - FORMAT = '%(asctime)s %(processName)s %(message)s' + FORMAT = '%(asctime)s %(levelname)s %(message)s' logging.basicConfig( format=FORMAT, filename=os.path.join(dump_dir, f'iyp-{date}.log'), @@ -59,7 +59,7 @@ def main(): # ######### Start a new docker image ########## - logging.warning('Starting new container...') + logging.info('Starting new container...') container = client.containers.run( 'neo4j:' + NEO4J_VERSION, name=f'iyp-{date}', @@ -142,7 +142,7 @@ def __init__(self, message): send_email(relation_count_error) except Exception as e: no_error = False - logging.error('crawler crashed!') + logging.error('Crawler crashed!') status[module_name] = e send_email(e) @@ -162,7 +162,7 @@ def __init__(self, message): except Exception as e: no_error = False - logging.error('crawler crashed!') + logging.error('Crawler crashed!') logging.error(e) status[module_name] = e diff --git a/iyp/__init__.py b/iyp/__init__.py index 81b2d2b0..9514085f 100644 --- a/iyp/__init__.py +++ b/iyp/__init__.py @@ -268,8 +268,10 @@ def batch_get_nodes_by_single_prop(self, label, prop_name, prop_set=set(), all=T prop_set = set(map(prop_formatters[prop_name], prop_set)) if all: + logging.info(f'Fetching all {label_str} nodes.') existing_nodes = self.tx.run(f'MATCH (n:{label_str}) RETURN n.{prop_name} AS {prop_name}, ID(n) AS _id') else: + logging.info(f'Fetching up to {len(prop_set)} {label_str} nodes.') list_prop = list(prop_set) existing_nodes = self.tx.run(f""" WITH $list_prop AS list_prop @@ -283,7 +285,8 @@ def batch_get_nodes_by_single_prop(self, label, prop_name, prop_set=set(), all=T missing_nodes = [{prop_name: val} for val in missing_props] # Create missing nodes - if create: + if create and missing_nodes: + logging.info(f'Creating {len(missing_nodes)} {label_str} nodes.') for i in range(0, len(missing_nodes), BATCH_SIZE): batch = missing_nodes[i:i + BATCH_SIZE] @@ -479,6 +482,8 @@ def batch_add_node_label(self, node_ids, label): if isinstance(label, list): label_str = ':'.join(label) + logging.info(f'Adding label "{label_str}" to {len(node_ids)} nodes.') + for i in range(0, len(node_ids), BATCH_SIZE): batch = node_ids[i:i + BATCH_SIZE] @@ -532,6 +537,9 @@ def batch_add_links(self, type, links, action='create'): self.__create_range_index(type, 'reference_name', on_relationship=True) + action_str = 'Creating' if action == 'create' else 'Merging' + logging.info(f'{action_str} {len(links)} {type} relationships.') + # Create links in batches for i in range(0, len(links), BATCH_SIZE): batch = links[i:i + BATCH_SIZE] diff --git a/iyp/crawlers/alice_lg/__init__.py b/iyp/crawlers/alice_lg/__init__.py index 878bdf76..166f51cd 100644 --- a/iyp/crawlers/alice_lg/__init__.py +++ b/iyp/crawlers/alice_lg/__init__.py @@ -319,15 +319,14 @@ def __fetch_routes(self) -> None: if failed_neighbors: logging.warning(f'Failed to fetch routes for {len(failed_neighbors)} neighbors: {failed_neighbors}') if failed_pages: - logging.warning( - f'Failed to fetch {sum(failed_pages.values())} pages for {len(failed_pages)} neighbors:') + logging.warning(f'Failed to fetch {sum(failed_pages.values())} pages for {len(failed_pages)} ' + f'neighbors:') for key, count in failed_pages.items(): logging.warning(f' {key}: {count}') def fetch(self) -> None: tmp_dir = self.get_tmp_dir() if not os.path.exists(tmp_dir): - logging.info(f'Creating tmp dir: {tmp_dir}') self.create_tmp_dir() self.__fetch_routeservers() @@ -360,7 +359,7 @@ def run(self) -> None: member_ip = neighbor['address'] n = peering_lans.search_best(member_ip) if n is None: - logging.warning(f'Failed to map member IP to peering LAN: {member_ip}') + logging.debug(f'Failed to map member IP to peering LAN: {member_ip}') continue member_asn = neighbor['asn'] if not member_asn or not isinstance(member_asn, int): @@ -419,11 +418,9 @@ def run(self) -> None: 'props': [flattened_route, self.reference.copy()]}) # Get/create nodes. - logging.info(f'Getting {len(asns)} AS nodes.') asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns, all=False) prefix_id = dict() if prefixes: - logging.info(f'Getting {len(prefixes)} Prefix nodes.') prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes, all=False) # Translate raw values to QID. @@ -437,11 +434,9 @@ def run(self) -> None: relationship['dst_id'] = prefix_id[prefix] # Push relationships. - logging.info(f'Pushing {len(member_of_rels)} MEMBER_OF relationships.') self.iyp.batch_add_links('MEMBER_OF', member_of_rels) if originate_rels: - logging.info(f'Pushing {len(originate_rels)} ORIGINATE relationships.') self.iyp.batch_add_links('ORIGINATE', originate_rels) def unit_test(self): - return super().unit_test(['MEMBER_OF', 'ORIGINATE', 'MANAGED_BY']) + return super().unit_test(['MEMBER_OF']) diff --git a/iyp/crawlers/apnic/eyeball.py b/iyp/crawlers/apnic/eyeball.py index 80544023..c5b275a0 100644 --- a/iyp/crawlers/apnic/eyeball.py +++ b/iyp/crawlers/apnic/eyeball.py @@ -28,8 +28,9 @@ def run(self): processed_asn = set() + logging.info(f'Processing {len(self.countries)} countries...') for cc, country in self.countries.items(): - logging.info(f'processing {country}') + logging.debug(f'processing {country}') # Get the QID of the country and corresponding ranking cc_qid = self.iyp.get_node('Country', {'country_code': cc}) @@ -46,7 +47,7 @@ def run(self): names = set() ranking = req.json() - logging.info(f'{len(ranking)} eyeball ASes') + logging.debug(f'{len(ranking)} eyeball ASes') # Collect all ASNs and names # and make sure the ranking is sorted and add rank field diff --git a/iyp/crawlers/bgpkit/pfx2asn.py b/iyp/crawlers/bgpkit/pfx2asn.py index d83e8dbc..f9c85ac7 100644 --- a/iyp/crawlers/bgpkit/pfx2asn.py +++ b/iyp/crawlers/bgpkit/pfx2asn.py @@ -45,7 +45,6 @@ def run(self): req.close() - logging.info('Pushing nodes to neo4j...') # get ASNs and prefixes IDs self.asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns) self.prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes) @@ -58,7 +57,6 @@ def run(self): links.append({'src_id': asn_qid, 'dst_id': prefix_qid, 'props': [self.reference, entry]}) - logging.info('Pushing links to neo4j...') # Push all links to IYP self.iyp.batch_add_links('ORIGINATE', links) diff --git a/iyp/crawlers/bgptools/anycast_prefixes.py b/iyp/crawlers/bgptools/anycast_prefixes.py index 824e5c45..e57589f3 100644 --- a/iyp/crawlers/bgptools/anycast_prefixes.py +++ b/iyp/crawlers/bgptools/anycast_prefixes.py @@ -65,14 +65,12 @@ def run(self): ipv4_prefixes_response = fetch_dataset(ipv4_prefixes_url) logging.info('IPv4 prefixes fetched successfully.') self.update(ipv4_prefixes_response, ipv4_prefixes_filename) - logging.info('IPv4 prefixes pushed to IYP.') self.reference['reference_url_data'] = ipv6_prefixes_url self.reference['reference_time_modification'] = get_commit_datetime(self.repo, self.v6_file) ipv6_prefixes_response = fetch_dataset(ipv6_prefixes_url) logging.info('IPv6 prefixes fetched successfully.') self.update(ipv6_prefixes_response, ipv6_prefixes_filename) - logging.info('IPv6 prefixes pushed to IYP.') def update(self, res, filename: str): with open(filename, 'w') as file: diff --git a/iyp/crawlers/bgptools/tags.py b/iyp/crawlers/bgptools/tags.py index 155f6446..a1e65251 100644 --- a/iyp/crawlers/bgptools/tags.py +++ b/iyp/crawlers/bgptools/tags.py @@ -60,7 +60,7 @@ def run(self): req = requests.get(url, headers=self.headers) if req.status_code != 200: - print(req.text) + logging.error(req.text) raise RequestStatusError('Error while fetching AS names') self.tag_qid = self.iyp.get_node('Tag', {'label': label}) @@ -74,14 +74,8 @@ def run(self): asn_qid = self.iyp.get_node('AS', {'asn': asn[2:]}) statements = [['CATEGORIZED', self.tag_qid, self.reference]] # Set AS name - try: - # Update AS name and country - self.iyp.add_links(asn_qid, statements) - - except Exception as error: - # print errors and continue running - print('Error for: ', line) - print(error) + # Update AS name and country + self.iyp.add_links(asn_qid, statements) def unit_test(self): return super().unit_test(['CATEGORIZED']) diff --git a/iyp/crawlers/caida/asrank.py b/iyp/crawlers/caida/asrank.py index 51e8e45d..6d797eb2 100644 --- a/iyp/crawlers/caida/asrank.py +++ b/iyp/crawlers/caida/asrank.py @@ -22,16 +22,15 @@ def __init__(self, organization, url, name): def run(self): """Fetch networks information from ASRank and push to IYP.""" - print('Fetching CAIDA AS Rank', file=sys.stderr) - nodes = list() has_next = True i = 0 + logging.info('Fetching AS Ranks...') while has_next: url = URL + f'&offset={i * 10000}' i += 1 - logging.info(f'Fetching {url}') + logging.debug(f'Fetching {url}') req = requests.get(url) if req.status_code != 200: logging.error(f'Request failed with status: {req.status_code}') @@ -42,7 +41,6 @@ def run(self): nodes += ranking['edges'] - print(f'Fetched {len(nodes):,d} ranks.', file=sys.stderr) logging.info(f'Fetched {len(nodes):,d} ranks.') # Collect all ASNs, names, and countries @@ -59,8 +57,6 @@ def run(self): asns.add(int(asn['asn'])) # Get/create ASNs, names, and country nodes - print('Pushing nodes.', file=sys.stderr) - logging.info('Pushing nodes.') self.asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns) self.country_id = self.iyp.batch_get_nodes_by_single_prop('Country', 'country_code', countries) self.name_id = self.iyp.batch_get_nodes_by_single_prop('Name', 'name', names, all=False) @@ -94,8 +90,6 @@ def run(self): rank_links.append({'src_id': asn_qid, 'dst_id': self.asrank_qid, 'props': [self.reference, flat_asn]}) # Push all links to IYP - print('Pushing links.', file=sys.stderr) - logging.info('Pushing links.') self.iyp.batch_add_links('NAME', name_links) self.iyp.batch_add_links('COUNTRY', country_links) self.iyp.batch_add_links('RANK', rank_links) diff --git a/iyp/crawlers/caida/ix_asns.py b/iyp/crawlers/caida/ix_asns.py index 8e0f7f9c..82b3f9a0 100644 --- a/iyp/crawlers/caida/ix_asns.py +++ b/iyp/crawlers/caida/ix_asns.py @@ -38,7 +38,7 @@ def __init__(self, organization, url, name): raise Exception('No recent CAIDA ix-asns file available') date = date.datetime.replace(day=1, hour=0, minute=0, second=0, microsecond=0, tzinfo=timezone.utc) - logging.info('going to use this URL: ' + url) + logging.info(f'Fetching data from: {url}') super().__init__(organization, url, name) self.reference['reference_url_info'] = 'https://publicdata.caida.org/datasets/ixps/README.txt' self.reference['reference_time_modification'] = date diff --git a/iyp/crawlers/cisco/umbrella_top1m.py b/iyp/crawlers/cisco/umbrella_top1m.py index 9ed18fba..fe99a6f6 100644 --- a/iyp/crawlers/cisco/umbrella_top1m.py +++ b/iyp/crawlers/cisco/umbrella_top1m.py @@ -44,7 +44,7 @@ def __set_modification_time(self): # date now points to the last available historical file , which means the # current file is the day after this date. self.reference['reference_time_modification'] = date + timedelta(days=1) - logging.info(self.reference) + logging.info(f'Got list for date {self.reference["reference_time_modification"].strftime("%Y-%m-%d")}') def run(self): """Fetch Umbrella top 1M and push to IYP.""" @@ -69,7 +69,6 @@ def run(self): links.append({'src_name': domain, 'dst_id': self.cisco_qid, 'props': [self.reference, {'rank': int(rank)}]}) - logging.info('Fetching DomainName/HostName nodes...') domain_id = self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name') host_id = self.iyp.batch_get_nodes_by_single_prop('HostName', 'name') @@ -103,10 +102,8 @@ def run(self): new_host_names.add(name) if new_domain_names: - logging.info(f'Pushing {len(new_domain_names)} additional DomainName nodes...') domain_id.update(self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name', new_domain_names, all=False)) if new_host_names: - logging.info(f'Pushing {len(new_host_names)} additional HostName nodes...') host_id.update(self.iyp.batch_get_nodes_by_single_prop('HostName', 'name', new_host_names, all=False)) for link in unprocessed_links: @@ -120,7 +117,6 @@ def run(self): processed_links.append(link) # Push all links to IYP - logging.info(f'Pushing {len(processed_links)} RANK relationships...') self.iyp.batch_add_links('RANK', processed_links) def unit_test(self): diff --git a/iyp/crawlers/cloudflare/dns_top_locations.py b/iyp/crawlers/cloudflare/dns_top_locations.py index 37eb0f42..cfe599ed 100644 --- a/iyp/crawlers/cloudflare/dns_top_locations.py +++ b/iyp/crawlers/cloudflare/dns_top_locations.py @@ -138,15 +138,12 @@ def run(self): self.compute_link(domain_top) if i % 100 == 0: - sys.stderr.write(f'Pushing link batch #{int(i / 100)}...\r') self.iyp.batch_add_links('QUERIED_FROM', self.statements) self.statements = [] if self.statements: self.iyp.batch_add_links('QUERIED_FROM', self.statements) - sys.stderr.write('\n') - def compute_link(self, param): """Compute link for the given domain name' top countries and corresponding properties.""" diff --git a/iyp/crawlers/cloudflare/ranking_bucket.py b/iyp/crawlers/cloudflare/ranking_bucket.py index c6b64c72..74a2b7dc 100644 --- a/iyp/crawlers/cloudflare/ranking_bucket.py +++ b/iyp/crawlers/cloudflare/ranking_bucket.py @@ -98,14 +98,11 @@ def run(self): # Note: Since we do not specify all=False in batch_get_nodes we will get the IDs # of _all_ DomainName nodes, so we must not create relationships for all # domain_ids, but iterate over the domains set instead. - logging.info(f'Adding/retrieving {len(all_domains)} DomainName nodes.') - print(f'Adding/retrieving {len(all_domains)} DomainName nodes') domain_ids = self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name', all_domains) for dataset, domains in datasets: dataset_title = f'Cloudflare {dataset["title"]}' logging.info(f'Processing dataset: {dataset_title}') - print(f'Processing dataset: {dataset_title}') ranking_id = self.iyp.get_node('Ranking', { 'name': dataset_title, @@ -119,7 +116,6 @@ def run(self): for domain in domains] if domain_links: # Push RANK relationships to IYP - print(f'Adding {len(domain_links)} RANK relationships', file=sys.stderr) self.iyp.batch_add_links('RANK', domain_links) def unit_test(self): diff --git a/iyp/crawlers/cloudflare/top100.py b/iyp/crawlers/cloudflare/top100.py index c44fbe24..afcfb360 100644 --- a/iyp/crawlers/cloudflare/top100.py +++ b/iyp/crawlers/cloudflare/top100.py @@ -43,7 +43,7 @@ def run(self): req = requests.get(self.reference['reference_url_data'], headers=headers) if req.status_code != 200: - print(f'Cannot download data {req.status_code}: {req.text}') + logging.error(f'Cannot download data {req.status_code}: {req.text}') raise RequestStatusError(f'Error while fetching data file: {req.status_code}') results = req.json()['result'] @@ -56,9 +56,8 @@ def run(self): logging.warning(f'Failed to get modification time: {e}') # Process line one after the other - for i, _ in enumerate(map(self.update, results['top'])): - sys.stderr.write(f'\rProcessed {i} lines') - sys.stderr.write('\n') + processed = list(map(self.update, results['top'])) + logging.info(f'Processed {len(processed)} lines') def update(self, entry): """Add the entry to IYP if it's not already there and update its properties.""" diff --git a/iyp/crawlers/iana/root_zone.py b/iyp/crawlers/iana/root_zone.py index 39d6224f..cabea1f4 100644 --- a/iyp/crawlers/iana/root_zone.py +++ b/iyp/crawlers/iana/root_zone.py @@ -35,11 +35,9 @@ def run(self): # NAME, TTL, CLASS, TYPE, RDATA if len(l) < 5: logging.warning(f'DNS record line is too short: {l}') - print(f'DNS record line is too short: {l}', file=sys.stderr) continue if l[2] != 'IN': logging.warning(f'Unexpected DNS record class: "{l[2]}". Expecting only IN records.') - print(f'Unexpected DNS record class: "{l[2]}". Expecting only IN records.', file=sys.stderr) continue record_type = l[3] if record_type not in {'A', 'AAAA', 'NS'}: @@ -54,7 +52,6 @@ def run(self): nsdname = rdata.rstrip('.') if not nsdname: logging.warning(f'NS record points to root node? {l}') - print(f'NS record points to root node? {l}', file=sys.stderr) continue if nsdname not in domainnames: domainnames.add(nsdname) @@ -72,8 +69,6 @@ def run(self): except ValueError as e: logging.warning(f'Invalid IP address in A/AAAA record: {l}') logging.warning(e) - print(f'Invalid IP address in A/AAAA record: {l}', file=sys.stderr) - print(e, file=sys.stderr) continue if ip not in ips: ips.add(ip) diff --git a/iyp/crawlers/ihr/README.md b/iyp/crawlers/ihr/README.md index 915c7163..b3ddbe75 100644 --- a/iyp/crawlers/ihr/README.md +++ b/iyp/crawlers/ihr/README.md @@ -73,5 +73,4 @@ The country geo-location is provided by Maxmind. ## Dependence -`rov.py` assumes ASes and prefixes are already registered in the database, it becomes very slow if -this is not the case. Running `bgpkit.pfx2asn` before makes it much faster. +These crawlers are not depending on other crawlers. diff --git a/iyp/crawlers/ihr/__init__.py b/iyp/crawlers/ihr/__init__.py index 67f08631..2338119a 100644 --- a/iyp/crawlers/ihr/__init__.py +++ b/iyp/crawlers/ihr/__init__.py @@ -1,33 +1,12 @@ import csv -import os -from datetime import timezone +import io +import logging +from datetime import datetime, timedelta, timezone -import arrow import lz4.frame import requests -from iyp import BaseCrawler - - -class lz4Csv: - def __init__(self, filename): - """Start reading a lz4 compress csv file.""" - - self.fp = lz4.frame.open(filename, 'rb') - - def __iter__(self): - """Read file header line and set self.fields.""" - line = self.fp.readline() - self.fields = line.decode('utf-8').rstrip().split(',') - return self - - def __next__(self): - line = self.fp.readline().decode('utf-8').rstrip() - - if len(line) > 0: - return line - else: - raise StopIteration +from iyp import BaseCrawler, DataNotAvailableError class HegemonyCrawler(BaseCrawler): @@ -39,68 +18,67 @@ def __init__(self, organization, url, name, af): def run(self): """Fetch data from file and push to IYP.""" - today = arrow.utcnow() - url = self.url.format(year=today.year, month=today.month, day=today.day) + today = datetime.now(tz=timezone.utc) + max_lookback = today - timedelta(days=7) + url = today.strftime(self.url) req = requests.head(url) - if req.status_code != 200: - today = today.shift(days=-1) - url = self.url.format(year=today.year, month=today.month, day=today.day) + while req.status_code != 200 and today > max_lookback: + today -= timedelta(days=1) + url = today.strftime(self.url) req = requests.head(url) - if req.status_code != 200: - today = today.shift(days=-1) - url = self.url.format(year=today.year, month=today.month, day=today.day) - req = requests.head(url) + if req.status_code != 200: + logging.error('Failed to find data within the specified lookback interval.') + raise DataNotAvailableError('Failed to find data within the specified lookback interval.') self.reference['reference_url_data'] = url - self.reference['reference_time_modification'] = today.datetime.replace(hour=0, - minute=0, - second=0, - microsecond=0, - tzinfo=timezone.utc) - - os.makedirs('tmp/', exist_ok=True) - os.system(f'wget {url} -P tmp/') - local_filename = 'tmp/' + url.rpartition('/')[2] - self.csv = lz4Csv(local_filename) + logging.info(f'Fetching data from: {url}') + req = requests.get(url) + req.raise_for_status() - self.timebin = None - asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', set()) + # lz4.frame.decompress() and splitlines() break the CSV parsing due to some + # weird input. + with lz4.frame.open(io.BytesIO(req.content)) as f: + csv_lines = [l.decode('utf-8').rstrip() for l in f] - links = [] + timebin = None + asns = set() + links = list() - for line in csv.reader(self.csv, quotechar='"', delimiter=',', skipinitialspace=True): + logging.info('Computing links...') + for rec in csv.DictReader(csv_lines): # header # timebin,originasn,asn,hege - rec = dict(zip(self.csv.fields, line)) rec['hege'] = float(rec['hege']) rec['af'] = self.af - if self.timebin is None: - self.timebin = rec['timebin'] - elif self.timebin != rec['timebin']: + if timebin is None: + timebin = rec['timebin'] + mod_time = datetime.strptime(timebin, '%Y-%m-%d %H:%M:%S+00').replace(tzinfo=timezone.utc) + self.reference['reference_time_modification'] = mod_time + elif timebin != rec['timebin']: break originasn = int(rec['originasn']) - if originasn not in asn_id: - asn_id[originasn] = self.iyp.get_node('AS', {'asn': originasn}) - asn = int(rec['asn']) - if asn not in asn_id: - asn_id[asn] = self.iyp.get_node('AS', {'asn': asn}) + asns.add(originasn) + asns.add(asn) links.append({ - 'src_id': asn_id[originasn], - 'dst_id': asn_id[asn], + 'src_id': originasn, + 'dst_id': asn, 'props': [self.reference, rec] }) + asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns, all=False) + # Replace values in links with node IDs. + for link in links: + link['src_id'] = asn_id[link['src_id']] + link['dst_id'] = asn_id[link['dst_id']] + # Push links to IYP self.iyp.batch_add_links('DEPENDS_ON', links) - # Remove downloaded file - os.remove(local_filename) - def unit_test(self): return super().unit_test(['DEPENDS_ON']) diff --git a/iyp/crawlers/ihr/local_hegemony_v4.py b/iyp/crawlers/ihr/local_hegemony_v4.py index a61bd8e8..3438dcc0 100644 --- a/iyp/crawlers/ihr/local_hegemony_v4.py +++ b/iyp/crawlers/ihr/local_hegemony_v4.py @@ -6,9 +6,7 @@ from iyp.crawlers.ihr import HegemonyCrawler # URL to the API -URL = ('https://ihr-archive.iijlab.net/ihr/hegemony/ipv4/local/' - '{year}/{month:02d}/{day:02d}/' - 'ihr_hegemony_ipv4_local_{year}-{month:02d}-{day:02d}.csv.lz4') +URL = 'https://ihr-archive.iijlab.net/ihr/hegemony/ipv4/local/%Y/%m/%d/ihr_hegemony_ipv4_local_%Y-%m-%d.csv.lz4' ORG = 'IHR' NAME = 'ihr.local_hegemony_v4' diff --git a/iyp/crawlers/ihr/local_hegemony_v6.py b/iyp/crawlers/ihr/local_hegemony_v6.py index 7c243419..c86b4ce7 100644 --- a/iyp/crawlers/ihr/local_hegemony_v6.py +++ b/iyp/crawlers/ihr/local_hegemony_v6.py @@ -6,9 +6,7 @@ from iyp.crawlers.ihr import HegemonyCrawler # URL to the API -URL = ('https://ihr-archive.iijlab.net/ihr/hegemony/ipv6/local/' - '{year}/{month:02d}/{day:02d}/' - 'ihr_hegemony_ipv6_local_{year}-{month:02d}-{day:02d}.csv.lz4') +URL = 'https://ihr-archive.iijlab.net/ihr/hegemony/ipv6/local/%Y/%m/%d/ihr_hegemony_ipv6_local_%Y-%m-%d.csv.lz4' ORG = 'IHR' NAME = 'ihr.local_hegemony_v6' diff --git a/iyp/crawlers/inetintel/as_org.py b/iyp/crawlers/inetintel/as_org.py index a5720e3c..85dfd331 100644 --- a/iyp/crawlers/inetintel/as_org.py +++ b/iyp/crawlers/inetintel/as_org.py @@ -26,7 +26,7 @@ def get_latest_dataset_url(github_repo: str, data_dir: str, file_extension: str) latest_files = repo.get_contents(all_data_dir[-1]) for file in latest_files: if file.path.endswith(file_extension): - print(file.download_url) + logging.info(file.download_url) return file.download_url return '' @@ -79,12 +79,12 @@ def run(self): with open(self.filename, 'w') as file: file.write(req.text) - print('Dataset crawled and saved in a temporary file.') + logging.info('Dataset crawled and saved in a temporary file.') # The dataset is very large. Pandas has the ability to read JSON, and, in # theory, it could do it in a more memory-efficient way. df = pd.read_json(self.filename, orient='index') - print('Dataset has {} rows.'.format(len(df))) + logging.info('Dataset has {} rows.'.format(len(df))) # Optimized code batch_size = 10000 @@ -176,8 +176,8 @@ def run(self): count_rows_global += count_rows count_relationships_global += count_relationships - print('processed: {} rows and {} relationships' - .format(count_rows_global, count_relationships_global)) + logging.info('processed: {} rows and {} relationships' + .format(count_rows_global, count_relationships_global)) def close(self): super().close() diff --git a/iyp/crawlers/manrs/members.py b/iyp/crawlers/manrs/members.py index fa671407..6ad7a8ff 100644 --- a/iyp/crawlers/manrs/members.py +++ b/iyp/crawlers/manrs/members.py @@ -99,9 +99,6 @@ def run(self): if action_bool == 'Yes': implement_rel_set.add((asn, self.actions[j]['qid'])) - print(f'\rProcessed {i} organizations', file=sys.stderr, end='') - print() - # Get/create nodes. asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asn_set, all=False) country_id = self.iyp.batch_get_nodes_by_single_prop('Country', 'country_code', country_set) diff --git a/iyp/crawlers/nro/delegated_stats.py b/iyp/crawlers/nro/delegated_stats.py index ce074238..c8f5514d 100644 --- a/iyp/crawlers/nro/delegated_stats.py +++ b/iyp/crawlers/nro/delegated_stats.py @@ -76,7 +76,7 @@ def run(self): asn_status_links = defaultdict(list) prefix_status_links = defaultdict(list) - logging.info('Parsing file') + logging.info('Parsing file...') for line in req.text.splitlines(): # Skip comments. if line.strip().startswith('#'): diff --git a/iyp/crawlers/openintel/__init__.py b/iyp/crawlers/openintel/__init__.py index dc02acb7..9b79de33 100644 --- a/iyp/crawlers/openintel/__init__.py +++ b/iyp/crawlers/openintel/__init__.py @@ -111,11 +111,11 @@ def get_parquet(self): suffix='.parquet', delete=True) as tempFile: - print("Opened temporary file for object download: '{}'.".format(tempFile.name)) + logging.info("Opened temporary file for object download: '{}'.".format(tempFile.name)) WAREHOUSE_BUCKET.download_fileobj( Key=i_obj.key, Fileobj=tempFile, Config=boto3.s3.transfer.TransferConfig( multipart_chunksize=16 * 1024 * 1024)) - print("Downloaded '{}' [{:.2f}MiB] into '{}'.".format( + logging.info("Downloaded '{}' [{:.2f}MiB] into '{}'.".format( os.path.join(S3A_OPENINTEL_ENDPOINT, WAREHOUSE_BUCKET.name, i_obj.key), os.path.getsize(tempFile.name) / (1024 * 1024), tempFile.name @@ -164,7 +164,7 @@ def run(self): df.query_name = df.query_name.str[:-1] # Remove root '.' df.ns_address = df.ns_address.map(lambda x: x[:-1] if x is not None else None) # Remove root '.' - print(f'Read {len(df)} unique records from {len(self.pandas_df_list)} Parquet file(s).') + logging.info(f'Read {len(df)} unique records from {len(self.pandas_df_list)} Parquet file(s).') # query_names for NS records are domain names domain_names = set(df[df.response_type == 'NS']['query_name']) @@ -194,8 +194,8 @@ def run(self): df[df.ip4_address.notnull()]['ip4_address']), all=False) ip6_id = self.iyp.batch_get_nodes_by_single_prop('IP', 'ip', ipv6_addresses, all=False) - print(f'Got {len(domain_id)} domains, {len(ns_id)} nameservers, {len(host_id)} hosts, {len(ip4_id)} IPv4, ' - f'{len(ip6_id)} IPv6') + logging.info(f'Got {len(domain_id)} domains, {len(ns_id)} nameservers, {len(host_id)} hosts, ' + f'{len(ip4_id)} IPv4, {len(ip6_id)} IPv6') # Compute links res_links = [] @@ -232,7 +232,7 @@ def run(self): for hd in host_names.intersection(domain_names): partof_links.append({'src_id': host_id[hd], 'dst_id': domain_id[hd], 'props': [self.reference]}) - print(f'Computed {len(res_links)} RESOLVES_TO links and {len(mng_links)} MANAGED_BY links') + logging.info(f'Computed {len(res_links)} RESOLVES_TO links and {len(mng_links)} MANAGED_BY links') # Push all links to IYP self.iyp.batch_add_links('RESOLVES_TO', res_links) @@ -240,7 +240,9 @@ def run(self): self.iyp.batch_add_links('PART_OF', partof_links) def unit_test(self): - # use different version depending on infra_ns vs others + # infra_ns only has RESOLVES_TO relationships. + if self.reference['reference_name'] == 'openintel.infra_ns': + return super().unit_test(['RESOLVES_TO']) return super().unit_test(['RESOLVES_TO', 'MANAGED_BY', 'PART_OF']) @@ -316,8 +318,6 @@ def run(self): unique_host_names.update(connections[connections[node_type] == 'HOSTNAME'][node_key].unique()) unique_ips.update(connections[connections[node_type] == 'IP'][node_key].unique()) - logging.info(f'Pushing/getting {len(unique_domain_names)} DomainName {len(unique_host_names)} HostName ' - f'{len(unique_ips)} IP nodes...') domains_id = self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name', unique_domain_names) hosts_id = self.iyp.batch_get_nodes_by_single_prop('HostName', 'name', unique_host_names) ips_id = self.iyp.batch_get_nodes_by_single_prop('IP', 'ip', unique_ips) @@ -374,8 +374,6 @@ def run(self): logging.error(f'Unknown relationship type: {connection.relation_name}') # Push all links to IYP - logging.info(f'Pushing {len(links_parent)} PARENT {len(links_part_of)} PART_OF {len(links_alias_of)} ALIAS_OF ' - f'{len(links_managed_by)} MANAGED_BY {len(links_resolves_to)} RESOLVES_TO relationships...') self.iyp.batch_add_links('PARENT', links_parent) self.iyp.batch_add_links('PART_OF', links_part_of) self.iyp.batch_add_links('ALIAS_OF', links_alias_of) @@ -384,7 +382,6 @@ def run(self): # Push the Authoritative NS Label ns_id = [link['dst_id'] for link in links_managed_by] - logging.info(f'Adding AuthoritativeNameServer label to {len(ns_id)} nodes') self.iyp.batch_add_node_label(ns_id, 'AuthoritativeNameServer') def unit_test(self): diff --git a/iyp/crawlers/pch/__init__.py b/iyp/crawlers/pch/__init__.py index db023a43..62302fe1 100644 --- a/iyp/crawlers/pch/__init__.py +++ b/iyp/crawlers/pch/__init__.py @@ -2,7 +2,6 @@ import json import logging import os -import sys from collections import defaultdict from datetime import datetime, timedelta, timezone from ipaddress import ip_network @@ -95,8 +94,6 @@ def fetch_urls(self, urls: list) -> Iterable: except ChunkedEncodingError as e: logging.error(f'Failed to retrieve data for {query}') logging.error(e) - print(f'Failed to retrieve data for {query}', file=sys.stderr) - print(e, file=sys.stderr) return False, str(), name def fetch_url(self, url: str, name: str = str()) -> Tuple[bool, str, str]: @@ -112,7 +109,6 @@ def fetch_collector_site(self) -> str: within the lookback interval. """ logging.info('Fetching list of collectors.') - print('Fetching list of collectors.') today = datetime.now(tz=timezone.utc) self.collector_site_url = self.url + today.strftime('%Y/%m/') resp = self.session.get(self.collector_site_url).result() @@ -160,7 +156,6 @@ def probe_latest_set(self, collector_name: str) -> datetime: Return None if no data is found within the valid interval. """ logging.info('Probing latest available dataset.') - print('Probing latest available dataset.') curr_date = datetime.now(tz=timezone.utc) max_lookback = curr_date - self.MAX_LOOKBACK while curr_date >= max_lookback: @@ -172,7 +167,6 @@ def probe_latest_set(self, collector_name: str) -> datetime: resp = self.session.head(probe_url).result() if resp.status_code == 200: logging.info(f'Latest available dataset: {curr_date.strftime("%Y-%m-%d")}') - print(f'Latest available dataset: {curr_date.strftime("%Y-%m-%d")}') return curr_date curr_date -= timedelta(days=1) logging.error('Failed to find current data.') @@ -194,7 +188,6 @@ def fetch(self) -> None: tmp_dir = self.get_tmp_dir() if not os.path.exists(tmp_dir): - logging.info(f'Creating tmp dir: {tmp_dir}') tmp_dir = self.create_tmp_dir() # Get a list of collector names @@ -238,8 +231,6 @@ def fetch(self) -> None: if to_fetch: logging.info(f'{len(self.collector_files)}/{len(collector_names)} collector files in cache, fetching ' f'{len(to_fetch)}') - print(f'{len(self.collector_files)}/{len(collector_names)} collector files in cache, fetching ' - f'{len(to_fetch)}') # If some collectors do not have current data available, # try again until the max lookback window is reached. @@ -267,8 +258,6 @@ def fetch(self) -> None: if failed_fetches: # Max lookback reached. logging.warning(f'Failed to find current data for {len(failed_fetches)} collectors: {failed_fetches}') - print(f'Failed to find current data for {len(failed_fetches)} collectors: {failed_fetches}', - file=sys.stderr) def run(self) -> None: """Fetch data from PCH, parse the files, and push nodes and relationships to the @@ -278,7 +267,6 @@ def run(self) -> None: # Parse files in parallel. logging.info(f'Parsing {len(self.collector_files)} collector files.') - print(f'Parsing {len(self.collector_files)} collector files.') fixtures = list() for collector_name, collector_file in self.collector_files.items(): fixtures.append((collector_name, collector_file)) @@ -306,8 +294,6 @@ def run(self) -> None: raw_links[(asn, prefix)].add(collector_name) # Get/push nodes. - logging.info(f'Fetching {len(ases)} AS and {len(prefixes)} Prefix nodes.') - print(f'Fetching {len(ases)} AS and {len(prefixes)} Prefix nodes.') as_ids = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', ases, all=False) prefix_ids = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes, all=False) @@ -320,8 +306,6 @@ def run(self) -> None: 'dst_id': prefix_ids[prefix], 'props': [props, self.reference]}) - logging.info(f'Pushing {len(relationships)} relationships.') - print(f'Pushing {len(relationships)} relationships.') self.iyp.batch_add_links('ORIGINATE', relationships) # Clear cache. diff --git a/iyp/crawlers/pch/show_bgp_parser.py b/iyp/crawlers/pch/show_bgp_parser.py index f1f92ab8..9eeaccfc 100644 --- a/iyp/crawlers/pch/show_bgp_parser.py +++ b/iyp/crawlers/pch/show_bgp_parser.py @@ -1,6 +1,5 @@ import logging import re -import sys from collections import defaultdict, namedtuple from ipaddress import (AddressValueError, IPv4Address, IPv4Network, IPv6Address, IPv6Network) @@ -52,7 +51,6 @@ def __handle_status_codes(self, code: str) -> set: for c in code: if c not in self.status_codes: logging.critical(f'{self.collector}: Invalid status code {c} in code {code}') - print(f'{self.collector}: Invalid status code {c} in code {code}', file=sys.stderr) return set() ret.add(self.status_codes[c]) return ret @@ -61,7 +59,6 @@ def __handle_origin_code(self, code: str) -> str: """Map origin code to its label.""" if code not in self.origin_codes: logging.critical(f'{self.collector}: Invalid origin code {code}') - print(f'{self.collector}: Invalid origin code {code}', file=sys.stderr) return str() return self.origin_codes[code] @@ -178,10 +175,8 @@ def __build_prefix_map(self, routes: list) -> dict: prefix_map[prefix].add(origin) if not_valid_routes: logging.info(f'{self.collector}: Ignored {not_valid_routes} not valid routes.') - print(f'{self.collector}: Ignored {not_valid_routes} not valid routes.', file=sys.stderr) if as_sets: - logging.info(f'{self.collector}: Ignored {as_sets} AS set origins.') - print(f'{self.collector}: Ignored {as_sets} AS set origins.', file=sys.stderr) + logging.debug(f'{self.collector}: Ignored {as_sets} AS set origins.') if incomplete_origin_routes: logging.info(f'{self.collector}: Ignored {incomplete_origin_routes} routes with incomplete origin.') return prefix_map @@ -201,7 +196,7 @@ def parse_parallel(self, fixture: tuple) -> Tuple[str, dict]: fixture: Tuple of (collector_name, input_str) """ collector_name, input_str = fixture - logging.info(collector_name) + logging.debug(collector_name) self.collector = collector_name return collector_name, self.parse(input_str) @@ -216,7 +211,6 @@ def parse(self, input_str: str) -> dict: pass except StopIteration: logging.warning(f'{self.collector}: Empty file.') - print(f'{self.collector}: Empty file.', file=sys.stderr) return dict() routes = list() last_pfx = str() diff --git a/iyp/crawlers/peeringdb/fac.py b/iyp/crawlers/peeringdb/fac.py index ebf012f6..170187f8 100644 --- a/iyp/crawlers/peeringdb/fac.py +++ b/iyp/crawlers/peeringdb/fac.py @@ -50,7 +50,7 @@ def __init__(self, organization, url, name): def run(self): """Fetch facilities information from PeeringDB and push to IYP.""" - sys.stderr.write('Fetching PeeringDB data...\n') + logging.info('Fetching PeeringDB data...') req = self.requests.get(URL, headers=self.headers) if req.status_code != 200: logging.error('Error while fetching peeringDB data') @@ -103,7 +103,6 @@ def run(self): try: flat_fac = dict(flatdict.FlatDict(fac)) except Exception as e: - sys.stderr.write(f'Cannot flatten dictionary {fac}\n{e}\n') logging.error(f'Cannot flatten dictionary {fac}\n{e}') facid_qid = self.facid_id[fac['id']] diff --git a/iyp/crawlers/peeringdb/ix.py b/iyp/crawlers/peeringdb/ix.py index 267c512d..cbd6964b 100644 --- a/iyp/crawlers/peeringdb/ix.py +++ b/iyp/crawlers/peeringdb/ix.py @@ -131,7 +131,6 @@ def run(self): self.ixs = result['data'] # Register IXPs - logging.warning('Pushing IXP info...') self.register_ixs() self.ix_id = self.iyp.batch_get_node_extid(IXID_LABEL) @@ -149,7 +148,6 @@ def run(self): for ixlan in ixlans: self.ixlans[ixlan['id']] = ixlan - logging.warning('Pushing IXP LAN and members...') self.register_ix_membership() self.iyp.commit() diff --git a/iyp/crawlers/peeringdb/org.py b/iyp/crawlers/peeringdb/org.py index aa0a2c70..7e429a43 100644 --- a/iyp/crawlers/peeringdb/org.py +++ b/iyp/crawlers/peeringdb/org.py @@ -95,7 +95,6 @@ def run(self): try: flat_org = dict(flatdict.FlatDict(org)) except Exception as e: - sys.stderr.write(f'Cannot flatten dictionary {org}\n{e}\n') logging.error(f'Cannot flatten dictionary {org}\n{e}') orgid_qid = self.orgid_id[org['id']] diff --git a/iyp/crawlers/ripe/as_names.py b/iyp/crawlers/ripe/as_names.py index 9e163ada..44e1efb0 100644 --- a/iyp/crawlers/ripe/as_names.py +++ b/iyp/crawlers/ripe/as_names.py @@ -36,7 +36,7 @@ def run(self): # Country codes are two digits if len(cc) > 2: - print(cc) + logging.warning(cc) continue asn = int(asn) diff --git a/iyp/crawlers/ripe/atlas_measurements.py b/iyp/crawlers/ripe/atlas_measurements.py index eb60f6cf..bb2f5763 100644 --- a/iyp/crawlers/ripe/atlas_measurements.py +++ b/iyp/crawlers/ripe/atlas_measurements.py @@ -52,12 +52,12 @@ def __process_response(response: requests.Response): next_url = data['next'] if not next_url: - logging.info('Reached end of list') + logging.debug('Reached end of list') next_url = str() return next_url, data['results'] def __execute_query(self, url: str): - logging.info(f'Querying {url}') + logging.debug(f'Querying {url}') r = self.session.get(url) return self.__process_response(r) @@ -140,8 +140,8 @@ def run(self): while next_url: next_url, next_data = self.__execute_query(next_url) data += next_data - logging.info(f'Added {len(next_data)} measurements. Total: {len(data)}') - print(f'Fetched {len(data)} measurements', file=sys.stderr) + logging.debug(f'Added {len(next_data)} measurements. Total: {len(data)}') + logging.info(f'Fetched {len(data)} measurements') # Transform the data to be compatible with the flatdict format. self.__transform_data(data) @@ -191,7 +191,6 @@ def run(self): valid_probe_measurements.append(probe_measurement) # push nodes - logging.info('Fetching/pushing nodes') probe_measurement_ids = dict() attrs_flattened = [] @@ -201,15 +200,10 @@ def run(self): probe_measurement_flattened = dict(flatdict.FlatterDict(probe_measurement_copy, delimiter='_')) attrs_flattened.append(probe_measurement_flattened) - logging.info(f'{len(attrs_flattened)} measurements') probe_measurement_ids = self.iyp.batch_get_nodes('AtlasMeasurement', attrs_flattened, ['id'], create=True) - logging.info(f'{len(probe_ids)} probes') probe_ids = self.iyp.batch_get_nodes_by_single_prop('AtlasProbe', 'id', probe_ids, all=False, create=True) - logging.info(f'{len(ips)} IPs') ip_ids = self.iyp.batch_get_nodes_by_single_prop('IP', 'ip', ips, all=False, create=True) - logging.info(f'{len(hostnames)} hostnames') hostname_ids = self.iyp.batch_get_nodes_by_single_prop('HostName', 'name', hostnames, all=False, create=True) - logging.info(f'{len(ases)} ASNs') asn_ids = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', ases, all=False, create=True) # compute links @@ -251,12 +245,8 @@ def run(self): 'props': [probe_measurement_reference]}) # Push all links to IYP - logging.info('Fetching/pushing relationships') - logging.info(f'{len(target_links)} TARGET') self.iyp.batch_add_links('TARGET', target_links) - logging.info(f'{len(part_of_links)} PART_OF') self.iyp.batch_add_links('PART_OF', part_of_links) - logging.info('Done.') def unit_test(self): return super().unit_test(['PART_OF', 'TARGET']) @@ -277,7 +267,6 @@ def main() -> None: ) logging.info(f'Started: {sys.argv}') - print('Fetching RIPE Atlas probe measurements', file=sys.stderr) crawler = Crawler(ORG, URL, NAME) if args.unit_test: diff --git a/iyp/crawlers/ripe/atlas_probes.py b/iyp/crawlers/ripe/atlas_probes.py index 4782099e..de96d191 100644 --- a/iyp/crawlers/ripe/atlas_probes.py +++ b/iyp/crawlers/ripe/atlas_probes.py @@ -52,12 +52,12 @@ def __process_response(response: requests.Response): next_url = data['next'] if not next_url: - logging.info('Reached end of list') + logging.debug('Reached end of list') next_url = str() return next_url, data['results'] def __execute_query(self, url: str): - logging.info(f'Querying {url}') + logging.debug(f'Querying {url}') r = self.session.get(url) return self.__process_response(r) @@ -75,8 +75,8 @@ def run(self): while next_url: next_url, next_data = self.__execute_query(next_url) data += next_data - logging.info(f'Added {len(next_data)} probes. Total: {len(data)}') - print(f'Fetched {len(data)} probes.', file=sys.stderr) + logging.debug(f'Added {len(next_data)} probes. Total: {len(data)}') + logging.info(f'Fetched {len(data)} probes.') # Compute nodes probe_ids = set() @@ -125,7 +125,6 @@ def run(self): probe.pop('country_code') # push nodes - logging.info('Fetching/pushing nodes') probe_id = dict() # Each probe is a JSON object with nested fields, so we need to flatten it. flattened_probes = [dict(flatdict.FlatterDict(probe, delimiter='_')) for probe in valid_probes] @@ -170,7 +169,6 @@ def run(self): 'props': [self.reference]}) # Push all links to IYP - logging.info('Fetching/pushing relationships') self.iyp.batch_add_links('ASSIGNED', assigned_links) self.iyp.batch_add_links('LOCATED_IN', located_in_links) self.iyp.batch_add_links('COUNTRY', country_links) @@ -194,7 +192,7 @@ def main() -> None: ) logging.info(f'Started: {sys.argv}') - print('Fetching RIPE Atlas probes', file=sys.stderr) + print('Fetching RIPE Atlas probes') crawler = Crawler(ORG, URL, NAME) if args.unit_test: diff --git a/iyp/crawlers/tranco/top1m.py b/iyp/crawlers/tranco/top1m.py index c2cfb374..cb9b71d9 100644 --- a/iyp/crawlers/tranco/top1m.py +++ b/iyp/crawlers/tranco/top1m.py @@ -36,7 +36,7 @@ def run(self): self.tranco_qid = self.iyp.get_node('Ranking', {'name': 'Tranco top 1M'}) - sys.stderr.write('Downloading latest list...\n') + logging.info('Downloading latest list...') req = requests.get(URL) if req.status_code != 200: raise RequestStatusError('Error while fetching Tranco csv file') diff --git a/iyp/crawlers/virginiatech/rovista.py b/iyp/crawlers/virginiatech/rovista.py index c31585c2..08180edb 100644 --- a/iyp/crawlers/virginiatech/rovista.py +++ b/iyp/crawlers/virginiatech/rovista.py @@ -55,7 +55,6 @@ def run(self): if len(data) < batch_size: break - logging.info('Pushing nodes to neo4j...') # get ASNs and prefixes IDs self.asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns) tag_id_not_valid = self.iyp.get_node('Tag', {'label': 'Not Validating RPKI ROV'}) @@ -71,7 +70,6 @@ def run(self): links.append({'src_id': asn_qid, 'dst_id': tag_id_not_valid, 'props': [self.reference, {'ratio': entry['ratio']}]}) - logging.info('Pushing links to neo4j...') # Push all links to IYP self.iyp.batch_add_links('CATEGORIZED', links) diff --git a/iyp/post/country_information.py b/iyp/post/country_information.py index 3d6ccaec..6a5fa3ae 100644 --- a/iyp/post/country_information.py +++ b/iyp/post/country_information.py @@ -17,7 +17,7 @@ def run(self): for country_code in country_id: if country_code not in iso3166.countries_by_alpha2: - logging.error(f'Country code {country_code} is not ISO 3166-1 alpha-2 conform.') + logging.error(f'Country code "{country_code}" is not ISO 3166-1 alpha-2 conform.') continue country_info = iso3166.countries_by_alpha2[country_code] new_props = {'name': country_info.apolitical_name, diff --git a/iyp/post/dns_hierarchy.py b/iyp/post/dns_hierarchy.py index 458e3501..78f84cb3 100644 --- a/iyp/post/dns_hierarchy.py +++ b/iyp/post/dns_hierarchy.py @@ -18,12 +18,10 @@ def run(self): """ self.reference['reference_name'] = 'iyp.post.dns_hierarchy' - print('Building DNS hierarchy.', file=sys.stderr) + logging.info('Building DNS hierarchy.') # Fetch all existing DomainName nodes. dns_id = self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name') - logging.info(f'Fetched {len(dns_id):,d} DomainName nodes.') - print(f'Fetched {len(dns_id):,d} DomainName nodes.', file=sys.stderr) # Build hierarchical relationships and keep track of new nodes that have to be # created. @@ -40,8 +38,6 @@ def run(self): dns_name = parent # Create new nodes. - logging.info(f'Creating {len(new_nodes):,d} new DomainName nodes.') - print(f'Creating {len(new_nodes):,d} new DomainName nodes.', file=sys.stderr) dns_id.update(self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name', new_nodes, all=False)) # Build relationships and push to IYP. @@ -49,8 +45,6 @@ def run(self): for child, parent in link_tuples: part_of_links.append({'src_id': dns_id[child], 'dst_id': dns_id[parent], 'props': [self.reference]}) - logging.info(f'Creating {len(part_of_links):,d} PART_OF relationships.') - print(f'Creating {len(part_of_links):,d} PART_OF relationships.', file=sys.stderr) self.iyp.batch_add_links('PART_OF', part_of_links) def count_relation(self):