diff --git a/ACKNOWLEDGMENTS.md b/ACKNOWLEDGMENTS.md index cee4faf..9c93517 100644 --- a/ACKNOWLEDGMENTS.md +++ b/ACKNOWLEDGMENTS.md @@ -111,6 +111,15 @@ We use the [extended allocation and assignment reports](https://www.nro.net/about/rirs/statistics/) provided by the [Number Resource Organization](https://www.nro.net/). +## Open Observatory of Network Interference + +We use [Internet censorship measurements](https://explorer.ooni.org/) provided by the +[Open Observatory of Network Interference](https://ooni.org/). + +This data is licensed under [CC BY-NC-SA +4.0](https://creativecommons.org/licenses/by-nc-sa/4.0/). The data is aggregated for +display in the graph. + ## OpenINTEL We use several datasets from [OpenINTEL](https://www.openintel.nl/), a joint project of diff --git a/documentation/data-sources.md b/documentation/data-sources.md index 8e8c889..8a7dfea 100644 --- a/documentation/data-sources.md +++ b/documentation/data-sources.md @@ -22,11 +22,11 @@ | Cisco | Umbrella Popularity List | https://s3-us-west-1.amazonaws.com/umbrella-static/index.html | | Citizen Lab | URL testing lists | https://github.com/citizenlab/test-lists | | Cloudflare | Cloudflare Radar API endpoints radar/dns/top/ases, radar/dns/top/locations, radar/ranking/top, radar/datasets | https://radar.cloudflare.com | -| | | | Emile Aben | AS names | https://github.com/emileaben/asnames | | IHR | Country Dependency, AS Hegemony, ROV | https://ihr.iijlab.net | | Internet Intelligence Lab | AS to Organization Mapping | https://github.com/InetIntel/Dataset-AS-to-Organization-Mapping | | NRO | Extended allocation and assignment reports | https://www.nro.net/about/rirs/statistics | +| OONI | Internet censorship measurements | https://ooni.org/ | | OpenINTEL | tranco1m, umbrella1m, ns | https://data.openintel.nl/data | | | DNS Dependency Graph | https://dnsgraph.dacs.utwente.nl | | Packet Clearing House | Daily routing snapshots | https://www.pch.net/resources/Routing_Data | diff --git a/documentation/node-types.md b/documentation/node-types.md index 84cd037..1306c57 100644 --- a/documentation/node-types.md +++ b/documentation/node-types.md @@ -25,6 +25,7 @@ | PeeringdbOrgID | Unique identifier for an Organization as assigned by PeeringDB. | | Prefix | An IPv4 or IPv6 prefix uniquely identified by the **prefix** property. The **af** property (address family) provides the IP version of the prefix.| | Ranking | Represent a specific ranking of Internet resources (e.g. CAIDA's ASRank or Tranco ranking). The rank value for each resource is given by the RANK relationship.| +| Resolver | An additional label added to IP nodes if they are a DNS resolver. | | Tag | The output of a classification. A tag can be the result of a manual or automated classification. Uniquely identified by the **label** property.| | URL | The full URL for an Internet resource, uniquely identified by the **url** property. | diff --git a/documentation/relationship-types.md b/documentation/relationship-types.md index e86e4a9..8567389 100644 --- a/documentation/relationship-types.md +++ b/documentation/relationship-types.md @@ -8,6 +8,7 @@ | ASSIGNED | Represent the allocation by a RIR of a network resource (AS, Prefix) to a resource holder (see OpaqueID). Or represent the assigned IP address of an AtlasProbe. | | AVAILABLE | Relate ASes and Prefixes to RIRs (in the form of an OpaqueID) meaning that the resource is not allocated and available at the related RIR. | | CATEGORIZED | Relate a network resource (AS, Prefix, URL) to a Tag, meaning that the resource has been classified accordingly to the Tag. The **reference_name** property provide the name of the original dataset/classifier. | +| CENSORED | Relate ASes to an OONI censorship test in form of a Tag or a network resource (IP, URL), meaning that there exists a censorship test result from a probe in this AS to the connected node. The **reference_name** property provides the name of the test.| | COUNTRY | Relate any node to its corresponding country. This relation may have different meaning depending on the original dataset (e.g. geo-location or registration). | | DEPENDS_ON | Relate an AS or Prefix to an AS, meaning the reachability of the AS/Prefix depends on a certain AS. | | EXTERNAL_ID | Relate a node to an identifier commonly used by an organization. For example, PeeringDB assigns unique identifiers to IXPs (see PeeringdbIXID). | diff --git a/iyp/crawlers/ooni/README.md b/iyp/crawlers/ooni/README.md index 701422f..eae9bd7 100644 --- a/iyp/crawlers/ooni/README.md +++ b/iyp/crawlers/ooni/README.md @@ -1,73 +1,301 @@ -# IYP OONI Implementation Tracker -This Crawler pulls the censorship data provided by the [Open -Observatory of Network Interference (OONI)](https://ooni.org/) into -the IYP. OONI runs a number of tests on devices provided by -volunteers, each test has their own crawler and they are specified -below. - -As for the implementation: - -The OoniCrawler baseclass, which extends the BaseCrawler, is defined -in the init.py. Each crawler then extends the base class with their -unique attributes. Common among all crawlers are the attributes reference, repo, -dataset, all_asns, all_countries, all_results, all_percentages, -all_dns_resolvers and unique_links. - -- reference and repo are set to OONI to identify the crawler. -- dataset needs to be set to the dataset that specific crawler is pulling, e.g. whatsapp. -- all_asns tracks all asns in the dataset and is added to by the - process_one_line() function -- all_countries tracks all countries in the dataset and is added to by the -process_one_line() function -- all_results contains all results the process_one_line() function - produces, but as there are crawler-specific attributes, the - process_one_line() function is extended in each crawler and also - modifies this variable. To do that, we first run the base function - and then acess the last result in the extended crawler class. - Therefore, if we choose not to proceed with a given result in the - process_one_line() class for any reason, e.g. invalid parameters, - one has to be careful to pop() the last result in all_results or it - will contain an invalid result. -- all_percentages is calculated by each crawler-specific - calculate_percentages() function, which highly depend on the OONI - test implementation. See each tests' github page for that - implementation. -- all_dns_resolvers is handled in the base OoniCrawler class to track - dns resolvers and add them to the IYP. No changes need be made in - extended crawlers. -- unique_links is a dictionary of currently the following sets: - 'COUNTRY': set(), - 'CENSORED': set(), - 'RESOLVES_TO': set(), - 'PART_OF': set(), - 'CATEGORIZED': set(), - if you are adding a new link, make sure to add it to this - dictionary. This is done to prevent link duplication stemming from - the same crawler, e.g. if multiple result files add the same PART_OF - relationship, the link would be duplicated if we do not track - existing links. Whenever you create a link in the extended - batch_add_to_iyp() class, make sure you add it to the corresponding - unique_links set, and before you create a link, check the set for - the existence of the link. - -Functions: - -- Each run starts by calling the download_and_extract() function of the grabber class. - This function is shared amongst all OONI crawlers, and takes the - repo, a directory and the dataset as the input. If implementing a - new crawler, only set the dataset correctly to the same name OONI - uses and you do not need to interact with this class. -- Then, each line in the downloaded and extracted results files is - processed in process_one_line(). This needs to be done in both the - base and the extended class, as there are test specific attributes - the extended class needs to process. See above, all_results(), and - comments in the init.py code for implementation specifics. -- calculate_percentages() calculates the link percentages based on - test-specific attributes. This is entirely done in the extended - crawler and needs to be implemented by you if you're adding a new - crawler. -- Finally, batch_add_to_iyp() is called to add the results to the IYP. +# OONI -- https://ooni.org/ +The [Open Observatory of Network Interference](https://ooni.org/) (OONI) is a non-profit +free software project that aims to empower decentralized efforts in documenting internet +censorship around the world. OONI runs a number of tests from devices provided by +volunteers, and we import a subset of these into IYP. + +Since most of these crawlers create the same graph representation, we first briefly +describe the function of all tests and link to their detailed test specification. Then +we give one combined description of the graph representation at the end. + +## Crawlers + +### Facebook Messenger (facebookmessenger.py) + +Specification: +[ts-019-facebook-messenger.md](https://github.com/ooni/spec/blob/master/nettests/ts-019-facebook-messenger.md) + +This test verifies if a set of Facebook Messenger endpoints resolve to consistent IPs +and if it is possible to establish a TCP connection to them on port 443. + +### Header Field Manipulation Test (httpheaderfieldmanipulation.py) + +Specification: +[ts-006-header-field-manipulation.md](https://github.com/ooni/spec/blob/master/nettests/ts-006-header-field-manipulation.md) + +This test performs HTTP requests with request headers that vary capitalization towards a +backend. If the headers reported by the server differ from the ones that were sent, then +tampering is detected. + +### Signal (osignal.py) + +Specification: +[ts-029-signal.md](https://github.com/ooni/spec/blob/master/nettests/ts-029-signal.md) + +This test checks if it is possible to establish a TLS connection with the Signal server +backend and perform an HTTP GET request. + +### Psiphon (psiphon.py) + +Specification: +[ts-015-psiphon.md](https://github.com/ooni/spec/blob/master/nettests/ts-015-psiphon.md) + +This test creates a Psiphon tunnel and then uses it to fetch the +https://www.google.com/humans.txt webpage. + +### RiseupVPN (riseupvpn.py) + +Specification: +[ts-026-riseupvpn.md](https://github.com/ooni/spec/blob/master/nettests/ts-026-riseupvpn.md) + +This test checks if a LEAP-platform-based VPN service like RiseupVPN is working as +expected. It first performs a HTTP GET request to the RiseupVPN API service, followed by +a TCP connection to the VPN gateways. + +### STUN reachability (stunreachability.py) + +Specification: +[ts-025-stun-reachability.md](https://github.com/ooni/spec/blob/master/nettests/ts-025-stun-reachability.md) + +For each STUN input URL, this test sends a binding request to the given URL's endpoint +and receives the corresponding response. If a valid response is received, then the test +is successful, otherwise it failed. + +### Telegram (telegram.py) + +Specification: +[ts-020-telegram.md](https://github.com/ooni/spec/blob/master/nettests/ts-020-telegram.md) + +This test checks if two services are working as they should: + +1. The Telegram access points (the addresses used by the Telegram desktop client) +1. The Telegram web version + +### Tor (tor.py) + +Specification: +[ts-023-tor.md](https://github.com/ooni/spec/blob/master/nettests/ts-023-tor.md) + +This test loops through the list of measurement targets. The measurement action depends +on the target type: + +- for dir_port targets, the test will GET the /tor/status-vote/current/consensus.z + resource using the HTTP protocol; +- for or_port and or_port_dirauth targets, the test will connect to the address and + perform a TLS handshake; +- for obfs4 targets, the test will connect to the address and perform an OBFS4 + handshake; +- otherwise, the test will TCP connect to the address. + +### Tor using snowflake (torsf.py) + +Specification: +[ts-030-torsf.md](https://github.com/ooni/spec/blob/master/nettests/ts-030-torsf.md) + +This test detects detect if tor bootstraps using the Snowflake pluggable transport +(PT) within a reasonable timeout. + +### Vanilla Tor (vanillator.py) + +Specification: +[ts-016-vanilla-tor.md](https://github.com/ooni/spec/blob/master/nettests/ts-016-vanilla-tor.md) + +This test runs the Tor executable and collect logs. The bootstrap will either succeed +or eventually time out. + +### Web Connectivity (webconnectivity.py) + +Specification: +[ts-017-web-connectivity.md](https://github.com/ooni/spec/blob/master/nettests/ts-017-web-connectivity.md) + +This test checks if a website is censored using a sequence of steps. For more details, +please check the specification. + +### WhatsApp (whatsapp.py) + +Specification: +[ts-018-whatsapp.md](https://github.com/ooni/spec/blob/master/nettests/ts-018-whatsapp.md) + +This test checks if three services are working as they should: + +1. The WhatsApp endpoints used by the WhatsApp mobile app; +1. The registration service, i.e. the service used to register a new account; +1. The WhatsApp web interface. + +## Graph Representation + +All crawlers create `CENSORED` relationships from `AS` nodes to either a `Tag`, `URL` or +`IP` node, indicating that there exists a censorship test result from at least one probe +in this AS. + +We aggregate test results on an AS-country basis, i.e., if an AS contains probes from +multiple countries, we create one `CENSORED` relationship per country. This results in +multiple `CENSORED` relationships between the same AS and target, which can be +distinguished by using the `country_code` property of the relationship. + +The result categories differ per test and are described in more detail below. However +all relationships contain the following two properties: + +- `total_count`: The total number of aggregated test results. Note that this is + different from the `count_total` field present for some crawlers. +- `country_code`: The country code of the country for which results were aggregated + +For each result category we create two properties: + +- `count_*`: The number of results in this category +- `percentage_*`: The relative size in percent of this category + +For many tests the result is derived from a combination of fields. In order to aggregate +the results we group them into categories and chose a name that should be recognizable +when looking at the OONI documentation as well. + +### `(:AS)-[:CENSORED]->(:Tag)` Crawlers + +As mentioned above most crawlers create `(:AS)-[:CENSORED]->(:Tag)` relationships. The +`Tag` node represents a specific OONI test (e.g., +[WhatsApp](https://github.com/ooni/spec/blob/master/nettests/ts-018-whatsapp.md)) and +the `CENSORED` relationship represents aggregated results. For brevity we only discuss +the result categories for each crawler here. + +If a result category is binary, it has a counterpart prefixed with `no_*` indicating a +negative result. + +#### facebookmessenger.py + +- `unblocked`: No blocking +- `dns_blocking`: Endpoints are DNS blocked +- `tcp_blocking`: Endpoints are TCP blocked +- `both_blocked`: Endpoints are blocked via both DNS & TCP + +#### httpheaderfieldmanipulation.py + +This test performs multiple measurements at once, which is why we introduce a meta +category. + +- `[no_]total`: Meta category indicating that any of the following results was positive +- `[no_]request_line_capitalization`: Request line was manipulated +- `[no_]header_name_capitalization`: Header field names were manipulated +- `[no_]header_field_value`: Header field values were manipulated +- `[no_]header_field_number`: Number of headers was manipulated + +#### httpinvalidrequestline.py + +- `[no_]tampering`: Tampering detected + +#### osignal.py + +- `ok`: Connection succeeded +- `blocked`: Connection failed + +#### psiphon.py + +- `bootstrapping_error`: Error in bootstrapping Psiphon +- `usage_error`: Error in using Psiphon +- `working`: Bootstrap worked +- `invalid`: Invalid (should not happen) + +#### riseupvpn.py + +- `ok`: VPN API is functional and reachable +- `failure`: Connection to VPN API failed + +#### telegram.py + +- `total_[ok|blocked]`: Meta category indicating that any of the following results was + blocked (`total_blocked`) or all are ok (`total_ok`) +- `web_[ok|blocked|none]`: Telegram web version is blocked. `web_none` should not really + happen but is kept for completeness. +- `http_[ok|blocked]`: Telegram access point blocked at HTTP level +- `tcp_[ok|blocked]`: Telegram access point blocked at TCP level + +#### torsf.py + +- `ok`: Bootstrap succeeded +- `failure`: Bootstrap failed + +#### vanillator.py + +- `ok`: Bootstrap succeeded +- `failure`: Bootstrap failed + +#### whatsapp.py + +- `total_[ok|blocked]`: Meta category indicating that any of the following results was + blocked (`total_blocked`) or all are ok (`total_ok`) +- `endpoint_[ok|blocked]`: Failed to connect to any endpoint +- `registration_server_[ok|blocked]`: Cannot connect to registration service +- `web_[ok|blocked]`: WhatsApp web is blocked + +### stunreachability.py + +This crawler connects `AS` with `URL` nodes and also adds hostnames and the IPs they +resolve to for the URL if available. The URL will be connected to the hostname by the +[`url2hostname`](../../post/url2hostname.py) postprocessing script. + +```Cypher +(:AS {asn: 2497})-[:CENSORED {country_code: 'JP'}]->(:URL {url: 'stun://stun.l.google.com:19302'}) +(:HostName {name: 'stun.l.google.com'})-[:RESOLVES_TO]->(:IP {ip: '198.18.5.122'}) +``` + +Result categories: + +- `ok`: STUN is working +- `failure`: STUN is not working + +### tor.py + +This crawler connects `AS` with `IP` nodes and tags IPs as Tor directories or bridges. + +```Cypher +(:AS {asn: 2497})-[:CENSORED {country_code: 'JP'}]->(:IP {ip: '192.95.36.142'})-[:CATEGORIZED]->(:Tag {label: 'OONI Probe Tor Tag obfs4'}) +``` + +Result categories: + +- `ok`: Target reachable +- `failure`: Target not reachable + +Tag names: + +- `OONI Probe Tor Tag dir_port` +- `OONI Probe Tor Tag obfs4` +- `OONI Probe Tor Tag or_port` +- `OONI Probe Tor Tag or_port_dirauth` + +### webconnectivity.py + +This crawler connects `AS` with `URL` nodes and also adds hostnames and the IPs they +resolve to for the URL if available. The URL will be connected to the hostname by the +[`url2hostname`](../../post/url2hostname.py) postprocessing script. + +Since this test sometimes targets URLs which contain an IP instead of a normal hostname, +it also adds a `PART_OF` relationship between `IP` and `URL` nodes in rare cases. + +```Cypher +(:AS {asn: 2497})-[:CENSORED {country_code: 'JP'}]->(:URL {url: 'https://www.reddit.com/'}) +(:HostName {name: 'www.reddit.com'})-[:RESOLVES_TO]->(:IP {ip: '199.232.73.140'}) +(:IP {ip: '180.215.14.121'})-[:PART_OF]->(:URL {url: 'http://180.215.14.121/'}) +``` + +Result categories + +- `ok`: Website reachable +- `confirmed`: Confirmed censorship by some form of blocking +- `failure`: Failed to reach website, but could be caused by normal connectivity issues +- `anomaly`: Default if no other case matches + +The webconnectivity crawler is also responsible for adding AS-to-country mapping and +`Resolver` nodes to the graph. Since this information is based on probes it does make +sense to add it from multiple crawlers. In addition, the webconnectivity test is +excecuted the most. + +```Cypher +(:AS {asn: 2497})-[:COUNTRY {reference_name: 'ooni.webconnectivity'}]->(:Country) +(:IP & Resolver {ip: '210.138.77.93'}) +``` + +## Implemented Tests | Test Name | Implementation Tracker | GitHub URL | |------------------------------------------|------------------------------|---------------------------------------------------------------------------------------------------------------| diff --git a/iyp/crawlers/ooni/__init__.py b/iyp/crawlers/ooni/__init__.py index bc1f3dc..dbaa8a7 100644 --- a/iyp/crawlers/ooni/__init__.py +++ b/iyp/crawlers/ooni/__init__.py @@ -16,10 +16,11 @@ def __init__(self, organization, url, name, dataset): self.reference['reference_url_info'] = 'https://ooni.org/post/mining-ooni-data' self.repo = 'ooni-data-eu-fra' self.dataset = dataset + self.categories = list() self.all_asns = set() self.all_countries = set() self.all_results = list() - self.all_percentages = list() + self.all_percentages = dict() self.all_dns_resolvers = set() self.unique_links = { 'COUNTRY': set(), @@ -50,52 +51,65 @@ def run(self): data = json.loads(line) self.process_one_line(data) logging.info('Calculating percentages...') - self.calculate_percentages() + self.aggregate_results() logging.info('Adding entries to IYP...') self.batch_add_to_iyp() logging.info('Done.') def process_one_line(self, one_line): - """Process a single line from the jsonl file and store the results locally.""" + """Process a single line from the jsonl file and store the results locally. + + Return True if an error occurred and no result was added, i.e., the extended + class should not continue to process this line. + """ + + # No test result. Can happen sometimes. + if not one_line.get('test_keys'): + return True # Extract the ASN, throw an exception if malformed - probe_asn = ( - int(one_line['probe_asn'][2:]) - if one_line.get('probe_asn', '').startswith('AS') - else (_ for _ in ()).throw(Exception('Invalid ASN')) - ) + try: + probe_asn = int(one_line['probe_asn'].removeprefix('AS')) + except ValueError as e: + logging.error(f'Invalid probe ASN: {one_line["probe_asn"]}') + raise e # Add the DNS resolver to the set, unless its not a valid IP address try: - self.all_dns_resolvers.add( - ipaddress.ip_address(one_line.get('resolver_ip')) - ) + resolver_ip = ipaddress.ip_address(one_line.get('resolver_ip')) + if resolver_ip.is_global: + self.all_dns_resolvers.add(resolver_ip.compressed) except ValueError: pass + probe_cc = one_line.get('probe_cc') - # Append the results to the list + if probe_asn == 0: + # Ignore result if probe ASN is hidden. + return True + self.all_asns.add(probe_asn) - self.all_countries.add(probe_cc) - self.all_results.append((probe_asn, probe_cc, None, None)) + if probe_cc != 'ZZ': + # Do not create country nodes for ZZ country. + self.all_countries.add(probe_cc) + + # Append the results to the list. + self.all_results.append((probe_asn, probe_cc)) """The base function adds a skeleton to the all_results list, which includes the - probe_asn and the probe_cc, as well as 2 dummy entries. - - Each extended crawler then modifies this entry - by calling self.all_results[-1][:2] to access the latest entry - in the all_list and modify the non-populated variables. Adding - further variables (e.g. more than 4) is also possible, as well - as adding less, in that case only modify variable 3. - Attention: if you are discarding a result in the extended - class, you need to make sure you specifically pop() the entry - created here, in the base class, or you WILL end up with - misformed entries that only contain the probe_asn and + probe_asn and the probe_cc. + + Each extended crawler then modifies this entry by calling self.all_results[-1] + to access the last result and add its specific variables. + Attention: if you are discarding a result in the extended class, you need to + make sure you specifically pop() the entry created here, in the base class, or + you WILL end up with misformed entries that only contain the probe_asn and probe_cc, and mess up your data. """ + return False def batch_add_to_iyp(self): """Add the results to the IYP.""" - country_links = [] + country_links = list() # First, add the nodes and store their IDs directly as returned dictionaries self.node_ids = { @@ -109,13 +123,15 @@ def batch_add_to_iyp(self): 'IP', 'ip', self.all_dns_resolvers, all=False ), } - # to avoid duplication of country links, we only add them from + # To avoid duplication of country links, we only add them from # the webconnectivity dataset if self.dataset == 'webconnectivity': for entry in self.all_results: asn, country = entry[:2] - asn_id = self.node_ids['asn'].get(asn) - country_id = self.node_ids['country'].get(country) + if country == 'ZZ': + continue + asn_id = self.node_ids['asn'][asn] + country_id = self.node_ids['country'][country] # Check if the COUNTRY link is unique if (asn_id, country_id) not in self.unique_links['COUNTRY']: @@ -133,3 +149,60 @@ def batch_add_to_iyp(self): self.iyp.batch_add_node_label( list(self.node_ids['dns_resolver'].values()), 'Resolver' ) + + def aggregate_results(self): + """Populate the self.all_percentages dict by aggregating results and calculating + percentages.""" + raise NotImplementedError() + + def make_result_dict(self, counts: dict, total_count: int = None): + """Create a result dict containing the counts, total count, and percentages. + + Ensure that entries for all categories defined in self.categories exist. If not + specified, total_count is the sum of all counts. + """ + if total_count is None: + total_count = sum(counts.values()) + + for category in self.categories: + # Ensure entry for each category exists. + counts[category] = counts.get(category, 0) + + percentages = { + category: ( + (counts[category] / total_count) * 100 if total_count > 0 else 0 + ) + for category in self.categories + } + + return { + 'total_count': total_count, + 'category_counts': dict(counts), + 'percentages': percentages, + } + + +def process_dns_queries(queries_list: list): + host_ip_set = set() + if not queries_list: + return host_ip_set + for query in queries_list: + if query['query_type'] not in {'A', 'AAAA'} or query['failure']: + continue + hostname = query['hostname'] + for answer in query['answers']: + try: + if answer['answer_type'] == 'A': + ip = ipaddress.ip_address(answer['ipv4']) + elif answer['answer_type'] == 'AAAA': + ip = ipaddress.ip_address(answer['ipv6']) + else: + # CNAME etc. + continue + except ValueError: + # In rare cases the answer IP is scrubbed and thus invalid. + continue + if not ip.is_global: + continue + host_ip_set.add((hostname, ip.compressed)) + return host_ip_set diff --git a/iyp/crawlers/ooni/facebookmessenger.py b/iyp/crawlers/ooni/facebookmessenger.py index c890149..3afdaba 100644 --- a/iyp/crawlers/ooni/facebookmessenger.py +++ b/iyp/crawlers/ooni/facebookmessenger.py @@ -17,74 +17,46 @@ class Crawler(OoniCrawler): def __init__(self, organization, url, name): super().__init__(organization, url, name, 'facebookmessenger') + self.categories = ['unblocked', 'dns_blocking', 'tcp_blocking', 'both_blocked'] # Process a single line from the jsonl file and store the results locally def process_one_line(self, one_line): - super().process_one_line(one_line) - result_dns = one_line.get('test_keys', {}).get('facebook_dns_blocking') - result_tcp = one_line.get('test_keys', {}).get('facebook_tcp_blocking') + if super().process_one_line(one_line): + return + result_dns = one_line['test_keys']['facebook_dns_blocking'] + result_tcp = one_line['test_keys']['facebook_tcp_blocking'] + if result_dns is None or result_tcp is None: + self.all_results.pop() + return # Using the last result from the base class, add our unique variables - self.all_results[-1] = self.all_results[-1][:2] + (result_dns, result_tcp) + self.all_results[-1] = self.all_results[-1] + (result_dns, result_tcp) def batch_add_to_iyp(self): super().batch_add_to_iyp() facebookmessenger_id = self.iyp.get_node('Tag', {'label': label}, create=True) - censored_links = [] - - # Accumulate properties for each ASN-country pair - link_properties = defaultdict(lambda: defaultdict(int)) - - for asn, country, result_dns, result_tcp in self.all_results: - asn_id = self.node_ids['asn'].get(asn) - country_id = self.node_ids['country'].get(country) - - if asn_id and country_id: - props = self.reference.copy() - if (asn, country) in self.all_percentages: - percentages = self.all_percentages[(asn, country)].get( - 'percentages', {} - ) - counts = self.all_percentages[(asn, country)].get( - 'category_counts', {} - ) - total_count = self.all_percentages[(asn, country)].get( - 'total_count', 0 - ) - - for category in [ - 'unblocked', - 'dns_blocking', - 'tcp_blocking', - 'both_blocked', - ]: - props[f'percentage_{category}'] = percentages.get(category, 0) - props[f'count_{category}'] = counts.get(category, 0) - props['total_count'] = total_count - - # Accumulate properties - link_properties[(asn_id, facebookmessenger_id)] = props - - # Create links only once per ASN-country pair - for (asn_id, facebookmessenger_id), props in link_properties.items(): - if (asn_id, facebookmessenger_id) not in self.unique_links['CENSORED']: - self.unique_links['CENSORED'].add((asn_id, facebookmessenger_id)) - censored_links.append( - {'src_id': asn_id, 'dst_id': facebookmessenger_id, 'props': [props]} - ) - - # Batch add the links (this is faster than adding them one by one) + censored_links = list() + + # Create one link per ASN-country pair. + for (asn, country), result_dict in self.all_percentages.items(): + asn_id = self.node_ids['asn'][asn] + props = dict() + for category in self.categories: + props[f'percentage_{category}'] = result_dict['percentages'][category] + props[f'count_{category}'] = result_dict['category_counts'][category] + props['total_count'] = result_dict['total_count'] + props['country_code'] = country + censored_links.append( + {'src_id': asn_id, 'dst_id': facebookmessenger_id, 'props': [props, self.reference]} + ) + self.iyp.batch_add_links('CENSORED', censored_links) - def calculate_percentages(self): + def aggregate_results(self): target_dict = defaultdict(lambda: defaultdict(int)) - # Initialize counts for all categories - categories = ['unblocked', 'dns_blocking', 'tcp_blocking', 'both_blocked'] - - # Populate the target_dict with counts for entry in self.all_results: asn, country, result_dns, result_tcp = entry if not result_dns and not result_tcp: @@ -96,26 +68,8 @@ def calculate_percentages(self): elif result_dns and result_tcp: target_dict[(asn, country)]['both_blocked'] += 1 - self.all_percentages = {} - for (asn, country), counts in target_dict.items(): - total_count = sum(counts.values()) - for category in categories: - counts[category] = counts.get(category, 0) - - percentages = { - category: ( - (counts[category] / total_count) * 100 if total_count > 0 else 0 - ) - for category in categories - } - - result_dict = { - 'total_count': total_count, - 'category_counts': dict(counts), - 'percentages': percentages, - } - self.all_percentages[(asn, country)] = result_dict + self.all_percentages[(asn, country)] = self.make_result_dict(counts) def unit_test(self): return super().unit_test(['CENSORED']) diff --git a/iyp/crawlers/ooni/httpheaderfieldmanipulation.py b/iyp/crawlers/ooni/httpheaderfieldmanipulation.py index 4640aa7..c7f79e9 100644 --- a/iyp/crawlers/ooni/httpheaderfieldmanipulation.py +++ b/iyp/crawlers/ooni/httpheaderfieldmanipulation.py @@ -17,37 +17,53 @@ class Crawler(OoniCrawler): def __init__(self, organization, url, name): super().__init__(organization, url, name, 'httpheaderfieldmanipulation') + self.categories = [ + 'total', + 'no_total', + 'request_line_capitalization', + 'no_request_line_capitalization', + 'header_name_capitalization', + 'no_header_name_capitalization', + 'header_field_value', + 'no_header_field_value', + 'header_field_number', + 'no_header_field_number', + ] def process_one_line(self, one_line): """Process a single line from the jsonl file and store the results locally.""" - super().process_one_line(one_line) + if super().process_one_line(one_line): + return - test_keys = one_line.get('test_keys', {}).get('tampering', {}) + test_keys = one_line['test_keys']['tampering'] - total = 'total' if test_keys.get('total', False) else 'no_total' + # "total" is true if an invalid response was received from the backend server, + # i.e., any tampering occurred, but also if there was no valid response. + # In this case, the individual fields are all false, but total is true, which + # can seem confusing. + total = 'total' if test_keys['total'] else 'no_total' request_line_capitalization = ( 'request_line_capitalization' - if test_keys.get('request_line_capitalization', False) - else 'no_request_line_capitalization' - ) + if test_keys['request_line_capitalization'] + else 'no_request_line_capitalization') header_name_capitalization = ( 'header_name_capitalization' - if test_keys.get('header_name_capitalization', False) + if test_keys['header_name_capitalization'] else 'no_header_name_capitalization' ) header_field_value = ( 'header_field_value' - if test_keys.get('header_field_value', False) + if test_keys['header_field_value'] else 'no_header_field_value' ) header_field_number = ( 'header_field_number' - if test_keys.get('header_field_number', False) + if test_keys['header_field_number'] else 'no_header_field_number' ) # Using the last result from the base class, add our unique variables - self.all_results[-1] = self.all_results[-1][:2] + ( + self.all_results[-1] = self.all_results[-1] + ( total, request_line_capitalization, header_name_capitalization, @@ -55,90 +71,31 @@ def process_one_line(self, one_line): header_field_number, ) - if len(self.all_results[-1]) != 7: - self.all_results.pop() - def batch_add_to_iyp(self): super().batch_add_to_iyp() httpheader_id = self.iyp.get_node('Tag', {'label': label}, create=True) - censored_links = [] - - # Accumulate properties for each ASN-country pair - link_properties = defaultdict(lambda: defaultdict(int)) + censored_links = list() + + # Create one link per ASN-country pair. + for (asn, country), result_dict in self.all_percentages.items(): + asn_id = self.node_ids['asn'][asn] + props = dict() + for category in self.categories: + props[f'percentage_{category}'] = result_dict['percentages'][category] + props[f'count_{category}'] = result_dict['category_counts'][category] + props['total_count'] = result_dict['total_count'] + props['country_code'] = country + censored_links.append( + {'src_id': asn_id, 'dst_id': httpheader_id, 'props': [props, self.reference]} + ) - for ( - asn, - country, - total, - request_line_capitalization, - header_name_capitalization, - header_field_value, - header_field_number, - ) in self.all_results: - asn_id = self.node_ids['asn'].get(asn) - country_id = self.node_ids['country'].get(country) - - if asn_id and country_id: - props = self.reference.copy() - if (asn, country) in self.all_percentages: - percentages = self.all_percentages[(asn, country)].get( - 'percentages', {} - ) - counts = self.all_percentages[(asn, country)].get( - 'category_counts', {} - ) - total_count = self.all_percentages[(asn, country)].get( - 'total_count', 0 - ) - - for category in [ - 'total', - 'no_total', - 'request_line_capitalization', - 'no_request_line_capitalization', - 'header_name_capitalization', - 'no_header_name_capitalization', - 'header_field_value', - 'no_header_field_value', - 'header_field_number', - 'no_header_field_number', - ]: - props[f'percentage_{category}'] = percentages.get(category, 0) - props[f'count_{category}'] = counts.get(category, 0) - props['total_count'] = total_count - - # Accumulate properties - link_properties[(asn_id, httpheader_id)] = props - - for (asn_id, httpheader_id), props in link_properties.items(): - if (asn_id, httpheader_id) not in self.unique_links['CENSORED']: - self.unique_links['CENSORED'].add((asn_id, httpheader_id)) - censored_links.append( - {'src_id': asn_id, 'dst_id': httpheader_id, 'props': [props]} - ) - - # Batch add the links (this is faster than adding them one by one) self.iyp.batch_add_links('CENSORED', censored_links) - def calculate_percentages(self): + def aggregate_results(self): target_dict = defaultdict(lambda: defaultdict(int)) - # Initialize counts for all categories - categories = [ - 'total', - 'no_total', - 'request_line_capitalization', - 'no_request_line_capitalization', - 'header_name_capitalization', - 'no_header_name_capitalization', - 'header_field_value', - 'no_header_field_value', - 'header_field_number', - 'no_header_field_number', - ] - # Populate the target_dict with counts for entry in self.all_results: ( @@ -156,26 +113,11 @@ def calculate_percentages(self): target_dict[(asn, country)][header_field_value] += 1 target_dict[(asn, country)][header_field_number] += 1 - self.all_percentages = {} - for (asn, country), counts in target_dict.items(): - total_count = sum(counts.values()) - for category in categories: - counts[category] = counts.get(category, 0) - - percentages = { - category: ( - (counts[category] / total_count) * 100 if total_count > 0 else 0 - ) - for category in categories - } - - result_dict = { - 'total_count': total_count, - 'category_counts': dict(counts), - 'percentages': percentages, - } - self.all_percentages[(asn, country)] = result_dict + # This test tests multiple things with one result, i.e., the categories are + # not disjunct so we have to use our own total count. + total_count = counts['total'] + counts['no_total'] + self.all_percentages[(asn, country)] = self.make_result_dict(counts, total_count) def unit_test(self): return super().unit_test(['CENSORED']) diff --git a/iyp/crawlers/ooni/httpinvalidrequestline.py b/iyp/crawlers/ooni/httpinvalidrequestline.py index 1c9dec8..7f2f229 100644 --- a/iyp/crawlers/ooni/httpinvalidrequestline.py +++ b/iyp/crawlers/ooni/httpinvalidrequestline.py @@ -17,99 +17,50 @@ class Crawler(OoniCrawler): def __init__(self, organization, url, name): super().__init__(organization, url, name, 'httpinvalidrequestline') + self.categories = ['tampering', 'no_tampering'] def process_one_line(self, one_line): """Process a single line from the jsonl file and store the results locally.""" - super().process_one_line(one_line) + if super().process_one_line(one_line): + return - tampering = one_line.get('test_keys', {}).get('tampering', False) + tampering = 'tampering' if one_line['test_keys']['tampering'] else 'no_tampering' # Using the last result from the base class, add our unique variables - self.all_results[-1] = self.all_results[-1][:2] + (tampering,) + self.all_results[-1] = self.all_results[-1] + (tampering,) def batch_add_to_iyp(self): super().batch_add_to_iyp() httpinvalidrequestline_id = self.iyp.get_node('Tag', {'label': label}, create=True) - censored_links = [] - - # Accumulate properties for each ASN-country pair - link_properties = defaultdict(lambda: defaultdict(int)) - - for asn, country, tampering in self.all_results: - asn_id = self.node_ids['asn'].get(asn) - country_id = self.node_ids['country'].get(country) - - if asn_id and country_id: - props = self.reference.copy() - if (asn, country) in self.all_percentages: - percentages = self.all_percentages[(asn, country)].get( - 'percentages', {} - ) - counts = self.all_percentages[(asn, country)].get( - 'category_counts', {} - ) - total_count = self.all_percentages[(asn, country)].get( - 'total_count', 0 - ) - props['percentage_no_tampering'] = percentages.get( - 'no_tampering', 0 - ) - props['count_no_tampering'] = counts.get('no_tampering', 0) - props['percentage_tampering'] = percentages.get('tampering', 0) - props['count_tampering'] = counts.get('tampering', 0) - props['total_count'] = total_count - - # Accumulate properties - link_properties[(asn_id, httpinvalidrequestline_id)] = props - - for (asn_id, httpinvalidrequestline_id), props in link_properties.items(): - if (asn_id, httpinvalidrequestline_id) not in self.unique_links['CENSORED']: - self.unique_links['CENSORED'].add((asn_id, httpinvalidrequestline_id)) - censored_links.append( - { - 'src_id': asn_id, - 'dst_id': httpinvalidrequestline_id, - 'props': [props], - } - ) - - # Batch add the links (this is faster than adding them one by one) + censored_links = list() + + # Create one link per ASN-country pair. + for (asn, country), result_dict in self.all_percentages.items(): + asn_id = self.node_ids['asn'][asn] + props = dict() + for category in self.categories: + props[f'percentage_{category}'] = result_dict['percentages'][category] + props[f'count_{category}'] = result_dict['category_counts'][category] + props['total_count'] = result_dict['total_count'] + props['country_code'] = country + censored_links.append( + {'src_id': asn_id, 'dst_id': httpinvalidrequestline_id, 'props': [props, self.reference]} + ) + self.iyp.batch_add_links('CENSORED', censored_links) - def calculate_percentages(self): + def aggregate_results(self): target_dict = defaultdict(lambda: defaultdict(int)) - # Initialize counts for all categories - categories = ['tampering', 'no_tampering'] - # Populate the target_dict with counts for entry in self.all_results: asn, country, tampering = entry - target_dict[(asn, country)]['tampering'] += 1 if tampering else 0 - target_dict[(asn, country)]['no_tampering'] += 1 if not tampering else 0 - - self.all_percentages = {} + target_dict[(asn, country)][tampering] += 1 for (asn, country), counts in target_dict.items(): - total_count = counts['tampering'] + counts['no_tampering'] - for category in categories: - counts[category] = counts.get(category, 0) - - percentages = { - category: ( - (counts[category] / total_count) * 100 if total_count > 0 else 0 - ) - for category in categories - } - - result_dict = { - 'total_count': total_count, - 'category_counts': dict(counts), - 'percentages': percentages, - } - self.all_percentages[(asn, country)] = result_dict + self.all_percentages[(asn, country)] = self.make_result_dict(counts) def unit_test(self): return super().unit_test(['CENSORED']) diff --git a/iyp/crawlers/ooni/osignal.py b/iyp/crawlers/ooni/osignal.py index 418efd4..eeb80c7 100644 --- a/iyp/crawlers/ooni/osignal.py +++ b/iyp/crawlers/ooni/osignal.py @@ -8,6 +8,8 @@ ORG = 'OONI' URL = 's3://ooni-data-eu-fra/raw/' +# This crawler is not called 'signal' to prevent name collision with Python's built-in +# module. NAME = 'ooni.osignal' label = 'OONI Signal Test' @@ -17,95 +19,52 @@ class Crawler(OoniCrawler): def __init__(self, organization, url, name): super().__init__(organization, url, name, 'signal') + self.categories = ['ok', 'blocked'] def process_one_line(self, one_line): """Process a single line from the jsonl file and store the results locally.""" - super().process_one_line(one_line) - - result = one_line.get('test_keys', {}).get('signal_backend_status', '').lower() - - # Normalize result to be either "OK" or "Failure" - result = 'OK' if result == 'ok' else 'Failure' + if super().process_one_line(one_line): + return + signal_backend_status = one_line['test_keys']['signal_backend_status'] + if signal_backend_status is None: + self.all_results.pop() + return # Using the last result from the base class, add our unique variables - self.all_results[-1] = self.all_results[-1][:2] + (result,) + self.all_results[-1] = self.all_results[-1] + (signal_backend_status,) def batch_add_to_iyp(self): super().batch_add_to_iyp() signal_id = self.iyp.get_node('Tag', {'label': label}, create=True) - censored_links = [] - - # Accumulate properties for each ASN-country pair - link_properties = defaultdict(lambda: defaultdict(int)) - - for asn, country, result in self.all_results: - asn_id = self.node_ids['asn'].get(asn) - country_id = self.node_ids['country'].get(country) - - if asn_id and country_id: - props = self.reference.copy() - if (asn, country) in self.all_percentages: - percentages = self.all_percentages[(asn, country)].get( - 'percentages', {} - ) - counts = self.all_percentages[(asn, country)].get( - 'category_counts', {} - ) - total_count = self.all_percentages[(asn, country)].get( - 'total_count', 0 - ) - - for category in ['OK', 'Failure']: - props[f'percentage_{category}'] = percentages.get(category, 0) - props[f'count_{category}'] = counts.get(category, 0) - props['total_count'] = total_count - - # Accumulate properties - link_properties[(asn_id, signal_id)] = props - - for (asn_id, signal_id), props in link_properties.items(): - if (asn_id, signal_id) not in self.unique_links['CENSORED']: - self.unique_links['CENSORED'].add((asn_id, signal_id)) - censored_links.append( - {'src_id': asn_id, 'dst_id': signal_id, 'props': [props]} - ) - - # Batch add the links (this is faster than adding them one by one) + censored_links = list() + + # Create one link per ASN-country pair. + for (asn, country), result_dict in self.all_percentages.items(): + asn_id = self.node_ids['asn'][asn] + props = dict() + for category in self.categories: + props[f'percentage_{category}'] = result_dict['percentages'][category] + props[f'count_{category}'] = result_dict['category_counts'][category] + props['total_count'] = result_dict['total_count'] + props['country_code'] = country + censored_links.append( + {'src_id': asn_id, 'dst_id': signal_id, 'props': [props, self.reference]} + ) + self.iyp.batch_add_links('CENSORED', censored_links) - def calculate_percentages(self): + def aggregate_results(self): target_dict = defaultdict(lambda: defaultdict(int)) - # Initialize counts for all categories - categories = ['OK', 'Failure'] - # Populate the target_dict with counts for entry in self.all_results: asn, country, result = entry target_dict[(asn, country)][result] += 1 - self.all_percentages = {} - for (asn, country), counts in target_dict.items(): - total_count = sum(counts.values()) - for category in categories: - counts[category] = counts.get(category, 0) - - percentages = { - category: ( - (counts[category] / total_count) * 100 if total_count > 0 else 0 - ) - for category in categories - } - - result_dict = { - 'total_count': total_count, - 'category_counts': dict(counts), - 'percentages': percentages, - } - self.all_percentages[(asn, country)] = result_dict + self.all_percentages[(asn, country)] = self.make_result_dict(counts) def unit_test(self): return super().unit_test(['CENSORED']) diff --git a/iyp/crawlers/ooni/psiphon.py b/iyp/crawlers/ooni/psiphon.py index 265ba7f..0a20fb2 100644 --- a/iyp/crawlers/ooni/psiphon.py +++ b/iyp/crawlers/ooni/psiphon.py @@ -17,14 +17,24 @@ class Crawler(OoniCrawler): def __init__(self, organization, url, name): super().__init__(organization, url, name, 'psiphon') + self.categories = [ + 'bootstrapping_error', + 'usage_error', + 'working', + 'invalid', + ] def process_one_line(self, one_line): """Process a single line from the jsonl file and store the results locally.""" - super().process_one_line(one_line) + if super().process_one_line(one_line): + return + + if 'bootstrap_time' not in one_line['test_keys']: + self.all_results.pop() + return - test_keys = one_line.get('test_keys', {}) - bootstrap_time = test_keys.get('bootstrap_time', 0) - failure = test_keys.get('failure') + bootstrap_time = one_line['test_keys']['bootstrap_time'] + failure = one_line['test_keys']['failure'] if bootstrap_time == 0 and failure is not None: result = 'bootstrapping_error' @@ -36,94 +46,40 @@ def process_one_line(self, one_line): result = 'invalid' # Using the last result from the base class, add our unique variables - self.all_results[-1] = self.all_results[-1][:2] + (result,) + self.all_results[-1] = self.all_results[-1] + (result,) def batch_add_to_iyp(self): super().batch_add_to_iyp() psiphon_id = self.iyp.get_node('Tag', {'label': label}, create=True) - censored_links = [] - - # Accumulate properties for each ASN-country pair - link_properties = defaultdict(lambda: defaultdict(int)) - - for asn, country, result in self.all_results: - asn_id = self.node_ids['asn'].get(asn) - country_id = self.node_ids['country'].get(country) - - if asn_id and country_id: - props = self.reference.copy() - if (asn, country) in self.all_percentages: - percentages = self.all_percentages[(asn, country)].get( - 'percentages', {} - ) - counts = self.all_percentages[(asn, country)].get( - 'category_counts', {} - ) - total_count = self.all_percentages[(asn, country)].get( - 'total_count', 0 - ) - - for category in [ - 'bootstrapping_error', - 'usage_error', - 'working', - 'invalid', - ]: - props[f'percentage_{category}'] = percentages.get(category, 0) - props[f'count_{category}'] = counts.get(category, 0) - props['total_count'] = total_count - - # Accumulate properties - link_properties[(asn_id, psiphon_id)] = props - - for (asn_id, psiphon_id), props in link_properties.items(): - if (asn_id, psiphon_id) not in self.unique_links['CENSORED']: - self.unique_links['CENSORED'].add((asn_id, psiphon_id)) - censored_links.append( - {'src_id': asn_id, 'dst_id': psiphon_id, 'props': [props]} - ) - - # Batch add the links (this is faster than adding them one by one) + censored_links = list() + + # Create one link per ASN-country pair. + for (asn, country), result_dict in self.all_percentages.items(): + asn_id = self.node_ids['asn'][asn] + props = dict() + for category in self.categories: + props[f'percentage_{category}'] = result_dict['percentages'][category] + props[f'count_{category}'] = result_dict['category_counts'][category] + props['total_count'] = result_dict['total_count'] + props['country_code'] = country + censored_links.append( + {'src_id': asn_id, 'dst_id': psiphon_id, 'props': [props, self.reference]} + ) + self.iyp.batch_add_links('CENSORED', censored_links) - def calculate_percentages(self): + def aggregate_results(self): target_dict = defaultdict(lambda: defaultdict(int)) - # Initialize counts for all categories - categories = [ - 'bootstrapping_error', - 'usage_error', - 'working', - 'invalid', - ] - # Populate the target_dict with counts for entry in self.all_results: asn, country, result = entry target_dict[(asn, country)][result] += 1 - self.all_percentages = {} - for (asn, country), counts in target_dict.items(): - total_count = sum(counts.values()) - for category in categories: - counts[category] = counts.get(category, 0) - - percentages = { - category: ( - (counts[category] / total_count) * 100 if total_count > 0 else 0 - ) - for category in categories - } - - result_dict = { - 'total_count': total_count, - 'category_counts': dict(counts), - 'percentages': percentages, - } - self.all_percentages[(asn, country)] = result_dict + self.all_percentages[(asn, country)] = self.make_result_dict(counts) def unit_test(self): return super().unit_test(['CENSORED']) diff --git a/iyp/crawlers/ooni/riseupvpn.py b/iyp/crawlers/ooni/riseupvpn.py index 23d3e1a..1f51049 100644 --- a/iyp/crawlers/ooni/riseupvpn.py +++ b/iyp/crawlers/ooni/riseupvpn.py @@ -17,100 +17,56 @@ class Crawler(OoniCrawler): def __init__(self, organization, url, name): super().__init__(organization, url, name, 'riseupvpn') + self.categories = ['ok', 'failure'] def process_one_line(self, one_line): """Process a single line from the jsonl file and store the results locally.""" - super().process_one_line(one_line) + if super().process_one_line(one_line): + return - test_keys = one_line.get('test_keys', {}) - api_failures = test_keys.get('api_failures', []) - ca_cert_status = test_keys.get('ca_cert_status', False) + api_failures = one_line['test_keys'].get('api_failures') + ca_cert_status = one_line['test_keys']['ca_cert_status'] if not api_failures and ca_cert_status: - result = 'working' + result = 'ok' else: - result = 'not_working' + result = 'failure' # Using the last result from the base class, add our unique variables - self.all_results[-1] = self.all_results[-1][:2] + (result,) + self.all_results[-1] = self.all_results[-1] + (result,) def batch_add_to_iyp(self): super().batch_add_to_iyp() riseupvpn_id = self.iyp.get_node('Tag', {'label': label}, create=True) - censored_links = [] - - # Accumulate properties for each ASN-country pair - link_properties = defaultdict(lambda: defaultdict(int)) - - for asn, country, result in self.all_results: - asn_id = self.node_ids['asn'].get(asn) - country_id = self.node_ids['country'].get(country) - - if asn_id and country_id: - props = self.reference.copy() - if (asn, country) in self.all_percentages: - percentages = self.all_percentages[(asn, country)].get( - 'percentages', {} - ) - counts = self.all_percentages[(asn, country)].get( - 'category_counts', {} - ) - total_count = self.all_percentages[(asn, country)].get( - 'total_count', 0 - ) - - for category in ['working', 'not_working']: - props[f'percentage_{category}'] = percentages.get(category, 0) - props[f'count_{category}'] = counts.get(category, 0) - props['total_count'] = total_count - - # Accumulate properties - link_properties[(asn_id, riseupvpn_id)] = props - - # Create links only once per ASN-country pair - for (asn_id, riseupvpn_id), props in link_properties.items(): - if (asn_id, riseupvpn_id) not in self.unique_links['CENSORED']: - self.unique_links['CENSORED'].add((asn_id, riseupvpn_id)) - censored_links.append( - {'src_id': asn_id, 'dst_id': riseupvpn_id, 'props': [props]} - ) - - # Batch add the links (this is faster than adding them one by one) + censored_links = list() + + # Create one link per ASN-country pair. + for (asn, country), result_dict in self.all_percentages.items(): + asn_id = self.node_ids['asn'][asn] + props = dict() + for category in self.categories: + props[f'percentage_{category}'] = result_dict['percentages'][category] + props[f'count_{category}'] = result_dict['category_counts'][category] + props['total_count'] = result_dict['total_count'] + props['country_code'] = country + censored_links.append( + {'src_id': asn_id, 'dst_id': riseupvpn_id, 'props': [props, self.reference]} + ) + self.iyp.batch_add_links('CENSORED', censored_links) - def calculate_percentages(self): + def aggregate_results(self): target_dict = defaultdict(lambda: defaultdict(int)) - # Initialize counts for all categories - categories = ['working', 'not_working'] - # Populate the target_dict with counts for entry in self.all_results: asn, country, result = entry target_dict[(asn, country)][result] += 1 - self.all_percentages = {} - for (asn, country), counts in target_dict.items(): - total_count = sum(counts.values()) - for category in categories: - counts[category] = counts.get(category, 0) - - percentages = { - category: ( - (counts[category] / total_count) * 100 if total_count > 0 else 0 - ) - for category in categories - } - - result_dict = { - 'total_count': total_count, - 'category_counts': dict(counts), - 'percentages': percentages, - } - self.all_percentages[(asn, country)] = result_dict + self.all_percentages[(asn, country)] = self.make_result_dict(counts) def unit_test(self): return super().unit_test(['CENSORED']) diff --git a/iyp/crawlers/ooni/stunreachability.py b/iyp/crawlers/ooni/stunreachability.py index 4e52bd7..bfd8b76 100644 --- a/iyp/crawlers/ooni/stunreachability.py +++ b/iyp/crawlers/ooni/stunreachability.py @@ -7,7 +7,7 @@ import tldextract -from iyp.crawlers.ooni import OoniCrawler +from iyp.crawlers.ooni import OoniCrawler, process_dns_queries ORG = 'OONI' URL = 's3://ooni-data-eu-fra/raw/' @@ -18,75 +18,66 @@ class Crawler(OoniCrawler): def __init__(self, organization, url, name): super().__init__(organization, url, name, 'stunreachability') - self.all_ips = set() self.all_urls = set() - self.all_hostnames = set() + self.all_hostname_ips = set() + self.categories = ['ok', 'failure'] def process_one_line(self, one_line): """Process a single line from the jsonl file and store the results locally.""" - super().process_one_line(one_line) - - stun_endpoint = one_line.get('input') - test_keys = one_line.get('test_keys', {}) - failure = test_keys.get('failure') - result = 'Success' if failure is None else 'Failure' - - if stun_endpoint: - # Extract the hostname from the STUN endpoint URL if it's not an IP address - hostname = None - stun_url = stun_endpoint.split('//')[-1] - stun_ip_port = stun_url.split(':') - stun_ip = stun_ip_port[0] - - try: - ipaddress.ip_address(stun_ip) - except ValueError: - hostname = tldextract.extract(stun_url).fqdn - - # Handle "queries" section to get IP addresses and map them to the hostname - ip_addresses = [] - for query in test_keys.get('queries', []): - if query and query.get('answers'): - for answer in query.get('answers', []): - if 'ipv4' in answer: - ip_addresses.append(answer['ipv4']) - elif 'ipv6' in answer: - ip_addresses.append(answer['ipv6']) - - self.all_ips.update(ip_addresses) - - # Ensure the stun_endpoint is valid before updating the results - if stun_endpoint: - # Append unique variables to corresponding sets - if hostname: - self.all_hostnames.add(hostname) - self.all_urls.add(stun_endpoint) - - # Using the last result from the base class, add our unique variables - self.all_results[-1] = self.all_results[-1][:2] + ( - stun_endpoint, - result, - hostname, - ip_addresses, - ) - - # Ensure the entry has 6 elements, otherwise remove it - if len(self.all_results[-1]) != 6 or not stun_endpoint or not result: + if super().process_one_line(one_line): + return + if not one_line['input']: + # If no input is provided, the test fails. self.all_results.pop() + return + + stun_url = one_line['input'] + failure = one_line['test_keys']['failure'] + result = 'ok' if failure is None else 'failure' + + # Extract the hostname from the STUN endpoint URL if it's not an IP address + stun_hostname = None + stun_endpoint = stun_url.split('//')[-1] + stun_ip_port = stun_endpoint.split(':') + stun_ip = stun_ip_port[0] + + try: + stun_ip = ipaddress.ip_address(stun_ip) + except ValueError: + stun_hostname = tldextract.extract(stun_endpoint).fqdn + + # Handle "queries" section to get IP addresses and map them to the hostname + if stun_hostname: + host_ip_set = process_dns_queries(one_line['test_keys']['queries']) + for hostname, ip in host_ip_set: + if hostname != stun_hostname: + logging.warning(f'STUN hostname is "{stun_hostname}" but requested "{hostname}"') + continue + self.all_hostname_ips.add((hostname, ip)) + elif one_line['test_keys']['queries']: + logging.warning(f'STUN hostname is IP "{stun_ip}" but DNS queries were made?') + logging.warning(one_line['test_keys']['queries']) + + self.all_urls.add(stun_url) + + # Using the last result from the base class, add our unique variables + self.all_results[-1] = self.all_results[-1] + ( + stun_url, + result, + ) def batch_add_to_iyp(self): super().batch_add_to_iyp() - stun_links = [] - resolves_to_links = [] + censored_links = list() + resolves_to_links = list() + hostnames = set() + ips = set() - # Fetch all IP nodes in one batch - if self.all_ips: - ip_id_map = self.iyp.batch_get_nodes_by_single_prop( - 'IP', 'ip', list(self.all_ips), all=False - ) - else: - ip_id_map = {} + for hostname, ip in self.all_hostname_ips: + hostnames.add(hostname) + ips.add(ip) + resolves_to_links.append({'src_id': hostname, 'dst_id': ip, 'props': [self.reference]}) self.node_ids.update( { @@ -94,108 +85,45 @@ def batch_add_to_iyp(self): 'URL', 'url', self.all_urls, all=False ), 'hostname': self.iyp.batch_get_nodes_by_single_prop( - 'HostName', 'name', self.all_hostnames, all=False + 'HostName', 'name', hostnames, all=False + ), + 'ip': self.iyp.batch_get_nodes_by_single_prop( + 'IP', 'ip', ips, all=False ), } ) - # Accumulate properties for each ASN-country pair - link_properties = defaultdict(lambda: defaultdict(int)) + # Replace hostname and IPs in RESOLVES_TO links with node IDs. + for link in resolves_to_links: + link['src_id'] = self.node_ids['hostname'][link['src_id']] + link['dst_id'] = self.node_ids['ip'][link['dst_id']] + + for (asn, country, stun_url), result_dict in self.all_percentages.items(): + asn_id = self.node_ids['asn'][asn] + url_id = self.node_ids['url'][stun_url] + props = dict() + for category in self.categories: + props[f'percentage_{category}'] = result_dict['percentages'][category] + props[f'count_{category}'] = result_dict['category_counts'][category] + props['total_count'] = result_dict['total_count'] + props['country_code'] = country + censored_links.append( + {'src_id': asn_id, 'dst_id': url_id, 'props': [props, self.reference]} + ) - # Ensure all IDs are present and process results - for ( - asn, - country, - stun_endpoint, - result, - hostname, - ip_addresses, - ) in self.all_results: - asn_id = self.node_ids['asn'].get(asn) - url_id = self.node_ids['url'].get(stun_endpoint) - hostname_id = self.node_ids['hostname'].get(hostname) - - if asn_id and url_id: - props = self.reference.copy() - if (asn, country, stun_endpoint) in self.all_percentages: - percentages = self.all_percentages[ - (asn, country, stun_endpoint) - ].get('percentages', {}) - counts = self.all_percentages[(asn, country, stun_endpoint)].get( - 'category_counts', {} - ) - total_count = self.all_percentages[ - (asn, country, stun_endpoint) - ].get('total_count', 0) - - for category in ['Success', 'Failure']: - props[f'percentage_{category}'] = percentages.get(category, 0) - props[f'count_{category}'] = counts.get(category, 0) - props['total_count'] = total_count - - # Accumulate properties - link_properties[(asn_id, url_id)] = props - - if result == 'Success' and hostname_id: - for ip in ip_addresses: - ip_id = ip_id_map.get(ip) - if ip_id: - if (hostname_id, ip_id) not in self.unique_links['RESOLVES_TO']: - self.unique_links['RESOLVES_TO'].add((hostname_id, ip_id)) - resolves_to_links.append( - { - 'src_id': hostname_id, - 'dst_id': ip_id, - 'props': [self.reference], - } - ) - - for (asn_id, url_id), props in link_properties.items(): - if (asn_id, url_id) not in self.unique_links['CENSORED']: - self.unique_links['CENSORED'].add((asn_id, url_id)) - stun_links.append( - {'src_id': asn_id, 'dst_id': url_id, 'props': [props]} - ) - - # Batch add the links (this is faster than adding them one by one) - self.iyp.batch_add_links('CENSORED', stun_links) + self.iyp.batch_add_links('CENSORED', censored_links) self.iyp.batch_add_links('RESOLVES_TO', resolves_to_links) - def calculate_percentages(self): + def aggregate_results(self): target_dict = defaultdict(lambda: defaultdict(int)) # Populate the target_dict with counts for entry in self.all_results: - if len(entry) != 6: - continue - asn, country, stun_endpoint, result, hostname, ip_addresses = entry - target_dict[(asn, country, stun_endpoint)][result] += 1 - - self.all_percentages = {} + asn, country, stun_url, result = entry + target_dict[(asn, country, stun_url)][result] += 1 - # Define all possible result categories to ensure they are included - possible_results = ['Success', 'Failure'] - - for (asn, country, stun_endpoint), counts in target_dict.items(): - total_count = sum(counts.values()) - - # Initialize counts for all possible results to ensure they are included - for result in possible_results: - counts[result] = counts.get(result, 0) - - percentages = { - category: ( - (counts[category] / total_count) * 100 if total_count > 0 else 0 - ) - for category in possible_results - } - - result_dict = { - 'total_count': total_count, - 'category_counts': dict(counts), - 'percentages': percentages, - } - self.all_percentages[(asn, country, stun_endpoint)] = result_dict + for (asn, country, stun_url), counts in target_dict.items(): + self.all_percentages[(asn, country, stun_url)] = self.make_result_dict(counts) def unit_test(self): return super().unit_test(['CENSORED', 'RESOLVES_TO']) diff --git a/iyp/crawlers/ooni/telegram.py b/iyp/crawlers/ooni/telegram.py index 3fca027..8e2b4c5 100644 --- a/iyp/crawlers/ooni/telegram.py +++ b/iyp/crawlers/ooni/telegram.py @@ -17,131 +17,88 @@ class Crawler(OoniCrawler): def __init__(self, organization, url, name): super().__init__(organization, url, name, 'telegram') + # 'total' and 'no_total' are meta categories that indicate if any of the three + # main categories is blocked. + self.categories = [ + 'total_blocked', + 'total_ok', + 'web_blocked', + 'web_none', + 'web_ok', + 'http_blocked', + 'http_ok', + 'tcp_blocked', + 'tcp_ok', + ] def process_one_line(self, one_line): """Process a single line from the jsonl file and store the results locally.""" - super().process_one_line(one_line) + if super().process_one_line(one_line): + return - telegram_http_blocking = one_line.get('test_keys', {}).get( - 'telegram_http_blocking', False - ) - telegram_tcp_blocking = one_line.get('test_keys', {}).get( - 'telegram_tcp_blocking', False - ) - telegram_web_status = ( - one_line.get('test_keys', {}).get('telegram_web_status', 'null').lower() - ) + telegram_http_blocking = one_line['test_keys']['telegram_http_blocking'] + telegram_tcp_blocking = one_line['test_keys']['telegram_tcp_blocking'] + telegram_web_status = one_line['test_keys']['telegram_web_status'] # Normalize result if telegram_web_status == 'blocked': result_web = 'web_blocked' elif telegram_web_status == 'ok': - result_web = 'unblocked' + result_web = 'web_ok' else: - result_web = 'unblocked' + result_web = 'web_none' + + result_http = 'http_blocked' if telegram_http_blocking else 'http_ok' + result_tcp = 'tcp_blocked' if telegram_tcp_blocking else 'tcp_ok' - result_http = 'http_blocked' if telegram_http_blocking else 'unblocked' - result_tcp = 'tcp_blocked' if telegram_tcp_blocking else 'unblocked' + total = 'total_ok' + if result_web == 'web_blocked' or result_http == 'http_blocked' or result_tcp == 'tcp_blocked': + total = 'total_blocked' # Using the last result from the base class, add our unique variables - self.all_results[-1] = self.all_results[-1][:2] + ( + self.all_results[-1] = self.all_results[-1] + ( + total, result_web, result_http, result_tcp, ) - if len(self.all_results[-1]) != 5: - self.all_results.pop() - def batch_add_to_iyp(self): super().batch_add_to_iyp() telegram_id = self.iyp.get_node('Tag', {'label': label}, create=True) - censored_links = [] - - # Accumulate properties for each ASN-country pair - link_properties = defaultdict(lambda: defaultdict(int)) - - # Ensure all IDs are present and process results - for asn, country, result_web, result_http, result_tcp in self.all_results: - asn_id = self.node_ids['asn'].get(asn) - country_id = self.node_ids['country'].get(country) - - if asn_id and country_id: - props = self.reference.copy() - if (asn, country) in self.all_percentages: - percentages = self.all_percentages[(asn, country)].get( - 'percentages', {} - ) - counts = self.all_percentages[(asn, country)].get( - 'category_counts', {} - ) - total_count = self.all_percentages[(asn, country)].get( - 'total_count', 0 - ) - - for category in [ - 'unblocked', - 'web_blocked', - 'http_blocked', - 'tcp_blocked', - ]: - props[f'percentage_{category}'] = percentages.get(category, 0) - props[f'count_{category}'] = counts.get(category, 0) - props['total_count'] = total_count - - # Accumulate properties - link_properties[(asn_id, telegram_id)] = props - - for (asn_id, telegram_id), props in link_properties.items(): - if (asn_id, telegram_id) not in self.unique_links['CENSORED']: - self.unique_links['CENSORED'].add((asn_id, telegram_id)) - censored_links.append( - {'src_id': asn_id, 'dst_id': telegram_id, 'props': [props]} - ) - - # Batch add the links (this is faster than adding them one by one) + censored_links = list() + + # Create one link per ASN-country pair. + for (asn, country), result_dict in self.all_percentages.items(): + asn_id = self.node_ids['asn'][asn] + props = dict() + for category in self.categories: + props[f'percentage_{category}'] = result_dict['percentages'][category] + props[f'count_{category}'] = result_dict['category_counts'][category] + props['total_count'] = result_dict['total_count'] + props['country_code'] = country + censored_links.append( + {'src_id': asn_id, 'dst_id': telegram_id, 'props': [props, self.reference]} + ) + self.iyp.batch_add_links('CENSORED', censored_links) - def calculate_percentages(self): + def aggregate_results(self): target_dict = defaultdict(lambda: defaultdict(int)) - # Initialize counts for all categories - categories = [ - 'unblocked', - 'web_blocked', - 'http_blocked', - 'tcp_blocked', - ] - # Populate the target_dict with counts for entry in self.all_results: - asn, country, result_web, result_http, result_tcp = entry + asn, country, total, result_web, result_http, result_tcp = entry + target_dict[(asn, country)][total] += 1 target_dict[(asn, country)][result_web] += 1 target_dict[(asn, country)][result_http] += 1 target_dict[(asn, country)][result_tcp] += 1 - self.all_percentages = {} - for (asn, country), counts in target_dict.items(): - total_count = sum(counts.values()) - for category in categories: - counts[category] = counts.get(category, 0) - - percentages = { - category: ( - (counts[category] / total_count) * 100 if total_count > 0 else 0 - ) - for category in categories - } - - result_dict = { - 'total_count': total_count, - 'category_counts': dict(counts), - 'percentages': percentages, - } - self.all_percentages[(asn, country)] = result_dict + total_count = counts['total_ok'] + counts['total_blocked'] + self.all_percentages[(asn, country)] = self.make_result_dict(counts, total_count) def unit_test(self): return super().unit_test(['CENSORED']) diff --git a/iyp/crawlers/ooni/tor.py b/iyp/crawlers/ooni/tor.py index 15e94eb..4dd80cf 100644 --- a/iyp/crawlers/ooni/tor.py +++ b/iyp/crawlers/ooni/tor.py @@ -16,134 +16,95 @@ class Crawler(OoniCrawler): def __init__(self, organization, url, name): super().__init__(organization, url, name, 'tor') - self.all_ips = set() - self.all_tags = {'or_port_dirauth', 'dir_port', 'obfs4', 'or_port'} + self.all_ip_tags = set() + # Prepend "OONI Probe Tor Tag" to all tag labels + self.all_tags = {tag: f'OONI Probe Tor Tag {tag}' + for tag in ['or_port_dirauth', 'dir_port', 'obfs4', 'or_port']} + self.categories = ['ok', 'failure'] def process_one_line(self, one_line): """Process a single line of the JSONL file.""" - super().process_one_line(one_line) - - test_keys = one_line.get('test_keys', {}) - - if not test_keys: - self.all_results.pop() + if super().process_one_line(one_line): return + test_keys = one_line['test_keys'] + # Check each target in the test_keys first_target = True - targets = test_keys.get('targets', {}) - for _, target_data in targets.items(): + for target_data in test_keys['targets'].values(): + # Technically the target_address can be domain:port, but apparently this is + # never the case? ip = ipaddress.ip_address( - target_data.get('target_address').rsplit(':', 1)[0].strip('[]') - ) - self.all_ips.add(ip) - result = target_data.get('failure') - target_protocol = target_data.get('target_protocol') + target_data['target_address'].rsplit(':', 1)[0].strip('[]') + ).compressed + + result = 'failure' if target_data['failure'] else 'ok' + + target_protocol = target_data['target_protocol'] if target_protocol not in self.all_tags: continue + + self.all_ip_tags.add((ip, self.all_tags[target_protocol])) if first_target: - self.all_results[-1] = self.all_results[-1][:2] + ( - ip, - target_protocol, - result, - ) + self.all_results[-1] = self.all_results[-1] + (ip, result) first_target = False else: - new_entry = self.all_results[-1][:2] + (ip, target_protocol, result) + # One test contains results for multiple targets, so copy the (asn, + # country) part and append new data. + new_entry = self.all_results[-1][:2] + (ip, result) self.all_results.append(new_entry) - if not first_target and len(self.all_results[-1]) != 5: - self.all_results.pop() - def batch_add_to_iyp(self): super().batch_add_to_iyp() - # Prepend "OONI Probe Tor Tag" to all tag labels - prepended_tags = {f'OONI Probe Tor Tag {tag}' for tag in self.all_tags} + censored_links = list() + categorized_links = list() + ips = set() + + for ip, tag in self.all_ip_tags: + ips.add(ip) + categorized_links.append({'src_id': ip, 'dst_id': tag, 'props': [self.reference]}) + self.node_ids.update( { 'ip': self.iyp.batch_get_nodes_by_single_prop( - 'IP', 'ip', [str(ip) for ip in self.all_ips] + 'IP', 'ip', ips, all=False ), 'tag': self.iyp.batch_get_nodes_by_single_prop( - 'Tag', 'label', prepended_tags + 'Tag', 'label', set(self.all_tags.values()), all=False ), } ) - censored_links = [] - categorized_links = [] - - link_properties = defaultdict(lambda: defaultdict(int)) - - for asn, country, ip, tor_type, _ in self.all_results: - asn_id = self.node_ids['asn'].get(asn) - ip_id = self.node_ids['ip'].get(str(ip)) - tag_id = self.node_ids['tag'].get(f'OONI Probe Tor Tag {tor_type}') - - if asn_id and ip_id: - props = self.reference.copy() - if (asn, ip) in self.all_percentages: - percentages = self.all_percentages[(asn, ip)].get('percentages', {}) - counts = self.all_percentages[(asn, ip)].get('category_counts', {}) - total_count = self.all_percentages[(asn, ip)].get('total_count', 0) - - for category in ['Failure', 'Success']: - props[f'percentage_{category}'] = percentages.get(category, 0) - props[f'count_{category}'] = counts.get(category, 0) - props['total_count'] = total_count - link_properties[(asn_id, ip_id)] = props - - if ( - ip_id - and tag_id - and (ip_id, tag_id) not in self.unique_links['CATEGORIZED'] - ): - self.unique_links['CATEGORIZED'].add((ip_id, tag_id)) - categorized_links.append( - {'src_id': ip_id, 'dst_id': tag_id, 'props': [self.reference]} - ) - - for (asn_id, ip_id), props in link_properties.items(): - if (asn_id, ip_id) not in self.unique_links['CENSORED']: - self.unique_links['CENSORED'].add((asn_id, ip_id)) - censored_links.append( - {'src_id': asn_id, 'dst_id': ip_id, 'props': [props]} - ) + # Replace IP and tag in CATEGORIZED links with node IDs. + for link in categorized_links: + link['src_id'] = self.node_ids['ip'][link['src_id']] + link['dst_id'] = self.node_ids['tag'][link['dst_id']] + + for (asn, country, ip), result_dict in self.all_percentages.items(): + asn_id = self.node_ids['asn'][asn] + ip_id = self.node_ids['ip'][ip] + props = dict() + for category in self.categories: + props[f'percentage_{category}'] = result_dict['percentages'][category] + props[f'count_{category}'] = result_dict['category_counts'][category] + props['total_count'] = result_dict['total_count'] + props['country_code'] = country + censored_links.append( + {'src_id': asn_id, 'dst_id': ip_id, 'props': [props, self.reference]} + ) self.iyp.batch_add_links('CENSORED', censored_links) self.iyp.batch_add_links('CATEGORIZED', categorized_links) - def calculate_percentages(self): + def aggregate_results(self): target_dict = defaultdict(lambda: defaultdict(int)) - categories = ['Failure', 'Success'] for entry in self.all_results: - asn, country, ip, tor_type, result = entry - if result is not None: - target_dict[(asn, ip)]['Failure'] += 1 - else: - target_dict[(asn, ip)]['Success'] += 1 - - self.all_percentages = {} + asn, country, ip, result = entry + target_dict[(asn, country, ip)][result] += 1 - for (asn, ip), counts in target_dict.items(): - total_count = sum(counts.values()) - for category in categories: - counts[category] = counts.get(category, 0) - - percentages = { - category: ( - (counts[category] / total_count) * 100 if total_count > 0 else 0 - ) - for category in categories - } - - result_dict = { - 'total_count': total_count, - 'category_counts': dict(counts), - 'percentages': percentages, - } - self.all_percentages[(asn, ip)] = result_dict + for (asn, country, ip), counts in target_dict.items(): + self.all_percentages[(asn, country, ip)] = self.make_result_dict(counts) def unit_test(self): return super().unit_test(['CENSORED', 'CATEGORIZED']) diff --git a/iyp/crawlers/ooni/torsf.py b/iyp/crawlers/ooni/torsf.py index 3d1bafa..0fd2985 100644 --- a/iyp/crawlers/ooni/torsf.py +++ b/iyp/crawlers/ooni/torsf.py @@ -17,94 +17,52 @@ class Crawler(OoniCrawler): def __init__(self, organization, url, name): super().__init__(organization, url, name, 'torsf') + self.categories = ['ok', 'failure'] def process_one_line(self, one_line): """Process a single line from the JSONL file.""" - super().process_one_line(one_line) - test_keys = one_line.get('test_keys', {}) - success = test_keys.get('success', False) - - # Normalize result to be either "OK" or "Failure" - result = 'OK' if success else 'Failure' + if super().process_one_line(one_line): + return + if 'success' not in one_line['test_keys']: + self.all_results.pop() + return + result = 'ok' if one_line['test_keys']['success'] else 'failure' # Update the last entry in all_results with the new test-specific data - self.all_results[-1] = self.all_results[-1][:2] + (result,) + self.all_results[-1] = self.all_results[-1] + (result,) def batch_add_to_iyp(self): super().batch_add_to_iyp() torsf_id = self.iyp.get_node('Tag', {'label': label}, create=True) - censored_links = [] - - link_properties = defaultdict(lambda: defaultdict(int)) - - # Ensure all IDs are present and process results - for asn, country, result in self.all_results: - asn_id = self.node_ids['asn'].get(asn) - country_id = self.node_ids['country'].get(country) - - if asn_id and country_id: - props = self.reference.copy() - if (asn, country) in self.all_percentages: - percentages = self.all_percentages[(asn, country)].get( - 'percentages', {} - ) - counts = self.all_percentages[(asn, country)].get( - 'category_counts', {} - ) - total_count = self.all_percentages[(asn, country)].get( - 'total_count', 0 - ) - - for category in ['OK', 'Failure']: - props[f'percentage_{category}'] = percentages.get(category, 0) - props[f'count_{category}'] = counts.get(category, 0) - props['total_count'] = total_count - - link_properties[(asn_id, torsf_id)] = props - - for (asn_id, torsf_id), props in link_properties.items(): - if (asn_id, torsf_id) not in self.unique_links['CENSORED']: - self.unique_links['CENSORED'].add((asn_id, torsf_id)) - censored_links.append( - {'src_id': asn_id, 'dst_id': torsf_id, 'props': [props]} - ) - - # Batch add the links (this is faster than adding them one by one) + censored_links = list() + + # Create one link per ASN-country pair. + for (asn, country), result_dict in self.all_percentages.items(): + asn_id = self.node_ids['asn'][asn] + props = dict() + for category in self.categories: + props[f'percentage_{category}'] = result_dict['percentages'][category] + props[f'count_{category}'] = result_dict['category_counts'][category] + props['total_count'] = result_dict['total_count'] + props['country_code'] = country + censored_links.append( + {'src_id': asn_id, 'dst_id': torsf_id, 'props': [props, self.reference]} + ) + self.iyp.batch_add_links('CENSORED', censored_links) - def calculate_percentages(self): + def aggregate_results(self): target_dict = defaultdict(lambda: defaultdict(int)) - # Initialize counts for all categories - categories = ['OK', 'Failure'] - # Populate the target_dict with counts for entry in self.all_results: asn, country, result = entry target_dict[(asn, country)][result] += 1 - self.all_percentages = {} - for (asn, country), counts in target_dict.items(): - total_count = sum(counts.values()) - for category in categories: - counts[category] = counts.get(category, 0) - - percentages = { - category: ( - (counts[category] / total_count) * 100 if total_count > 0 else 0 - ) - for category in categories - } - - result_dict = { - 'total_count': total_count, - 'category_counts': dict(counts), - 'percentages': percentages, - } - self.all_percentages[(asn, country)] = result_dict + self.all_percentages[(asn, country)] = self.make_result_dict(counts) def unit_test(self): return super().unit_test(['CENSORED']) diff --git a/iyp/crawlers/ooni/vanillator.py b/iyp/crawlers/ooni/vanillator.py index 6e46f41..a141953 100644 --- a/iyp/crawlers/ooni/vanillator.py +++ b/iyp/crawlers/ooni/vanillator.py @@ -17,94 +17,49 @@ class Crawler(OoniCrawler): def __init__(self, organization, url, name): super().__init__(organization, url, name, 'vanillator') + self.categories = ['ok', 'failure'] def process_one_line(self, one_line): """Process a single line from the JSONL file.""" - super().process_one_line(one_line) - test_keys = one_line.get('test_keys', {}) - success = test_keys.get('success', False) - - # Normalize result to be either "OK" or "Failure" - result = 'OK' if success else 'Failure' + if super().process_one_line(one_line): + return + result = 'ok' if one_line['test_keys']['success'] else 'failure' # Update the last entry in all_results with the new test-specific data - self.all_results[-1] = self.all_results[-1][:2] + (result,) + self.all_results[-1] = self.all_results[-1] + (result,) def batch_add_to_iyp(self): super().batch_add_to_iyp() vanillator_id = self.iyp.get_node('Tag', {'label': label}, create=True) - censored_links = [] - - link_properties = defaultdict(lambda: defaultdict(int)) - - # Ensure all IDs are present and process results - for asn, country, result in self.all_results: - asn_id = self.node_ids['asn'].get(asn) - country_id = self.node_ids['country'].get(country) - - if asn_id and country_id: - props = self.reference.copy() - if (asn, country) in self.all_percentages: - percentages = self.all_percentages[(asn, country)].get( - 'percentages', {} - ) - counts = self.all_percentages[(asn, country)].get( - 'category_counts', {} - ) - total_count = self.all_percentages[(asn, country)].get( - 'total_count', 0 - ) - - for category in ['OK', 'Failure']: - props[f'percentage_{category}'] = percentages.get(category, 0) - props[f'count_{category}'] = counts.get(category, 0) - props['total_count'] = total_count - - link_properties[(asn_id, vanillator_id)] = props - - for (asn_id, vanillator_id), props in link_properties.items(): - if (asn_id, vanillator_id) not in self.unique_links['CENSORED']: - self.unique_links['CENSORED'].add((asn_id, vanillator_id)) - censored_links.append( - {'src_id': asn_id, 'dst_id': vanillator_id, 'props': [props]} - ) - - # Batch add the links (this is faster than adding them one by one) + censored_links = list() + + # Create one link per ASN-country pair. + for (asn, country), result_dict in self.all_percentages.items(): + asn_id = self.node_ids['asn'][asn] + props = dict() + for category in self.categories: + props[f'percentage_{category}'] = result_dict['percentages'][category] + props[f'count_{category}'] = result_dict['category_counts'][category] + props['total_count'] = result_dict['total_count'] + props['country_code'] = country + censored_links.append( + {'src_id': asn_id, 'dst_id': vanillator_id, 'props': [props, self.reference]} + ) + self.iyp.batch_add_links('CENSORED', censored_links) - def calculate_percentages(self): + def aggregate_results(self): target_dict = defaultdict(lambda: defaultdict(int)) - # Initialize counts for all categories - categories = ['OK', 'Failure'] - # Populate the target_dict with counts for entry in self.all_results: asn, country, result = entry target_dict[(asn, country)][result] += 1 - self.all_percentages = {} - for (asn, country), counts in target_dict.items(): - total_count = sum(counts.values()) - for category in categories: - counts[category] = counts.get(category, 0) - - percentages = { - category: ( - (counts[category] / total_count) * 100 if total_count > 0 else 0 - ) - for category in categories - } - - result_dict = { - 'total_count': total_count, - 'category_counts': dict(counts), - 'percentages': percentages, - } - self.all_percentages[(asn, country)] = result_dict + self.all_percentages[(asn, country)] = self.make_result_dict(counts) def unit_test(self): return super().unit_test(['CENSORED']) diff --git a/iyp/crawlers/ooni/webconnectivity.py b/iyp/crawlers/ooni/webconnectivity.py index ba335f3..9ca56ce 100644 --- a/iyp/crawlers/ooni/webconnectivity.py +++ b/iyp/crawlers/ooni/webconnectivity.py @@ -8,7 +8,7 @@ import tldextract -from iyp.crawlers.ooni import OoniCrawler +from iyp.crawlers.ooni import OoniCrawler, process_dns_queries ORG = 'OONI' URL = 's3://ooni-data-eu-fra/raw/' @@ -22,241 +22,135 @@ class Crawler(OoniCrawler): def __init__(self, organization, url, name): super().__init__(organization, url, name, 'webconnectivity') self.all_urls = set() - self.all_hostnames = set() + self.all_hostname_ips = set() + self.all_ip_urls = set() + self.categories = ['ok', 'confirmed', 'failure', 'anomaly'] # Process a single line from the jsonl file and store the results locally def process_one_line(self, one_line): - super().process_one_line(one_line) - - ips = {'ipv4': [], 'ipv6': []} - input_url = one_line.get('input') - test_keys = one_line.get('test_keys', {}) - blocking = test_keys.get('blocking') - accessible = test_keys.get('accessible') - - # Extract the IPs from the DNS replies, if they exist - queries = test_keys.get('queries', []) - if queries is not None: - for query in queries: - answers = query.get('answers') - if answers: - for answer in answers: - ipv4 = answer.get('ipv4', []) - ipv6 = answer.get('ipv6', []) - ips['ipv4'].extend(ipv4 if isinstance(ipv4, list) else [ipv4]) - ips['ipv6'].extend(ipv6 if isinstance(ipv6, list) else [ipv6]) - - # Remove duplicates if necessary - ips['ipv4'] = list(set(ips['ipv4'])) - ips['ipv6'] = list(set(ips['ipv6'])) + if super().process_one_line(one_line): + return + + input_url = one_line['input'] + test_keys = one_line['test_keys'] + blocking = test_keys['blocking'] + accessible = test_keys['accessible'] + + if not input_url.startswith('http'): + logging.warning(f'No HTTP URL: {input_url}') # Extract the hostname from the URL if it's not an IP address hostname = urlparse(input_url).hostname - hostname = ( - tldextract.extract(input_url).fqdn - if hostname - and not ( - hostname.replace('.', '').isdigit() and ipaddress.ip_address(hostname) - ) - else hostname + try: + hostname = ipaddress.ip_address(hostname).compressed + hostname_is_ip = True + except ValueError: + hostname = tldextract.extract(input_url).fqdn + hostname_is_ip = False + + host_ip_set = set() + if not hostname_is_ip: + # The test performs DNS queries even if the hostname is an IP, but this + # does not make sense so we want to ignore it. + host_ip_set = process_dns_queries(test_keys['queries']) + + # Determine the result based on the table + # (https://github.com/ooni/spec/blob/master/nettests/ts-017-web-connectivity.md) + if blocking is None and accessible is None: + result = 'failure' # Could not assign values to the fields + elif blocking is False and accessible is False: + result = 'failure' # Expected failures (e.g., the website down) + elif blocking is False and accessible is True: + result = 'ok' # Expected success (i.e., no censorship) + elif blocking == 'dns' and accessible is False: + result = 'confirmed' # DNS-based blocking + elif blocking == 'tcp_ip' and accessible is False: + result = 'confirmed' # TCP-based blocking + elif blocking == 'http-failure' and accessible is False: + result = 'confirmed' # HTTP or TLS based blocking + elif blocking == 'http-diff' and accessible is False: + result = 'confirmed' # Blockpage rather than legit page + else: + result = 'anomaly' # Default case if no other case matches + + # Using the last result from the base class, add our unique variables + self.all_urls.add(input_url) + if hostname_is_ip: + self.all_ip_urls.add((hostname, input_url)) + else: + self.all_hostname_ips.update(host_ip_set) + self.all_results[-1] = self.all_results[-1] + ( + input_url, + result, ) - # Ensure all required fields are present - if ( - self.all_results[-1][0] - and self.all_results[-1][1] - and input_url - and test_keys - ): - # Determine the result based on the table - # (https://github.com/ooni/spec/blob/master/nettests/ts-017-web-connectivity.md) - if blocking is None and accessible is None: - result = 'Failure' # Could not assign values to the fields - elif blocking is False and accessible is False: - result = 'Failure' # Expected failures (e.g., the website down) - elif blocking is False and accessible is True: - result = 'OK' # Expected success (i.e., no censorship) - elif blocking == 'dns' and accessible is False: - result = 'Confirmed' # DNS-based blocking - elif blocking == 'tcp_ip' and accessible is False: - result = 'Confirmed' # TCP-based blocking - elif blocking == 'http-failure' and accessible is False: - result = 'Confirmed' # HTTP or TLS based blocking - elif blocking == 'http-diff' and accessible is False: - result = 'Confirmed' # Blockpage rather than legit page - else: - result = 'Anomaly' # Default case if no other case matches - - # Using the last result from the base class, add our unique variables - self.all_urls.add(input_url) - self.all_hostnames.add(hostname) - self.all_results[-1] = self.all_results[-1][:2] + ( - input_url, - result, - hostname, - ips, - ) - - if len(self.all_results[-1]) != 6: - self.all_results.pop() - def batch_add_to_iyp(self): super().batch_add_to_iyp() - censored_links = [] - resolves_to_links = [] - part_of_links = [] + censored_links = list() + resolves_to_links = list() + part_of_links = list() + ips = set() + hostnames = set() - # Collect all IP addresses first - all_ips = [] - for asn, country, url, result, hostname, ips in self.all_results: - if result == 'OK' and hostname and ips: - for ip_type in ips.values(): - all_ips.extend( - ipaddress.ip_address(ip).compressed for ip in ip_type - ) + for hostname, ip in self.all_hostname_ips: + hostnames.add(hostname) + ips.add(ip) + resolves_to_links.append({'src_id': hostname, 'dst_id': ip, 'props': [self.reference]}) - # Fetch all IP nodes in one batch - ip_id_map = self.iyp.batch_get_nodes_by_single_prop('IP', 'ip', all_ips, all=False) + for ip, url in self.all_ip_urls: + ips.add(ip) + part_of_links.append({'src_id': ip, 'dst_id': url, 'props': [self.reference]}) self.node_ids.update( { - 'url': self.iyp.batch_get_nodes_by_single_prop( - 'URL', 'url', self.all_urls, all=False + 'ip': self.iyp.batch_get_nodes_by_single_prop( + 'IP', 'ip', ips, all=False ), 'hostname': self.iyp.batch_get_nodes_by_single_prop( - 'HostName', 'name', self.all_hostnames, all=False + 'HostName', 'name', hostnames, all=False + ), + 'url': self.iyp.batch_get_nodes_by_single_prop( + 'URL', 'url', self.all_urls, all=False ), } ) - # Ensure all IDs are present and process results - for asn, country, url, result, hostname, ips in self.all_results: - asn_id = self.node_ids['asn'].get(asn) - url_id = self.node_ids['url'].get(url) - hostname_id = self.node_ids['hostname'].get(hostname) - - if asn_id and url_id: - props = self.reference.copy() - if (asn, country, url) in self.all_percentages: - percentages = self.all_percentages[(asn, country, url)].get( - 'percentages', {} - ) - counts = self.all_percentages[(asn, country, url)].get( - 'category_counts', {} - ) - total_count = self.all_percentages[(asn, country, url)].get( - 'total_count', 0 - ) - - for category in ['OK', 'Confirmed', 'Failure', 'Anomaly']: - props[f'percentage_{category}'] = percentages.get(category, 0) - props[f'count_{category}'] = counts.get(category, 0) - props['total_count'] = total_count - - censored_links.append( - {'src_id': asn_id, 'dst_id': url_id, 'props': [props]} - ) - - if result == 'OK' and hostname and ips: - compressed_ips = [ - ipaddress.ip_address(ip).compressed - for ip_type in ips.values() - for ip in ip_type - ] - for ip in compressed_ips: - ip_id = ip_id_map.get(ip) - if ( - hostname_id - and ip_id - and (hostname_id, ip_id) not in self.unique_links['RESOLVES_TO'] - ): - self.unique_links['RESOLVES_TO'].add((hostname_id, ip_id)) - resolves_to_links.append( - { - 'src_id': hostname_id, - 'dst_id': ip_id, - 'props': [self.reference], - } - ) - if url_id and ip_id: - if lambda ip: True if ipaddress.ip_address(ip) else False: - if ( - ip_id - and url_id - and (ip_id, url_id) not in self.unique_links['PART_OF'] - ): - self.unique_links['PART_OF'].add((ip_id, url_id)) - part_of_links.append( - { - 'src_id': ip_id, - 'dst_id': url_id, - 'props': [self.reference], - } - ) - - if hostname_id and url_id: - # Check if the url is a valid IP address - if not ( - lambda url: ( - lambda hostname: ( - True - if hostname - and not isinstance( - ValueError, type(ipaddress.ip_address(hostname)) - ) - else False - ) - )(urlparse(url).hostname) - ): - if (url_id, hostname_id) not in self.unique_links['PART_OF']: - self.unique_links['PART_OF'].add((url_id, hostname_id)) - part_of_links.append( - { - 'src_id': url_id, - 'dst_id': hostname_id, - 'props': [self.reference], - } - ) - - # Batch add the links (this is faster than adding them one by one) + for link in resolves_to_links: + link['src_id'] = self.node_ids['hostname'][link['src_id']] + link['dst_id'] = self.node_ids['ip'][link['dst_id']] + + for link in part_of_links: + link['src_id'] = self.node_ids['ip'][link['src_id']] + link['dst_id'] = self.node_ids['url'][link['dst_id']] + + for (asn, country, url), result_dict in self.all_percentages.items(): + asn_id = self.node_ids['asn'][asn] + url_id = self.node_ids['url'][url] + props = dict() + for category in self.categories: + props[f'percentage_{category}'] = result_dict['percentages'][category] + props[f'count_{category}'] = result_dict['category_counts'][category] + props['total_count'] = result_dict['total_count'] + props['country_code'] = country + censored_links.append( + {'src_id': asn_id, 'dst_id': url_id, 'props': [props, self.reference]} + ) + self.iyp.batch_add_links('CENSORED', censored_links) self.iyp.batch_add_links('RESOLVES_TO', resolves_to_links) self.iyp.batch_add_links('PART_OF', part_of_links) - def calculate_percentages(self): + def aggregate_results(self): target_dict = defaultdict(lambda: defaultdict(int)) # Populate the target_dict with counts for entry in self.all_results: - asn, country, target, result, hostname, ips = entry + asn, country, target, result = entry target_dict[(asn, country, target)][result] += 1 - self.all_percentages = {} - - # Define all possible result categories to ensure they are included - possible_results = ['OK', 'Confirmed', 'Failure', 'Anomaly'] - for (asn, country, target), counts in target_dict.items(): - total_count = sum(counts.values()) - - # Initialize counts for all possible results to ensure they are included - for result in possible_results: - counts[result] = counts.get(result, 0) - - percentages = { - category: ( - (counts[category] / total_count) * 100 if total_count > 0 else 0 - ) - for category in possible_results - } - - result_dict = { - 'total_count': total_count, - 'category_counts': dict(counts), - 'percentages': percentages, - } - self.all_percentages[(asn, country, target)] = result_dict + self.all_percentages[(asn, country, target)] = self.make_result_dict(counts) def unit_test(self): return super().unit_test(['CENSORED', 'RESOLVES_TO', 'PART_OF', 'COUNTRY']) diff --git a/iyp/crawlers/ooni/whatsapp.py b/iyp/crawlers/ooni/whatsapp.py index 9d34b90..5c6342a 100644 --- a/iyp/crawlers/ooni/whatsapp.py +++ b/iyp/crawlers/ooni/whatsapp.py @@ -17,145 +17,82 @@ class Crawler(OoniCrawler): def __init__(self, organization, url, name): super().__init__(organization, url, name, 'whatsapp') + self.categories = [ + 'total_ok', + 'total_blocked', + 'endpoint_ok', + 'endpoint_blocked', + 'registration_server_ok', + 'registration_server_blocked', + 'web_ok', + 'web_blocked', + ] def process_one_line(self, one_line): """Process a single line from the JSONL file.""" - super().process_one_line(one_line) - test_keys = one_line.get('test_keys', {}) + if super().process_one_line(one_line): + return + test_keys = one_line['test_keys'] # Determine the status and failure for each category - server_status = test_keys.get('registration_server_status', '').lower() - server_failure = test_keys.get('registration_server_failure') - endpoint_status = test_keys.get('whatsapp_endpoints_status', '').lower() - web_status = test_keys.get('whatsapp_web_status', '').lower() - web_failure = test_keys.get('whatsapp_web_failure') - - server_result = ( - 'server_failure' - if server_failure is not None - else f'server_{server_status}' - ) - endpoint_result = f'endpoint_{endpoint_status}' - web_result = 'web_failure' if web_failure is not None else f'web_{web_status}' + server_status = test_keys['registration_server_status'] + endpoint_status = test_keys['whatsapp_endpoints_status'] + web_status = test_keys['whatsapp_web_status'] + server_result = f'registration_server_{server_status}' + endpoint_result = f'endpoint_{endpoint_status}' + web_result = f'web_{web_status}' + + total = 'total_ok' + if ( + server_result == 'registration_server_blocked' + or endpoint_result == 'endpoint_blocked' + or web_result == 'web_blocked' + ): + total = 'total_blocked' # Update the last entry in all_results with the new test-specific data - self.all_results[-1] = self.all_results[-1][:2] + ( + self.all_results[-1] = self.all_results[-1] + ( + total, server_result, endpoint_result, web_result, ) - if len(self.all_results[-1]) != 5: - self.all_results.pop() - def batch_add_to_iyp(self): super().batch_add_to_iyp() whatsapp_id = self.iyp.get_node('Tag', {'label': label}, create=True) - censored_links = [] + censored_links = list() + + # Create one link per ASN-country pair. + for (asn, country), result_dict in self.all_percentages.items(): + asn_id = self.node_ids['asn'][asn] + props = dict() + for category in self.categories: + props[f'percentage_{category}'] = result_dict['percentages'][category] + props[f'count_{category}'] = result_dict['category_counts'][category] + props['total_count'] = result_dict['total_count'] + props['country_code'] = country + censored_links.append( + {'src_id': asn_id, 'dst_id': whatsapp_id, 'props': [props, self.reference]} + ) - # Ensure all IDs are present and process results - for ( - asn, - country, - server_result, - endpoint_result, - web_result, - ) in self.all_results: - asn_id = self.node_ids['asn'].get(asn) - country_id = self.node_ids['country'].get(country) - - if asn_id and country_id: - props = self.reference.copy() - if (asn, country) in self.all_percentages: - percentages = self.all_percentages[(asn, country)].get( - 'percentages', {} - ) - counts = self.all_percentages[(asn, country)].get( - 'category_counts', {} - ) - total_count = self.all_percentages[(asn, country)].get( - 'total_count', 0 - ) - - for category in [ - 'server_failure', - 'server_ok', - 'server_blocked', - 'endpoint_ok', - 'endpoint_blocked', - 'web_failure', - 'web_ok', - 'no_server_failure', - 'no_server_ok', - 'no_server_blocked', - 'no_endpoint_ok', - 'no_endpoint_blocked', - 'no_web_failure', - 'no_web_ok', - ]: - props[f'percentage_{category}'] = percentages.get(category, 0) - props[f'count_{category}'] = counts.get(category, 0) - props['total_count'] = total_count - - if (asn_id, whatsapp_id) not in self.unique_links['CENSORED']: - self.unique_links['CENSORED'].add((asn_id, whatsapp_id)) - censored_links.append( - {'src_id': asn_id, 'dst_id': whatsapp_id, 'props': [props]} - ) - - # Batch add the links (this is faster than adding them one by one) self.iyp.batch_add_links('CENSORED', censored_links) - def calculate_percentages(self): + def aggregate_results(self): target_dict = defaultdict(lambda: defaultdict(int)) - # Initialize counts for all categories - categories = [ - 'server_failure', - 'server_ok', - 'server_blocked', - 'endpoint_ok', - 'endpoint_blocked', - 'web_failure', - 'web_ok', - 'no_server_failure', - 'no_server_ok', - 'no_server_blocked', - 'no_endpoint_ok', - 'no_endpoint_blocked', - 'no_web_failure', - 'no_web_ok', - ] - - # Populate the target_dict with counts for entry in self.all_results: - asn, country, server_result, endpoint_result, web_result = entry + asn, country, total, server_result, endpoint_result, web_result = entry + target_dict[(asn, country)][total] += 1 target_dict[(asn, country)][server_result] += 1 target_dict[(asn, country)][endpoint_result] += 1 target_dict[(asn, country)][web_result] += 1 - self.all_percentages = {} - for (asn, country), counts in target_dict.items(): - total_count = sum(counts.values()) - for category in categories: - counts[category] = counts.get(category, 0) - - percentages = { - category: ( - (counts[category] / total_count) * 100 if total_count > 0 else 0 - ) - for category in categories - } - - result_dict = { - 'total_count': total_count, - 'category_counts': dict(counts), - 'percentages': percentages, - } - self.all_percentages[(asn, country)] = result_dict + total_count = counts['total_ok'] + counts['total_blocked'] + self.all_percentages[(asn, country)] = self.make_result_dict(counts, total_count) def unit_test(self): return super().unit_test(['CENSORED'])