-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathworker.py
executable file
·122 lines (91 loc) · 3.19 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import asyncio
from aiogram import Bot
import database
from handlers.commands.image import worker as txt2img_worker # NOQA: 引入处理器
from handlers.member_captcha.events import new_member_check, unban_member # NOQA: 引入处理器
from manager import manager
is_running = False
SQL_CREATE_MESSAGES = """
create table if not exists lazy_delete_messages(
id integer primary key autoincrement,
chat int,
msg int,
deleted_at timestamp with time zone
)
"""
SQL_CREATE_NEW_MEMBER_SESSION = """
create table if not exists lazy_sessions(
id integer primary key autoincrement,
chat int,
msg int,
member int,
type text,
checkout_at timestamp with time zone
)
"""
SQL_FETCH_LAZY_DELETE_MESSAGES = (
"select id,chat,msg from lazy_delete_messages where deleted_at < datetime('now','localtime') order by deleted_at limit 500"
)
SQL_FETCH_SESSIONS = "select id,chat,msg,member,type from lazy_sessions where checkout_at < datetime('now','localtime') order by checkout_at limit 500"
logger = manager.logger
# logger.level('DEBUG')
async def lazy_messages(bot: Bot):
"""
处理延迟删除信息
"""
async with database.connection() as conn:
proxy = await conn.execute(SQL_FETCH_LAZY_DELETE_MESSAGES)
rows = [i for i in await proxy.fetchall()]
await proxy.close()
for row in rows:
if await manager.delete_message(row[1], row[2]):
await conn.execute("delete from lazy_delete_messages where id=$1", (row[0],))
await conn.commit()
async def lazy_sessions(bot: Bot):
"""
处理延迟会话
"""
async with database.connection() as conn:
proxy = await conn.execute(SQL_FETCH_SESSIONS)
rows = [i for i in await proxy.fetchall()]
await proxy.close()
if not rows:
return
async with database.connection() as conn:
for row in rows:
id = row[0]
chat = row[1]
msg = row[2]
member = row[3]
session_type = row[4]
func = manager.events.get(session_type)
if func and callable(func):
await func(bot, chat, msg, member)
await conn.execute("delete from lazy_sessions where id=$1", (id,))
logger.info(f"lazy session is touched:{id} {session_type}")
await conn.commit()
async def main():
manager.load_config()
manager.setup()
bot = manager.bot
async with database.connection() as conn:
await conn.execute(SQL_CREATE_MESSAGES)
await conn.execute(SQL_CREATE_NEW_MEMBER_SESSION)
# 清理不必要的数据
await conn.execute("delete from lazy_sessions where checkout_at < datetime('now','-60 seconds')")
for name, func in manager.events.items():
logger.info(f"event:{name} => {func}")
is_running = True
# 启动新成员检查
asyncio.create_task(txt2img_worker())
# 启动延迟删除消息
while is_running:
await asyncio.sleep(0.25)
await lazy_messages(bot)
await lazy_sessions(bot)
print("worker closed")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
is_running = False