-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathoperationworker.go
142 lines (111 loc) · 3.78 KB
/
operationworker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package conveyor
import (
"fmt"
"log"
"golang.org/x/sync/semaphore"
)
// OperationWorkerPool struct provides the worker pool infra for Operation interface
type OperationWorkerPool struct {
*ConcreteNodeWorker
nextWorkerCount int
inputChannel chan map[string]interface{}
outputChannel chan map[string]interface{}
}
// OperationNode structue
type OperationNode struct {
Pool *OperationWorkerPool
}
// NewOperationWorkerPool creates a new OperationWorkerPool
func NewOperationWorkerPool(executor NodeExecutor, mode WorkerMode) NodeWorker {
cnw := newConcreteNodeWorker(executor, mode)
fwp := &OperationWorkerPool{ConcreteNodeWorker: cnw}
return fwp
}
// CreateChannels creates channels for the Operation WorkerPool
func (fwp *OperationWorkerPool) CreateChannels(buffer int) {
fwp.inputChannel = make(chan map[string]interface{}, buffer)
}
// GetInputChannel returns the input channel of Operation WorkerPool
func (fwp *OperationWorkerPool) GetInputChannel() (chan map[string]interface{}, error) {
return fwp.inputChannel, nil
}
// GetOutputChannel returns the output channel of Operation WorkerPool
func (fwp *OperationWorkerPool) GetOutputChannel() (chan map[string]interface{}, error) {
return fwp.outputChannel, nil
}
// SetInputChannel updates the input channel of Operation WorkerPool
func (fwp *OperationWorkerPool) SetInputChannel(inChan chan map[string]interface{}) error {
fwp.inputChannel = inChan
return nil
}
// SetOutputChannel updates the output channel of Operation WorkerPool
func (fwp *OperationWorkerPool) SetOutputChannel(outChan chan map[string]interface{}) error {
fwp.outputChannel = outChan
return nil
}
// Start Operation Worker Pool
func (fwp *OperationWorkerPool) Start(ctx CnvContext) error {
if fwp.Mode == WorkerModeTransaction {
return fwp.startTransactionMode(ctx)
} else if fwp.Mode == WorkerModeLoop {
return fwp.startLoopMode(ctx)
} else {
return ErrInvalidWorkerMode
}
}
// startLoopMode OperationWorkerPool
func (fwp *OperationWorkerPool) startLoopMode(ctx CnvContext) error {
return fwp.ConcreteNodeWorker.startLoopMode(ctx, fwp.inputChannel, fwp.outputChannel)
}
// startTransactionMode starts OperationWorkerPool in transaction mode
func (fwp *OperationWorkerPool) startTransactionMode(ctx CnvContext) error {
fwp.sem = semaphore.NewWeighted(int64(fwp.WorkerCount))
workerLoop:
for {
select {
case <-ctx.Done():
break workerLoop
default:
}
inData, ok := <-fwp.inputChannel
if !ok {
ctx.SendLog(0, fmt.Sprintf("Executor:[%s] Operation's input channel closed", fwp.Executor.GetUniqueIdentifier()), nil)
break workerLoop
}
if err := fwp.sem.Acquire(ctx, 1); err != nil {
ctx.SendLog(0, fmt.Sprintf("Executor:[%s], sem acquire failed", fwp.Executor.GetUniqueIdentifier()), err)
break workerLoop
}
go func(data map[string]interface{}) {
defer fwp.recovery(ctx, "OperationWorkerPool")
defer fwp.sem.Release(1)
out, err := fwp.Executor.Execute(ctx, data)
if err == nil {
select {
case <-ctx.Done():
return
default:
}
fwp.outputChannel <- out
} else if err == ErrExecuteNotImplemented {
ctx.SendLog(0, fmt.Sprintf("Executor:[%s]", fwp.Executor.GetUniqueIdentifier()), err)
log.Fatalf("Improper setup of Executor[%s], Execute() method is required", fwp.Executor.GetUniqueIdentifier())
} else {
ctx.SendLog(2, fmt.Sprintf("Worker:[%s] for Executor:[%s] Execute() Call Failed.",
fwp.Name, fwp.Executor.GetUniqueIdentifier()), err)
}
return
}(inData)
}
return nil
}
// WorkerType returns the type of worker
func (fwp *OperationWorkerPool) WorkerType() string {
return WorkerTypeOperation
}
// WaitAndStop OperationWorkerPool
func (fwp *OperationWorkerPool) WaitAndStop(ctx CnvContext) error {
_ = fwp.ConcreteNodeWorker.WaitAndStop(ctx)
close(fwp.outputChannel)
return nil
}