Skip to content

Commit

Permalink
Update OpenINTEL crawler (#162)
Browse files Browse the repository at this point in the history
Better handling for CNAMEs. Previously we treated them as chains, but
they are more like trees.
  • Loading branch information
m-appel authored Dec 13, 2024
1 parent 72358a4 commit d21d96c
Showing 1 changed file with 148 additions and 76 deletions.
224 changes: 148 additions & 76 deletions iyp/crawlers/openintel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from ipaddress import IPv6Address
from itertools import pairwise

import arrow
import boto3
Expand Down Expand Up @@ -139,22 +138,32 @@ def get_parquet(self):
)

@staticmethod
def assemble_cname_chain(first_link: str, chain_links: dict):
"""Order the chain links and return a list representing the chain."""
accounted = 1
chain = [first_link]
next = chain_links[first_link]
while next in chain_links:
chain.append(next)
accounted += 1
next = chain_links[next]
chain.append(next)
accounted += 1
if accounted != len(chain_links) + 1:
logging.warning('Unaccounted CNAME chain links')
logging.warning(f'Chain: {chain}')
logging.warning(f'Chain links: {chain_links}')
return chain
def recurse_chain(current_chain: list, chain_links: dict, records: dict, state: dict):
"""Recurse CNAME chains and populate state dictionary.
This is a depth-first search that just follows every possible chain from the
root.
If the current tail of the chain is an A/AAAA records, the IPs are added to all
names currently in the chain. As a consequence, the state dict only contains
names that resolve to at least one IP so if there is a branch that does not end
in an IP, the corresponding names will not be in the state and can be pruned.
Args:
current_chain (list): List of names that form the current chain.
chain_links (dict): Dictionary mapping names to a set of names forming the
chain links.
records (dict): Dictionary mapping names to a set of IPs (A/AAAA records).
state (dict): State dictionary that will be populated by this function.
"""
chain_tail = current_chain[-1]
if chain_tail in records:
for link in current_chain:
state[link].update(records[chain_tail])
if chain_tail in chain_links:
for link in chain_links[chain_tail]:
current_chain.append(link)
OpenIntelCrawler.recurse_chain(current_chain, chain_links, records, state)
current_chain.pop()

def run(self):
"""Fetch the forward DNS data, populate a data frame, and process lines one by
Expand Down Expand Up @@ -233,50 +242,64 @@ def run(self):
# A example.org A b.example.org 192.0.2.1
#
# The beginning of the chain is the CNAME entry where query_name is equal to
# response_name.
# response_name. Chains can also branch out so parts of a chain resolve to
# different IPs.
#
# The dataset also contains CNAME chains that do not resolve to an IP (i.e., no
# response with type A/AAAA exists), so we need to filter these out.

# Get query names which contain CNAMEs and resolved to an IP.
cname_queries = (df
[
(df.response_type == 'A') |
(df.response_type == 'AAAA')
]
.query('query_name != response_name')
[[
'response_type',
'query_name'
]]
.groupby('response_type')
.aggregate(['unique']))
# Simplify access.
cname_queries = {'A': set(cname_queries.loc['A']['query_name']['unique']),
'AAAA': set(cname_queries.loc['AAAA']['query_name']['unique'])}
cname_ip_records = defaultdict(lambda: defaultdict(lambda: defaultdict(set)))
for row in (
df
[
(df.response_type == 'A') |
(df.response_type == 'AAAA')
]
.query('query_name != response_name')
[[
'response_type',
'query_name',
'response_name',
'ip4_address',
'ip6_address'
]]
.drop_duplicates()
).itertuples():
# There are cases where a single query name has multiple CNAME chains that
# end in different IPs. To check if chains are valid, we need to know the
# last entry that resolves to an IP to identify broken chains.
if row.response_type == 'A':
ip = row.ip4_address
else:
ip = IPv6Address(row.ip6_address).compressed
cname_ip_records[row.response_type][row.query_name][row.response_name].add(ip)

# Get the components of CNAME chains for queries that successfully resolved.
cnames = defaultdict(lambda: defaultdict(dict))
cnames = defaultdict(lambda: defaultdict(lambda: defaultdict(set)))
# There are cases where NS queries receive a CNAME response, which we want to
# ignore.
for row in df[(df.query_type.isin(['A', 'AAAA'])) & (df.response_type == 'CNAME')].itertuples():
if row.query_name not in cname_queries[row.query_type]:
if row.query_name not in cname_ip_records[row.query_type]:
# No need to build chains for entries that did not resolve.
continue
cname_dict = cnames[row.query_type]
if row.response_name in cname_dict[row.query_name]:
logging.warning(f'Duplicate CNAME chain entry "{row.response_name}" for query "{row.query_name}"')
# We keep the chain links as a dict and order them later since there is no
# guarantee that our dataframe rows are ordered.
cname_dict[row.query_name][row.response_name] = row.cname_name
# Also need to create HostName nodes for these.
host_names.add(row.cname_name)
# Links can also branch, i.e., there are two CNAME records for one response
# name.
cname_dict[row.query_name][row.response_name].add(row.cname_name)

# Assemble chains.
cname_chains = defaultdict(dict)
cname_resolves_to = defaultdict(dict)
for query_type, chain_dict in cnames.items():
for query_name, chain_links in chain_dict.items():
cname_chains[query_type][query_name] = self.assemble_cname_chain(query_name, chain_links)
records = cname_ip_records[query_type][query_name]
state = defaultdict(set)
self.recurse_chain([query_name], chain_links, records, state)
cname_resolves_to[query_type][query_name] = state
# Also create HostName nodes for all names that resolved.
host_names.update(state.keys())

# Get/create all nodes:
domain_id = self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name', domain_names, all=False)
Expand All @@ -298,6 +321,7 @@ def run(self):
aliasof_links = list()
unique_alias = set()
unique_res = set()
processed_cnames = set()

# RESOLVES_TO and MANAGED_BY links
for row in df.itertuples():
Expand All @@ -312,24 +336,52 @@ def run(self):
elif row.response_type == 'A' and row.ip4_address:
host_qid = host_id[row.query_name]
ip_qid = ip4_id[row.ip4_address]
if (host_qid, ip_qid) not in unique_res:
res_links.append({'src_id': host_qid, 'dst_id': ip_qid, 'props': [self.reference]})
unique_res.add((host_qid, ip_qid))
if row.query_name != row.response_name:
if row.query_name == row.response_name:
# A record
if (host_qid, ip_qid, row.response_type) not in unique_res:
res_links.append({'src_id': host_qid, 'dst_id': ip_qid, 'props': [
self.reference, {'record': row.response_type}]})
unique_res.add((host_qid, ip_qid, row.response_type))
# If the final CNAME has multiple A/AAAA records, we would enter
# this loop multiple times, so ignore for slight performance
# optimization.
elif (row.response_type, row.query_name) not in processed_cnames:
# CNAME result
for left, right in pairwise(cname_chains[row.response_type][row.query_name]):
left_host_qid = host_id[left]
right_host_qid = host_id[right]
# First left is the initial hostname that is already added
# above.
if (right_host_qid, ip_qid) not in unique_res:
res_links.append({'src_id': right_host_qid, 'dst_id': ip_qid, 'props': [self.reference]})
unique_res.add((right_host_qid, ip_qid))
if (left_host_qid, right_host_qid) not in unique_alias:
aliasof_links.append({'src_id': left_host_qid,
'dst_id': right_host_qid,
'props': [self.reference]})
unique_alias.add((left_host_qid, right_host_qid))
processed_cnames.add((row.response_type, row.query_name))
# Add RESOLVES_TO entries
resolved_names = cname_resolves_to[row.response_type][row.query_name]
records = cname_ip_records[row.response_type][row.query_name]
for hostname, ips in resolved_names.items():
host_qid = host_id[hostname]
for ip in ips:
# Keep track of the source for the RESOLVES_TO relationship.
# Either CNAME if it was added transitively or A/AAAA if it
# is the end of the chain.
# A name can have both, i.e., IPs coming from a transitive
# CNAME and direct A/AAAA records.
record = 'CNAME'
if hostname in records and ip in records[hostname]:
record = row.response_type
ip_qid = ip4_id[ip]
if (host_qid, ip_qid, record) not in unique_res:
res_links.append({'src_id': host_qid, 'dst_id': ip_qid,
'props': [self.reference, {'record': record}]})
unique_res.add((host_qid, ip_qid, record))
# Add ALIAS_OF links for resolvable parts of the chain.
for source, destinations in cnames[row.response_type][row.query_name].items():
if source not in resolved_names:
# Link is not resolvable so ignore.
continue
source_qid = host_id[source]
for destination in destinations:
if destination not in resolved_names:
continue
destination_qid = host_id[destination]
if (source_qid, destination_qid) not in unique_alias:
aliasof_links.append({'src_id': source_qid,
'dst_id': destination_qid,
'props': [self.reference]})
unique_alias.add((source_qid, destination_qid))

# AAAA Record
elif row.response_type == 'AAAA' and row.ip6_address:
Expand All @@ -340,24 +392,44 @@ def run(self):
continue
host_qid = host_id[row.query_name]
ip_qid = ip6_id[ip_normalized]
if (host_qid, ip_qid) not in unique_res:
res_links.append({'src_id': host_qid, 'dst_id': ip_qid, 'props': [self.reference]})
unique_res.add((host_qid, ip_qid))
if row.query_name != row.response_name:
if row.query_name == row.response_name:
# AAAA record
if (host_qid, ip_qid, row.response_type) not in unique_res:
res_links.append({'src_id': host_qid, 'dst_id': ip_qid, 'props': [
self.reference, {'record': row.response_type}]})
unique_res.add((host_qid, ip_qid, row.response_type))
elif (row.response_type, row.query_name) not in processed_cnames:
processed_cnames.add((row.response_type, row.query_name))
# CNAME result
for left, right in pairwise(cname_chains[row.response_type][row.query_name]):
left_host_qid = host_id[left]
right_host_qid = host_id[right]
# First left is the initial hostname that is already added
# above.
if (right_host_qid, ip_qid) not in unique_res:
res_links.append({'src_id': right_host_qid, 'dst_id': ip_qid, 'props': [self.reference]})
unique_res.add((right_host_qid, ip_qid))
if (left_host_qid, right_host_qid) not in unique_alias:
aliasof_links.append({'src_id': left_host_qid,
'dst_id': right_host_qid,
'props': [self.reference]})
unique_alias.add((left_host_qid, right_host_qid))
# Add RESOLVES_TO entries
resolved_names = cname_resolves_to[row.response_type][row.query_name]
records = cname_ip_records[row.response_type][row.query_name]
for hostname, ips in resolved_names.items():
host_qid = host_id[hostname]
for ip in ips:
record = 'CNAME'
if hostname in records and ip in records[hostname]:
record = row.response_type
ip_qid = ip6_id[ip]
if (host_qid, ip_qid, record) not in unique_res:
res_links.append({'src_id': host_qid, 'dst_id': ip_qid,
'props': [self.reference, {'record': record}]})
unique_res.add((host_qid, ip_qid, record))
# Add ALIAS_OF links for resolvable parts of the chain.
for source, destinations in cnames[row.response_type][row.query_name].items():
if source not in resolved_names:
# Link is not resolvable so ignore.
continue
source_qid = host_id[source]
for destination in destinations:
if destination not in resolved_names:
continue
destination_qid = host_id[destination]
if (source_qid, destination_qid) not in unique_alias:
aliasof_links.append({'src_id': source_qid,
'dst_id': destination_qid,
'props': [self.reference]})
unique_alias.add((source_qid, destination_qid))

# PART_OF links between HostNames and DomainNames
for hd in host_names.intersection(domain_names):
Expand Down

0 comments on commit d21d96c

Please sign in to comment.