forked from snarfed/bridgy-fed
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathatproto_hub.py
138 lines (110 loc) · 4.34 KB
/
atproto_hub.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""Single-instance hub for ATProto subscription (firehose) server and client."""
from functools import lru_cache
from ipaddress import ip_address, ip_network
import logging
import os
from pathlib import Path
import socket
import threading
from threading import Thread, Timer
import arroba.server
from arroba import xrpc_sync
from flask import Flask, render_template
import lexrpc.client
import lexrpc.flask_server
from oauth_dropins.webutil.appengine_info import DEBUG, LOCAL_SERVER
from oauth_dropins.webutil import appengine_config, flask_util
import pytz
# all protocols
import activitypub, atproto, web
import atproto_firehose
import common
import models
# as of 2024-07-10
BSKY_TEAM_CIDRS = (
# https://discord.com/channels/1097580399187738645/1115973909624397855/1260356452162469969
ip_network('209.249.133.120/29'),
ip_network('108.179.139.0/24'),
# https://github.com/bluesky-social/atproto/discussions/3036#discussioncomment-11431550
ip_network('67.213.161.32/29')
)
BSKY_TEAM_HOSTS = (
'zip.zayo.com', # maybe? https://github.com/bluesky-social/atproto/discussions/3036#discussioncomment-11399854
)
# WARNING: when this is higher than 1, we start seeing ndb context exceptions,
# "ContextError: No current context," in _handle, even though it has an ndb context
# from handler. No clue why. They happen more often as the number of threads
# increases. Are ndb clients/contexts not thread safe?!
# https://github.com/snarfed/bridgy-fed/issues/1315
# https://console.cloud.google.com/errors/detail/CJrBqKnRzPfNRA;time=PT1H;refresh=true;locations=global?project=bridgy-federated
HANDLE_THREADS = 10
logger = logging.getLogger(__name__)
models.reset_protocol_properties()
# Flask app
app = Flask(__name__)
app.json.compact = False
app_dir = Path(__file__).parent
app.config.from_pyfile(app_dir / 'config.py')
app.wsgi_app = flask_util.ndb_context_middleware(
app.wsgi_app, client=appengine_config.ndb_client,
# limited context-local cache. avoid full one due to this bug:
# https://github.com/googleapis/python-ndb/issues/888
cache_policy=common.cache_policy,
global_cache=common.global_cache,
global_cache_policy=common.global_cache_policy,
global_cache_timeout_policy=common.global_cache_timeout_policy)
@app.get('/liveness_check')
@app.get('/readiness_check')
def health_check():
"""App Engine Flex health checks.
https://cloud.google.com/appengine/docs/flexible/reference/app-yaml?tab=python#updated_health_checks
"""
return 'OK'
# ATProto XRPC server
lexrpc.flask_server.init_flask(arroba.server.server, app)
@app.post('/queue/atproto-commit')
@flask_util.cloud_tasks_only(log=False)
def atproto_commit():
"""Handler for atproto-commit tasks.
Triggers `subscribeRepos` to check for new commits.
"""
xrpc_sync.send_events()
return 'OK'
@lru_cache
def gethostbyaddr(addr):
"""Wrapper for :func:`socket.gethostbyaddr` that caches the result."""
for subnet in BSKY_TEAM_CIDRS:
if ip_address(addr) in subnet:
return 'bsky'
try:
return socket.gethostbyaddr(addr)[0]
except socket.herror:
return None
@app.get('/admin/atproto')
def atproto_admin():
return render_template(
'atproto.html',
subscribers=lexrpc.flask_server.subscribers,
gethostbyaddr=gethostbyaddr,
pytz=pytz,
)
# start firehose consumer threads
if LOCAL_SERVER or not DEBUG:
threads = [t.name for t in threading.enumerate()]
assert 'atproto_firehose.subscriber' not in threads
assert 'atproto_firehose.handler' not in threads
Thread(target=atproto_firehose.subscriber, name='atproto_firehose.subscriber').start()
for i in range(HANDLE_THREADS):
Thread(target=atproto_firehose.handler, name=f'atproto_firehose.handler-{i}').start()
# send requestCrawl to relay every 5m.
# delay 15s at startup because we're not up and serving XRPCs at this point yet.
# not sure why not.
if 'GAE_INSTANCE' in os.environ: # prod
def request_crawl():
bgs = lexrpc.client.Client(f'https://{os.environ["BGS_HOST"]}',
headers={'User-Agent': common.USER_AGENT})
resp = bgs.com.atproto.sync.requestCrawl({'hostname': os.environ['PDS_HOST']})
logger.info(resp)
Timer(5 * 60, request_crawl).start()
Timer(15, request_crawl).start()
logger.info('Will send relay requestCrawl in 15s')