-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathhub.go
159 lines (137 loc) · 3.4 KB
/
hub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package hub
import (
"reflect"
"runtime"
"sync/atomic"
"github.com/rs/zerolog/log"
)
const stackBufferSize = 2048
// Hub 监听多个通道数据
type Hub struct {
producer chan producerOp
processors *Queue
keep atomic.Value
working atomic.Value
}
type producerOp struct {
producer chan interface{}
op producerOpType
cb func()
}
type producerOpType int
const (
addProducer producerOpType = 1
removeProducer producerOpType = 2
)
// NewHub 构建Hub
//
// @param recovery -1 总是恢复; 0 不恢复; >0 恢复次数
func newHub(producerLen int, recovery int, processors ...IDataProcessor) *Hub {
hub := &Hub{
processors: newQueue(processors),
producer: make(chan producerOp, producerLen),
}
hub.working.Store(false)
go hub.process(recovery)
return hub
}
// 添加生产者通道
func (h *Hub) Add(producer chan interface{}, cb func()) {
h.producer <- producerOp{producer, addProducer, cb}
}
// 移除生产者通道
func (h *Hub) Remove(producer chan interface{}, cb func()) {
h.producer <- producerOp{producer, removeProducer, cb}
}
func (h *Hub) process(recovery int) {
h.working.Store(true)
var cases []reflect.SelectCase
backup := h.keep.Load()
if backup == nil {
cases = []reflect.SelectCase{
{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(h.producer),
},
}
h.keep.Store(cases)
} else {
// 存在备份时,直接恢复
cases = backup.([]reflect.SelectCase)
}
defer func() {
if r := recover(); r != nil {
if stackBufferSize > 0 {
buf := make([]byte, stackBufferSize)
l := runtime.Stack(buf, false)
log.Printf("%v: %s", r, buf[:l])
} else {
log.Printf("%v", r)
}
}
flag := recovery != 0
if recovery > 0 {
recovery--
}
log.Warn().Bool("recovery", flag).Int("remain", recovery).Msg("panic recovery")
if flag {
go h.process(recovery)
}
}()
for {
chosen, recv, recvOK := reflect.Select(cases)
if !recvOK {
// remove close chan
cases = append(cases[:chosen], cases[chosen+1:]...)
continue
}
switch value := recv.Interface().(type) {
case producerOp:
if value.op == addProducer {
// append case
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(value.producer),
})
if value.cb != nil {
value.cb()
}
log.Trace().Int("len", len(cases)).Msg("append producer")
} else if value.op == removeProducer {
// remove case
var removed bool
for i, e := range cases {
t, ok := e.Chan.Interface().(chan interface{})
if ok && t == value.producer {
cases = append(cases[:i], cases[i+1:]...)
removed = true
break
}
}
log.Trace().Int("len", len(cases)).Bool("removed", removed).Bool("cb", value.cb != nil).Msg("remove producer")
if removed && value.cb != nil {
value.cb()
}
}
default:
// 执行高风险调用前,先备份一次
h.keep.Store(cases)
// 调用自定义处理
data := recv.Interface()
cursor := h.processors.Cursor()
for data != nil && cursor.Next() {
data = cursor.Value().OnData(data)
}
}
}
}
// 停止
func (h *Hub) Stop() {
// 关闭add通道
close(h.producer)
h.working.Store(false)
}
// 工作中
func (h *Hub) IsWorking() bool {
return h.working.Load().(bool)
}