Skip to content

Commit

Permalink
0.6.2
Browse files Browse the repository at this point in the history
  • Loading branch information
kitUIN committed Nov 7, 2020
1 parent 3bd0c2d commit df4defd
Show file tree
Hide file tree
Showing 3 changed files with 314 additions and 0 deletions.
8 changes: 8 additions & 0 deletions PicImageSearch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from .saucenao import SauceNAO
from .tracemoe import TraceMoe

__author__ = 'kitUIN'
__license__ = 'Apache-2.0 License'
__maintainer__ = 'kitUIN'
__email__ = '[email protected]'
__status__ = 'Production'
154 changes: 154 additions & 0 deletions PicImageSearch/saucenao.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import io

import requests
from PIL import Image
from loguru import logger


class SauceNAONorm:
def __init__(self, data):
result_header = data['header']
result_data = data['data']
self.raw: dict = data
self.similarity: float = float(result_header['similarity'])
self.thumbnail: str = result_header['thumbnail']
self.index_id: int = result_header['index_id']
self.index_name: str = result_header['index_name']
self.title: str = self._get_title(result_data)
self.urls: str = self._get_urls(result_data)
self.author: str = self._get_author(result_data)
self.pixiv_id: str = self._get_pixiv_id(result_data)
self.member_id: str = self._get_member_id(result_data)

@staticmethod
def _get_title(data):
if 'title' in data:
return data['title']
elif 'eng_name' in data:
return data['eng_name']
elif 'material' in data:
return data['material']
elif 'source' in data:
return data['source']
elif 'created_at' in data:
return data['created_at']

@staticmethod
def _get_urls(data):
if 'ext_urls' in data:
return data['ext_urls'][0]
elif 'getchu_id' in data:
return f'http://www.getchu.com/soft.phtml?id={data["getchu_id"]}'
return []

@staticmethod
def _get_author(data):
if 'author' in data:
return data['author']
elif 'author_name' in data:
return data['author_name']
elif 'member_name' in data:
return data['member_name']
elif 'pawoo_user_username' in data:
return data['pawoo_user_username']
elif 'company' in data:
return data['company']
elif 'creator' in data:
if isinstance(data['creator'], list):
return data['creator'][0]
return data['creator']

@staticmethod
def _get_pixiv_id(data):
if 'pixiv_id' in data:
return data['pixiv_id']
else:
return ''

@staticmethod
def _get_member_id(data):
if 'member_id' in data:
return data['member_id']
else:
return ''

def __repr__(self):
return f'<NormSauce(title={repr(self.title)}, similarity={self.similarity:.2f})>'


class SauceNAOResponse:
def __init__(self, resp):
self.raw: list = []
resp_header = resp['header']
resp_results = resp['results']
for i in resp_results:
self.raw.append(SauceNAONorm(i))
self.origin: dict = resp
self.short_remaining: int = resp_header['short_remaining'] # 每30秒访问额度
self.long_remaining: int = resp_header['long_remaining'] # 每天访问额度
self.user_id: int = resp_header['user_id']
self.account_type: int = resp_header['account_type']
self.short_limit: str = resp_header['short_limit']
self.long_limit: str = resp_header['long_limit']
self.status: int = resp_header['status']
self.results_requested: int = resp_header['results_requested']
self.search_depth: str = resp_header['search_depth']
self.minimum_similarity: float = resp_header['minimum_similarity']
self.results_returned: int = resp_header['results_returned']

@staticmethod
def _sort(data):
if data is None:
return []
sorts = sorted(data, key=lambda r: float(r['header']['similarity']), reverse=True)
return sorts

def __repr__(self):
return (f'<SauceResponse(count={repr(len(self.raw))}, long_remaining={repr(self.long_remaining)}, '
f'short_remaining={repr(self.short_remaining)})>')


class SauceNAO:
SauceNAOURL = 'https://saucenao.com/search.php'

def __init__(self,
api_key: str = None,
*,
output_type: int = 2,
testmode: int = 0,
numres: int = 10
) -> None:
"""
:param api_key:用于SauceNAO的访问密钥
:param output_type: 0=正常(默认) html 1=xml api(未实现) 2=json api
:param testmode: 测试模式 0=正常 1=测试
:param numres: 输出数量 默认10
"""
# minsim 控制最小相似度
# todo 完善所有类型params
params = dict()
if api_key is not None:
params['api_key'] = api_key
params['testmode'] = testmode
params['numres'] = numres
params['output_type'] = output_type
self.params = params

def search(self, url: str, files=None):
try:
params = self.params
if url[:4] == 'http': # 网络url
params['url'] = url
else: # 文件
image = Image.open(url)
imageData = io.BytesIO()
image.save(imageData, format='PNG')
files = {'file': ("image.png", imageData.getvalue())}
imageData.close()
resp = requests.post(self.SauceNAOURL, params=params, files=files)
status_code = resp.status_code
logger.info(status_code)
data = resp.json()
return SauceNAOResponse(data)
except Exception as e:
logger.error(e)
152 changes: 152 additions & 0 deletions PicImageSearch/tracemoe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import base64
from urllib import parse

import requests
from loguru import logger


# todo 完善注释
class TraceMoe:
def __init__(self):
self.Url = 'https://trace.moe/api/search' # 按图像 URL 搜索
self.img = '' # 本地图片base64转码结果
self.raws = []

@staticmethod
def _base_64(filename):
with open(filename, 'rb') as f:
coding = base64.b64encode(f.read()) # 读取文件内容,转换为base64编码
# print('本地base64转码~')
return coding.decode()

'''
def errors(self,code):
if code == 200:
response = 'trace.moe访问正常。'
return response
elif code == 413:
response = '图片体积太大。'
return response
elif code == 400:
response = '你没上传图片?'
return response
elif code == 403:
response = 'token无效。'
return response
elif code == 429:
response = '请求太快了,缓一缓吧。'
return response
elif code == 500 or code == 503:
response = '服务器错误 或者 你传错了图片格式。'
return response
else:
response = '未知错误'
return
'''

def arrange(self, data): # todo 更新成class形式
self.raw_all = data # 总返回
self.RawDocsCount = data['RawDocsCount'] # 搜索的帧总数
self.RawDocsSearchTime = data['RawDocsSearchTime'] # 从数据库检索帧所用的时间
self.ReRankSearchTime = data['ReRankSearchTime'] # 比较帧所用的时间
self.CacheHit = data['CacheHit'] # 是否缓存搜索结果
self.trial = data['trial'] # 搜索时间
self.limit = data['limit'] # 剩余搜索限制数
self.limit_ttl = data['limit_ttl'] # 限制重置之前的时间(秒)
self.quota = data['quota'] # 剩余搜索配额数
self.quota_ttl = data['quota_ttl'] # 配额重置之前的时间(秒)
docs = data['docs'][0]
self.raw = docs # 最匹配项总结果
self.From = docs['from'] # 匹配场景的开始时间
self.to = docs['to'] # 匹配场景的结束时间
self.anilist_id = docs['anilist_id'] # 匹配的Anilist IDhttps://anilist.co/
self.at = docs['at'] # 匹配场景的确切时间
self.season = docs['season'] # 发布时间
self.anime = docs['anime'] # 番剧名字
self.filename = docs['filename'] # 找到匹配项的文件名
self.episode = docs['episode'] # 估计的匹配的番剧的集数
self.tokenthumb = docs['tokenthumb'] # 用于生成预览的token
self.similarity = docs['similarity'] # 相似度,相似性低于 87% 的搜索结果可能是不正确的结果
self.title = docs['title'] # 番剧名字
self.title_native = docs['title_native'] # 番剧世界命名
self.title_chinese = docs['title_chinese'] # 番剧中文命名
self.title_english = docs['title_english'] # 番剧英文命名
self.title_romaji = docs['title_romaji'] # 番剧罗马命名
self.mal_id = docs['mal_id'] # 匹配的MyAnimelist IDhttps://myanimelist.net/
self.synonyms = docs['synonyms'] # 备用英文标题
self.synonyms_chinese = docs['synonyms_chinese'] # 备用中文标题
self.is_adult = docs['is_adult'] # 是否R18
self.thumbnail = self.preview_image() # 缩略图预览地址
self.viedo = self.preview_video() # 视频预览地址
for i in range(len(data['docs'])):
self.raws.append(data['docs'][i]) # 分开搜索结果

def preview_image(self): # 预览

anilist_id = self.anilist_id
filename = parse.quote(self.filename)
at = self.at
tokenthumb = self.tokenthumb
url = "https://trace.moe/thumbnail.php?anilist_id={}&file={}&t={}&token={}".format(anilist_id, filename, at,
tokenthumb)
return url

def preview_video(self, mute=False):
"""
创建预览视频
:param mute:预览视频是否静音,True为静音
:return: 预览视频url地址
"""
anilist_id = self.anilist_id
filename = parse.quote(self.filename)
at = self.at
tokenthumb = self.tokenthumb
url = 'https://media.trace.moe/video/{}/{}?t={}&token={}'.format(anilist_id, filename, at, tokenthumb)
if mute:
url = url + '&mute'
return url

def search(self, url, file=False, Filter=0):
"""
搜索
:param url:网络地址或本地
:param file: 是否是本地文件(默认否)
:param Filter: 搜索限制为特定的 Anilist ID(默认无)
:return:
"""
try:
if file: # 是否是本地文件
URl = self.Url
self.img = self._base_64(url)
res = requests.post(URl, json={"image": self.img, "filter": Filter})
self.code = res.status_code
data = res.json()
self.arrange(data)
elif not file: # 网络url
URl = self.Url + '?url=' + url
res = requests.get(URl)
self.code = res.status_code
data = res.json()
self.arrange(data)
except Exception as e:
logger.info(e)

def download_image(self, thumbnail):
"""
下载缩略图
:param thumbnail:缩略图地址
"""
with requests.get(thumbnail, stream=True) as resp:
with open('image.png', 'wb') as fd:
for chunk in resp.iter_content():
fd.write(chunk)

def download_video(self, video):
"""
下载预览视频
:param video :缩略图地址
"""
with requests.get(video, stream=True) as resp:
with open('video.mp4', 'wb') as fd:
for chunk in resp.iter_content():
fd.write(chunk)

0 comments on commit df4defd

Please sign in to comment.