-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmiddleware.py
65 lines (57 loc) · 2.28 KB
/
middleware.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import asyncio
import re
from datetime import datetime
import background_tasks
import data_types as t
import database
import dns
import notifications
def test_domain(domain: str, resolvers: list[t.DNSResolver]) -> dict[str, str | list[dict[str, str | int]]]:
domain = re.sub(r"^(http(s)?://)?", "", domain.strip().lower()) # normalize domain
results = asyncio.run(dns.run_full_check(domain, resolvers))
if results.final_result in (t.FullProbeResponseType.BLOCKED, t.FullProbeResponseType.PARTIALLY_BLOCKED):
database.add_blocking_instances([
t.BlockingInstance(domain, result.resolver.isp, datetime.now())
for result in results.responses if result.response == t.SingleProbeResponseType.BLOCKED
])
is_new_block = database.add_blocked_domain(t.BlockedDomain(domain, None, datetime.now(), None))
if is_new_block:
notifications.domain_blocked(domain)
return {
"domain": domain,
"final_result": results.final_result.name,
"responses": [
{
"resolver": result.resolver.name,
"response": result.response.name,
"duration": result.duration,
"obeys_cuii": result.resolver.is_blocking,
"protocol": result.resolver.address.protocol
}
for result in results.responses
]
}
def get_resolvers():
resolver_healths = background_tasks.get_resolver_health()
return [
{
"resolver": resolver.resolver.name,
"isp": resolver.resolver.isp,
"protocol": resolver.resolver.address.protocol,
"health": resolver.health.name,
"ping": resolver.ping,
"obeys_cuii": resolver.resolver.is_blocking
}
for resolver in resolver_healths
]
def get_blocked_domains():
blocked_domains = database.get_blocked_domains()
return [
{
"domain": blocked_domain.domain,
# "added_by": blocked_domain.added_by, - removed for now
"first_blocked_on": blocked_domain.first_blocked_on.date().isoformat(),
"site": blocked_domain.site.name if blocked_domain.site else None,
}
for blocked_domain in blocked_domains
]