-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathworker.py
87 lines (74 loc) · 3.01 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
#!/usr/bin/env python3
from xmlrpc.server import SimpleXMLRPCServer
from xmlrpc.client import ServerProxy
from mapper import Mapper
from reducer import Reducer, RPCMapperClient
import sys
import socket
import _thread
import time
import cfg
import fake_fs
import yadfs.client.client
class Worker:
def __init__(self, fs, name, addr, opts):
self.addr = addr
self.jt_addr = opts["jt_addr"]
self.jt = ServerProxy(self.jt_addr)
self.hb_timeout = 0.2 # heartbeat timeout in seconds
self.on = True
self.mapper = Mapper(opts, fs, "map" + name, addr)
self.reducer = Reducer(fs, "reduce" + name, addr, opts, RPCMapperClient())
def start(self):
print('Init worker')
print('Start sending heartbeats to', self.jt_addr)
_thread.start_new_thread(self._heartbeat, ())
print('Server is ready')
def _heartbeat(self):
while self.on:
try:
self.jt.heartbeat(self.addr)
except Exception as e:
print(e)
time.sleep(self.hb_timeout)
# map data by applying some data function
# task_id - unique task_id
# reducers_count - number of reducers for the task
# chunk_path - DFS path to the chunk file to map
# map_script - DFS path to script of map function
# restart_task - if True then restart map task even its already completed or executing now
def map(self, task_id, rds_count, chunk_path, map_script, restart_task=False):
return self.mapper.map(task_id, rds_count, chunk_path, map_script, restart_task)
# get status of task execution for the current task
def get_status(self, task_id, chunk_path):
return self.mapper.get_status(task_id, chunk_path)
# read mapped data for specific region
# task_id - unique task_id
# region - is a integer region which is specified for the current reducer
# Return dict {status: Status.ok, data: list of tuples}
# if file not exists then status = Status.not_found
# if file is empty then returns ok and empty list
def read_mapped_data(self, task_id, region_number):
return self.mapper.read_mapped_data(task_id, region_number)
# signal from JT for starting reducing
# task_id - unique task_id
# region for which reducer is responsible
# mappers which contain data for current task
# path in DFS to files
def reduce(self, task_id, region, mappers, script_path):
return self.reducer.reduce(task_id, region, mappers, script_path)
if __name__ == '__main__':
# port = int(sys.argv[1])
# addr = "http://localhost:" + str(port)
cfg_path = sys.argv[1]
opts = cfg.load(cfg_path)
host = socket.gethostbyname(socket.gethostname())
port = 8888
addr = 'http://' + host + ":" + str(port)
fs = yadfs.client.client.Client()
worker = Worker(fs, str(port), addr, opts)
worker.start()
server = SimpleXMLRPCServer((host, port))
server.register_introspection_functions()
server.register_instance(worker)
server.serve_forever()