-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththreadPool.py
124 lines (108 loc) · 2.98 KB
/
threadPool.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# -*- coding: utf-8 -*-
######################################################################
#
# Copyright (c) 2013 Baidu.com, Inc. All Rights Reserved.
#
######################################################################
"""
A thread-pool with message queue implemented using threading.
Users only provide a work routine.
Usage:
import threadPool
tp = threadPool.ThreadPool(threadNum, timeOut)
tp.start()
tp.addJob(work_func, args)
tp.stop()
Authors: zhangzhibiao01([email protected])
Date: 2013/02/18 17:24:26
"""
import Queue
import threading
class Worker(threading.Thread):
"""
Worker, which does the real job.
"""
def __init__(self, threadPool, *args, **kargs):
threading.Thread.__init__(self)
self.threadPool = threadPool
self.setDaemon(True)
self.state = None #线程工作状态
self.start()
def run(self):
"""
Running routine.
"""
while True:
if self.state == 'STOP':
break
try:
func, args, kargs = self.threadPool.workQueue.get(self.threadPool.timeOut)
except Queue.Empty:
continue
try:
res = func(*args, **kargs)
self.threadPool.resultQueue.put(res)
self.threadPool.workDone()
except:
break
def stop(self):
"""
Stop workers.
"""
self.state = 'STOP'
class ThreadPool(object):
"""
Thread pool module.
"""
def __init__(self, threadNum, timeOut):
"""
Constructor.
@threadNum, num of threads
@timeOut, time waiting on queue in seconds.
"""
#工作队列
self.workQueue = Queue.Queue()
#结果队列
self.resultQueue = Queue.Queue()
#线程池
self.threadPool = []
#线程数目
self.threadNum = threadNum
#队列读等待超时时间,
#写无须设置超时时间,
#因为队列无长度限制
self.timeOut = timeOut
def start_threads(self):
"""
Start threads.
"""
for i in range(self.threadNum):
self.threadPool.append(Worker(self))
def work_join(self, *args, **kargs):
"""
Join the workers.
"""
self.workQueue.join()
def add_job(self, func, *args, **kargs):
"""
Add an job to pool.
"""
self.workQueue.put((func, args, kargs))
def work_done(self, *args):
"""
Called when job done.
"""
self.workQueue.task_done()
def get_result(self, *args, **kargs):
"""
Get result from result queue
"""
return self.resultQueue.get(*args, **kargs)
def stop_threads(self):
"""
Stop all threads.
"""
for thread in self.threadPool:
#thread.join()
thread.stop()
del self.threadPool[:]