forked from wangxyd/nicecoze
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdowmd.py
116 lines (106 loc) · 4.85 KB
/
dowmd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# encoding:utf-8
import re
import plugins
from bridge.reply import Reply, ReplyType
from lib import itchat
from plugins import *
from config import conf
@plugins.register(
name="dow_markdown",
desire_priority=66,
desc="优化markdown返回结果中的图片和网址链接。",
version="0.7",
author="Kubbo",
hidden=False
)
class dow_markdown(Plugin):
def __init__(self):
super().__init__()
try:
self.config =super().load_config()
self.handlers[Event.ON_HANDLE_CONTEXT] = self.on_handle_context
self.handlers[Event.ON_DECORATE_REPLY] = self.on_decorate_reply
logger.info("[dow_markdown] inited.")
except Exception as e:
logger.warn("[dow_markdown] init failed, ignore.")
raise e
def on_handle_context(self, e_context: EventContext):
send_msg = e_context["context"]
try:
text = send_msg["content"]
if any(word in send_msg["content"] for word in [".画", ".sd"]):
receiver = send_msg.get("receiver")
itchat.send("我正在绘画中,可能需要多等待一会,请稍后...", toUserName=receiver)
logger.info("[WX] sendMsg={}, receiver={}".format(text, receiver))
e_context.action = EventAction.BREAK_PASS
except Exception as e:
text = send_msg["content"]
logger.warn(f"[dow_markdown] on_handle_context failed, content={text}, error={e}")
finally:
e_context.action = EventAction.CONTINUE
def on_decorate_reply(self, e_context: EventContext):
if e_context["reply"].type != ReplyType.TEXT:
return
logger.info("[on_decorate_reply] yooooooooooooooooo")
try:
content = e_context["reply"].content.strip()
# 避免图片无法下载时,重复调用插件导致没有响应的问题
if content.startswith("[DOWNLOAD_ERROR]"):
return
content_arr = [m for m in content.split("<br>") if m.strip()]
if len(content_arr) > 0:
for index, msg in enumerate(content_arr):
self.handle_send(msg, e_context, index == len(content_arr) - 1)
except Exception as e:
logger.warn(f"[dow_markdown] on_decorate_reply failed, error={e}")
finally:
e_context.action = EventAction.CONTINUE
def get_help_text(self, **kwargs):
return "优化返回结果中的图片和网址链接。"
def handle_send(self, content, e_context, is_last):
host = os.environ.get('DIFY_API_BASE', '')
if host.endswith('/v1'):
host = host[:-3]
if 'coze' == conf().get('model'):
host = "https://s.coze.cn/t/"
format_content = self.format_content(content)
parts = re.split(r'&分块&', format_content)
parts = [p for p in parts if p.strip()]
channel = e_context["channel"]
context = e_context["context"]
logger.info("[handle_send] parts={}".format(parts))
if len(parts) > 0:
for index, part in enumerate(parts):
reply = Reply(content=part.strip(), type=ReplyType.TEXT)
if re.search(r"\.(gif|jpg|png|jpeg|webp)", part):
reply.type = ReplyType.IMAGE_URL
reply.content = self.extract_url(part.strip(), host)
elif re.search(r"\.(mp4)", part):
reply.type = ReplyType.VIDEO_URL
reply.content = self.extract_url(part.strip(), host)
try:
if index == len(parts) - 1 and is_last:
e_context["reply"] = reply
e_context.action = EventAction.BREAK_PASS
else:
channel.send(reply, context)
except Exception as e:
source_type = {ReplyType.IMAGE_URL: "图片", ReplyType.VIDEO_URL: "视频"}[reply.type]
itchat.send(f"[{source_type}]加载失败", toUserName=context.get("receiver"))
logger.warn("[dow_markdown] handle_send failed, error={}".format(e))
def extract_url(self, text, host):
text = text.strip()
if text.endswith(")"):
text = text[:-1]
if not text.startswith('http'):
if text.startswith('/t/') and 'coze' == conf().get('model'):
text = text[3:]
text = host + text
logger.info("[extract_url] text={}".format(text))
return text
def format_content(self, content):
regex_pattern = self.config.get("regex_pattern",r"\!\[([^\]]*)\]\(?([^)\\\s]+)\)?")
replacement_pattern = self.config.get("replacement_pattern",r"&分块&\2&分块&")
content = re.sub(regex_pattern, replacement_pattern, content)
content = re.sub(r"\\n", "\n", content)
return content