diff --git a/iyp/crawlers/openintel/__init__.py b/iyp/crawlers/openintel/__init__.py index 9b79de3..d476c87 100644 --- a/iyp/crawlers/openintel/__init__.py +++ b/iyp/crawlers/openintel/__init__.py @@ -5,8 +5,10 @@ import logging import os import tempfile +from collections import defaultdict from datetime import datetime, timedelta, timezone from ipaddress import IPv6Address +from itertools import pairwise import arrow import boto3 @@ -125,18 +127,40 @@ def get_parquet(self): pd.read_parquet(tempFile.name, engine='fastparquet', columns=[ + 'query_type', 'query_name', 'response_type', + 'response_name', 'ip4_address', 'ip6_address', - 'ns_address']) + 'ns_address', + 'cname_name', + ]) ) + @staticmethod + def assemble_cname_chain(first_link: str, chain_links: dict): + """Order the chain links and return a list representing the chain.""" + accounted = 1 + chain = [first_link] + next = chain_links[first_link] + while next in chain_links: + chain.append(next) + accounted += 1 + next = chain_links[next] + chain.append(next) + accounted += 1 + if accounted != len(chain_links) + 1: + logging.warning('Unaccounted CNAME chain links') + logging.warning(f'Chain: {chain}') + logging.warning(f'Chain links: {chain_links}') + return chain + def run(self): """Fetch the forward DNS data, populate a data frame, and process lines one by one.""" attempt = 5 - self.pandas_df_list = [] # List of Parquet file-specific Pandas DataFrames + self.pandas_df_list = list() # List of Parquet file-specific Pandas DataFrames while len(self.pandas_df_list) == 0 and attempt > 0: self.get_parquet() @@ -147,22 +171,31 @@ def run(self): # Select A, AAAA, and NS mappings from the measurement data df = pandas_df[ + ( + (pandas_df.query_type == 'A') | + (pandas_df.query_type == 'AAAA') | + (pandas_df.query_type == 'NS') + + ) & ( (pandas_df.response_type == 'A') | (pandas_df.response_type == 'AAAA') | - (pandas_df.response_type == 'NS') + (pandas_df.response_type == 'NS') | + (pandas_df.response_type == 'CNAME') ) & - # Filter out non-apex records - (~pandas_df.query_name.str.startswith('www.')) & # Filter missing addresses (there is at least one...) ( (pandas_df.ip4_address.notnull()) | (pandas_df.ip6_address.notnull()) | - (pandas_df.ns_address.notnull()) + (pandas_df.ns_address.notnull()) | + (pandas_df.cname_name.notnull()) ) - ][['query_name', 'response_type', 'ip4_address', 'ip6_address', 'ns_address']].drop_duplicates() - df.query_name = df.query_name.str[:-1] # Remove root '.' - df.ns_address = df.ns_address.map(lambda x: x[:-1] if x is not None else None) # Remove root '.' + ].drop_duplicates() + # Remove root '.' from fields. + df.query_name = df.query_name.str[:-1] + df.response_name = df.response_name.str[:-1] + df.ns_address = df.ns_address.map(lambda x: x[:-1] if x is not None else None) + df.cname_name = df.cname_name.map(lambda x: x[:-1] if x is not None else None) logging.info(f'Read {len(df)} unique records from {len(self.pandas_df_list)} Parquet file(s).') @@ -185,22 +218,86 @@ def run(self): continue ipv6_addresses.add(ip_normalized) + # Handle CNAME entries. + # A query where the result is obtained via a CNAME is indicated by a + # response name that is different from the query name. This means there will be + # a CNAME response linking the initial query name to the cname name. However, + # the entry with the resolved IP only contains the last entry of a potential + # CNAME chain, so we need to check the CNAME responses as well. + # An example CNAME chain looks like this: + # + # query_type query_name response_type response_name ip4_address cname_name # noqa: W505 + # ------------ ------------- --------------- --------------- ------------- --------------- # noqa: W505 + # A example.org CNAME example.org a.example.org # noqa: W505 + # A example.org CNAME a.example.org b.example.org # noqa: W505 + # A example.org A b.example.org 192.0.2.1 + # + # The beginning of the chain is the CNAME entry where query_name is equal to + # response_name. + # + # The dataset also contains CNAME chains that do not resolve to an IP (i.e., no + # response with type A/AAAA exists), so we need to filter these out. + + # Get query names which contain CNAMEs and resolved to an IP. + cname_queries = (df + [ + (df.response_type == 'A') | + (df.response_type == 'AAAA') + ] + .query('query_name != response_name') + [[ + 'response_type', + 'query_name' + ]] + .groupby('response_type') + .aggregate(['unique'])) + # Simplify access. + cname_queries = {'A': set(cname_queries.loc['A']['query_name']['unique']), + 'AAAA': set(cname_queries.loc['AAAA']['query_name']['unique'])} + + # Get the components of CNAME chains for queries that successfully resolved. + cnames = defaultdict(lambda: defaultdict(dict)) + # There are cases where NS queries receive a CNAME response, which we want to + # ignore. + for row in df[(df.query_type.isin(['A', 'AAAA'])) & (df.response_type == 'CNAME')].itertuples(): + if row.query_name not in cname_queries[row.query_type]: + # No need to build chains for entries that did not resolve. + continue + cname_dict = cnames[row.query_type] + if row.response_name in cname_dict[row.query_name]: + logging.warning(f'Duplicate CNAME chain entry "{row.response_name}" for query "{row.query_name}"') + # We keep the chain links as a dict and order them later since there is no + # guarantee that our dataframe rows are ordered. + cname_dict[row.query_name][row.response_name] = row.cname_name + # Also need to create HostName nodes for these. + host_names.add(row.cname_name) + + # Assemble chains. + cname_chains = defaultdict(dict) + for query_type, chain_dict in cnames.items(): + for query_name, chain_links in chain_dict.items(): + cname_chains[query_type][query_name] = self.assemble_cname_chain(query_name, chain_links) + # Get/create all nodes: domain_id = self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name', domain_names, all=False) host_id = self.iyp.batch_get_nodes_by_single_prop('HostName', 'name', host_names, all=False) ns_id = self.iyp.batch_get_nodes_by_single_prop('HostName', 'name', name_servers, all=False) self.iyp.batch_add_node_label(list(ns_id.values()), 'AuthoritativeNameServer') - ip4_id = self.iyp.batch_get_nodes_by_single_prop('IP', 'ip', set( - df[df.ip4_address.notnull()]['ip4_address']), all=False) + ip4_id = self.iyp.batch_get_nodes_by_single_prop('IP', 'ip', + set(df[df.ip4_address.notnull()]['ip4_address']), + all=False) ip6_id = self.iyp.batch_get_nodes_by_single_prop('IP', 'ip', ipv6_addresses, all=False) logging.info(f'Got {len(domain_id)} domains, {len(ns_id)} nameservers, {len(host_id)} hosts, ' f'{len(ip4_id)} IPv4, {len(ip6_id)} IPv6') # Compute links - res_links = [] - mng_links = [] - partof_links = [] + res_links = list() + mng_links = list() + partof_links = list() + aliasof_links = list() + unique_alias = set() + unique_res = set() # RESOLVES_TO and MANAGED_BY links for row in df.itertuples(): @@ -215,7 +312,24 @@ def run(self): elif row.response_type == 'A' and row.ip4_address: host_qid = host_id[row.query_name] ip_qid = ip4_id[row.ip4_address] - res_links.append({'src_id': host_qid, 'dst_id': ip_qid, 'props': [self.reference]}) + if (host_qid, ip_qid) not in unique_res: + res_links.append({'src_id': host_qid, 'dst_id': ip_qid, 'props': [self.reference]}) + unique_res.add((host_qid, ip_qid)) + if row.query_name != row.response_name: + # CNAME result + for left, right in pairwise(cname_chains[row.response_type][row.query_name]): + left_host_qid = host_id[left] + right_host_qid = host_id[right] + # First left is the initial hostname that is already added + # above. + if (right_host_qid, ip_qid) not in unique_res: + res_links.append({'src_id': right_host_qid, 'dst_id': ip_qid, 'props': [self.reference]}) + unique_res.add((right_host_qid, ip_qid)) + if (left_host_qid, right_host_qid) not in unique_alias: + aliasof_links.append({'src_id': left_host_qid, + 'dst_id': right_host_qid, + 'props': [self.reference]}) + unique_alias.add((left_host_qid, right_host_qid)) # AAAA Record elif row.response_type == 'AAAA' and row.ip6_address: @@ -226,7 +340,24 @@ def run(self): continue host_qid = host_id[row.query_name] ip_qid = ip6_id[ip_normalized] - res_links.append({'src_id': host_qid, 'dst_id': ip_qid, 'props': [self.reference]}) + if (host_qid, ip_qid) not in unique_res: + res_links.append({'src_id': host_qid, 'dst_id': ip_qid, 'props': [self.reference]}) + unique_res.add((host_qid, ip_qid)) + if row.query_name != row.response_name: + # CNAME result + for left, right in pairwise(cname_chains[row.response_type][row.query_name]): + left_host_qid = host_id[left] + right_host_qid = host_id[right] + # First left is the initial hostname that is already added + # above. + if (right_host_qid, ip_qid) not in unique_res: + res_links.append({'src_id': right_host_qid, 'dst_id': ip_qid, 'props': [self.reference]}) + unique_res.add((right_host_qid, ip_qid)) + if (left_host_qid, right_host_qid) not in unique_alias: + aliasof_links.append({'src_id': left_host_qid, + 'dst_id': right_host_qid, + 'props': [self.reference]}) + unique_alias.add((left_host_qid, right_host_qid)) # PART_OF links between HostNames and DomainNames for hd in host_names.intersection(domain_names): @@ -238,12 +369,13 @@ def run(self): self.iyp.batch_add_links('RESOLVES_TO', res_links) self.iyp.batch_add_links('MANAGED_BY', mng_links) self.iyp.batch_add_links('PART_OF', partof_links) + self.iyp.batch_add_links('ALIAS_OF', aliasof_links) def unit_test(self): - # infra_ns only has RESOLVES_TO relationships. + # infra_ns only has RESOLVES_TO and ALIAS_OF relationships. if self.reference['reference_name'] == 'openintel.infra_ns': - return super().unit_test(['RESOLVES_TO']) - return super().unit_test(['RESOLVES_TO', 'MANAGED_BY', 'PART_OF']) + return super().unit_test(['RESOLVES_TO', 'ALIAS_OF']) + return super().unit_test(['RESOLVES_TO', 'MANAGED_BY', 'PART_OF', 'ALIAS_OF']) class DnsgraphCrawler(BaseCrawler):