diff --git a/iyp/crawlers/openintel/__init__.py b/iyp/crawlers/openintel/__init__.py index d476c87..ca04a05 100644 --- a/iyp/crawlers/openintel/__init__.py +++ b/iyp/crawlers/openintel/__init__.py @@ -8,7 +8,6 @@ from collections import defaultdict from datetime import datetime, timedelta, timezone from ipaddress import IPv6Address -from itertools import pairwise import arrow import boto3 @@ -139,22 +138,32 @@ def get_parquet(self): ) @staticmethod - def assemble_cname_chain(first_link: str, chain_links: dict): - """Order the chain links and return a list representing the chain.""" - accounted = 1 - chain = [first_link] - next = chain_links[first_link] - while next in chain_links: - chain.append(next) - accounted += 1 - next = chain_links[next] - chain.append(next) - accounted += 1 - if accounted != len(chain_links) + 1: - logging.warning('Unaccounted CNAME chain links') - logging.warning(f'Chain: {chain}') - logging.warning(f'Chain links: {chain_links}') - return chain + def recurse_chain(current_chain: list, chain_links: dict, records: dict, state: dict): + """Recurse CNAME chains and populate state dictionary. + + This is a depth-first search that just follows every possible chain from the + root. + If the current tail of the chain is an A/AAAA records, the IPs are added to all + names currently in the chain. As a consequence, the state dict only contains + names that resolve to at least one IP so if there is a branch that does not end + in an IP, the corresponding names will not be in the state and can be pruned. + + Args: + current_chain (list): List of names that form the current chain. + chain_links (dict): Dictionary mapping names to a set of names forming the + chain links. + records (dict): Dictionary mapping names to a set of IPs (A/AAAA records). + state (dict): State dictionary that will be populated by this function. + """ + chain_tail = current_chain[-1] + if chain_tail in records: + for link in current_chain: + state[link].update(records[chain_tail]) + if chain_tail in chain_links: + for link in chain_links[chain_tail]: + current_chain.append(link) + OpenIntelCrawler.recurse_chain(current_chain, chain_links, records, state) + current_chain.pop() def run(self): """Fetch the forward DNS data, populate a data frame, and process lines one by @@ -233,50 +242,64 @@ def run(self): # A example.org A b.example.org 192.0.2.1 # # The beginning of the chain is the CNAME entry where query_name is equal to - # response_name. + # response_name. Chains can also branch out so parts of a chain resolve to + # different IPs. # # The dataset also contains CNAME chains that do not resolve to an IP (i.e., no # response with type A/AAAA exists), so we need to filter these out. # Get query names which contain CNAMEs and resolved to an IP. - cname_queries = (df - [ - (df.response_type == 'A') | - (df.response_type == 'AAAA') - ] - .query('query_name != response_name') - [[ - 'response_type', - 'query_name' - ]] - .groupby('response_type') - .aggregate(['unique'])) - # Simplify access. - cname_queries = {'A': set(cname_queries.loc['A']['query_name']['unique']), - 'AAAA': set(cname_queries.loc['AAAA']['query_name']['unique'])} + cname_ip_records = defaultdict(lambda: defaultdict(lambda: defaultdict(set))) + for row in ( + df + [ + (df.response_type == 'A') | + (df.response_type == 'AAAA') + ] + .query('query_name != response_name') + [[ + 'response_type', + 'query_name', + 'response_name', + 'ip4_address', + 'ip6_address' + ]] + .drop_duplicates() + ).itertuples(): + # There are cases where a single query name has multiple CNAME chains that + # end in different IPs. To check if chains are valid, we need to know the + # last entry that resolves to an IP to identify broken chains. + if row.response_type == 'A': + ip = row.ip4_address + else: + ip = IPv6Address(row.ip6_address).compressed + cname_ip_records[row.response_type][row.query_name][row.response_name].add(ip) # Get the components of CNAME chains for queries that successfully resolved. - cnames = defaultdict(lambda: defaultdict(dict)) + cnames = defaultdict(lambda: defaultdict(lambda: defaultdict(set))) # There are cases where NS queries receive a CNAME response, which we want to # ignore. for row in df[(df.query_type.isin(['A', 'AAAA'])) & (df.response_type == 'CNAME')].itertuples(): - if row.query_name not in cname_queries[row.query_type]: + if row.query_name not in cname_ip_records[row.query_type]: # No need to build chains for entries that did not resolve. continue cname_dict = cnames[row.query_type] - if row.response_name in cname_dict[row.query_name]: - logging.warning(f'Duplicate CNAME chain entry "{row.response_name}" for query "{row.query_name}"') # We keep the chain links as a dict and order them later since there is no # guarantee that our dataframe rows are ordered. - cname_dict[row.query_name][row.response_name] = row.cname_name - # Also need to create HostName nodes for these. - host_names.add(row.cname_name) + # Links can also branch, i.e., there are two CNAME records for one response + # name. + cname_dict[row.query_name][row.response_name].add(row.cname_name) # Assemble chains. - cname_chains = defaultdict(dict) + cname_resolves_to = defaultdict(dict) for query_type, chain_dict in cnames.items(): for query_name, chain_links in chain_dict.items(): - cname_chains[query_type][query_name] = self.assemble_cname_chain(query_name, chain_links) + records = cname_ip_records[query_type][query_name] + state = defaultdict(set) + self.recurse_chain([query_name], chain_links, records, state) + cname_resolves_to[query_type][query_name] = state + # Also create HostName nodes for all names that resolved. + host_names.update(state.keys()) # Get/create all nodes: domain_id = self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name', domain_names, all=False) @@ -298,6 +321,7 @@ def run(self): aliasof_links = list() unique_alias = set() unique_res = set() + processed_cnames = set() # RESOLVES_TO and MANAGED_BY links for row in df.itertuples(): @@ -312,24 +336,52 @@ def run(self): elif row.response_type == 'A' and row.ip4_address: host_qid = host_id[row.query_name] ip_qid = ip4_id[row.ip4_address] - if (host_qid, ip_qid) not in unique_res: - res_links.append({'src_id': host_qid, 'dst_id': ip_qid, 'props': [self.reference]}) - unique_res.add((host_qid, ip_qid)) - if row.query_name != row.response_name: + if row.query_name == row.response_name: + # A record + if (host_qid, ip_qid, row.response_type) not in unique_res: + res_links.append({'src_id': host_qid, 'dst_id': ip_qid, 'props': [ + self.reference, {'record': row.response_type}]}) + unique_res.add((host_qid, ip_qid, row.response_type)) + # If the final CNAME has multiple A/AAAA records, we would enter + # this loop multiple times, so ignore for slight performance + # optimization. + elif (row.response_type, row.query_name) not in processed_cnames: # CNAME result - for left, right in pairwise(cname_chains[row.response_type][row.query_name]): - left_host_qid = host_id[left] - right_host_qid = host_id[right] - # First left is the initial hostname that is already added - # above. - if (right_host_qid, ip_qid) not in unique_res: - res_links.append({'src_id': right_host_qid, 'dst_id': ip_qid, 'props': [self.reference]}) - unique_res.add((right_host_qid, ip_qid)) - if (left_host_qid, right_host_qid) not in unique_alias: - aliasof_links.append({'src_id': left_host_qid, - 'dst_id': right_host_qid, - 'props': [self.reference]}) - unique_alias.add((left_host_qid, right_host_qid)) + processed_cnames.add((row.response_type, row.query_name)) + # Add RESOLVES_TO entries + resolved_names = cname_resolves_to[row.response_type][row.query_name] + records = cname_ip_records[row.response_type][row.query_name] + for hostname, ips in resolved_names.items(): + host_qid = host_id[hostname] + for ip in ips: + # Keep track of the source for the RESOLVES_TO relationship. + # Either CNAME if it was added transitively or A/AAAA if it + # is the end of the chain. + # A name can have both, i.e., IPs coming from a transitive + # CNAME and direct A/AAAA records. + record = 'CNAME' + if hostname in records and ip in records[hostname]: + record = row.response_type + ip_qid = ip4_id[ip] + if (host_qid, ip_qid, record) not in unique_res: + res_links.append({'src_id': host_qid, 'dst_id': ip_qid, + 'props': [self.reference, {'record': record}]}) + unique_res.add((host_qid, ip_qid, record)) + # Add ALIAS_OF links for resolvable parts of the chain. + for source, destinations in cnames[row.response_type][row.query_name].items(): + if source not in resolved_names: + # Link is not resolvable so ignore. + continue + source_qid = host_id[source] + for destination in destinations: + if destination not in resolved_names: + continue + destination_qid = host_id[destination] + if (source_qid, destination_qid) not in unique_alias: + aliasof_links.append({'src_id': source_qid, + 'dst_id': destination_qid, + 'props': [self.reference]}) + unique_alias.add((source_qid, destination_qid)) # AAAA Record elif row.response_type == 'AAAA' and row.ip6_address: @@ -340,24 +392,44 @@ def run(self): continue host_qid = host_id[row.query_name] ip_qid = ip6_id[ip_normalized] - if (host_qid, ip_qid) not in unique_res: - res_links.append({'src_id': host_qid, 'dst_id': ip_qid, 'props': [self.reference]}) - unique_res.add((host_qid, ip_qid)) - if row.query_name != row.response_name: + if row.query_name == row.response_name: + # AAAA record + if (host_qid, ip_qid, row.response_type) not in unique_res: + res_links.append({'src_id': host_qid, 'dst_id': ip_qid, 'props': [ + self.reference, {'record': row.response_type}]}) + unique_res.add((host_qid, ip_qid, row.response_type)) + elif (row.response_type, row.query_name) not in processed_cnames: + processed_cnames.add((row.response_type, row.query_name)) # CNAME result - for left, right in pairwise(cname_chains[row.response_type][row.query_name]): - left_host_qid = host_id[left] - right_host_qid = host_id[right] - # First left is the initial hostname that is already added - # above. - if (right_host_qid, ip_qid) not in unique_res: - res_links.append({'src_id': right_host_qid, 'dst_id': ip_qid, 'props': [self.reference]}) - unique_res.add((right_host_qid, ip_qid)) - if (left_host_qid, right_host_qid) not in unique_alias: - aliasof_links.append({'src_id': left_host_qid, - 'dst_id': right_host_qid, - 'props': [self.reference]}) - unique_alias.add((left_host_qid, right_host_qid)) + # Add RESOLVES_TO entries + resolved_names = cname_resolves_to[row.response_type][row.query_name] + records = cname_ip_records[row.response_type][row.query_name] + for hostname, ips in resolved_names.items(): + host_qid = host_id[hostname] + for ip in ips: + record = 'CNAME' + if hostname in records and ip in records[hostname]: + record = row.response_type + ip_qid = ip6_id[ip] + if (host_qid, ip_qid, record) not in unique_res: + res_links.append({'src_id': host_qid, 'dst_id': ip_qid, + 'props': [self.reference, {'record': record}]}) + unique_res.add((host_qid, ip_qid, record)) + # Add ALIAS_OF links for resolvable parts of the chain. + for source, destinations in cnames[row.response_type][row.query_name].items(): + if source not in resolved_names: + # Link is not resolvable so ignore. + continue + source_qid = host_id[source] + for destination in destinations: + if destination not in resolved_names: + continue + destination_qid = host_id[destination] + if (source_qid, destination_qid) not in unique_alias: + aliasof_links.append({'src_id': source_qid, + 'dst_id': destination_qid, + 'props': [self.reference]}) + unique_alias.add((source_qid, destination_qid)) # PART_OF links between HostNames and DomainNames for hd in host_names.intersection(domain_names):