From 85176ceddf24c06cb5e40c40c008f2797ca7e3bb Mon Sep 17 00:00:00 2001 From: Malte Tashiro Date: Mon, 26 Feb 2024 07:22:26 +0000 Subject: [PATCH] Add SimulaMet RIR-data crawler for rDNS data. Co-authored-by: Raffaele Sommese --- ACKNOWLEDGMENTS.md | 9 ++ iyp/crawlers/simulamet/README.md | 23 ++++ iyp/crawlers/simulamet/rirdata_rdns.py | 151 +++++++++++++++++++++++++ requirements.txt | 1 + 4 files changed, 184 insertions(+) create mode 100644 iyp/crawlers/simulamet/README.md create mode 100644 iyp/crawlers/simulamet/rirdata_rdns.py diff --git a/ACKNOWLEDGMENTS.md b/ACKNOWLEDGMENTS.md index 9aae2ef..5dcd6cc 100644 --- a/ACKNOWLEDGMENTS.md +++ b/ACKNOWLEDGMENTS.md @@ -144,6 +144,15 @@ Policy](https://www.peeringdb.com/aup). We use AS names, Atlas measurement information, and RPKI data from the [RIPE NCC](https://www.ripe.net/) and [RIPE Atlas](https://atlas.ripe.net/). +## SimulaMet + +We use rDNS data from [RIR-data.org](https://rir-data.org/), a joint project of +SimulaMet and the University of Twente. + +> Alfred Arouna, Ioana Livadariu, and Mattijs Jonker. "[Lowering the Barriers to Working +> with Public RIR-Level Data.](https://dl.acm.org/doi/10.1145/3606464.3606473)" +> Proceedings of the 2023 Workshop on Applied Networking Research (ANRW '23). + ## Stanford We use the [Stanford ASdb dataset](https://asdb.stanford.edu/) provided by the [Stanford diff --git a/iyp/crawlers/simulamet/README.md b/iyp/crawlers/simulamet/README.md new file mode 100644 index 0000000..2e6f920 --- /dev/null +++ b/iyp/crawlers/simulamet/README.md @@ -0,0 +1,23 @@ +# rDNS RIR data -- https://rir-data.org/ + +"Lowering the Barriers to Working with Public RIR-Level Data" is a joint project of +SimulaMet and the University of Twente with the goal of making WHOIS, route object +delegation, and reverse DNS (rDNS) zone files published by Regional Internet Registries +(RIRs) more accessible. + +IYP imports the rDNS files in a simplified format to indicate which authoritative name +servers are responsible for a prefix. We do not model PTR records and the corresponding +hierarchy but instead add a simple MANAGED_BY link. + +## Graph representation + +```cypher +(:Prefix {prefix: '103.2.57.0/24'})-[:MANAGED_BY {source: 'APNIC', ttl: 172800}]->(:AuthoritativeNameServer {name: 'dns0.iij.ad.jp'}) +``` + +The `source` property indicates from which RIR the information was obtained, the `ttl` +property refers to the time-to-live of the associated SOA record. + +## Dependence + +This crawler is not depending on other crawlers. diff --git a/iyp/crawlers/simulamet/rirdata_rdns.py b/iyp/crawlers/simulamet/rirdata_rdns.py new file mode 100644 index 0000000..0961467 --- /dev/null +++ b/iyp/crawlers/simulamet/rirdata_rdns.py @@ -0,0 +1,151 @@ +import argparse +import logging +import os +import sys +import time +from datetime import datetime, timedelta, timezone + +import pyspark.sql.functions as psf +from pyspark import SparkConf +from pyspark.sql import SparkSession + +from iyp import BaseCrawler, DataNotAvailableError + +URL = 'https://rir-data.org/' +ORG = 'SimulaMet' +NAME = 'simulamet.rirdata_rdns' + + +class Crawler(BaseCrawler): + def __init__(self, organization, url, name): + super().__init__(organization, url, name) + self.reference['reference_url_info'] = 'https://rir-data.org/#reverse-dns' + + # Function to load data from S3 for current or previous days + + def load_data_for_current_or_previous_days(self, spark): + # Get current date + current_date = datetime.now(tz=timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) + + # Attempt to load data for current date or up to 8 days back + MAX_LOOKBACK_IN_DAYS = 8 + PATH_FMT = 's3a://rir-data/rirs-rdns-formatted/type=enriched/year=%Y/month=%m/day=%d/' + for i in range(MAX_LOOKBACK_IN_DAYS + 1): + date_to_load = current_date - timedelta(days=i) + try: + # Generate path for the given date + path = date_to_load.strftime(PATH_FMT) + + # Try to load data + data = spark.read.format('json').option('basePath', + 's3a://rir-data/rirs-rdns-formatted/type=enriched').load(path) + logging.info(f'Data loaded successfully for date {date_to_load}') + self.reference['reference_time_modification'] = date_to_load + return data + except Exception as e: + # Log error + logging.info(e) + logging.info(f'Failed to load data for date {date_to_load}. Trying previous day...') + continue + # If data is still not loaded after attempting for 8 days, throw an Exception + logging.error(f'Failed to load data for current date and up to {MAX_LOOKBACK_IN_DAYS} days back.') + raise DataNotAvailableError(f'Failed to load data for current date and up to{MAX_LOOKBACK_IN_DAYS} days back.') + + def run(self): + # See https://rir-data.org/pyspark-local.html + # Create Spark Config + sparkConf = SparkConf() + sparkConf.setMaster('local[1]') + sparkConf.setAppName('pyspark-{}-{}'.format(os.getuid(), int(time.time()))) + # executors + sparkConf.set('spark.executor.instances', '1') + sparkConf.set('spark.executor.cores', '1') + sparkConf.set('spark.executor.memory', '4G') + sparkConf.set('spark.executor.memoryOverhead', '512M') + # driver + sparkConf.set('spark.driver.cores', '1') + sparkConf.set('spark.driver.memory', '2G') + + # RIR-data.org Object Storage settings + sparkConf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.2') + sparkConf.set('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem') + sparkConf.set('spark.hadoop.fs.s3a.aws.credentials.provider', + 'org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider') + sparkConf.set('spark.hadoop.fs.s3a.endpoint', 'https://data.rir-data.org') + sparkConf.set('spark.hadoop.fs.s3a.connection.ssl.enabled', 'true') + sparkConf.set('spark.hadoop.fs.s3a.signing-algorithm', 'S3SignerType') + sparkConf.set('spark.hadoop.fs.s3a.path.style.access', 'true') + sparkConf.set('spark.hadoop.fs.s3a.block.size', '16M') + sparkConf.set('spark.hadoop.fs.s3a.readahead.range', '1M') + sparkConf.set('spark.hadoop.fs.s3a.experimental.input.fadvise', 'normal') + sparkConf.set('spark.io.file.buffer.size', '67108864') + sparkConf.set('spark.buffer.size', '67108864') + + # Initialize our Spark Session + spark = SparkSession.builder.config(conf=sparkConf).getOrCreate() + spark.sparkContext.setLogLevel('OFF') + + logging.info('Started SparkSession') + + rir_data_df = self.load_data_for_current_or_previous_days(spark) + + rir_data_df = ( + rir_data_df.withColumn('prefix', psf.explode('prefixes')) + .withColumn('auth_ns', psf.explode('rdns.rdatasets.NS')) + .select('auth_ns', 'prefix', psf.col('rdns.ttl').name('ttl'), 'source') + .where("auth_ns!='' and prefix!=''").distinct().toPandas() + ) + # Remove trailing root "." + rir_data_df['auth_ns'] = rir_data_df['auth_ns'].str[:-1] + + logging.info('Reading NSes') + # Get unique nameservers and remove trailing root "." + ns_set = set(rir_data_df['auth_ns'].unique()) + logging.info('Reading Prefixes') + prefix_set = set(rir_data_df['prefix'].unique()) + spark.stop() + + ns_id = self.iyp.batch_get_nodes_by_single_prop('HostName', 'name', ns_set, all=False) + self.iyp.batch_add_node_label(list(ns_id.values()), 'AuthoritativeNameServer') + prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefix_set, all=False) + + logging.info('Computing relationship') + links_managed_by = [] + for relationship in rir_data_df.itertuples(): + links_managed_by.append({ + 'src_id': prefix_id[relationship.prefix], + 'dst_id': ns_id[relationship.auth_ns], + 'props': [self.reference, {'source': relationship.source, 'ttl': relationship.ttl}], + }) + + self.iyp.batch_add_links('MANAGED_BY', links_managed_by) + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument('--unit-test', action='store_true') + args = parser.parse_args() + + scriptname = os.path.basename(sys.argv[0]).replace('/', '_')[0:-3] + FORMAT = '%(asctime)s %(levelname)s %(message)s' + logging.basicConfig( + format=FORMAT, + filename='log/' + scriptname + '.log', + level=logging.INFO, + datefmt='%Y-%m-%d %H:%M:%S' + ) + + logging.info(f'Started: {sys.argv}') + + crawler = Crawler(ORG, URL, NAME) + if args.unit_test: + crawler.unit_test(logging) + else: + crawler.run() + crawler.close() + logging.info(f'Finished: {sys.argv}') + + +if __name__ == '__main__': + main() + sys.exit(0) diff --git a/requirements.txt b/requirements.txt index 5c08f08..00afa5c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,3 +21,4 @@ flake8 pre-commit PyGithub clickhouse_driver +pyspark \ No newline at end of file