forked from lizouzt/DHT
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcollectord.py
executable file
·132 lines (109 loc) · 4.26 KB
/
collectord.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#!/usr/bin/env python
# coding: utf-8
import os,pdb
from twisted.application import service, internet
from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver
from twisted.internet import reactor, utils, task
from twisted.python import log
from twisted.python.log import ILogObserver, FileLogObserver
from twisted.python.logfile import DailyLogFile
class CollectorFactory(Factory):
'''
采集器工厂
'''
def __init__(self, serv):
self._serv = serv
def buildProtocol(self, addr):
return CollectorProtocol(self._serv)
class CollectorProtocol(LineReceiver):
'''
接口协议,处理状态查询,服务控制
'''
def __init__(self, service):
self._service = service
def connectionMade(self):
#接到node KRPC对接,添加当前握手node到处理队列
self._service.add_query_protocol(self)
def lostConnection(self):
#node节点断开连接,从队列中去除
self._service.del_query_protocol(self)
class CollectorServices(service.Service):
'''
采集进程控制服务
'''
_query_protocols = []
def __init__(self, port):
self._before_cmds = ['/bin/rm -r -f '
'*.log collections/* libtorrent_logs*']
self._run_cmd = '/usr/bin/python'
self._run_args = ('dhtcollector.py', 'result.json', 'collector.stat')
self._timeout = 10 * 60
self._restart_times = 0
self._work_d = None
self._task = None
self._work_stat = None
self._serv = None
self._port = port
def startService(self):
log.msg('start run collectord')
if self._work_d is None:
self._start_work()
#每10.0ms循环进行_readstat信息读取
self._task = task.LoopingCall(self._readstat, 'collector.stat')
self._task.start(10.0)
log.msg('start listen %d' % self._port)
#启动TCP监听node KRPC交互
self._serv = reactor.listenTCP(self._port, CollectorFactory(self))
def stopService(self):
log.msg('stop run collectord')
def add_query_protocol(self, protocol):
self._query_protocols.append(protocol)
def del_query_protocol(self, protocol):
self._query_protocols.remove(protocol)
def _start_work(self):
self._restart_times += 1
log.msg('restart task times %d' % self._restart_times)
for cmd in self._before_cmds:
os.system(cmd)
'''
utils创建一个deferred对象,该对象创建一个新进程执行参数命令
deferred执行的结果经回调传递给监听函数处理
'''
self._work_d = utils.getProcessOutput(self._run_cmd,
self._run_args)
self._work_d.addCallbacks(self._work_finish, self._work_err)
def _work_finish(self, result):
log.msg('process done, msg[%s]' % result)
self._work_d = None
def _work_err(self, result):
log.err('process exit, error[%s]' % result.getErrorMessage())
self._work_d = None
def _readstat(self, statfile):
if self._work_d is None:
#作为peer读取DHT网络中获取的UDP数据
self._start_work()
log.msg('_readstat arund');
if self._query_protocols and os.path.isfile(statfile):
#处理请求队列,将自己数据发送给其他peer
try:
with open(statfile, 'rb') as f:
self._work_stat = f.read()
except Exception as err:
self._work_stat = err.message
log.err(err.message)
out = ['run times: %d\n\n' % (self._restart_times),
self._work_stat]
out = ''.join(out)
for protocol in self._query_protocols:
protocol.sendLine(out)
# 创建log目录
if not os.path.isdir('./collectord_log'):
os.mkdir('./collectord_log')
application = service.Application('collectord')
logfile = DailyLogFile('collectord.log', './collectord_log')
log.startLogging(open("./collectord_log/collectord.log",'w'))
application.setComponent(ILogObserver, FileLogObserver(logfile).emit)
services = CollectorServices(32900)
services.setServiceParent(application)
services.startService()