Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor ihr.rov crawler #164

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 64 additions & 90 deletions iyp/crawlers/ihr/rov.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,27 @@
import argparse
import csv
import io
import logging
import os
import sys
from datetime import timezone
from datetime import datetime, timedelta, timezone
from ipaddress import ip_network

import arrow
import lz4.frame
import requests

from iyp import BaseCrawler

# NOTE: Assumes ASNs and Prefixes are already registered in the database. Run
# bgpkit.pfx2asn before this one
from iyp import BaseCrawler, DataNotAvailableError

# URL to the API
URL = 'https://ihr-archive.iijlab.net/ihr/rov/{year}/{month:02d}/{day:02d}/ihr_rov_{year}-{month:02d}-{day:02d}.csv.lz4'
URL = 'https://ihr-archive.iijlab.net/ihr/rov/%Y/%m/%d/ihr_rov_%Y-%m-%d.csv.lz4'
ORG = 'IHR'
NAME = 'ihr.rov'


class lz4Csv:
def __init__(self, filename):
"""Start reading a lz4 compress csv file."""

self.fp = lz4.frame.open(filename)

def __iter__(self):
"""Read file header line and set self.fields."""
line = self.fp.readline()
self.fields = line.decode('utf-8').rstrip().split(',')
return self

def __next__(self):
line = self.fp.readline().decode('utf-8').rstrip()

if len(line) > 0:
return line
else:
raise StopIteration

def close(self):
self.fp.close()
def replace_link_ids(links: list, src_id: dict, dst_id: dict):
for link in links:
link['src_id'] = src_id[link['src_id']]
link['dst_id'] = dst_id[link['dst_id']]


class Crawler(BaseCrawler):
Expand All @@ -52,50 +31,51 @@ def __init__(self, organization, url, name):

def run(self):
"""Fetch data from file and push to IYP."""

today = arrow.utcnow()
url = URL.format(year=today.year, month=today.month, day=today.day)
today = datetime.now(tz=timezone.utc)
max_lookback = today - timedelta(days=7)
url = today.strftime(self.url)
logging.info(url)
req = requests.head(url)
if req.status_code != 200:
today = today.shift(days=-1)
url = URL.format(year=today.year, month=today.month, day=today.day)
while req.status_code != 200 and today > max_lookback:
today -= timedelta(days=1)
url = today.strftime(self.url)
logging.info(url)
req = requests.head(url)
if req.status_code != 200:
today = today.shift(days=-1)
url = URL.format(year=today.year, month=today.month, day=today.day)
if req.status_code != 200:
logging.error('Failed to find data within the specified lookback interval.')
raise DataNotAvailableError('Failed to find data within the specified lookback interval.')

self.reference['reference_url_data'] = url
self.reference['reference_time_modification'] = today.datetime.replace(hour=0,
minute=0,
second=0,
microsecond=0,
tzinfo=timezone.utc)
self.reference['reference_time_modification'] = today.replace(hour=0,
minute=0,
second=0,
microsecond=0,
tzinfo=timezone.utc)

os.makedirs('tmp/', exist_ok=True)
os.system(f'wget {url} -P tmp/')
logging.info(f'Fetching data from: {url}')
req = requests.get(url)
req.raise_for_status()

local_filename = 'tmp/' + url.rpartition('/')[2]
self.csv = lz4Csv(local_filename)
with lz4.frame.open(io.BytesIO(req.content)) as f:
csv_lines = [l.decode('utf-8').rstrip() for l in f]

logging.info('Getting node IDs from neo4j...')
asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn')
prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix')
tag_id = self.iyp.batch_get_nodes_by_single_prop('Tag', 'label')
country_id = self.iyp.batch_get_nodes_by_single_prop('Country', 'country_code')
asns = set()
prefixes = set()
tags = set()
countries = set()

orig_links = []
tag_links = []
dep_links = []
country_links = []
orig_links = list()
tag_links = list()
dep_links = list()
country_links = list()

logging.info('Computing links...')
for line in csv.reader(self.csv, quotechar='"', delimiter=',', skipinitialspace=True):
for rec in csv.DictReader(csv_lines):
# header
# id, timebin, prefix, hege, af, visibility, rpki_status, irr_status,
# delegated_prefix_status, delegated_asn_status, descr, moas, asn_id,
# country_id, originasn_id

rec = dict(zip(self.csv.fields, line))
rec['hege'] = float(rec['hege'])
rec['visibility'] = float(rec['visibility'])
rec['af'] = int(rec['af'])
Expand All @@ -105,77 +85,71 @@ def run(self):
except ValueError as e:
logging.warning(f'Ignoring malformed prefix: "{rec["prefix"]}": {e}')
continue

if prefix not in prefix_id:
prefix_id[prefix] = self.iyp.get_node('Prefix', {'prefix': prefix})
prefixes.add(prefix)

# make status/country/origin links only for lines where asn=originasn
if rec['asn_id'] == rec['originasn_id']:
# Make sure all nodes exist
originasn = int(rec['originasn_id'])
if originasn not in asn_id:
asn_id[originasn] = self.iyp.get_node('AS', {'asn': originasn})

rpki_status = 'RPKI ' + rec['rpki_status']
if rpki_status not in tag_id:
tag_id[rpki_status] = self.iyp.get_node('Tag', {'label': rpki_status})

irr_status = 'IRR ' + rec['irr_status']
if irr_status not in tag_id:
tag_id[irr_status] = self.iyp.get_node('Tag', {'label': irr_status})

cc = rec['country_id']
if cc not in country_id:
country_id[cc] = self.iyp.get_node('Country', {'country_code': cc})

asns.add(originasn)
tags.add(rpki_status)
tags.add(irr_status)
countries.add(cc)

# Compute links
orig_links.append({
'src_id': asn_id[originasn],
'dst_id': prefix_id[prefix],
'src_id': originasn,
'dst_id': prefix,
'props': [self.reference, rec]
})

tag_links.append({
'src_id': prefix_id[prefix],
'dst_id': tag_id[rpki_status],
'src_id': prefix,
'dst_id': rpki_status,
'props': [self.reference, rec]
})

tag_links.append({
'src_id': prefix_id[prefix],
'dst_id': tag_id[irr_status],
'src_id': prefix,
'dst_id': irr_status,
'props': [self.reference, rec]
})

country_links.append({
'src_id': prefix_id[prefix],
'dst_id': country_id[cc],
'src_id': prefix,
'dst_id': cc,
'props': [self.reference]
})

# Dependency links
asn = int(rec['asn_id'])
if asn not in asn_id:
asn_id[asn] = self.iyp.get_node('AS', {'asn': asn})
asns.add(asn)

dep_links.append({
'src_id': prefix_id[prefix],
'dst_id': asn_id[asn],
'src_id': prefix,
'dst_id': asn,
'props': [self.reference, rec]
})

self.csv.close()
asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns)
prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes, all=False)
tag_id = self.iyp.batch_get_nodes_by_single_prop('Tag', 'label', tags, all=False)
country_id = self.iyp.batch_get_nodes_by_single_prop('Country', 'country_code', countries)

replace_link_ids(orig_links, asn_id, prefix_id)
replace_link_ids(tag_links, prefix_id, tag_id)
replace_link_ids(country_links, prefix_id, country_id)
replace_link_ids(dep_links, prefix_id, asn_id)

# Push links to IYP
logging.info('Pushing links to neo4j...')
self.iyp.batch_add_links('ORIGINATE', orig_links)
self.iyp.batch_add_links('CATEGORIZED', tag_links)
self.iyp.batch_add_links('DEPENDS_ON', dep_links)
self.iyp.batch_add_links('COUNTRY', country_links)

# Remove downloaded file
os.remove(local_filename)

def unit_test(self):
return super().unit_test(['ORIGINATE', 'CATEGORIZED', 'DEPENDS_ON', 'COUNTRY'])

Expand Down
Loading