Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update OpenINTEL crawler #162

Merged
merged 1 commit into from
Dec 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 148 additions & 76 deletions iyp/crawlers/openintel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from ipaddress import IPv6Address
from itertools import pairwise

import arrow
import boto3
Expand Down Expand Up @@ -139,22 +138,32 @@ def get_parquet(self):
)

@staticmethod
def assemble_cname_chain(first_link: str, chain_links: dict):
"""Order the chain links and return a list representing the chain."""
accounted = 1
chain = [first_link]
next = chain_links[first_link]
while next in chain_links:
chain.append(next)
accounted += 1
next = chain_links[next]
chain.append(next)
accounted += 1
if accounted != len(chain_links) + 1:
logging.warning('Unaccounted CNAME chain links')
logging.warning(f'Chain: {chain}')
logging.warning(f'Chain links: {chain_links}')
return chain
def recurse_chain(current_chain: list, chain_links: dict, records: dict, state: dict):
"""Recurse CNAME chains and populate state dictionary.

This is a depth-first search that just follows every possible chain from the
root.
If the current tail of the chain is an A/AAAA records, the IPs are added to all
names currently in the chain. As a consequence, the state dict only contains
names that resolve to at least one IP so if there is a branch that does not end
in an IP, the corresponding names will not be in the state and can be pruned.

Args:
current_chain (list): List of names that form the current chain.
chain_links (dict): Dictionary mapping names to a set of names forming the
chain links.
records (dict): Dictionary mapping names to a set of IPs (A/AAAA records).
state (dict): State dictionary that will be populated by this function.
"""
chain_tail = current_chain[-1]
if chain_tail in records:
for link in current_chain:
state[link].update(records[chain_tail])
if chain_tail in chain_links:
for link in chain_links[chain_tail]:
current_chain.append(link)
OpenIntelCrawler.recurse_chain(current_chain, chain_links, records, state)
current_chain.pop()

def run(self):
"""Fetch the forward DNS data, populate a data frame, and process lines one by
Expand Down Expand Up @@ -233,50 +242,64 @@ def run(self):
# A example.org A b.example.org 192.0.2.1
#
# The beginning of the chain is the CNAME entry where query_name is equal to
# response_name.
# response_name. Chains can also branch out so parts of a chain resolve to
# different IPs.
#
# The dataset also contains CNAME chains that do not resolve to an IP (i.e., no
# response with type A/AAAA exists), so we need to filter these out.

# Get query names which contain CNAMEs and resolved to an IP.
cname_queries = (df
[
(df.response_type == 'A') |
(df.response_type == 'AAAA')
]
.query('query_name != response_name')
[[
'response_type',
'query_name'
]]
.groupby('response_type')
.aggregate(['unique']))
# Simplify access.
cname_queries = {'A': set(cname_queries.loc['A']['query_name']['unique']),
'AAAA': set(cname_queries.loc['AAAA']['query_name']['unique'])}
cname_ip_records = defaultdict(lambda: defaultdict(lambda: defaultdict(set)))
for row in (
df
[
(df.response_type == 'A') |
(df.response_type == 'AAAA')
]
.query('query_name != response_name')
[[
'response_type',
'query_name',
'response_name',
'ip4_address',
'ip6_address'
]]
.drop_duplicates()
).itertuples():
# There are cases where a single query name has multiple CNAME chains that
# end in different IPs. To check if chains are valid, we need to know the
# last entry that resolves to an IP to identify broken chains.
if row.response_type == 'A':
ip = row.ip4_address
else:
ip = IPv6Address(row.ip6_address).compressed
cname_ip_records[row.response_type][row.query_name][row.response_name].add(ip)

# Get the components of CNAME chains for queries that successfully resolved.
cnames = defaultdict(lambda: defaultdict(dict))
cnames = defaultdict(lambda: defaultdict(lambda: defaultdict(set)))
# There are cases where NS queries receive a CNAME response, which we want to
# ignore.
for row in df[(df.query_type.isin(['A', 'AAAA'])) & (df.response_type == 'CNAME')].itertuples():
if row.query_name not in cname_queries[row.query_type]:
if row.query_name not in cname_ip_records[row.query_type]:
# No need to build chains for entries that did not resolve.
continue
cname_dict = cnames[row.query_type]
if row.response_name in cname_dict[row.query_name]:
logging.warning(f'Duplicate CNAME chain entry "{row.response_name}" for query "{row.query_name}"')
# We keep the chain links as a dict and order them later since there is no
# guarantee that our dataframe rows are ordered.
cname_dict[row.query_name][row.response_name] = row.cname_name
# Also need to create HostName nodes for these.
host_names.add(row.cname_name)
# Links can also branch, i.e., there are two CNAME records for one response
# name.
cname_dict[row.query_name][row.response_name].add(row.cname_name)

# Assemble chains.
cname_chains = defaultdict(dict)
cname_resolves_to = defaultdict(dict)
for query_type, chain_dict in cnames.items():
for query_name, chain_links in chain_dict.items():
cname_chains[query_type][query_name] = self.assemble_cname_chain(query_name, chain_links)
records = cname_ip_records[query_type][query_name]
state = defaultdict(set)
self.recurse_chain([query_name], chain_links, records, state)
cname_resolves_to[query_type][query_name] = state
# Also create HostName nodes for all names that resolved.
host_names.update(state.keys())

# Get/create all nodes:
domain_id = self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name', domain_names, all=False)
Expand All @@ -298,6 +321,7 @@ def run(self):
aliasof_links = list()
unique_alias = set()
unique_res = set()
processed_cnames = set()

# RESOLVES_TO and MANAGED_BY links
for row in df.itertuples():
Expand All @@ -312,24 +336,52 @@ def run(self):
elif row.response_type == 'A' and row.ip4_address:
host_qid = host_id[row.query_name]
ip_qid = ip4_id[row.ip4_address]
if (host_qid, ip_qid) not in unique_res:
res_links.append({'src_id': host_qid, 'dst_id': ip_qid, 'props': [self.reference]})
unique_res.add((host_qid, ip_qid))
if row.query_name != row.response_name:
if row.query_name == row.response_name:
# A record
if (host_qid, ip_qid, row.response_type) not in unique_res:
res_links.append({'src_id': host_qid, 'dst_id': ip_qid, 'props': [
self.reference, {'record': row.response_type}]})
unique_res.add((host_qid, ip_qid, row.response_type))
# If the final CNAME has multiple A/AAAA records, we would enter
# this loop multiple times, so ignore for slight performance
# optimization.
elif (row.response_type, row.query_name) not in processed_cnames:
# CNAME result
for left, right in pairwise(cname_chains[row.response_type][row.query_name]):
left_host_qid = host_id[left]
right_host_qid = host_id[right]
# First left is the initial hostname that is already added
# above.
if (right_host_qid, ip_qid) not in unique_res:
res_links.append({'src_id': right_host_qid, 'dst_id': ip_qid, 'props': [self.reference]})
unique_res.add((right_host_qid, ip_qid))
if (left_host_qid, right_host_qid) not in unique_alias:
aliasof_links.append({'src_id': left_host_qid,
'dst_id': right_host_qid,
'props': [self.reference]})
unique_alias.add((left_host_qid, right_host_qid))
processed_cnames.add((row.response_type, row.query_name))
# Add RESOLVES_TO entries
resolved_names = cname_resolves_to[row.response_type][row.query_name]
records = cname_ip_records[row.response_type][row.query_name]
for hostname, ips in resolved_names.items():
host_qid = host_id[hostname]
for ip in ips:
# Keep track of the source for the RESOLVES_TO relationship.
# Either CNAME if it was added transitively or A/AAAA if it
# is the end of the chain.
# A name can have both, i.e., IPs coming from a transitive
# CNAME and direct A/AAAA records.
record = 'CNAME'
if hostname in records and ip in records[hostname]:
record = row.response_type
ip_qid = ip4_id[ip]
if (host_qid, ip_qid, record) not in unique_res:
res_links.append({'src_id': host_qid, 'dst_id': ip_qid,
'props': [self.reference, {'record': record}]})
unique_res.add((host_qid, ip_qid, record))
# Add ALIAS_OF links for resolvable parts of the chain.
for source, destinations in cnames[row.response_type][row.query_name].items():
if source not in resolved_names:
# Link is not resolvable so ignore.
continue
source_qid = host_id[source]
for destination in destinations:
if destination not in resolved_names:
continue
destination_qid = host_id[destination]
if (source_qid, destination_qid) not in unique_alias:
aliasof_links.append({'src_id': source_qid,
'dst_id': destination_qid,
'props': [self.reference]})
unique_alias.add((source_qid, destination_qid))

# AAAA Record
elif row.response_type == 'AAAA' and row.ip6_address:
Expand All @@ -340,24 +392,44 @@ def run(self):
continue
host_qid = host_id[row.query_name]
ip_qid = ip6_id[ip_normalized]
if (host_qid, ip_qid) not in unique_res:
res_links.append({'src_id': host_qid, 'dst_id': ip_qid, 'props': [self.reference]})
unique_res.add((host_qid, ip_qid))
if row.query_name != row.response_name:
if row.query_name == row.response_name:
# AAAA record
if (host_qid, ip_qid, row.response_type) not in unique_res:
res_links.append({'src_id': host_qid, 'dst_id': ip_qid, 'props': [
self.reference, {'record': row.response_type}]})
unique_res.add((host_qid, ip_qid, row.response_type))
elif (row.response_type, row.query_name) not in processed_cnames:
processed_cnames.add((row.response_type, row.query_name))
# CNAME result
for left, right in pairwise(cname_chains[row.response_type][row.query_name]):
left_host_qid = host_id[left]
right_host_qid = host_id[right]
# First left is the initial hostname that is already added
# above.
if (right_host_qid, ip_qid) not in unique_res:
res_links.append({'src_id': right_host_qid, 'dst_id': ip_qid, 'props': [self.reference]})
unique_res.add((right_host_qid, ip_qid))
if (left_host_qid, right_host_qid) not in unique_alias:
aliasof_links.append({'src_id': left_host_qid,
'dst_id': right_host_qid,
'props': [self.reference]})
unique_alias.add((left_host_qid, right_host_qid))
# Add RESOLVES_TO entries
resolved_names = cname_resolves_to[row.response_type][row.query_name]
records = cname_ip_records[row.response_type][row.query_name]
for hostname, ips in resolved_names.items():
host_qid = host_id[hostname]
for ip in ips:
record = 'CNAME'
if hostname in records and ip in records[hostname]:
record = row.response_type
ip_qid = ip6_id[ip]
if (host_qid, ip_qid, record) not in unique_res:
res_links.append({'src_id': host_qid, 'dst_id': ip_qid,
'props': [self.reference, {'record': record}]})
unique_res.add((host_qid, ip_qid, record))
# Add ALIAS_OF links for resolvable parts of the chain.
for source, destinations in cnames[row.response_type][row.query_name].items():
if source not in resolved_names:
# Link is not resolvable so ignore.
continue
source_qid = host_id[source]
for destination in destinations:
if destination not in resolved_names:
continue
destination_qid = host_id[destination]
if (source_qid, destination_qid) not in unique_alias:
aliasof_links.append({'src_id': source_qid,
'dst_id': destination_qid,
'props': [self.reference]})
unique_alias.add((source_qid, destination_qid))

# PART_OF links between HostNames and DomainNames
for hd in host_names.intersection(domain_names):
Expand Down
Loading