-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathroutinepool.go
201 lines (166 loc) · 4.99 KB
/
routinepool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
package routinepool
import (
"log"
"sync"
"sync/atomic"
"time"
)
type poolStatus int32
const (
running poolStatus = iota
shutting
)
type RoutinePool struct {
corePool chan struct{}
tempPool chan struct{}
queue chan poolTask
maxIdleTime time.Duration
rwmu sync.RWMutex // synchronize between Schedule and ShutDown
shutOnce sync.Once
status poolStatus
active uint64
}
type poolTask struct {
task func()
panicHandler func(any)
}
type PoolRejectsScheduleErr struct {
reason string
}
func (e *PoolRejectsScheduleErr) Error() string {
return "pool rejects to schedule the task: " + e.reason
}
// New creates a goroutine pool.
// When a task coming, a core goroutine will be created immediately to handle the task,
// if the number of goroutines is less than corePoolSize.
// The following tasks will be put in a task queue, if the task queue has free space.
// A temporary goroutine will handle the task who can't be put in the task queue.
// Core goroutines and temporary goroutines poll tasks from the task queue all the time,
// but when the idle time of a temporary goroutine more than maxIdleTime, the temporary goroutine will be destroyed.
// When 1.task queue is full 2.all goroutines are busy 3.the number of goroutines has achieved maxPoolSize
// 4.the pool is shutting down, a task can't be scheduled.
func New(corePoolSize, maxPoolSize, queueSize uint64, maxIdleTime time.Duration) *RoutinePool {
if maxPoolSize == 0 {
panic("maxPoolSize should be greater than 0")
}
if maxPoolSize < corePoolSize {
panic("maxPoolSize should be greater than or equal to corePoolSize")
}
return &RoutinePool{
corePool: make(chan struct{}, corePoolSize),
tempPool: make(chan struct{}, maxPoolSize-corePoolSize),
queue: make(chan poolTask, queueSize),
maxIdleTime: maxIdleTime,
status: running,
active: 0,
}
}
func (p *RoutinePool) Schedule(task func(), panicHandler func(any)) error {
if task == nil {
return &PoolRejectsScheduleErr{reason: "task is nil"}
}
if p.getPoolStatus() != running {
return &PoolRejectsScheduleErr{reason: "pool is not running"}
}
p.rwmu.RLock()
defer p.rwmu.RUnlock()
if p.status != running {
return &PoolRejectsScheduleErr{reason: "pool is not running"}
}
t := poolTask{
task: task,
panicHandler: panicHandler,
}
select {
case p.corePool <- struct{}{}:
go p.work(t)
return nil
default:
}
select {
case p.queue <- t:
return nil
default:
}
select {
case p.tempPool <- struct{}{}:
go p.workWithExpiration(t)
return nil
default:
}
return &PoolRejectsScheduleErr{reason: "pool was full just now"}
}
func (p *RoutinePool) ShutDown() {
p.shutOnce.Do(func() {
p.rwmu.Lock()
defer p.rwmu.Unlock()
p.setPoolStatus(shutting)
close(p.corePool)
close(p.tempPool)
close(p.queue)
})
}
func (p *RoutinePool) work(t poolTask) {
p.incRunningRoutine()
defer p.decRunningRoutine()
defer func() {
if err := recover(); err != nil {
if t.panicHandler != nil {
t.panicHandler(err)
} else {
log.Printf("recover panic: %v", err)
}
if p.getPoolStatus() != shutting {
go p.work(poolTask{task: func() {}}) // core goroutine panic, restart one
}
}
}()
t.task()
for t := range p.queue {
t.task()
} // ShutDown was called and the tasks in the queue are handled completely
}
func (p *RoutinePool) workWithExpiration(t poolTask) {
p.incRunningRoutine()
defer p.decRunningRoutine()
defer func() { <-p.tempPool }()
defer func() {
if err := recover(); err != nil {
if t.panicHandler != nil {
t.panicHandler(err)
} else {
log.Printf("recover panic: %v", err)
}
}
}()
t.task()
for {
select {
case <-time.After(p.maxIdleTime):
return
case t, ok := <-p.queue:
if !ok {
return // ShutDown was called and the tasks in the queue are handled completely
}
t.task()
}
}
}
func (p *RoutinePool) IsShuttingDown() bool {
return p.getPoolStatus() == shutting
}
func (p *RoutinePool) IsTerminated() bool {
return p.getPoolStatus() == shutting && atomic.LoadUint64(&p.active) == 0
}
func (p *RoutinePool) incRunningRoutine() {
atomic.AddUint64(&p.active, 1)
}
func (p *RoutinePool) decRunningRoutine() {
atomic.AddUint64(&p.active, ^uint64(0))
}
func (p *RoutinePool) getPoolStatus() poolStatus {
return poolStatus(atomic.LoadInt32((*int32)(&p.status)))
}
func (p *RoutinePool) setPoolStatus(s poolStatus) {
atomic.StoreInt32((*int32)(&p.status), int32(s))
}