-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpool.go
48 lines (43 loc) · 1.55 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package pipeline
import (
"context"
"sync"
"sync/atomic"
)
/*
NewWorkerPoolStep creates a pipeline step that runs nested pipelines in a thread pool.
The function provided as Supplier is expected to close the given channel when no more pipelines should be executed, otherwise this step blocks forever.
The step waits until all pipelines are finished.
* If the given ParallelResultHandler is non-nil it will be called after all pipelines were run, otherwise the step is considered successful.
* The pipelines are executed in a pool of a number of Go routines indicated by size.
* If size is 1, the pipelines are effectively run in sequence.
* If size is 0 or less, the function panics.
*/
func NewWorkerPoolStep[T context.Context](name string, size int, pipelineSupplier Supplier[T], handler ParallelResultHandler[T]) Step[T] {
if size < 1 {
panic("pool size cannot be lower than 1")
}
step := Step[T]{Name: name}
step.Action = func(ctx T) error {
pipelineChan := make(chan *Pipeline[T], size)
m := sync.Map{}
var wg sync.WaitGroup
count := uint64(0)
go pipelineSupplier(ctx, pipelineChan)
for i := 0; i < size; i++ {
wg.Add(1)
go poolWork(ctx, pipelineChan, &wg, &count, &m)
}
wg.Wait()
res := collectResults(ctx, handler, &m)
return setResultErrorFromContext(ctx, name, res)
}
return step
}
func poolWork[T context.Context](ctx T, pipelineChan chan *Pipeline[T], wg *sync.WaitGroup, i *uint64, m *sync.Map) {
defer wg.Done()
for pipe := range pipelineChan {
n := atomic.AddUint64(i, 1) - 1
m.Store(n, pipe.RunWithContext(ctx))
}
}