diff --git a/config.json.example b/config.json.example index e2ad6f8..57348e7 100644 --- a/config.json.example +++ b/config.json.example @@ -69,9 +69,16 @@ "iyp.crawlers.pch.daily_routing_snapshots_v6", "iyp.crawlers.emileaben.as_names", "iyp.crawlers.ripe.atlas_probes", + "iyp.crawlers.iana.root_zone", + "iyp.crawlers.alice_lg.amsix", + "iyp.crawlers.alice_lg.bcix", + "iyp.crawlers.alice_lg.decix", + "iyp.crawlers.alice_lg.ixbr", + "iyp.crawlers.alice_lg.linx", + "iyp.crawlers.alice_lg.megaport", + "iyp.crawlers.alice_lg.netnod", "iyp.crawlers.cloudflare.dns_top_locations", - "iyp.crawlers.cloudflare.dns_top_ases", - "iyp.crawlers.iana.root_zone" + "iyp.crawlers.cloudflare.dns_top_ases" ], "post": [ diff --git a/iyp/crawlers/alice_lg/README.md b/iyp/crawlers/alice_lg/README.md new file mode 100644 index 0000000..a7b74af --- /dev/null +++ b/iyp/crawlers/alice_lg/README.md @@ -0,0 +1,47 @@ +# Alice-LG -- https://github.com/alice-lg/alice-lg + +Alice-LG is a BGP looking glass which gets its data from external APIs. + +It is used by some large IXPs (e.g., DE-CIX, LINX, AMS-IX) and IYP imports membership +information by reading the route server neighbors. + +The crawler *can* also import the received routes of all neighbors, however testing has +shown that this takes an unreasonable amount of time for most IXPs due to the tiny +pagination size (250 routes per page). Therefore this functionality is disabled by default. + +List of supported IXPs: + +- AMS-IX (`amsix.py`) +- BCIX (`bcix.py`) +- DE-CIX (`decix.py`) +- IX.br (`ixbr.py`) +- LINX (`linx.py`) +- Megaport (`megaport.py`) +- Netnod (`netnod.py`) + +## Graph representation + +```Cypher +(:AS {asn: 2497})-[:MEMBER_OF {address: '80.81.193.136', routeserver_id: 'rs1_fra_ipv4'}]->(:IXP {name: 'DE-CIX Frankfurt'}) +// Routes are not crawled by default +(:AS {asn: 3333})-[:ORIGINATE {neighbor_id: 'pb_0280_as20562', routeserver_id: 'rs01-bcix-v4'}]->(:Prefix {prefix: '193.0.0.0/21'}) +``` + +There is the possibility of multiple relationships between the same node. However, these +contain different information, e.g., a member is present with multiple interfaces +(`address`) or the information is seen by different route servers (`routeserver_id`). +Similarly, a route can be seen via multiple neighbors (`neighbor_id`) or different route +servers (`routeserver_id`). + +## Dependence + +This crawler requires peering LAN information to map the neighbor IP to an IXP. +Therefore, it should be run after crawlers that create + +```Cypher +(:Prefix)-[:MANAGED_BY]->(:IXP) +``` + +relationships: + +- `iyp.crawlers.peeringdb.ix` diff --git a/iyp/crawlers/alice_lg/__init__.py b/iyp/crawlers/alice_lg/__init__.py new file mode 100644 index 0000000..1f88a24 --- /dev/null +++ b/iyp/crawlers/alice_lg/__init__.py @@ -0,0 +1,402 @@ +import ipaddress +import logging +import os +import sys +from collections import defaultdict +from concurrent.futures import as_completed +from datetime import datetime +from json import JSONDecodeError +from typing import Iterable, Tuple + +import flatdict +import radix +from requests.adapters import HTTPAdapter, Response +from requests_futures.sessions import FuturesSession +from urllib3.util.retry import Retry + +from iyp import BaseCrawler, CacheHandler + +# Alice-LG Rest API +# +# The API provides endpoints for getting +# information from the routeservers / alice datasources. +# +# Endpoints: +# +# Config +# Show /api/v1/config +# +# Routeservers +# List /api/v1/routeservers +# Status /api/v1/routeservers/:id/status +# Neighbors /api/v1/routeservers/:id/neighbors +# Routes /api/v1/routeservers/:id/neighbors/:neighborId/routes +# /api/v1/routeservers/:id/neighbors/:neighborId/routes/received +# /api/v1/routeservers/:id/neighbors/:neighborId/routes/filtered +# /api/v1/routeservers/:id/neighbors/:neighborId/routes/not-exported +# +# Querying +# LookupPrefix /api/v1/lookup/prefix?q= +# LookupNeighbor /api/v1/lookup/neighbor?asn=1235 + +# Model +# /config +# AS of peering LAN -> Not linkable to IXP without manual relationship +# Could model the looking glass, but not interesting I think +# /routeservers +# Model route server? (:RouteServer)-[:ROUTESERVER]->(:IXP) +# group specifies to which IXP the route server belongs. May or may not match name +# of IXP node. + +# Find IXP via neighbor IP -> peeringLAN lookup +# /routeservers/:id/neighbors +# (:AS)-[:MEMBER_OF]->(:IXP) +# Get IXP peering LANs: +# MATCH (p:Prefix)-[:MANAGED_BY]->(i:IXP) +# RETURN p.prefix AS peering_lan, ID(i) AS ixp_qid +# neighbors -> list of neighbors +# neighbor['address'] -> map to prefix +# /routeservers/:id/neighbors/:neighborId/routes/received +# (:AS)-[:ORIGINATE]->(:Prefix) +# received['imported'] -> list of routes +# route['network'] -> prefix +# route['bgp']['as_path'][-1] -> originating ASN + + +class Crawler(BaseCrawler): + """Import IXP members and optionally prefix announcements based on routes received + via members from Alice-LG-based looking glasses.""" + # In principle, the fetching process can be sped up by performing parallel queries. + # However, some tests showed that many looking glasses perform poorly when queried + # in parallel, which is why I leave the functionality in the code, but set the + # default values to not query in parallel. + # Similarly, querying the received routes is infeasible for large IXPs since the + # queries just take too long, which is why the functionality is disabled by default. + + def __init__(self, + organization: str, + url: str, + name: str, + parallel_downloads: int = 1, + fetch_routes: bool = False, + fetch_routes_batch_size: int = 1) -> None: + super().__init__(organization, url, name) + + # URLs to the API + url = url.rstrip('/') + self.urls = { + 'routeservers': f'{url}/routeservers', + 'neighbors': url + '/routeservers/{rs}/neighbors', + 'routes': url + '/routeservers/{rs}/neighbors/{neighbor}/routes/received' + } + cache_file_prefix = f'CACHED.{datetime.now().strftime("%Y%m%d")}.' + self.cache_handler = CacheHandler(self.get_tmp_dir(), cache_file_prefix) + self.workers = parallel_downloads + # List of route server dicts. + self.routeservers = list() + # List of neighbor dicts. Each dict contains information about the route server, + # so we do not keep track of that separately. + self.neighbors = list() + # Dict mapping (routeserver_id, neighbor_id) tuple to a list of route dicts. + self.routes = dict() + # If routes should be fetched or not. + self.fetch_routes = fetch_routes + self.fetch_routes_batch_size = fetch_routes_batch_size + self.__initialize_session() + + def __initialize_session(self) -> None: + self.session = FuturesSession(max_workers=self.workers) + retry = Retry( + backoff_factor=0.1, + status_forcelist=(429, 500, 502, 503, 504), + respect_retry_after_header=True + ) + adapter = HTTPAdapter(max_retries=retry) + self.session.mount('http://', adapter) + self.session.mount('https://', adapter) + + @staticmethod + def decode_json(resp: Response, *args, **kwargs) -> None: + """Process json in background.""" + logging.debug(f'Processing response: {resp.url} Status: {resp.ok}') + + try: + resp.data = resp.json() + except JSONDecodeError as e: + print(f'Failed to retrieve data for {resp.url}', file=sys.stderr) + print(f'Error while reading json data: {e}', file=sys.stderr) + logging.error(f'Error while reading json data: {e}') + logging.error(resp.status_code) + logging.error(resp.headers) + logging.error(resp.text) + resp.data = dict() + + def fetch_urls(self, urls: list, additional_data=list()) -> Iterable: + """Fetch the specified URLs in parallel and yield the status, result, and + optionally an additional data item. + + Since results are yielded in order of completion, which might differ from the + order of URLs in the list, the additional_data list can be used to pass + arbitrary objects (e.g., an identifier) that will be returned together with the + corresponding result. For this purpose the additional_data list, if specified, + must have the same length as the urls list. + """ + if additional_data and len(additional_data) != len(urls): + raise ValueError('Additional data list must have the same length as URL list.') + if not additional_data: + # Create empty data list. + additional_data = [None] * len(urls) + queries = list() + for idx, url in enumerate(urls): + future = self.session.get(url, + hooks={'response': self.decode_json}, + timeout=60) + future.additional_data = additional_data[idx] + queries.append(future) + for future in as_completed(queries): + try: + resp = future.result() + yield resp.ok, resp.data, future.additional_data + except Exception as e: + logging.error(f'Failed to retrieve data for {future}') + logging.error(e) + print(f'Failed to retrieve data for {future}', file=sys.stderr) + print(e, file=sys.stderr) + return False, dict(), None + + def fetch_url(self, url: str) -> Tuple[bool, dict]: + """Helper function for single URL.""" + for status, resp, _ in self.fetch_urls([url]): + return status, resp + return False, dict() + + def __fetch_routeservers(self) -> None: + """Fetch the list of routeservers or load from cache.""" + routeserver_object_name = 'routeservers' + if self.cache_handler.cached_object_exists(routeserver_object_name): + logging.info('Using cached route server information.') + self.routeservers = self.cache_handler.load_cached_object(routeserver_object_name) + else: + print(f'Fetching route servers from {self.urls["routeservers"]}') + logging.info(f'Fetching route servers from {self.urls["routeservers"]}') + is_ok, routeservers_root = self.fetch_url(self.urls['routeservers']) + if not is_ok: + raise Exception('Failed to fetch route servers.') + self.routeservers = routeservers_root['routeservers'] + self.cache_handler.save_cached_object(routeserver_object_name, self.routeservers) + + def __fetch_neighbors(self) -> None: + """Fetch neighbor information in parallel or load from cache.""" + neighbor_object_name = 'neighbors' + if self.cache_handler.cached_object_exists(neighbor_object_name): + logging.info('Using cached neighbor information.') + self.neighbors = self.cache_handler.load_cached_object(neighbor_object_name) + else: + print(f'Fetching neighbor information from {len(self.routeservers)} route servers.') + logging.info(f'Fetching neighbor information from {len(self.routeservers)} route servers.') + neighbor_urls = [self.urls['neighbors'].format(rs=rs['id']) for rs in self.routeservers] + failed_routeservers = list() + for is_ok, neighbor_list_root, routeserver_id in self.fetch_urls(neighbor_urls, + additional_data=self.routeservers): + if not is_ok: + failed_routeservers.append(routeserver_id) + continue + # Spelling of neighbors/neighbours field is not consistent... + if 'neighbors' in neighbor_list_root: + neighbor_list = neighbor_list_root['neighbors'] + elif 'neighbours' in neighbor_list_root: + neighbor_list = neighbor_list_root['neighbours'] + else: + logging.error(f'Missing "neighbors"/"neighbours" field in reply: {neighbor_list_root}') + print(f'Missing "neighbors"/"neighbours" field in reply: {neighbor_list_root}', file=sys.stderr) + continue + self.neighbors += neighbor_list + self.cache_handler.save_cached_object(neighbor_object_name, self.neighbors) + if failed_routeservers: + logging.warning(f'Failed to get neighbor information for {len(failed_routeservers)} routeservers: ' + f'{failed_routeservers}') + + def __fetch_routes(self) -> None: + """Fetch received route information or load from cache.""" + routes_object_name_prefix = 'routes.' + cached_route_objects = 0 + fetch_required = list() + for neighbor in self.neighbors: + if neighbor['routes_received'] == 0: + # No query required. + continue + neighbor_id = neighbor['id'] + routeserver_id = neighbor['routeserver_id'] + key = (routeserver_id, neighbor_id) + object_name = f'{routes_object_name_prefix}{routeserver_id}.{neighbor_id}' + if self.cache_handler.cached_object_exists(object_name): + cached_route_objects += 1 + self.routes[key] = self.cache_handler.load_cached_object(object_name) + else: + fetch_required.append(key) + + total_route_objects = cached_route_objects + len(fetch_required) + logging.info(f'{cached_route_objects}/{total_route_objects} route objects in cache. Fetching ' + f'{len(fetch_required)}') + + if fetch_required: + for i in range(0, len(fetch_required), self.fetch_routes_batch_size): + logging.debug(f'Batch {i}') + batch = fetch_required[i: i + self.fetch_routes_batch_size] + # Fetch in two rounds. First round gets the initial page for all + # neighbors and generates the remaining URLs based on that. Second round + # fetched the remaining pages. + urls = list() + additional_data = list() + for routeserver_id, neighbor_id in batch: + url = self.urls['routes'].format(rs=routeserver_id, neighbor=neighbor_id) + urls.append(url) + additional_data.append((routeserver_id, neighbor_id)) + next_urls = list() + next_additional_data = list() + failed_neighbors = list() + for ok, data, key in self.fetch_urls(urls, additional_data): + if not ok: + failed_neighbors.append(key) + continue + self.routes[key] = data['imported'] + if data['pagination']['total_pages'] > 1: + base_url = self.urls['routes'].format(rs=key[0], neighbor=key[1]) + # Alice LG pagination is zero indexed, i.e., first page is 0. + for page in range(1, min(data['pagination']['total_pages'], 10)): + next_urls.append(f'{base_url}?page={page}') + next_additional_data.append(key) + logging.debug('First round done.') + logging.debug(f'Fetching {len(next_urls)} additional pages.') + failed_pages = defaultdict(int) + for ok, data, key in self.fetch_urls(next_urls, next_additional_data): + if not ok: + failed_pages[key] += 1 + continue + self.routes[key] += data['imported'] + logging.debug('Second round done.') + logging.debug('Caching.') + for routeserver_id, neighbor_id in batch: + key = (routeserver_id, neighbor_id) + if key not in self.routes: + continue + object_name = f'{routes_object_name_prefix}{routeserver_id}.{neighbor_id}' + self.cache_handler.save_cached_object(object_name, self.routes[key]) + + if failed_neighbors: + logging.warning(f'Failed to fetch routes for {len(failed_neighbors)} neighbors: {failed_neighbors}') + if failed_pages: + logging.warning( + f'Failed to fetch {sum(failed_pages.values())} pages for {len(failed_pages)} neighbors:') + for key, count in failed_pages.items(): + logging.warning(f' {key}: {count}') + + def fetch(self) -> None: + tmp_dir = self.get_tmp_dir() + if not os.path.exists(tmp_dir): + logging.info(f'Creating tmp dir: {tmp_dir}') + self.create_tmp_dir() + + self.__fetch_routeservers() + self.__fetch_neighbors() + if self.fetch_routes: + self.__fetch_routes() + + def __get_peering_lans(self) -> radix.Radix: + """Get IXP peering LANs from IYP and return a radix tree containing the QID of + the IXP node in the data['ixp_qid'] field of each tree node.""" + query = """MATCH (p:Prefix)-[:MANAGED_BY]->(i:IXP) + RETURN p.prefix AS peering_lan, ID(i) AS ixp_qid""" + peering_lans = radix.Radix() + for res in self.iyp.tx.run(query): + n = peering_lans.add(res['peering_lan']) + n.data['ixp_qid'] = res['ixp_qid'] + logging.info(f'Fetched {len(peering_lans.nodes())} peering LANs') + return peering_lans + + def run(self) -> None: + self.fetch() + + peering_lans = self.__get_peering_lans() + + # Compute MEMBER_OF relationships from neighbor data. + asns = set() + member_of_rels = list() + logging.info('Iterating neighbors.') + for neighbor in self.neighbors: + member_ip = neighbor['address'] + n = peering_lans.search_best(member_ip) + if n is None: + logging.warning(f'Failed to map member IP to peering LAN: {member_ip}') + continue + member_asn = neighbor['asn'] + if not member_asn or not isinstance(member_asn, int): + logging.warning(f'Malformed member ASN: "{member_asn}"') + continue + asns.add(member_asn) + + # This is a bit silly, but for some neighbors that are in a weird state, + # there can be an empty dict in details:route_changes, which will remain as + # a FlatDict. Since neo4j does not like maps as properties, remove any empty + # dicts left in the property. + flattened_neighbor = dict(flatdict.FlatDict(neighbor)) + if ('details:route_changes' in flattened_neighbor + and isinstance(flattened_neighbor['details:route_changes'], flatdict.FlatDict)): + flattened_neighbor.pop('details:route_changes') + self.reference['reference_url'] = self.urls['neighbors'].format(rs=neighbor['routeserver_id']) + member_of_rels.append({'src_id': member_asn, # Translate to QID later. + 'dst_id': n.data['ixp_qid'], + 'props': [flattened_neighbor, self.reference.copy()]}) + + # Compute ORIGINATE relationships from received routes. + prefixes = set() + originate_rels = list() + if self.fetch_routes: + logging.info('Iterating routes.') + for (routeserver_id, neighbor_id), routes in self.routes.items(): + self.reference['reference_url'] = self.urls['routes'].format(rs=routeserver_id, neighbor=neighbor_id) + for route in routes: + prefix = ipaddress.ip_network(route['network']).compressed + origin_asn = route['bgp']['as_path'][-1] + prefixes.add(prefix) + asns.add(origin_asn) + # route object contains lists of lists so FlatterDict is required. + # Similar to above, there can be an empty dicts inside the object, + # but for different keys, which is why we just iterate over all of + # them. + flattened_route = dict(flatdict.FlatterDict(route)) + to_delete = list() + for k, v in flattened_route.items(): + if isinstance(v, flatdict.FlatterDict): + to_delete.append(k) + for k in to_delete: + flattened_route.pop(k) + flattened_route['routeserver_id'] = routeserver_id + originate_rels.append({'src_id': origin_asn, # Translate to QIDs later. + 'dst_id': prefix, + 'props': [flattened_route, self.reference.copy()]}) + + # Get/create nodes. + logging.info(f'Getting {len(asns)} AS nodes.') + asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns, all=False) + prefix_id = dict() + if prefixes: + logging.info(f'Getting {len(prefixes)} Prefix nodes.') + prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes, all=False) + + # Translate raw values to QID. + for relationship in member_of_rels: + asn = relationship['src_id'] + relationship['src_id'] = asn_id[asn] + for relationship in originate_rels: + asn = relationship['src_id'] + prefix = relationship['dst_id'] + relationship['src_id'] = asn_id[asn] + relationship['dst_id'] = prefix_id[prefix] + + # Push relationships. + logging.info(f'Pushing {len(member_of_rels)} MEMBER_OF relationships.') + self.iyp.batch_add_links('MEMBER_OF', member_of_rels) + if originate_rels: + logging.info(f'Pushing {len(originate_rels)} ORIGINATE relationships.') + self.iyp.batch_add_links('ORIGINATE', originate_rels) diff --git a/iyp/crawlers/alice_lg/amsix.py b/iyp/crawlers/alice_lg/amsix.py index c1e3da8..7328645 100644 --- a/iyp/crawlers/alice_lg/amsix.py +++ b/iyp/crawlers/alice_lg/amsix.py @@ -1,25 +1,40 @@ +import argparse import logging +import os import sys from iyp.crawlers.alice_lg import Crawler +ORG = 'Alice-LG' URL = 'https://lg.ams-ix.net/api/v1/' +NAME = 'alice_lg.amsix' -# Main program -if __name__ == '__main__': - scriptname = sys.argv[0].replace('/', '_')[0:-3] - FORMAT = '%(asctime)s %(processName)s %(message)s' +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument('--unit-test', action='store_true') + args = parser.parse_args() + + scriptname = os.path.basename(sys.argv[0]).replace('/', '_')[0:-3] + FORMAT = '%(asctime)s %(levelname)s %(message)s' logging.basicConfig( format=FORMAT, filename='log/' + scriptname + '.log', level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S' ) - logging.info('Started: %s' % sys.argv) - crawler = Crawler(URL) - if len(sys.argv) > 1 and sys.argv[1] == 'unit_test': + logging.info(f'Started: {sys.argv}') + + crawler = Crawler(ORG, URL, NAME) + if args.unit_test: crawler.unit_test(logging) else: crawler.run() + crawler.close() + logging.info(f'Finished: {sys.argv}') + + +if __name__ == '__main__': + main() + sys.exit(0) diff --git a/iyp/crawlers/alice_lg/bcix.py b/iyp/crawlers/alice_lg/bcix.py new file mode 100644 index 0000000..66bc993 --- /dev/null +++ b/iyp/crawlers/alice_lg/bcix.py @@ -0,0 +1,40 @@ +import argparse +import logging +import os +import sys + +from iyp.crawlers.alice_lg import Crawler + +ORG = 'Alice-LG' +URL = 'https://lg.bcix.de/api/v1/' +NAME = 'alice_lg.bcix' + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument('--unit-test', action='store_true') + args = parser.parse_args() + + scriptname = os.path.basename(sys.argv[0]).replace('/', '_')[0:-3] + FORMAT = '%(asctime)s %(levelname)s %(message)s' + logging.basicConfig( + format=FORMAT, + filename='log/' + scriptname + '.log', + level=logging.INFO, + datefmt='%Y-%m-%d %H:%M:%S' + ) + + logging.info(f'Started: {sys.argv}') + + crawler = Crawler(ORG, URL, NAME) + if args.unit_test: + crawler.unit_test(logging) + else: + crawler.run() + crawler.close() + logging.info(f'Finished: {sys.argv}') + + +if __name__ == '__main__': + main() + sys.exit(0) diff --git a/iyp/crawlers/alice_lg/decix.py b/iyp/crawlers/alice_lg/decix.py index b0f1cbe..8a3aa25 100644 --- a/iyp/crawlers/alice_lg/decix.py +++ b/iyp/crawlers/alice_lg/decix.py @@ -1,25 +1,40 @@ +import argparse import logging +import os import sys from iyp.crawlers.alice_lg import Crawler +ORG = 'Alice-LG' URL = 'https://lg.de-cix.net/api/v1/' +NAME = 'alice_lg.decix' -# Main program -if __name__ == '__main__': - scriptname = sys.argv[0].replace('/', '_')[0:-3] - FORMAT = '%(asctime)s %(processName)s %(message)s' +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument('--unit-test', action='store_true') + args = parser.parse_args() + + scriptname = os.path.basename(sys.argv[0]).replace('/', '_')[0:-3] + FORMAT = '%(asctime)s %(levelname)s %(message)s' logging.basicConfig( format=FORMAT, filename='log/' + scriptname + '.log', level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S' ) - logging.info('Started: %s' % sys.argv) - crawler = Crawler(URL) - if len(sys.argv) > 1 and sys.argv[1] == 'unit_test': + logging.info(f'Started: {sys.argv}') + + crawler = Crawler(ORG, URL, NAME) + if args.unit_test: crawler.unit_test(logging) else: crawler.run() + crawler.close() + logging.info(f'Finished: {sys.argv}') + + +if __name__ == '__main__': + main() + sys.exit(0) diff --git a/iyp/crawlers/alice_lg/ecix.py b/iyp/crawlers/alice_lg/ecix.py deleted file mode 100644 index 970cc13..0000000 --- a/iyp/crawlers/alice_lg/ecix.py +++ /dev/null @@ -1,25 +0,0 @@ -import logging -import sys - -from iyp.crawlers.alice_lg import Crawler - -URL = 'https://lg.megaport.com/api/v1/' - -# Main program -if __name__ == '__main__': - - scriptname = sys.argv[0].replace('/', '_')[0:-3] - FORMAT = '%(asctime)s %(processName)s %(message)s' - logging.basicConfig( - format=FORMAT, - filename='log/' + scriptname + '.log', - level=logging.INFO, - datefmt='%Y-%m-%d %H:%M:%S' - ) - logging.info('Started: %s' % sys.argv) - - crawler = Crawler(URL) - if len(sys.argv) > 1 and sys.argv[1] == 'unit_test': - crawler.unit_test(logging) - else: - crawler.run() diff --git a/iyp/crawlers/alice_lg/ixbr.py b/iyp/crawlers/alice_lg/ixbr.py new file mode 100644 index 0000000..548701a --- /dev/null +++ b/iyp/crawlers/alice_lg/ixbr.py @@ -0,0 +1,40 @@ +import argparse +import logging +import os +import sys + +from iyp.crawlers.alice_lg import Crawler + +ORG = 'Alice-LG' +URL = 'https://lg.ix.br/api/v1/' +NAME = 'alice_lg.ixbr' + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument('--unit-test', action='store_true') + args = parser.parse_args() + + scriptname = os.path.basename(sys.argv[0]).replace('/', '_')[0:-3] + FORMAT = '%(asctime)s %(levelname)s %(message)s' + logging.basicConfig( + format=FORMAT, + filename='log/' + scriptname + '.log', + level=logging.INFO, + datefmt='%Y-%m-%d %H:%M:%S' + ) + + logging.info(f'Started: {sys.argv}') + + crawler = Crawler(ORG, URL, NAME) + if args.unit_test: + crawler.unit_test(logging) + else: + crawler.run() + crawler.close() + logging.info(f'Finished: {sys.argv}') + + +if __name__ == '__main__': + main() + sys.exit(0) diff --git a/iyp/crawlers/alice_lg/linx.py b/iyp/crawlers/alice_lg/linx.py index d4058e7..ef67bd2 100644 --- a/iyp/crawlers/alice_lg/linx.py +++ b/iyp/crawlers/alice_lg/linx.py @@ -1,25 +1,40 @@ +import argparse import logging +import os import sys from iyp.crawlers.alice_lg import Crawler +ORG = 'Alice-LG' URL = 'https://alice-rs.linx.net/api/v1/' +NAME = 'alice_lg.linx' -# Main program -if __name__ == '__main__': - scriptname = sys.argv[0].replace('/', '_')[0:-3] - FORMAT = '%(asctime)s %(processName)s %(message)s' +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument('--unit-test', action='store_true') + args = parser.parse_args() + + scriptname = os.path.basename(sys.argv[0]).replace('/', '_')[0:-3] + FORMAT = '%(asctime)s %(levelname)s %(message)s' logging.basicConfig( format=FORMAT, filename='log/' + scriptname + '.log', level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S' ) - logging.info('Started: %s' % sys.argv) - crawler = Crawler(URL) - if len(sys.argv) > 1 and sys.argv[1] == 'unit_test': + logging.info(f'Started: {sys.argv}') + + crawler = Crawler(ORG, URL, NAME) + if args.unit_test: crawler.unit_test(logging) else: crawler.run() + crawler.close() + logging.info(f'Finished: {sys.argv}') + + +if __name__ == '__main__': + main() + sys.exit(0) diff --git a/iyp/crawlers/alice_lg/megaport.py b/iyp/crawlers/alice_lg/megaport.py new file mode 100644 index 0000000..8080f45 --- /dev/null +++ b/iyp/crawlers/alice_lg/megaport.py @@ -0,0 +1,40 @@ +import argparse +import logging +import os +import sys + +from iyp.crawlers.alice_lg import Crawler + +ORG = 'Alice-LG' +URL = 'https://lg.megaport.com/api/v1/' +NAME = 'alice_lg.megaport' + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument('--unit-test', action='store_true') + args = parser.parse_args() + + scriptname = os.path.basename(sys.argv[0]).replace('/', '_')[0:-3] + FORMAT = '%(asctime)s %(levelname)s %(message)s' + logging.basicConfig( + format=FORMAT, + filename='log/' + scriptname + '.log', + level=logging.INFO, + datefmt='%Y-%m-%d %H:%M:%S' + ) + + logging.info(f'Started: {sys.argv}') + + crawler = Crawler(ORG, URL, NAME) + if args.unit_test: + crawler.unit_test(logging) + else: + crawler.run() + crawler.close() + logging.info(f'Finished: {sys.argv}') + + +if __name__ == '__main__': + main() + sys.exit(0) diff --git a/iyp/crawlers/alice_lg/netnod.py b/iyp/crawlers/alice_lg/netnod.py new file mode 100644 index 0000000..8ba89f4 --- /dev/null +++ b/iyp/crawlers/alice_lg/netnod.py @@ -0,0 +1,40 @@ +import argparse +import logging +import os +import sys + +from iyp.crawlers.alice_lg import Crawler + +ORG = 'Alice-LG' +URL = 'https://lg.netnod.se/api/v1/' +NAME = 'alice_lg.netnod' + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument('--unit-test', action='store_true') + args = parser.parse_args() + + scriptname = os.path.basename(sys.argv[0]).replace('/', '_')[0:-3] + FORMAT = '%(asctime)s %(levelname)s %(message)s' + logging.basicConfig( + format=FORMAT, + filename='log/' + scriptname + '.log', + level=logging.INFO, + datefmt='%Y-%m-%d %H:%M:%S' + ) + + logging.info(f'Started: {sys.argv}') + + crawler = Crawler(ORG, URL, NAME) + if args.unit_test: + crawler.unit_test(logging) + else: + crawler.run() + crawler.close() + logging.info(f'Finished: {sys.argv}') + + +if __name__ == '__main__': + main() + sys.exit(0) diff --git a/iyp/crawlers/alice_lg/to_delete.py b/iyp/crawlers/alice_lg/to_delete.py deleted file mode 100644 index ee1c2dc..0000000 --- a/iyp/crawlers/alice_lg/to_delete.py +++ /dev/null @@ -1,217 +0,0 @@ -import json -import logging -import sys - -import requests -from requests.adapters import HTTPAdapter -from urllib3.util.retry import Retry - -from iyp.tools.ip2plan import ip2plan -from iyp.wiki.wikihandy import Wikihandy - -# TODO do prefix lookups before updating wiki -# (e.g. https://lg.de-cix.net/api/v1/lookup/prefix?q=217.115.0.0) -# so we can add all information at once. -# TODO keep track of which prefixes have been added and skip the lookup for -# already seen prefixes -# TODO keep track of prefixes per routeserver so we might not even have to do -# the neighbor query if we already saw all exported prefixes (e.g. google) - - -class Crawler(object): - def __init__(self, url): - """Initialize wikihandy and http session. - - url is the API endpoint (e.g. https://lg.de-cix.net/api/v1/) - """ - - # URLs to the API - self.urls = { - 'config': url + '/config', - 'routeservers': url + '/routeservers', - 'routes': url + '/routeservers/{rs}/neighbors/{neighbor}/routes/received', - 'neighbors': url + '/routeservers/{rs}/neighbors' - } - - # Session object to fetch peeringdb data - retries = Retry(total=10, - backoff_factor=0.1, - status_forcelist=[104, 500, 502, 503, 504]) - - self.http_session = requests.Session() - self.http_session.mount('https://', HTTPAdapter(max_retries=retries)) - - # Helper for wiki access - self.wh = Wikihandy() - - self.rs_config = self.fetch(self.urls['config']) - self.rs_asn_qid = self.wh.asn2qid(self.rs_config['asn'], create=True) - - self.ip2plan = ip2plan(self.wh) - - def fetch(self, url): - try: - req = self.http_session.get(url) - except requests.exceptions.RetryError as e: - logging.error(f'Error could not fetch: {url}') - logging.error(e) - return None - - if req.status_code != 200: - return None - return json.loads(req.text) - - def run(self): - """Fetch data from API and push to wikibase.""" - - routeservers = self.fetch(self.urls['routeservers'])['routeservers'] - - # For each routeserver - for rs in routeservers: - sys.stderr.write(f'Processing route server {rs["name"]}\n') - - # route server neighbors - self.url_neighbor = self.urls['neighbors'].format(rs=rs['id']) - neighbors = self.fetch(self.url_neighbor)['neighbours'] - - # find corresponding IXP - self.org_qid = None - self.ix_qid = None - - # Find the peering LAN using neighbors IP addresses - for neighbor in neighbors: - peering_lan = self.ip2plan(neighbor['address']) - if peering_lan is not None: - self.org_qid = peering_lan['org_qid'] - self.ix_qid = peering_lan['ix_qid'] - break - - if self.org_qid is None: - logging.error(f'Could not find the IXP/organization corresponding to routeserver {rs}.') - logging.error("Is the IXP's peering LAN registered in IYP or PeeringDB?") - continue - - # Register/update route server - self.reference = [ - (self.wh.get_pid('source'), self.org_qid), - (self.wh.get_pid('reference URL'), self.urls['routeservers']), - (self.wh.get_pid('point in time'), self.wh.today()), - ] - self.update_rs(rs) - - self.reference_neighbor = [ - (self.wh.get_pid('source'), self.org_qid), - (self.wh.get_pid('reference URL'), self.url_neighbor), - (self.wh.get_pid('point in time'), self.wh.today()), - ] - self.qualifier_neighbor = [ - (self.wh.get_pid('managed by'), self.rs_qid), - ] - - for neighbor in neighbors: - - sys.stderr.write(f'Processing neighbor {neighbor["id"]}\n') - self.update_neighbor(neighbor) - - self.url_route = self.urls['routes'].format(rs=rs['id'], neighbor=neighbor['id']) - self.reference_route = [ - (self.wh.get_pid('source'), self.org_qid), - (self.wh.get_pid('reference URL'), self.url_route), - (self.wh.get_pid('point in time'), self.wh.today()), - ] - - asn_qid = self.wh.asn2qid(neighbor['asn'], create=True) - self.qualifier_route = [ - (self.wh.get_pid('imported from'), asn_qid) - ] - - routes = self.fetch(self.url_route) - if routes is None: - continue - nb_pages = routes['pagination']['total_pages'] - # Imported routes - for p in range(nb_pages): - # fetch all subsequent pages - if p != 0: - routes = self.fetch(self.url_route + f'?page={p}') - - for i, route in enumerate(routes['imported']): - self.update_route(route, 'imported') - sys.stderr.write(f'\rProcessing page {p+1}/{nb_pages} {i+1}/{len(routes["imported"])} routes') - - sys.stderr.write('\n') - - def update_route(self, route, status): - """Update route data.""" - - asn_qid = self.wh.asn2qid(route['bgp']['as_path'][-1], create=True) - # Properties - statements = [ - [self.wh.get_pid('appeared in'), self.rs_qid, - self.reference_route, self.qualifier_route]] - statements.append([self.wh.get_pid('originated by'), asn_qid, self.reference_route]) - prefix_qid = self.wh.prefix2qid(route['network'], create=True) - self.wh.upsert_statements('update from route server API', prefix_qid, statements) - - def update_neighbor(self, neighbor): - """Update AS neighbor data.""" - - # Properties - statements = [[self.wh.get_pid('external ID'), neighbor['id'], - self.reference_neighbor, self.qualifier_neighbor]] - asn_qid = self.wh.asn2qid(neighbor['asn'], create=True) - self.wh.upsert_statements('update from route server API', asn_qid, statements) - - def update_rs(self, rs): - """Update route server data or create if it's not already there.""" - - reference_config = [ - (self.wh.get_pid('source'), self.org_qid), - (self.wh.get_pid('reference URL'), self.urls['routeservers']), - (self.wh.get_pid('point in time'), self.wh.today()), - ] - - # Properties - statements = [] - - # set ASN - statements.append([self.wh.get_pid('autonomous system number'), str(self.rs_config['asn']), reference_config]) - - # set org - statements.append([self.wh.get_pid('managed by'), self.org_qid, self.reference]) - - # set IXP - statements.append([self.wh.get_pid('part of'), self.ix_qid, self.reference]) - - # set external id - statements.append([self.wh.get_pid('external ID'), rs['id'], self.reference]) - - # Commit to wikibase - self.rs_qid = self.wh.get_qid(rs['name'], - create={ - 'summary': 'add route server from Alice API', - 'description': f"Route server in {rs['group']}", - 'statements': [[self.wh.get_pid('instance of'), self.wh.get_qid('route server')]] - }) - self.wh.upsert_statements('update from route server API', self.rs_qid, statements) - - -# Main program -if __name__ == '__main__': - URL = 'https://lg.de-cix.net/api/v1/' - - scriptname = sys.argv[0].replace('/', '_')[0:-3] - FORMAT = '%(asctime)s %(processName)s %(message)s' - logging.basicConfig( - format=FORMAT, - filename='log/' + scriptname + '.log', - level=logging.INFO, - datefmt='%Y-%m-%d %H:%M:%S' - ) - logging.info('Started: %s' % sys.argv) - - crawler = Crawler(URL) - if len(sys.argv) > 1 and sys.argv[1] == 'unit_test': - crawler.unit_test(logging) - else: - crawler.run()