diff --git a/iyp/crawlers/ihr/rov.py b/iyp/crawlers/ihr/rov.py index 3e9dfd7..3c40d46 100644 --- a/iyp/crawlers/ihr/rov.py +++ b/iyp/crawlers/ihr/rov.py @@ -1,48 +1,27 @@ import argparse import csv +import io import logging import os import sys -from datetime import timezone +from datetime import datetime, timedelta, timezone from ipaddress import ip_network -import arrow import lz4.frame import requests -from iyp import BaseCrawler - -# NOTE: Assumes ASNs and Prefixes are already registered in the database. Run -# bgpkit.pfx2asn before this one +from iyp import BaseCrawler, DataNotAvailableError # URL to the API -URL = 'https://ihr-archive.iijlab.net/ihr/rov/{year}/{month:02d}/{day:02d}/ihr_rov_{year}-{month:02d}-{day:02d}.csv.lz4' +URL = 'https://ihr-archive.iijlab.net/ihr/rov/%Y/%m/%d/ihr_rov_%Y-%m-%d.csv.lz4' ORG = 'IHR' NAME = 'ihr.rov' -class lz4Csv: - def __init__(self, filename): - """Start reading a lz4 compress csv file.""" - - self.fp = lz4.frame.open(filename) - - def __iter__(self): - """Read file header line and set self.fields.""" - line = self.fp.readline() - self.fields = line.decode('utf-8').rstrip().split(',') - return self - - def __next__(self): - line = self.fp.readline().decode('utf-8').rstrip() - - if len(line) > 0: - return line - else: - raise StopIteration - - def close(self): - self.fp.close() +def replace_link_ids(links: list, src_id: dict, dst_id: dict): + for link in links: + link['src_id'] = src_id[link['src_id']] + link['dst_id'] = dst_id[link['dst_id']] class Crawler(BaseCrawler): @@ -52,50 +31,51 @@ def __init__(self, organization, url, name): def run(self): """Fetch data from file and push to IYP.""" - - today = arrow.utcnow() - url = URL.format(year=today.year, month=today.month, day=today.day) + today = datetime.now(tz=timezone.utc) + max_lookback = today - timedelta(days=7) + url = today.strftime(self.url) + logging.info(url) req = requests.head(url) - if req.status_code != 200: - today = today.shift(days=-1) - url = URL.format(year=today.year, month=today.month, day=today.day) + while req.status_code != 200 and today > max_lookback: + today -= timedelta(days=1) + url = today.strftime(self.url) + logging.info(url) req = requests.head(url) - if req.status_code != 200: - today = today.shift(days=-1) - url = URL.format(year=today.year, month=today.month, day=today.day) + if req.status_code != 200: + logging.error('Failed to find data within the specified lookback interval.') + raise DataNotAvailableError('Failed to find data within the specified lookback interval.') self.reference['reference_url_data'] = url - self.reference['reference_time_modification'] = today.datetime.replace(hour=0, - minute=0, - second=0, - microsecond=0, - tzinfo=timezone.utc) + self.reference['reference_time_modification'] = today.replace(hour=0, + minute=0, + second=0, + microsecond=0, + tzinfo=timezone.utc) - os.makedirs('tmp/', exist_ok=True) - os.system(f'wget {url} -P tmp/') + logging.info(f'Fetching data from: {url}') + req = requests.get(url) + req.raise_for_status() - local_filename = 'tmp/' + url.rpartition('/')[2] - self.csv = lz4Csv(local_filename) + with lz4.frame.open(io.BytesIO(req.content)) as f: + csv_lines = [l.decode('utf-8').rstrip() for l in f] - logging.info('Getting node IDs from neo4j...') - asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn') - prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix') - tag_id = self.iyp.batch_get_nodes_by_single_prop('Tag', 'label') - country_id = self.iyp.batch_get_nodes_by_single_prop('Country', 'country_code') + asns = set() + prefixes = set() + tags = set() + countries = set() - orig_links = [] - tag_links = [] - dep_links = [] - country_links = [] + orig_links = list() + tag_links = list() + dep_links = list() + country_links = list() logging.info('Computing links...') - for line in csv.reader(self.csv, quotechar='"', delimiter=',', skipinitialspace=True): + for rec in csv.DictReader(csv_lines): # header # id, timebin, prefix, hege, af, visibility, rpki_status, irr_status, # delegated_prefix_status, delegated_asn_status, descr, moas, asn_id, # country_id, originasn_id - rec = dict(zip(self.csv.fields, line)) rec['hege'] = float(rec['hege']) rec['visibility'] = float(rec['visibility']) rec['af'] = int(rec['af']) @@ -105,77 +85,71 @@ def run(self): except ValueError as e: logging.warning(f'Ignoring malformed prefix: "{rec["prefix"]}": {e}') continue - - if prefix not in prefix_id: - prefix_id[prefix] = self.iyp.get_node('Prefix', {'prefix': prefix}) + prefixes.add(prefix) # make status/country/origin links only for lines where asn=originasn if rec['asn_id'] == rec['originasn_id']: # Make sure all nodes exist originasn = int(rec['originasn_id']) - if originasn not in asn_id: - asn_id[originasn] = self.iyp.get_node('AS', {'asn': originasn}) - rpki_status = 'RPKI ' + rec['rpki_status'] - if rpki_status not in tag_id: - tag_id[rpki_status] = self.iyp.get_node('Tag', {'label': rpki_status}) - irr_status = 'IRR ' + rec['irr_status'] - if irr_status not in tag_id: - tag_id[irr_status] = self.iyp.get_node('Tag', {'label': irr_status}) - cc = rec['country_id'] - if cc not in country_id: - country_id[cc] = self.iyp.get_node('Country', {'country_code': cc}) + + asns.add(originasn) + tags.add(rpki_status) + tags.add(irr_status) + countries.add(cc) # Compute links orig_links.append({ - 'src_id': asn_id[originasn], - 'dst_id': prefix_id[prefix], + 'src_id': originasn, + 'dst_id': prefix, 'props': [self.reference, rec] }) tag_links.append({ - 'src_id': prefix_id[prefix], - 'dst_id': tag_id[rpki_status], + 'src_id': prefix, + 'dst_id': rpki_status, 'props': [self.reference, rec] }) tag_links.append({ - 'src_id': prefix_id[prefix], - 'dst_id': tag_id[irr_status], + 'src_id': prefix, + 'dst_id': irr_status, 'props': [self.reference, rec] }) country_links.append({ - 'src_id': prefix_id[prefix], - 'dst_id': country_id[cc], + 'src_id': prefix, + 'dst_id': cc, 'props': [self.reference] }) # Dependency links asn = int(rec['asn_id']) - if asn not in asn_id: - asn_id[asn] = self.iyp.get_node('AS', {'asn': asn}) + asns.add(asn) dep_links.append({ - 'src_id': prefix_id[prefix], - 'dst_id': asn_id[asn], + 'src_id': prefix, + 'dst_id': asn, 'props': [self.reference, rec] }) - self.csv.close() + asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns) + prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes, all=False) + tag_id = self.iyp.batch_get_nodes_by_single_prop('Tag', 'label', tags, all=False) + country_id = self.iyp.batch_get_nodes_by_single_prop('Country', 'country_code', countries) + + replace_link_ids(orig_links, asn_id, prefix_id) + replace_link_ids(tag_links, prefix_id, tag_id) + replace_link_ids(country_links, prefix_id, country_id) + replace_link_ids(dep_links, prefix_id, asn_id) - # Push links to IYP - logging.info('Pushing links to neo4j...') self.iyp.batch_add_links('ORIGINATE', orig_links) self.iyp.batch_add_links('CATEGORIZED', tag_links) self.iyp.batch_add_links('DEPENDS_ON', dep_links) self.iyp.batch_add_links('COUNTRY', country_links) - # Remove downloaded file - os.remove(local_filename) - def unit_test(self): return super().unit_test(['ORIGINATE', 'CATEGORIZED', 'DEPENDS_ON', 'COUNTRY'])