-
-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathservice.py
41 lines (29 loc) · 1.19 KB
/
service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from __future__ import annotations
from typing import TYPE_CHECKING
from launart import Launart, Service, any_completed
from avilla.onebot.v11.net.ws_client import OneBot11WsClientNetworking
from avilla.onebot.v11.net.ws_server import OneBot11WsServerNetworking
if TYPE_CHECKING:
from .protocol import OneBot11Protocol
class OneBot11Service(Service):
id = "onebot11.service"
required: set[str] = set()
stages: set[str] = {"preparing", "blocking", "cleanup"}
connections: list[OneBot11WsClientNetworking | OneBot11WsServerNetworking]
protocol: OneBot11Protocol
def __init__(self, protocol: OneBot11Protocol):
self.connections = []
self.protocol = protocol
super().__init__()
async def launch(self, manager: Launart):
for conn in self.connections:
manager.add_component(conn)
async with self.stage("preparing"):
...
async with self.stage("blocking"):
await any_completed(
manager.status.wait_for_sigexit(),
*(conn.status.wait_for("blocking-completed") for conn in self.connections),
)
async with self.stage("cleanup"):
pass