forked from kvnZero/IGPSPORT2Xingzhe
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathActivitySync.py
156 lines (130 loc) · 6.05 KB
/
ActivitySync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import base64
from datetime import datetime
from zoneinfo import ZoneInfo
import os
import requests, json
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5
import garth
import zipfile
def encrpt(password, public_key):
rsa = RSA.importKey(public_key)
cipher = PKCS1_v1_5.new(rsa)
return base64.b64encode(cipher.encrypt(password.encode())).decode()
def syncData(username, password, garmin_email = None, garmin_password = None):
headers = {
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
"Accept-Encoding" : "gzip, deflate",
}
igp_host = "my.igpsport.com"
if os.getenv("IGPSPORT_REGION") == "global":
igp_host = "i.igpsport.com"
session = requests.session()
type = 1 #default igp
if garmin_password is not None and garmin_password != '':
type = 2 #garmin
# login account
if type == 2:
print("同步佳明数据")
garth.configure(domain="garmin.cn")
garth.login(garmin_email, garmin_password)
activities = garth.connectapi(
f"/activitylist-service/activities/search/activities",
params={"activityType": "cycling", "limit": 10, "start": 0, 'excludeChildren': False},
)
else:
print("同步IGP数据")
url = "https://%s/Auth/Login" % igp_host
data = {
'username': username,
'password': password,
}
res = session.post(url, data, headers=headers)
# get igpsport list
url = "https://%s/Activity/ActivityList" % igp_host
res = session.get(url)
result = json.loads(res.text, strict=False)
activities = result["item"]
# login xingzhe account
url = "https://www.imxingzhe.com/user/login"
res = session.get(url, headers=headers) # need flush cookie
cookie = session.cookies.get_dict()
rd = cookie['rd']
safe_password = password + ';' + rd
encrypter_public_key = "-----BEGIN PUBLIC KEY-----\nMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDmuQkBbijudDAJgfffDeeIButq\nWHZvUwcRuvWdg89393FSdz3IJUHc0rgI/S3WuU8N0VePJLmVAZtCOK4qe4FY/eKm\nWpJmn7JfXB4HTMWjPVoyRZmSYjW4L8GrWmh51Qj7DwpTADadF3aq04o+s1b8LXJa\n8r6+TIqqL5WUHtRqmQIDAQAB\n-----END PUBLIC KEY-----\n"
safe_password = encrpt(safe_password, encrypter_public_key)
url = "https://www.imxingzhe.com/api/v4/account/login"
data = {
'account': username,
'password': safe_password,
"source": "web"
}
res = session.post(url, json.dumps(data), headers=headers)
# get user info
url = "https://www.imxingzhe.com/api/v4/account/get_user_info/"
res = session.get(url, headers=headers)
result = json.loads(res.text, strict=False)
userId = result["userid"]
# get current month data
url = "https://www.imxingzhe.com/api/v4/user_month_info/?user_id="+str(userId)
res = session.get(url, headers=headers)
result = json.loads(res.text, strict=False)
data = result["data"]["wo_info"]
sync_data = []
# get not upload activity
timezone = ZoneInfo('Asia/Shanghai') # to Shanghai timezero in Gtihub Action env
for activity in activities:
if type == 2: #garmin
dt = datetime.strptime(activity["startTimeLocal"], "%Y-%m-%d %H:%M:%S")
dt2 = datetime(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, tzinfo=timezone)
s_time = dt2.timestamp()
mk_time = int(s_time) * 1000
else:
dt = datetime.strptime(activity["StartTime"], "%Y-%m-%d %H:%M:%S")
dt2 = datetime(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, tzinfo=timezone)
s_time = dt2.timestamp()
mk_time = int(s_time) * 1000
need_sync = True
for item in data:
if item["start_time"] == mk_time:
need_sync = False
break
if need_sync:
sync_data.append(activity)
if len(sync_data) == 0:
print("nothing data need sync")
else:
#down file
upload_url = "https://www.imxingzhe.com/api/v4/upload_fits"
for sync_item in sync_data:
if type == 2: # garmin
rid = sync_item['activityId']
rid = str(rid)
print("sync rid:" + rid)
res = garth.download(
f"/download-service/files/activity/{rid}",
)
with open(rid+".zip", "wb") as f:
f.write(res)
with zipfile.ZipFile(rid+".zip", 'r') as zip_ref:
zip_ref.extractall(rid)
with open(rid+"/"+rid+"_ACTIVITY.fit", 'rb') as fd:
result = session.post(upload_url, files={
"title": (None, 'Garmin-'+sync_item["startTimeLocal"], None),
"device": (None, 6, None), #IGPS
"sport": (None, 3, None), #骑行
"upload_file_name": (rid+"_ACTIVITY.fit", fd.read(), 'application/octet-stream')
})
else:
rid = sync_item["RideId"]
rid = str(rid)
print("sync rid:" + rid)
fit_url = "https://%s/fit/activity?type=0&rideid=%s" % (igp_host, rid)
res = session.get(fit_url)
result = session.post(upload_url, files={
"title": (None, 'IGPSPORT-'+sync_item["StartTime"], None),
"device": (None, 3, None), #IGPS
"sport": (None, 3, None), #骑行
"upload_file_name": (sync_item["StartTime"]+'.fit', res.content, 'application/octet-stream')
})
activity = syncData(os.getenv("USERNAME"), os.getenv("PASSWORD"), os.getenv("GARMIN_EMAIL"), os.getenv("GARMIN_PASSWORD"))