forked from nfe-w/aio-dynamic-push
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
47 lines (35 loc) · 1.6 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import time
import schedule
import push_channel
import query_task
from common.config import global_config
from common.logger import log
def init_push_channel(push_channel_config_list: list):
log.info("开始初始化推送通道")
for config in push_channel_config_list:
if config.get('enable', False):
if push_channel.push_channel_dict.get(config.get('name', '')) is not None:
raise ValueError(f"推送通道名称重复: {config.get('name', '')}")
push_channel.push_channel_dict[config.get('name', '')] = push_channel.get_push_channel(config)
log.info(f"初始化推送通道: {config.get('name', '')},通道类型: {config.get('type', None)}")
def init_query_task(query_task_config_list: list):
log.info("初始化查询任务")
for config in query_task_config_list:
if config.get('enable', False):
current_query = query_task.get_query_task(config).query
schedule.every(config.get("intervals_second", 60)).seconds.do(current_query)
log.info(f"初始化查询任务: {config.get('name', '')},任务类型: {config.get('type', None)}")
# 先执行一次
current_query()
while True:
schedule.run_pending()
time.sleep(1)
def main():
query_task_config_list = global_config.get_query_task_config()
push_channel_config_list = global_config.get_push_channel_config()
# 初始化推送通道
init_push_channel(push_channel_config_list)
# 初始化查询任务
init_query_task(query_task_config_list)
if __name__ == '__main__':
main()