From c67213184f42e21d6a7d62d59a593a2a24c43c9e Mon Sep 17 00:00:00 2001 From: Malte Tashiro Date: Wed, 7 Feb 2024 09:06:10 +0000 Subject: [PATCH] Add OpenINTEL DNS dependency crawler Integrate with existing files and remove some unnecessary stuff. Co-authored-by: Raffaele Sommese --- config.json.example | 2 + iyp/crawlers/openintel/__init__.py | 145 ++++++++++++++++---- iyp/crawlers/openintel/dns_dependency_jp.py | 45 ++++++ iyp/crawlers/openintel/dns_dependency_nl.py | 45 ++++++ 4 files changed, 213 insertions(+), 24 deletions(-) create mode 100644 iyp/crawlers/openintel/dns_dependency_jp.py create mode 100644 iyp/crawlers/openintel/dns_dependency_nl.py diff --git a/config.json.example b/config.json.example index 81134ca..a281435 100644 --- a/config.json.example +++ b/config.json.example @@ -81,6 +81,8 @@ "iyp.crawlers.alice_lg.linx", "iyp.crawlers.alice_lg.megaport", "iyp.crawlers.alice_lg.netnod", + "iyp.crawlers.openintel.dns_dependency_nl", + "iyp.crawlers.openintel.dns_dependency_jp", "iyp.crawlers.cloudflare.dns_top_locations", "iyp.crawlers.cloudflare.dns_top_ases" ], diff --git a/iyp/crawlers/openintel/__init__.py b/iyp/crawlers/openintel/__init__.py index d255205..a8a6dbc 100644 --- a/iyp/crawlers/openintel/__init__.py +++ b/iyp/crawlers/openintel/__init__.py @@ -1,12 +1,12 @@ # Simple Python script to fetch domain name to IP address mappings from OpenINTEL data -# Based on code from Mattijs Jonker +# OpenIntelCrawler is based on code from Mattijs Jonker import argparse -import datetime import json import logging import os import tempfile +from datetime import datetime, timedelta, timezone from ipaddress import IPv6Address import arrow @@ -15,15 +15,11 @@ import pandas as pd import requests -from iyp import BaseCrawler +from iyp import BaseCrawler, RequestStatusError TMP_DIR = './tmp' os.makedirs(TMP_DIR, exist_ok=True) -URL = 'https://data.openintel.nl/data/' -ORG = 'OpenINTEL' -NAME = 'openintel.*' - # credentials OPENINTEL_ACCESS_KEY = '' OPENINTEL_SECRET_KEY = '' @@ -36,7 +32,7 @@ def valid_date(s): try: - return datetime.datetime.strptime(s, '%Y-%m-%d') + return datetime.strptime(s, '%Y-%m-%d') except ValueError: msg = 'not a valid ISO 8601 date: {0!r}'.format(s) raise argparse.ArgumentTypeError(msg) @@ -78,22 +74,24 @@ def get_parquet(self): # check on the website if yesterday's data is available yesterday = arrow.utcnow().shift(days=-1) - url = URL.format(year=yesterday.year, month=yesterday.month, day=yesterday.day) - try: - req = requests.head(url) - - attempt = 3 - while req.status_code != 200 and attempt > 0: - print(req.status_code) - attempt -= 1 - yesterday = yesterday.shift(days=-1) - url = URL.format(year=yesterday.year, month=yesterday.month, day=yesterday.day) - req = requests.head(url) - - except requests.exceptions.ConnectionError: - logging.warning("Cannot reach OpenINTEL website, try yesterday's data") - yesterday = arrow.utcnow().shift(days=-1) - url = URL.format(year=yesterday.year, month=yesterday.month, day=yesterday.day) + # FIXME Check at the proper place. Remove flake8 exception afterwards. + # flake8: noqa + # url = URL.format(year=yesterday.year, month=yesterday.month, day=yesterday.day) + # try: + # req = requests.head(url) + + # attempt = 3 + # while req.status_code != 200 and attempt > 0: + # print(req.status_code) + # attempt -= 1 + # yesterday = yesterday.shift(days=-1) + # url = URL.format(year=yesterday.year, month=yesterday.month, day=yesterday.day) + # req = requests.head(url) + + # except requests.exceptions.ConnectionError: + # logging.warning("Cannot reach OpenINTEL website, try yesterday's data") + # yesterday = arrow.utcnow().shift(days=-1) + # url = URL.format(year=yesterday.year, month=yesterday.month, day=yesterday.day) logging.warning(f'Fetching data for {yesterday}') @@ -245,3 +243,102 @@ def run(self): self.iyp.batch_add_links('RESOLVES_TO', res_links) self.iyp.batch_add_links('MANAGED_BY', mng_links) self.iyp.batch_add_links('PART_OF', partof_links) + + +class DnsDependencyCrawler(BaseCrawler): + + def __init__(self, organization, url, name): + super().__init__(organization, url, name) + + def run(self): + # Extract current date for partitioning + logging.info('Probing available data') + max_lookback_in_weeks = 1 + for lookback in range(0, max_lookback_in_weeks + 1): + current_date = datetime.now(tz=timezone.utc) - timedelta(weeks=lookback) + year = current_date.strftime('%Y') + week = current_date.strftime('%U') + base_url = f'{self.reference["reference_url"]}/year={year}/week={week}' + probe_url = f'{base_url}/domain_nodes.json.gz' + if requests.head(probe_url).ok: + logging.info(f'Using year={year}/week={week} ({current_date.strftime("%Y-%m-%d")})') + break + else: + logging.error('Failed to find data within the specified lookback interval.') + raise RequestStatusError('Failed to find data within the specified lookback interval.') + + logging.info('Reading domain names') + domains = pd.read_json(f'{base_url}/domain_nodes.json.gz', lines=True) + logging.info('Reading host names') + hosts = pd.read_json(f'{base_url}/host_nodes.json.gz', lines=True) + logging.info('Reading IPs') + ips = pd.read_json(f'{base_url}/ip_nodes.json.gz', lines=True) + logging.info('Reading connections') + connections = pd.read_json(f'{base_url}/connections.json.gz', lines=True) + + unique_domain_names = set(domains['name']) + unique_host_names = set(hosts['name']) + unique_ips = set(ips['address']) + logging.info(f'Pushing/getting {len(unique_domain_names)} DomainName {len(unique_host_names)} HostName ' + f'{len(unique_ips)} IP nodes...') + domains_id = self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name', unique_domain_names) + hosts_id = self.iyp.batch_get_nodes_by_single_prop('HostName', 'name', unique_host_names) + ips_id = self.iyp.batch_get_nodes_by_single_prop('IP', 'ip', unique_ips) + + links_parent = list() + links_part_of = list() + links_alias_of = list() + links_managed_by = list() + links_resolves_to = list() + + logging.info('Computing relationships...') + start_ts = datetime.now().timestamp() + for index, connection in connections.iterrows(): + if connection['relation_name'] == 'PARENT': + links_parent.append({ + 'src_id': domains_id[connection['from_nodeKey']], + 'dst_id': domains_id[connection['to_nodeKey']], + 'props': [self.reference, connection['properties']], + }) + elif connection['relation_name'] == 'MANAGED_BY': + links_managed_by.append({ + 'src_id': domains_id[connection['from_nodeKey']], + 'dst_id': hosts_id[connection['to_nodeKey']], + 'props': [self.reference, connection['properties']], + }) + elif connection['relation_name'] == 'PART_OF': + links_part_of.append({ + 'src_id': hosts_id[connection['from_nodeKey']], + 'dst_id': domains_id[connection['to_nodeKey']], + 'props': [self.reference, connection['properties']], + }) + elif connection['relation_name'] == 'ALIAS_OF': + links_alias_of.append({ + 'src_id': hosts_id[connection['from_nodeKey']], + 'dst_id': hosts_id[connection['to_nodeKey']], + 'props': [self.reference, connection['properties']], + }) + elif connection['relation_name'] == 'RESOLVES_TO': + links_resolves_to.append({ + 'src_id': hosts_id[connection['from_nodeKey']], + 'dst_id': ips_id[connection['to_nodeKey']], + 'props': [self.reference, connection['properties']], + }) + else: + logging.error(f'Unknown relationship type: {connection["relation_name"]}') + stop_ts = datetime.now().timestamp() + logging.info(f'{stop_ts - start_ts:.2f}s elapsed') + + # Push all links to IYP + logging.info(f'Pushing {len(links_parent)} PARENT {len(links_part_of)} PART_OF {len(links_alias_of)} ALIAS_OF ' + f'{len(links_managed_by)} MANAGED_BY {len(links_resolves_to)} RESOLVES_TO relationships...') + self.iyp.batch_add_links('PARENT', links_parent) + self.iyp.batch_add_links('PART_OF', links_part_of) + self.iyp.batch_add_links('ALIAS_OF', links_alias_of) + self.iyp.batch_add_links('MANAGED_BY', links_managed_by) + self.iyp.batch_add_links('RESOLVES_TO', links_resolves_to) + + # Push the Authoritative NS Label + ns_id = [link['dst_id'] for link in links_managed_by] + logging.info(f'Adding AuthoritativeNameServer label to {len(ns_id)} nodes') + self.iyp.batch_add_node_label(ns_id, 'AuthoritativeNameServer') diff --git a/iyp/crawlers/openintel/dns_dependency_jp.py b/iyp/crawlers/openintel/dns_dependency_jp.py new file mode 100644 index 0000000..3a3993f --- /dev/null +++ b/iyp/crawlers/openintel/dns_dependency_jp.py @@ -0,0 +1,45 @@ +import argparse +import logging +import os +import sys + +from iyp.crawlers.openintel import DnsDependencyCrawler + +URL = 'https://storage.dacs.utwente.nl/sommeser-dnsdep/JP' +ORG = 'OpenINTEL' +NAME = 'openintel.dns_dependency_jp' + + +class Crawler(DnsDependencyCrawler): + def __init__(self, organization, url, name): + super().__init__(organization, url, name) + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument('--unit-test', action='store_true') + args = parser.parse_args() + + scriptname = os.path.basename(sys.argv[0]).replace('/', '_')[0:-3] + FORMAT = '%(asctime)s %(levelname)s %(message)s' + logging.basicConfig( + format=FORMAT, + filename='log/' + scriptname + '.log', + level=logging.INFO, + datefmt='%Y-%m-%d %H:%M:%S' + ) + + logging.info(f'Started: {sys.argv}') + + crawler = Crawler(ORG, URL, NAME) + if args.unit_test: + crawler.unit_test(logging) + else: + crawler.run() + crawler.close() + logging.info(f'Finished: {sys.argv}') + + +if __name__ == '__main__': + main() + sys.exit(0) diff --git a/iyp/crawlers/openintel/dns_dependency_nl.py b/iyp/crawlers/openintel/dns_dependency_nl.py new file mode 100644 index 0000000..04c6729 --- /dev/null +++ b/iyp/crawlers/openintel/dns_dependency_nl.py @@ -0,0 +1,45 @@ +import argparse +import logging +import os +import sys + +from iyp.crawlers.openintel import DnsDependencyCrawler + +URL = 'https://storage.dacs.utwente.nl/sommeser-dnsdep/NL' +ORG = 'OpenINTEL' +NAME = 'openintel.dns_dependency_nl' + + +class Crawler(DnsDependencyCrawler): + def __init__(self, organization, url, name): + super().__init__(organization, url, name) + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument('--unit-test', action='store_true') + args = parser.parse_args() + + scriptname = os.path.basename(sys.argv[0]).replace('/', '_')[0:-3] + FORMAT = '%(asctime)s %(levelname)s %(message)s' + logging.basicConfig( + format=FORMAT, + filename='log/' + scriptname + '.log', + level=logging.INFO, + datefmt='%Y-%m-%d %H:%M:%S' + ) + + logging.info(f'Started: {sys.argv}') + + crawler = Crawler(ORG, URL, NAME) + if args.unit_test: + crawler.unit_test(logging) + else: + crawler.run() + crawler.close() + logging.info(f'Finished: {sys.argv}') + + +if __name__ == '__main__': + main() + sys.exit(0)