-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathserver.py
349 lines (316 loc) · 13.6 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
#!/usr/bin/python
# -*- coding: utf-8 -*-
import json
import logging.handlers
import math
import os
import platform
import random
import sched
import shutil
import signal
import socket
import subprocess
import threading
import time
from urllib import request, parse
import psutil
import osutil
import settings
if not os.path.exists('/data/python'):
os.makedirs('/data/python')
log = logging.getLogger()
log.setLevel(logging.DEBUG)
fh = logging.handlers.RotatingFileHandler("/data/python/watchdog.log", maxBytes=16 * 1024 * 1024,
backupCount=1, encoding="UTF-8")
fh.setLevel(logging.DEBUG)
formatter = logging.Formatter('$: %(asctime)s > %(levelname)-5s > %(filename)s:%(lineno)s > %(message)s')
fh.setFormatter(formatter)
log.addHandler(fh)
hostname = socket.gethostname()
settings.CACHE_HOST_NAME = hostname
# 初始化sched模块的scheduler类
# 第一个参数是一个可以返回时间戳的函数,第二个参数可以在定时未到达之前阻塞。
schedule = sched.scheduler(time.time, time.sleep)
# 被周期性调度触发的函数
def heart():
h = startHeartThread()
h.start()
schedule.enter(settings.HTTP_HEARTBEAT, 0, heart)
def start():
# 延迟几秒,让项目启动完全
time.sleep(30)
# enter四个参数分别为: 间隔时间、优先级、被调用触发的函数、传递参数,如果只有一个参数需加,号 (xxx,)
schedule.enter(settings.HTTP_HEARTBEAT, 0, heart)
schedule.run()
# 心跳请求,定时获取最新任务
class startHeartThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.daemon = False
def run(self):
url = settings.HTTP_URL + "/task"
now = int(time.time())
nonce = ''.join(random.sample('abcdefghijklmnopqrstuvwxyz0123456789', 32))
headers = {
r'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.109 Safari/537.36',
r'Accept': 'application/json, text/javascript, */*; q=0.01',
r'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8'
}
mem = psutil.virtual_memory()
cpu_percent = psutil.cpu_percent(0.5)
net = psutil.net_io_counters(pernic=False, nowrap=True)
hdd = psutil.disk_usage('/')
data = {
'appid': settings.HTTP_SECRET_ID,
'nonce': nonce,
'timestamp': str(now),
'apps': ','.join(settings.APP_LIST),
'hostname': settings.CACHE_HOST_NAME,
'mem_total': str(mem.total),
'mem_used': str(mem.used),
'mem_free': str(mem.free),
'mem_percent': str(mem.percent),
'cpu_percent': str(cpu_percent),
'net_sent': str(net.bytes_sent),
'net_recv': str(net.bytes_recv),
'net_tcp': str(osutil.netstat()),
'hdd_total': str(hdd.total),
'hdd_used': str(hdd.used),
'hdd_free': str(hdd.free),
'hdd_percent': str(hdd.percent)
}
sign = osutil.createSign(settings.HTTP_SECRET_KEY, data)
data['sign'] = sign
data = parse.urlencode(data).encode('utf-8')
try:
req = request.Request(url, data, headers, None, None, 'POST')
page = request.urlopen(req, None, settings.HTTP_TIMEOUT).read()
result = page.decode('utf-8')
log.debug(result)
res_data = json.loads(result)
if res_data['code'] == 0:
task_list = res_data['data']
for task in task_list:
# 将任务ID放到缓存里,防止重复执行同一个任务
if task['id'] in settings.CACHE_TASK_IDS:
log.debug('任务已存在,任务ID:::' + task['id'])
else:
settings.CACHE_TASK_IDS.append(task['id'])
t = execTaskThread(task['id'], task)
t.start()
log.debug('任务已添加,任务ID:::' + task['id'])
except Exception as e:
log.error(str(e) + ':::' + url)
# 杀死进程
def killProcess(pid):
sysname = platform.system()
try:
if sysname == 'Linux':
os.kill(int(pid), signal.SIGKILL)
result = {'code': 0, 'msg': '已杀死PID为 %s 的进程' % (pid)}
elif sysname == 'Windows':
os.kill(int(pid), -1)
result = {'code': 0, 'msg': '已杀死PID为 %s 的进程' % (pid)}
else:
result = {'code': -1, 'msg': '不支持的操作系统::' + platform.system()}
except OSError as e:
log.error('杀死进程出错::' + str(e))
result = {'code': -2, 'msg': '杀死进程出错::' + str(e)}
return result
# 启动jar程序
class startJavaThread(threading.Thread):
def __init__(self, appname):
threading.Thread.__init__(self)
self.appname = appname
self.daemon = False
def run(self):
_env = os.environ.copy()
_env["APPNAME"] = self.appname
appversion = osutil.getAppVersion(self.appname)
confversion = osutil.getConfVersion(self.appname)
apppath = settings.APP_ROOT + self.appname + '/app/' + appversion + '/' + self.appname + '.jar'
confpath = settings.APP_ROOT + self.appname + '/conf/' + confversion + '/'
try:
jvm = settings.APP_OPTS[self.appname]
except KeyError as e:
jvm = ''
cmd = 'java ' + ' -Dnutz.boot.configure.properties.dir=' + confpath + ' ' + jvm + ' -jar ' + apppath
log.info(cmd)
subprocess.call(cmd,close_fds=True,shell=True,env=_env)
#with open("/data/python/" + self.appname + ".log", "w") as f:
# subprocess.call(cmd, close_fds=True, shell=True, env=_env, stdout=f, stderr=f)
# 执行任务命令
class execTaskThread(threading.Thread):
def __init__(self, taskid, task):
threading.Thread.__init__(self)
self.taskid = taskid
self.task = task
self.daemon = False
def run(self):
action = self.task['action']
if 'stop' == action:
result = killProcess(self.task['processId'])
if result['code'] == 0:
settings.CACHE_TASK_IDS.remove(self.taskid)
r = execReportThread(self.taskid, 2, '执行成功')
r.start()
else:
r = execReportThread(self.taskid, 3, result['msg'])
r.start()
if 'start' == action:
appname = self.task['name']
appversion = self.task['appVersion']
confversion = self.task['confVersion']
ok = True
# 判断程序版本是否存在,不存在则下载,并把version文件移到新文件夹
if not os.path.exists(settings.APP_ROOT + appname + '/app/' + appversion + '/' + appname + '.jar'):
try:
os.makedirs(settings.APP_ROOT + appname + '/app/' + appversion)
except Exception as e:
log.error(str(e))
if not dowload('jar', appname, appversion):
ok = False
r = execReportThread(self.taskid, 3, 'Jar包下载失败')
r.start()
# 判断配置文件是否存在,存在则下载并覆盖,不存在则下载
if not os.path.exists(settings.APP_ROOT + appname + '/conf/' + confversion + '/' + appname + '.properties'):
try:
os.makedirs(settings.APP_ROOT + appname + '/conf/' + confversion)
except Exception as e:
log.error(str(e))
if not dowload('conf', appname, confversion):
ok = False
r = execReportThread(self.taskid, 3, '配置文件下载失败')
r.start()
if ok:
try:
oldappversion = osutil.getAppVersion(appname)
if oldappversion != appversion:
shutil.move(settings.APP_ROOT + appname + '/app/' + oldappversion + '/version',
settings.APP_ROOT + appname + '/app/' + appversion + '/version')
oldconfversion = osutil.getConfVersion(appname)
if oldconfversion != confversion:
shutil.move(settings.APP_ROOT + appname + '/conf/' + oldconfversion + '/version',
settings.APP_ROOT + appname + '/conf/' + confversion + '/version')
except Exception as e:
ok = False
log.error(str(e))
r = execReportThread(self.taskid, 3, '切换版本出错')
r.start()
if ok:
t = startJavaThread(appname)
t.start()
settings.CACHE_TASK_IDS.remove(self.taskid)
r = execReportThread(self.taskid, 2, '执行成功')
r.start()
def dowload(type, name, version):
if type == 'conf':
url = settings.HTTP_URL + "/conf/download"
else:
url = settings.HTTP_URL + "/jar/download"
now = int(time.time())
nonce = ''.join(random.sample('abcdefghijklmnopqrstuvwxyz0123456789', 32))
headers = {
r'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.109 Safari/537.36',
r'Accept': 'application/json, text/javascript, */*; q=0.01',
r'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8'
}
data = {
'appid': settings.HTTP_SECRET_ID,
'nonce': nonce,
'timestamp': str(now),
'hosts': ','.join(settings.APP_LIST),
'hostname': settings.CACHE_HOST_NAME,
'name': name,
'version': version
}
sign = osutil.createSign(settings.HTTP_SECRET_KEY, data)
data['sign'] = sign
data = parse.urlencode(data).encode('utf-8')
try:
req = request.Request(url, data, headers, None, None, 'POST')
data = request.urlopen(req, None, settings.HTTP_TIMEOUT).read()
if type == 'conf':
filepath = settings.APP_ROOT + name + '/conf/' + version + "/" + name + '.properties'
else:
filepath = settings.APP_ROOT + name + '/app/' + version + "/" + name + '.jar'
with open(filepath, "wb+") as file:
file.write(data)
return True
except Exception as e:
log.error(str(e) + ':::' + url)
return False
# 上报执行结果 status 0-待执行,1-执行中,2-执行成功,3-执行失败,4-撤销任务
class execReportThread(threading.Thread):
def __init__(self, taskid, status, msg):
threading.Thread.__init__(self)
self.taskid = taskid
self.status = status
self.msg = msg
self.daemon = False
def run(self):
url = settings.HTTP_URL + "/report"
now = int(time.time())
nonce = ''.join(random.sample('abcdefghijklmnopqrstuvwxyz0123456789', 32))
headers = {
r'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.109 Safari/537.36',
r'Accept': 'application/json, text/javascript, */*; q=0.01',
r'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8'
}
data = {
'appid': settings.HTTP_SECRET_ID,
'nonce': nonce,
'timestamp': str(now),
'hostname': settings.CACHE_HOST_NAME,
'taskid': self.taskid,
'status': str(self.status),
'msg': self.msg
}
sign = osutil.createSign(settings.HTTP_SECRET_KEY, data)
data['sign'] = sign
data = parse.urlencode(data).encode('utf-8')
try:
req = request.Request(url, data, headers, None, None, 'POST')
request.urlopen(req, None, settings.HTTP_TIMEOUT).read()
except Exception as e:
log.error(str(e) + ':::' + url)
def init():
# 初始化运行目录
if not os.path.exists(settings.APP_ROOT):
now = time.time()
log.info('初始化运行目录: ' + settings.APP_ROOT)
os.makedirs(settings.APP_ROOT)
for appName in settings.APP_LIST:
if not os.path.exists(settings.APP_ROOT + appName):
try:
os.makedirs(settings.APP_ROOT + appName + '/app/0/')
os.makedirs(settings.APP_ROOT + appName + '/conf/0/')
shutil.copyfile(settings.APP_JARS + appName + '.jar',
settings.APP_ROOT + appName + '/app/0/' + appName + '.jar')
shutil.copyfile(settings.APP_JARS + appName + '.properties',
settings.APP_ROOT + appName + '/conf/0/' + appName + '.properties')
file = open(settings.APP_ROOT + appName + '/app/0/version', 'w')
file.write("")
file.close()
file = open(settings.APP_ROOT + appName + '/conf/0/version', 'w')
file.write("")
file.close()
except Exception as e:
log.error('运行目录初始化出错::' + str(e))
log.info('初始化运行目录完成,耗时: ' + str(math.ceil(1000 * (time.time() - now))) + "ms")
else:
log.info('运行目录已存在,开始启动项目...')
for appName in settings.APP_LIST:
try:
t = startJavaThread(appName)
t.start()
time.sleep(5)
except Exception as e:
log.error(str(e))
start()
if __name__ == '__main__':
init()