From df4defd28697ab05d136099dd86265bc0733920b Mon Sep 17 00:00:00 2001 From: kitUIN <8119808+kituin@user.noreply.gitee.com> Date: Sat, 7 Nov 2020 21:54:41 +0800 Subject: [PATCH] 0.6.2 --- PicImageSearch/__init__.py | 8 ++ PicImageSearch/saucenao.py | 154 +++++++++++++++++++++++++++++++++++++ PicImageSearch/tracemoe.py | 152 ++++++++++++++++++++++++++++++++++++ 3 files changed, 314 insertions(+) create mode 100644 PicImageSearch/__init__.py create mode 100644 PicImageSearch/saucenao.py create mode 100644 PicImageSearch/tracemoe.py diff --git a/PicImageSearch/__init__.py b/PicImageSearch/__init__.py new file mode 100644 index 00000000..8504e183 --- /dev/null +++ b/PicImageSearch/__init__.py @@ -0,0 +1,8 @@ +from .saucenao import SauceNAO +from .tracemoe import TraceMoe + +__author__ = 'kitUIN' +__license__ = 'Apache-2.0 License' +__maintainer__ = 'kitUIN' +__email__ = 'kulujun@gmail.com' +__status__ = 'Production' diff --git a/PicImageSearch/saucenao.py b/PicImageSearch/saucenao.py new file mode 100644 index 00000000..245897c9 --- /dev/null +++ b/PicImageSearch/saucenao.py @@ -0,0 +1,154 @@ +import io + +import requests +from PIL import Image +from loguru import logger + + +class SauceNAONorm: + def __init__(self, data): + result_header = data['header'] + result_data = data['data'] + self.raw: dict = data + self.similarity: float = float(result_header['similarity']) + self.thumbnail: str = result_header['thumbnail'] + self.index_id: int = result_header['index_id'] + self.index_name: str = result_header['index_name'] + self.title: str = self._get_title(result_data) + self.urls: str = self._get_urls(result_data) + self.author: str = self._get_author(result_data) + self.pixiv_id: str = self._get_pixiv_id(result_data) + self.member_id: str = self._get_member_id(result_data) + + @staticmethod + def _get_title(data): + if 'title' in data: + return data['title'] + elif 'eng_name' in data: + return data['eng_name'] + elif 'material' in data: + return data['material'] + elif 'source' in data: + return data['source'] + elif 'created_at' in data: + return data['created_at'] + + @staticmethod + def _get_urls(data): + if 'ext_urls' in data: + return data['ext_urls'][0] + elif 'getchu_id' in data: + return f'http://www.getchu.com/soft.phtml?id={data["getchu_id"]}' + return [] + + @staticmethod + def _get_author(data): + if 'author' in data: + return data['author'] + elif 'author_name' in data: + return data['author_name'] + elif 'member_name' in data: + return data['member_name'] + elif 'pawoo_user_username' in data: + return data['pawoo_user_username'] + elif 'company' in data: + return data['company'] + elif 'creator' in data: + if isinstance(data['creator'], list): + return data['creator'][0] + return data['creator'] + + @staticmethod + def _get_pixiv_id(data): + if 'pixiv_id' in data: + return data['pixiv_id'] + else: + return '' + + @staticmethod + def _get_member_id(data): + if 'member_id' in data: + return data['member_id'] + else: + return '' + + def __repr__(self): + return f'' + + +class SauceNAOResponse: + def __init__(self, resp): + self.raw: list = [] + resp_header = resp['header'] + resp_results = resp['results'] + for i in resp_results: + self.raw.append(SauceNAONorm(i)) + self.origin: dict = resp + self.short_remaining: int = resp_header['short_remaining'] # 每30秒访问额度 + self.long_remaining: int = resp_header['long_remaining'] # 每天访问额度 + self.user_id: int = resp_header['user_id'] + self.account_type: int = resp_header['account_type'] + self.short_limit: str = resp_header['short_limit'] + self.long_limit: str = resp_header['long_limit'] + self.status: int = resp_header['status'] + self.results_requested: int = resp_header['results_requested'] + self.search_depth: str = resp_header['search_depth'] + self.minimum_similarity: float = resp_header['minimum_similarity'] + self.results_returned: int = resp_header['results_returned'] + + @staticmethod + def _sort(data): + if data is None: + return [] + sorts = sorted(data, key=lambda r: float(r['header']['similarity']), reverse=True) + return sorts + + def __repr__(self): + return (f'') + + +class SauceNAO: + SauceNAOURL = 'https://saucenao.com/search.php' + + def __init__(self, + api_key: str = None, + *, + output_type: int = 2, + testmode: int = 0, + numres: int = 10 + ) -> None: + """ + :param api_key:用于SauceNAO的访问密钥 + :param output_type: 0=正常(默认) html 1=xml api(未实现) 2=json api + :param testmode: 测试模式 0=正常 1=测试 + :param numres: 输出数量 默认10 + """ + # minsim 控制最小相似度 + # todo 完善所有类型params + params = dict() + if api_key is not None: + params['api_key'] = api_key + params['testmode'] = testmode + params['numres'] = numres + params['output_type'] = output_type + self.params = params + + def search(self, url: str, files=None): + try: + params = self.params + if url[:4] == 'http': # 网络url + params['url'] = url + else: # 文件 + image = Image.open(url) + imageData = io.BytesIO() + image.save(imageData, format='PNG') + files = {'file': ("image.png", imageData.getvalue())} + imageData.close() + resp = requests.post(self.SauceNAOURL, params=params, files=files) + status_code = resp.status_code + logger.info(status_code) + data = resp.json() + return SauceNAOResponse(data) + except Exception as e: + logger.error(e) diff --git a/PicImageSearch/tracemoe.py b/PicImageSearch/tracemoe.py new file mode 100644 index 00000000..55adab29 --- /dev/null +++ b/PicImageSearch/tracemoe.py @@ -0,0 +1,152 @@ +import base64 +from urllib import parse + +import requests +from loguru import logger + + +# todo 完善注释 +class TraceMoe: + def __init__(self): + self.Url = 'https://trace.moe/api/search' # 按图像 URL 搜索 + self.img = '' # 本地图片base64转码结果 + self.raws = [] + + @staticmethod + def _base_64(filename): + with open(filename, 'rb') as f: + coding = base64.b64encode(f.read()) # 读取文件内容,转换为base64编码 + # print('本地base64转码~') + return coding.decode() + + ''' + def errors(self,code): + if code == 200: + response = 'trace.moe访问正常。' + return response + elif code == 413: + response = '图片体积太大。' + return response + elif code == 400: + response = '你没上传图片?' + return response + elif code == 403: + response = 'token无效。' + return response + elif code == 429: + response = '请求太快了,缓一缓吧。' + return response + elif code == 500 or code == 503: + response = '服务器错误 或者 你传错了图片格式。' + return response + else: + response = '未知错误' + return + ''' + + def arrange(self, data): # todo 更新成class形式 + self.raw_all = data # 总返回 + self.RawDocsCount = data['RawDocsCount'] # 搜索的帧总数 + self.RawDocsSearchTime = data['RawDocsSearchTime'] # 从数据库检索帧所用的时间 + self.ReRankSearchTime = data['ReRankSearchTime'] # 比较帧所用的时间 + self.CacheHit = data['CacheHit'] # 是否缓存搜索结果 + self.trial = data['trial'] # 搜索时间 + self.limit = data['limit'] # 剩余搜索限制数 + self.limit_ttl = data['limit_ttl'] # 限制重置之前的时间(秒) + self.quota = data['quota'] # 剩余搜索配额数 + self.quota_ttl = data['quota_ttl'] # 配额重置之前的时间(秒) + docs = data['docs'][0] + self.raw = docs # 最匹配项总结果 + self.From = docs['from'] # 匹配场景的开始时间 + self.to = docs['to'] # 匹配场景的结束时间 + self.anilist_id = docs['anilist_id'] # 匹配的Anilist IDhttps://anilist.co/ + self.at = docs['at'] # 匹配场景的确切时间 + self.season = docs['season'] # 发布时间 + self.anime = docs['anime'] # 番剧名字 + self.filename = docs['filename'] # 找到匹配项的文件名 + self.episode = docs['episode'] # 估计的匹配的番剧的集数 + self.tokenthumb = docs['tokenthumb'] # 用于生成预览的token + self.similarity = docs['similarity'] # 相似度,相似性低于 87% 的搜索结果可能是不正确的结果 + self.title = docs['title'] # 番剧名字 + self.title_native = docs['title_native'] # 番剧世界命名 + self.title_chinese = docs['title_chinese'] # 番剧中文命名 + self.title_english = docs['title_english'] # 番剧英文命名 + self.title_romaji = docs['title_romaji'] # 番剧罗马命名 + self.mal_id = docs['mal_id'] # 匹配的MyAnimelist IDhttps://myanimelist.net/ + self.synonyms = docs['synonyms'] # 备用英文标题 + self.synonyms_chinese = docs['synonyms_chinese'] # 备用中文标题 + self.is_adult = docs['is_adult'] # 是否R18 + self.thumbnail = self.preview_image() # 缩略图预览地址 + self.viedo = self.preview_video() # 视频预览地址 + for i in range(len(data['docs'])): + self.raws.append(data['docs'][i]) # 分开搜索结果 + + def preview_image(self): # 预览 + + anilist_id = self.anilist_id + filename = parse.quote(self.filename) + at = self.at + tokenthumb = self.tokenthumb + url = "https://trace.moe/thumbnail.php?anilist_id={}&file={}&t={}&token={}".format(anilist_id, filename, at, + tokenthumb) + return url + + def preview_video(self, mute=False): + """ + 创建预览视频 + :param mute:预览视频是否静音,True为静音 + :return: 预览视频url地址 + """ + anilist_id = self.anilist_id + filename = parse.quote(self.filename) + at = self.at + tokenthumb = self.tokenthumb + url = 'https://media.trace.moe/video/{}/{}?t={}&token={}'.format(anilist_id, filename, at, tokenthumb) + if mute: + url = url + '&mute' + return url + + def search(self, url, file=False, Filter=0): + """ + 搜索 + :param url:网络地址或本地 + :param file: 是否是本地文件(默认否) + :param Filter: 搜索限制为特定的 Anilist ID(默认无) + :return: + """ + try: + if file: # 是否是本地文件 + URl = self.Url + self.img = self._base_64(url) + res = requests.post(URl, json={"image": self.img, "filter": Filter}) + self.code = res.status_code + data = res.json() + self.arrange(data) + elif not file: # 网络url + URl = self.Url + '?url=' + url + res = requests.get(URl) + self.code = res.status_code + data = res.json() + self.arrange(data) + except Exception as e: + logger.info(e) + + def download_image(self, thumbnail): + """ + 下载缩略图 + :param thumbnail:缩略图地址 + """ + with requests.get(thumbnail, stream=True) as resp: + with open('image.png', 'wb') as fd: + for chunk in resp.iter_content(): + fd.write(chunk) + + def download_video(self, video): + """ + 下载预览视频 + :param video :缩略图地址 + """ + with requests.get(video, stream=True) as resp: + with open('video.mp4', 'wb') as fd: + for chunk in resp.iter_content(): + fd.write(chunk)