-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathjointworker.go
95 lines (81 loc) · 2.78 KB
/
jointworker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package conveyor
import (
"fmt"
"log"
)
// JointWorkerPool struct provides the worker pool infra for Joint interface, that act as connections between nodes
type JointWorkerPool struct {
*ConcreteJointWorker
nextWorkerCount int
inputChannels []chan map[string]interface{}
outputChannels []chan map[string]interface{}
}
// NewJointWorkerPool creates a new OperationWorkerPool
func NewJointWorkerPool(executor JointExecutor) JointWorker {
jwp := &JointWorkerPool{
ConcreteJointWorker: &ConcreteJointWorker{
WPool: &WPool{
Name: executor.GetName() + "_worker",
},
Executor: executor,
},
}
return jwp
}
// CreateChannels creates channels for the joint worker
func (jwp *JointWorkerPool) CreateChannels(buffer int) {
for i := 0; i < jwp.Executor.InputCount(); i++ {
jwp.inputChannels = append(jwp.inputChannels, make(chan map[string]interface{}, buffer))
}
}
// GetInputChannels returns the input channel of Joint WorkerPool
func (jwp *JointWorkerPool) GetInputChannels() ([]chan map[string]interface{}, error) {
return jwp.inputChannels, nil
}
// GetOutputChannels returns the output channel of Joint WorkerPool
func (jwp *JointWorkerPool) GetOutputChannels() ([]chan map[string]interface{}, error) {
return jwp.outputChannels, nil
}
// SetInputChannels updates the input channel of Joint WorkerPool
func (jwp *JointWorkerPool) SetInputChannels(inChans []chan map[string]interface{}) error {
jwp.inputChannels = inChans
return nil
}
// SetOutputChannels updates the output channel of Joint WorkerPool
func (jwp *JointWorkerPool) SetOutputChannels(outChans []chan map[string]interface{}) error {
jwp.outputChannels = outChans
return nil
}
// AddInputChannel maps a slice of channels on the join't outupt channels
func (jwp *JointWorkerPool) AddInputChannel(inChan chan map[string]interface{}) error {
jwp.outputChannels = append(jwp.outputChannels, inChan)
return nil
}
// AddOutputChannel maps a slice of channels on the join't outupt channels
func (jwp *JointWorkerPool) AddOutputChannel(outChan chan map[string]interface{}) error {
jwp.outputChannels = append(jwp.outputChannels, outChan)
return nil
}
// Start JoinWorkerPool
func (jwp *JointWorkerPool) Start(ctx CnvContext) error {
for i := 0; i < jwp.Executor.Count(); i++ {
jwp.Wg.Add(1)
go func() {
defer jwp.Wg.Done()
if err := jwp.Executor.ExecuteLoop(ctx, jwp.inputChannels, jwp.outputChannels); err != nil {
ctx.SendLog(0, fmt.Sprintf("Executor:[%s]", jwp.Executor.GetUniqueIdentifier()), err)
log.Fatalf("Improper setup of Executor[%s], ExecuteLoop() method is required", jwp.Executor.GetUniqueIdentifier())
return
}
}()
}
return nil
}
// WaitAndStop JointWorkerPool
func (jwp *JointWorkerPool) WaitAndStop() error {
jwp.Wg.Wait()
for _, ch := range jwp.outputChannels {
close(ch)
}
return nil
}