Skip to content

Commit

Permalink
Merge pull request #155 from m-appel/openintel-updates
Browse files Browse the repository at this point in the history
Update OpenIntelCrawler
  • Loading branch information
romain-fontugne authored Nov 29, 2024
2 parents 280243d + 6f66a54 commit f91c981
Showing 1 changed file with 151 additions and 19 deletions.
170 changes: 151 additions & 19 deletions iyp/crawlers/openintel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import logging
import os
import tempfile
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from ipaddress import IPv6Address
from itertools import pairwise

import arrow
import boto3
Expand Down Expand Up @@ -125,18 +127,40 @@ def get_parquet(self):
pd.read_parquet(tempFile.name,
engine='fastparquet',
columns=[
'query_type',
'query_name',
'response_type',
'response_name',
'ip4_address',
'ip6_address',
'ns_address'])
'ns_address',
'cname_name',
])
)

@staticmethod
def assemble_cname_chain(first_link: str, chain_links: dict):
"""Order the chain links and return a list representing the chain."""
accounted = 1
chain = [first_link]
next = chain_links[first_link]
while next in chain_links:
chain.append(next)
accounted += 1
next = chain_links[next]
chain.append(next)
accounted += 1
if accounted != len(chain_links) + 1:
logging.warning('Unaccounted CNAME chain links')
logging.warning(f'Chain: {chain}')
logging.warning(f'Chain links: {chain_links}')
return chain

def run(self):
"""Fetch the forward DNS data, populate a data frame, and process lines one by
one."""
attempt = 5
self.pandas_df_list = [] # List of Parquet file-specific Pandas DataFrames
self.pandas_df_list = list() # List of Parquet file-specific Pandas DataFrames

while len(self.pandas_df_list) == 0 and attempt > 0:
self.get_parquet()
Expand All @@ -147,22 +171,31 @@ def run(self):

# Select A, AAAA, and NS mappings from the measurement data
df = pandas_df[
(
(pandas_df.query_type == 'A') |
(pandas_df.query_type == 'AAAA') |
(pandas_df.query_type == 'NS')

) &
(
(pandas_df.response_type == 'A') |
(pandas_df.response_type == 'AAAA') |
(pandas_df.response_type == 'NS')
(pandas_df.response_type == 'NS') |
(pandas_df.response_type == 'CNAME')
) &
# Filter out non-apex records
(~pandas_df.query_name.str.startswith('www.')) &
# Filter missing addresses (there is at least one...)
(
(pandas_df.ip4_address.notnull()) |
(pandas_df.ip6_address.notnull()) |
(pandas_df.ns_address.notnull())
(pandas_df.ns_address.notnull()) |
(pandas_df.cname_name.notnull())
)
][['query_name', 'response_type', 'ip4_address', 'ip6_address', 'ns_address']].drop_duplicates()
df.query_name = df.query_name.str[:-1] # Remove root '.'
df.ns_address = df.ns_address.map(lambda x: x[:-1] if x is not None else None) # Remove root '.'
].drop_duplicates()
# Remove root '.' from fields.
df.query_name = df.query_name.str[:-1]
df.response_name = df.response_name.str[:-1]
df.ns_address = df.ns_address.map(lambda x: x[:-1] if x is not None else None)
df.cname_name = df.cname_name.map(lambda x: x[:-1] if x is not None else None)

logging.info(f'Read {len(df)} unique records from {len(self.pandas_df_list)} Parquet file(s).')

Expand All @@ -185,22 +218,86 @@ def run(self):
continue
ipv6_addresses.add(ip_normalized)

# Handle CNAME entries.
# A query where the result is obtained via a CNAME is indicated by a
# response name that is different from the query name. This means there will be
# a CNAME response linking the initial query name to the cname name. However,
# the entry with the resolved IP only contains the last entry of a potential
# CNAME chain, so we need to check the CNAME responses as well.
# An example CNAME chain looks like this:
#
# query_type query_name response_type response_name ip4_address cname_name # noqa: W505
# ------------ ------------- --------------- --------------- ------------- --------------- # noqa: W505
# A example.org CNAME example.org a.example.org # noqa: W505
# A example.org CNAME a.example.org b.example.org # noqa: W505
# A example.org A b.example.org 192.0.2.1
#
# The beginning of the chain is the CNAME entry where query_name is equal to
# response_name.
#
# The dataset also contains CNAME chains that do not resolve to an IP (i.e., no
# response with type A/AAAA exists), so we need to filter these out.

# Get query names which contain CNAMEs and resolved to an IP.
cname_queries = (df
[
(df.response_type == 'A') |
(df.response_type == 'AAAA')
]
.query('query_name != response_name')
[[
'response_type',
'query_name'
]]
.groupby('response_type')
.aggregate(['unique']))
# Simplify access.
cname_queries = {'A': set(cname_queries.loc['A']['query_name']['unique']),
'AAAA': set(cname_queries.loc['AAAA']['query_name']['unique'])}

# Get the components of CNAME chains for queries that successfully resolved.
cnames = defaultdict(lambda: defaultdict(dict))
# There are cases where NS queries receive a CNAME response, which we want to
# ignore.
for row in df[(df.query_type.isin(['A', 'AAAA'])) & (df.response_type == 'CNAME')].itertuples():
if row.query_name not in cname_queries[row.query_type]:
# No need to build chains for entries that did not resolve.
continue
cname_dict = cnames[row.query_type]
if row.response_name in cname_dict[row.query_name]:
logging.warning(f'Duplicate CNAME chain entry "{row.response_name}" for query "{row.query_name}"')
# We keep the chain links as a dict and order them later since there is no
# guarantee that our dataframe rows are ordered.
cname_dict[row.query_name][row.response_name] = row.cname_name
# Also need to create HostName nodes for these.
host_names.add(row.cname_name)

# Assemble chains.
cname_chains = defaultdict(dict)
for query_type, chain_dict in cnames.items():
for query_name, chain_links in chain_dict.items():
cname_chains[query_type][query_name] = self.assemble_cname_chain(query_name, chain_links)

# Get/create all nodes:
domain_id = self.iyp.batch_get_nodes_by_single_prop('DomainName', 'name', domain_names, all=False)
host_id = self.iyp.batch_get_nodes_by_single_prop('HostName', 'name', host_names, all=False)
ns_id = self.iyp.batch_get_nodes_by_single_prop('HostName', 'name', name_servers, all=False)
self.iyp.batch_add_node_label(list(ns_id.values()), 'AuthoritativeNameServer')
ip4_id = self.iyp.batch_get_nodes_by_single_prop('IP', 'ip', set(
df[df.ip4_address.notnull()]['ip4_address']), all=False)
ip4_id = self.iyp.batch_get_nodes_by_single_prop('IP', 'ip',
set(df[df.ip4_address.notnull()]['ip4_address']),
all=False)
ip6_id = self.iyp.batch_get_nodes_by_single_prop('IP', 'ip', ipv6_addresses, all=False)

logging.info(f'Got {len(domain_id)} domains, {len(ns_id)} nameservers, {len(host_id)} hosts, '
f'{len(ip4_id)} IPv4, {len(ip6_id)} IPv6')

# Compute links
res_links = []
mng_links = []
partof_links = []
res_links = list()
mng_links = list()
partof_links = list()
aliasof_links = list()
unique_alias = set()
unique_res = set()

# RESOLVES_TO and MANAGED_BY links
for row in df.itertuples():
Expand All @@ -215,7 +312,24 @@ def run(self):
elif row.response_type == 'A' and row.ip4_address:
host_qid = host_id[row.query_name]
ip_qid = ip4_id[row.ip4_address]
res_links.append({'src_id': host_qid, 'dst_id': ip_qid, 'props': [self.reference]})
if (host_qid, ip_qid) not in unique_res:
res_links.append({'src_id': host_qid, 'dst_id': ip_qid, 'props': [self.reference]})
unique_res.add((host_qid, ip_qid))
if row.query_name != row.response_name:
# CNAME result
for left, right in pairwise(cname_chains[row.response_type][row.query_name]):
left_host_qid = host_id[left]
right_host_qid = host_id[right]
# First left is the initial hostname that is already added
# above.
if (right_host_qid, ip_qid) not in unique_res:
res_links.append({'src_id': right_host_qid, 'dst_id': ip_qid, 'props': [self.reference]})
unique_res.add((right_host_qid, ip_qid))
if (left_host_qid, right_host_qid) not in unique_alias:
aliasof_links.append({'src_id': left_host_qid,
'dst_id': right_host_qid,
'props': [self.reference]})
unique_alias.add((left_host_qid, right_host_qid))

# AAAA Record
elif row.response_type == 'AAAA' and row.ip6_address:
Expand All @@ -226,7 +340,24 @@ def run(self):
continue
host_qid = host_id[row.query_name]
ip_qid = ip6_id[ip_normalized]
res_links.append({'src_id': host_qid, 'dst_id': ip_qid, 'props': [self.reference]})
if (host_qid, ip_qid) not in unique_res:
res_links.append({'src_id': host_qid, 'dst_id': ip_qid, 'props': [self.reference]})
unique_res.add((host_qid, ip_qid))
if row.query_name != row.response_name:
# CNAME result
for left, right in pairwise(cname_chains[row.response_type][row.query_name]):
left_host_qid = host_id[left]
right_host_qid = host_id[right]
# First left is the initial hostname that is already added
# above.
if (right_host_qid, ip_qid) not in unique_res:
res_links.append({'src_id': right_host_qid, 'dst_id': ip_qid, 'props': [self.reference]})
unique_res.add((right_host_qid, ip_qid))
if (left_host_qid, right_host_qid) not in unique_alias:
aliasof_links.append({'src_id': left_host_qid,
'dst_id': right_host_qid,
'props': [self.reference]})
unique_alias.add((left_host_qid, right_host_qid))

# PART_OF links between HostNames and DomainNames
for hd in host_names.intersection(domain_names):
Expand All @@ -238,12 +369,13 @@ def run(self):
self.iyp.batch_add_links('RESOLVES_TO', res_links)
self.iyp.batch_add_links('MANAGED_BY', mng_links)
self.iyp.batch_add_links('PART_OF', partof_links)
self.iyp.batch_add_links('ALIAS_OF', aliasof_links)

def unit_test(self):
# infra_ns only has RESOLVES_TO relationships.
# infra_ns only has RESOLVES_TO and ALIAS_OF relationships.
if self.reference['reference_name'] == 'openintel.infra_ns':
return super().unit_test(['RESOLVES_TO'])
return super().unit_test(['RESOLVES_TO', 'MANAGED_BY', 'PART_OF'])
return super().unit_test(['RESOLVES_TO', 'ALIAS_OF'])
return super().unit_test(['RESOLVES_TO', 'MANAGED_BY', 'PART_OF', 'ALIAS_OF'])


class DnsgraphCrawler(BaseCrawler):
Expand Down

0 comments on commit f91c981

Please sign in to comment.