Skip to content

Commit

Permalink
Refactor ihr.rov crawler (#164)
Browse files Browse the repository at this point in the history
- Proper lookback interval of one week
- Use requests instead of system wget call
- Keep file in memory
- Follow batch + push approach
  • Loading branch information
m-appel authored Dec 18, 2024
1 parent d21d96c commit 4107e9a
Showing 1 changed file with 64 additions and 90 deletions.
154 changes: 64 additions & 90 deletions iyp/crawlers/ihr/rov.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,27 @@
import argparse
import csv
import io
import logging
import os
import sys
from datetime import timezone
from datetime import datetime, timedelta, timezone
from ipaddress import ip_network

import arrow
import lz4.frame
import requests

from iyp import BaseCrawler

# NOTE: Assumes ASNs and Prefixes are already registered in the database. Run
# bgpkit.pfx2asn before this one
from iyp import BaseCrawler, DataNotAvailableError

# URL to the API
URL = 'https://ihr-archive.iijlab.net/ihr/rov/{year}/{month:02d}/{day:02d}/ihr_rov_{year}-{month:02d}-{day:02d}.csv.lz4'
URL = 'https://ihr-archive.iijlab.net/ihr/rov/%Y/%m/%d/ihr_rov_%Y-%m-%d.csv.lz4'
ORG = 'IHR'
NAME = 'ihr.rov'


class lz4Csv:
def __init__(self, filename):
"""Start reading a lz4 compress csv file."""

self.fp = lz4.frame.open(filename)

def __iter__(self):
"""Read file header line and set self.fields."""
line = self.fp.readline()
self.fields = line.decode('utf-8').rstrip().split(',')
return self

def __next__(self):
line = self.fp.readline().decode('utf-8').rstrip()

if len(line) > 0:
return line
else:
raise StopIteration

def close(self):
self.fp.close()
def replace_link_ids(links: list, src_id: dict, dst_id: dict):
for link in links:
link['src_id'] = src_id[link['src_id']]
link['dst_id'] = dst_id[link['dst_id']]


class Crawler(BaseCrawler):
Expand All @@ -52,50 +31,51 @@ def __init__(self, organization, url, name):

def run(self):
"""Fetch data from file and push to IYP."""

today = arrow.utcnow()
url = URL.format(year=today.year, month=today.month, day=today.day)
today = datetime.now(tz=timezone.utc)
max_lookback = today - timedelta(days=7)
url = today.strftime(self.url)
logging.info(url)
req = requests.head(url)
if req.status_code != 200:
today = today.shift(days=-1)
url = URL.format(year=today.year, month=today.month, day=today.day)
while req.status_code != 200 and today > max_lookback:
today -= timedelta(days=1)
url = today.strftime(self.url)
logging.info(url)
req = requests.head(url)
if req.status_code != 200:
today = today.shift(days=-1)
url = URL.format(year=today.year, month=today.month, day=today.day)
if req.status_code != 200:
logging.error('Failed to find data within the specified lookback interval.')
raise DataNotAvailableError('Failed to find data within the specified lookback interval.')

self.reference['reference_url_data'] = url
self.reference['reference_time_modification'] = today.datetime.replace(hour=0,
minute=0,
second=0,
microsecond=0,
tzinfo=timezone.utc)
self.reference['reference_time_modification'] = today.replace(hour=0,
minute=0,
second=0,
microsecond=0,
tzinfo=timezone.utc)

os.makedirs('tmp/', exist_ok=True)
os.system(f'wget {url} -P tmp/')
logging.info(f'Fetching data from: {url}')
req = requests.get(url)
req.raise_for_status()

local_filename = 'tmp/' + url.rpartition('/')[2]
self.csv = lz4Csv(local_filename)
with lz4.frame.open(io.BytesIO(req.content)) as f:
csv_lines = [l.decode('utf-8').rstrip() for l in f]

logging.info('Getting node IDs from neo4j...')
asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn')
prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix')
tag_id = self.iyp.batch_get_nodes_by_single_prop('Tag', 'label')
country_id = self.iyp.batch_get_nodes_by_single_prop('Country', 'country_code')
asns = set()
prefixes = set()
tags = set()
countries = set()

orig_links = []
tag_links = []
dep_links = []
country_links = []
orig_links = list()
tag_links = list()
dep_links = list()
country_links = list()

logging.info('Computing links...')
for line in csv.reader(self.csv, quotechar='"', delimiter=',', skipinitialspace=True):
for rec in csv.DictReader(csv_lines):
# header
# id, timebin, prefix, hege, af, visibility, rpki_status, irr_status,
# delegated_prefix_status, delegated_asn_status, descr, moas, asn_id,
# country_id, originasn_id

rec = dict(zip(self.csv.fields, line))
rec['hege'] = float(rec['hege'])
rec['visibility'] = float(rec['visibility'])
rec['af'] = int(rec['af'])
Expand All @@ -105,77 +85,71 @@ def run(self):
except ValueError as e:
logging.warning(f'Ignoring malformed prefix: "{rec["prefix"]}": {e}')
continue

if prefix not in prefix_id:
prefix_id[prefix] = self.iyp.get_node('Prefix', {'prefix': prefix})
prefixes.add(prefix)

# make status/country/origin links only for lines where asn=originasn
if rec['asn_id'] == rec['originasn_id']:
# Make sure all nodes exist
originasn = int(rec['originasn_id'])
if originasn not in asn_id:
asn_id[originasn] = self.iyp.get_node('AS', {'asn': originasn})

rpki_status = 'RPKI ' + rec['rpki_status']
if rpki_status not in tag_id:
tag_id[rpki_status] = self.iyp.get_node('Tag', {'label': rpki_status})

irr_status = 'IRR ' + rec['irr_status']
if irr_status not in tag_id:
tag_id[irr_status] = self.iyp.get_node('Tag', {'label': irr_status})

cc = rec['country_id']
if cc not in country_id:
country_id[cc] = self.iyp.get_node('Country', {'country_code': cc})

asns.add(originasn)
tags.add(rpki_status)
tags.add(irr_status)
countries.add(cc)

# Compute links
orig_links.append({
'src_id': asn_id[originasn],
'dst_id': prefix_id[prefix],
'src_id': originasn,
'dst_id': prefix,
'props': [self.reference, rec]
})

tag_links.append({
'src_id': prefix_id[prefix],
'dst_id': tag_id[rpki_status],
'src_id': prefix,
'dst_id': rpki_status,
'props': [self.reference, rec]
})

tag_links.append({
'src_id': prefix_id[prefix],
'dst_id': tag_id[irr_status],
'src_id': prefix,
'dst_id': irr_status,
'props': [self.reference, rec]
})

country_links.append({
'src_id': prefix_id[prefix],
'dst_id': country_id[cc],
'src_id': prefix,
'dst_id': cc,
'props': [self.reference]
})

# Dependency links
asn = int(rec['asn_id'])
if asn not in asn_id:
asn_id[asn] = self.iyp.get_node('AS', {'asn': asn})
asns.add(asn)

dep_links.append({
'src_id': prefix_id[prefix],
'dst_id': asn_id[asn],
'src_id': prefix,
'dst_id': asn,
'props': [self.reference, rec]
})

self.csv.close()
asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns)
prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes, all=False)
tag_id = self.iyp.batch_get_nodes_by_single_prop('Tag', 'label', tags, all=False)
country_id = self.iyp.batch_get_nodes_by_single_prop('Country', 'country_code', countries)

replace_link_ids(orig_links, asn_id, prefix_id)
replace_link_ids(tag_links, prefix_id, tag_id)
replace_link_ids(country_links, prefix_id, country_id)
replace_link_ids(dep_links, prefix_id, asn_id)

# Push links to IYP
logging.info('Pushing links to neo4j...')
self.iyp.batch_add_links('ORIGINATE', orig_links)
self.iyp.batch_add_links('CATEGORIZED', tag_links)
self.iyp.batch_add_links('DEPENDS_ON', dep_links)
self.iyp.batch_add_links('COUNTRY', country_links)

# Remove downloaded file
os.remove(local_filename)

def unit_test(self):
return super().unit_test(['ORIGINATE', 'CATEGORIZED', 'DEPENDS_ON', 'COUNTRY'])

Expand Down

0 comments on commit 4107e9a

Please sign in to comment.