-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpool.go
79 lines (66 loc) · 1.58 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package workerpool
import (
"context"
"sync"
)
// Worker - worker handler. Calls on task arriving.
type Worker[Task any] func(ctx context.Context, task Task)
// Pool - worker pool entity
type Pool[Task any] struct {
worker Worker[Task]
tasks chan Task
wg *sync.WaitGroup
}
// NewPool - creates new worker pool. `worker` - is a job. `workersCount` - count of workers executing jobs.
func NewPool[Task any](worker Worker[Task], workersCount int) *Pool[Task] {
return &Pool[Task]{
worker: worker,
tasks: make(chan Task, workersCount),
wg: new(sync.WaitGroup),
}
}
// WorkersCount - returns workers count in the pool
func (pool *Pool[Task]) WorkersCount() int {
return cap(pool.tasks)
}
// Close - wait returnning from workers and closing pool object
func (pool *Pool[Task]) Close() error {
pool.wg.Wait()
close(pool.tasks)
return nil
}
// Start - begin worker pool
func (pool *Pool[Task]) Start(ctx context.Context) {
if pool.worker == nil {
return
}
for i := 0; i < pool.WorkersCount(); i++ {
pool.wg.Add(1)
go pool.start(ctx)
}
}
func (pool *Pool[Task]) start(ctx context.Context) {
defer pool.wg.Done()
for {
select {
case <-ctx.Done():
return
case task := <-pool.tasks:
pool.worker(ctx, task)
}
}
}
// AddTask - adds task to worker pool. Blocks if worker pool is full.
func (pool *Pool[Task]) AddTask(task Task) {
pool.tasks <- task
}
// QueueSize - current count of tasks in pool
func (pool *Pool[Task]) QueueSize() int {
return len(pool.tasks)
}
// Clear - clears queue
func (pool *Pool[Task]) Clear() {
for len(pool.tasks) > 0 {
<-pool.tasks
}
}