Skip to content

Commit

Permalink
华为云函数支持参数验证
Browse files Browse the repository at this point in the history
  • Loading branch information
xxyz30 committed Sep 19, 2023
1 parent 9fad1f2 commit 75edf5d
Showing 1 changed file with 56 additions and 15 deletions.
71 changes: 56 additions & 15 deletions cloud_functions/skyland.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,33 @@
import hashlib
import hmac
import json
import logging
import threading
import time
from urllib import parse

import requests

header = {
'cred': 'cred',
'User-Agent': 'Skland/1.0.1 (com.hypergryph.skland; build:100001014; Android 31; ) Okhttp/4.11.0',
'Accept-Encoding': 'gzip',
'Connection': 'close',

# 老版本请求头,新版本要验参
"vName": "1.0.1",
"vCode": "100001014",
"dId": "de9759a5afaa634f",
"platform": "1"
'Connection': 'close'
}

header_login = {
'User-Agent': 'Skland/1.0.1 (com.hypergryph.skland; build:100001014; Android 31; ) Okhttp/4.11.0',
'Accept-Encoding': 'gzip',
'Connection': 'close',
'Connection': 'close'
}

# 老版本请求头,新版本要验参
"vName": "1.0.1",
"vCode": "100001014",
"dId": "de9759a5afaa634f",
"platform": "1"
# 签名请求头一定要这个顺序,否则失败
# timestamp是必填的,其它三个随便填,不要为none即可
header_for_sign = {
'platform': '',
'timestamp': '',
'dId': '',
'vName': ''
}

# 签到url
Expand All @@ -40,6 +42,43 @@

app_code = '4ca99fa6b56cc2ba'

sign_token = threading.local()


def generate_signature(token: str, path, body_or_query):
"""
获得签名头
接口地址+方法为Get请求?用query否则用body+时间戳+ 请求头的四个重要参数(dId,platform,timestamp,vName).toJSON()
将此字符串做HMAC加密,算法为SHA-256,密钥token为请求cred接口会返回的一个token值
再将加密后的字符串做MD5即得到sign
:param token: 拿cred时候的token
:param path: 请求路径(不包括网址)
:param body_or_query: 如果是GET,则是它的query。POST则为它的body
:return: 计算完毕的sign
"""
t = str(int(time.time()))
token = token.encode('utf-8')
header_ca = json.loads(json.dumps(header_for_sign))
header_ca['timestamp'] = t
header_ca_str = json.dumps(header_ca, separators=(',', ':'))
s = path + body_or_query + t + header_ca_str
hex_s = hmac.new(token, s.encode('utf-8'), hashlib.sha256).hexdigest()
md5 = hashlib.md5(hex_s.encode('utf-8')).hexdigest().encode('utf-8').decode('utf-8')
logging.info(f'算出签名: {md5}')
return md5, header_ca


def get_sign_header(url: str, method, body, old_header):
h = json.loads(json.dumps(old_header))
p = parse.urlparse(url)
if method.lower() == 'get':
h['sign'], header_ca = generate_signature(sign_token.token, p.path, p.query)
else:
h['sign'], header_ca = generate_signature(sign_token.token, p.path, json.dumps(body))
for i in header_ca:
h[i] = header_ca[i]
return h


def copy_header(cred):
v = json.loads(json.dumps(header))
Expand All @@ -64,6 +103,7 @@ def get_cred(grant):
}, headers=header_login).json()
if resp['code'] != 0:
raise Exception(f'获得cred失败:{resp["messgae"]}')
sign_token.token = resp['data']['token']
return resp['data']['cred']


Expand All @@ -80,7 +120,7 @@ def get_grant_code(token):

def get_binding_list(cred):
v = []
resp = requests.get(url=binding_url, headers=copy_header(cred)).json()
resp = requests.get(url=binding_url, headers=get_sign_header(binding_url, 'get', None, copy_header(cred))).json()
if resp['code'] != 0:
logging.error(f"请求角色列表出现问题:{resp['message']}")
if resp.get('message') == '用户未登录':
Expand All @@ -100,7 +140,8 @@ def do_sign(cred):
'uid': i.get('uid'),
'gameId': 1
}
resp = requests.post(sign_url, headers=copy_header(cred), json=body).json()
resp = requests.post(sign_url, headers=get_sign_header(sign_url, 'post', body, copy_header(cred)),
json=body).json()
if resp['code'] != 0:
logging.error(f'角色{i.get("nickName")}({i.get("channelName")})签到失败了!原因:{resp.get("message")}')
continue
Expand Down

0 comments on commit 75edf5d

Please sign in to comment.