-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathris_live.py
48 lines (37 loc) · 1.37 KB
/
ris_live.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import asyncio
import websockets
import orjson
async def main():
async for websocket in websockets.connect("wss://ris-live.ripe.net/v1/ws"):
# Subscribe to everything
await websocket.send(
orjson.dumps(
{
"type": "ris_subscribe",
"data": {
"moreSpecific": True,
"socketOptions": {"includeRaw": False},
},
}
)
)
# Blindly process forever!
try:
async for message in websocket:
await process(message)
except websockets.ConnectionClosed:
continue
async def process(message):
data = orjson.loads(message)
# as `path` may contain mixed data types Array(UInt32 | Array(Uint32)) (see https://ris-live.ripe.net/manual/)
# check `path` for AS_SET (the array in the array) and pop it into it's own key
if "path" in data["data"] and data["data"]["path"]:
if type(data["data"]["path"][-1]) == type(list()):
data["data"]["as_set"] = data["data"]["path"][-1].copy()
data["data"]["path"].pop()
# print and pipe into whatever
print(orjson.dumps(data["data"]).decode("utf-8"))
if __name__ == "__main__":
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
loop.run_forever()