-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrunner_pool.go
229 lines (189 loc) · 5.43 KB
/
runner_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
package runnerpool
import (
"context"
"fmt"
"sync"
"sync/atomic"
)
// Pool represents a pool of runners, understanding runners as func(func()) functions like func(f func()) { go f() }
type Pool interface {
// Worker returns a Worker if can be acquired before context is canceled, otherwise returns the
// ctx.Err() result
Worker(ctx context.Context) (Worker, error)
// Stats provides statistics about pool's size, status & config
Stats() Stats
}
// Stats provides statistics about pool's size, status & config
type Stats struct {
MaxWorkers int32
Workers int32
Acquired int32
Running int32
}
//go:generate mockery -outpkg runnerpoolmock -output runnerpoolmock -case underscore -name Pool
var _ Pool = &WorkerPool{}
// Worker represents a pool worker that offers a one-time usable runner
type Worker interface {
// Run runs the given function.
// The context provided is will be canceled if WorkerPool.Stop is called.
Run(func(context.Context))
// Release should be called at least once every time a worker is acquired
// It is safe to call Release twice and calling Release on a worker that is already running has no effect,
// so the safest way is to defer worker.Release() once the worker has been acquired.
Release()
}
//go:generate mockery -outpkg runnerpoolmock -output runnerpoolmock -case underscore -name Worker
var _ Worker = &worker{}
// New returns a new WorkerPool, this should be started after being used
// Run should be provided to choose whether to recover or report panics, notice that if you
// recover from panics, the workers won't be restablished so you'll eventually exhaust them
func New(cfg Config, runner func(func())) *WorkerPool {
ctx, cancel := context.WithCancel(context.Background())
return &WorkerPool{
config: cfg,
runner: runner,
sem: make(chan struct{}, cfg.Workers),
ctx: ctx,
stop: cancel,
}
}
// Config is the configuration for the pool implementation
type Config struct {
Workers int `default:"100"`
}
// WorkerPool implements Pool using a runner & goroutines pool
type WorkerPool struct {
workers chan *worker
config Config
runner func(func())
stats struct {
workers int32
acquired int32
running int32
}
sem chan struct{}
ctx context.Context
stop func()
}
type worker struct {
f chan func(context.Context)
once sync.Once
released chan struct{}
}
func (w *worker) Run(f func(context.Context)) {
w.f <- f
}
func (w *worker) Release() {
w.once.Do(func() { close(w.released) })
}
// Worker returns an error, if possible within the provided context, otherwise it will return ctx.Err()
func (p *WorkerPool) Worker(ctx context.Context) (Worker, error) {
select {
case w := <-p.workers:
return w, nil
case <-ctx.Done():
return nil, ErrCantAcquireWorker{ctx.Err()}
case <-p.ctx.Done():
return nil, ErrCantAcquireWorker{fmt.Errorf("pool is already stopped")}
}
}
// Start starts creating the workers
func (p *WorkerPool) Start() error {
if p.config.Workers == 0 {
return fmt.Errorf("can't start a pool with 0 workers")
}
p.workers = make(chan *worker)
go func() {
for {
// We force the loop to always evaluate first if the pool is
// being stopped before trying to create a worker
select {
case <-p.ctx.Done():
return
default:
select {
case p.sem <- struct{}{}:
p.createWorker()
case <-p.ctx.Done():
return
}
}
}
}()
return nil
}
// Stats provides thread-safe statistics about pool's size, status & config
func (p *WorkerPool) Stats() Stats {
return Stats{
MaxWorkers: int32(p.config.Workers),
Workers: atomic.LoadInt32(&p.stats.workers),
Acquired: atomic.LoadInt32(&p.stats.acquired),
Running: atomic.LoadInt32(&p.stats.running),
}
}
func (p *WorkerPool) createWorker() {
p.inc(&p.stats.workers)
p.runner(func() {
defer func() { <-p.sem }()
defer p.dec(&p.stats.workers)
for {
worker := &worker{
f: make(chan func(context.Context)),
released: make(chan struct{}),
}
select {
case p.workers <- worker:
func() {
p.inc(&p.stats.acquired)
defer p.dec(&p.stats.acquired)
select {
case f := <-worker.f:
func() {
p.inc(&p.stats.running)
defer p.dec(&p.stats.running)
f(p.ctx)
}()
case <-worker.released:
// keep looping
}
}()
case <-p.ctx.Done():
return
}
}
})
}
func (p *WorkerPool) inc(ptr *int32) {
atomic.AddInt32(ptr, 1)
}
func (p *WorkerPool) dec(ptr *int32) {
atomic.AddInt32(ptr, -1)
}
// Stop cancel the pool's context which is provided to each worker and will wait until all workers are stopped or until
// context is done, in which case it will return a wrapped ctx.Err()
func (p *WorkerPool) Stop(ctx context.Context) error {
p.stop()
// It tries to push as many times as workers are defined for the pool.
// If it succeeds, it means all elements inside sem chan belong to the Stop
// call and, thus, all workers are stopped.
for i := 0; i < p.config.Workers; i++ {
select {
case p.sem <- struct{}{}:
// we're good
case <-ctx.Done():
return fmt.Errorf("can't stop all workers: %w", ctx.Err())
}
}
return nil
}
// ErrCantAcquireWorker is returned by Pool.Worker() function when worker can't be acquired
type ErrCantAcquireWorker struct {
cause error
}
// Unwrap implements the errors unwrapping
func (e ErrCantAcquireWorker) Unwrap() error {
return e.cause
}
func (e ErrCantAcquireWorker) Error() string {
return fmt.Sprintf("can't acquire worker: %s", e.cause)
}