forked from SkySoft-ATM/gorillaz
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgetAndWatch_stream_provider.go
189 lines (166 loc) · 6.03 KB
/
getAndWatch_stream_provider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package gorillaz
import (
"encoding/base64"
"time"
"github.com/skysoft-atm/gorillaz/mux"
"github.com/skysoft-atm/gorillaz/stream"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)
type GetAndWatchStreamProvider struct {
streamDef *StreamDefinition
config *GetAndWatchConfig
broadcaster *mux.StateBroadcaster
metrics providerMetricsHolder
gaz *Gaz
}
func (p *GetAndWatchStreamProvider) streamType() stream.StreamType {
return stream.StreamType_GET_AND_WATCH
}
type GetAndWatchConfigOpt func(p *GetAndWatchConfig)
// ProviderConfig is the configuration that will be applied for the stream StreamProvider
type GetAndWatchConfig struct {
InputBufferLen int // InputBufferLen is the size of the input channel (default: 256)
SubscriberInputBufferLen int // SubscriberInputBufferLen is the size of the channel used to forward events to each client. (default: 256)
OnBackPressure func(streamName string) // OnBackPressure is the function called when a customer cannot consume fast enough and event are dropped. (default: log)
Ttl time.Duration
}
func defaultGetAndWatchConfig() *GetAndWatchConfig {
return &GetAndWatchConfig{
InputBufferLen: 256,
SubscriberInputBufferLen: 256,
OnBackPressure: func(streamName string) {
Log.Warn("backpressure applied, an event won't be delivered because it can't consume fast enough", zap.String("stream", streamName))
},
Ttl: 0,
}
}
// NewStreamProvider returns a new provider ready to be used.
// only one instance of provider should be created for a given streamName
func (g *Gaz) NewGetAndWatchStreamProvider(streamName, dataType string, opts ...GetAndWatchConfigOpt) *GetAndWatchStreamProvider {
Log.Info("creating stream", zap.String("stream", streamName))
config := defaultGetAndWatchConfig()
for _, opt := range opts {
opt(config)
}
broadcaster := mux.NewNonBlockingStateBroadcaster(config.InputBufferLen, config.Ttl)
p := &GetAndWatchStreamProvider{
streamDef: &StreamDefinition{Name: streamName, DataType: dataType},
config: config,
broadcaster: broadcaster,
metrics: pMetricHolder(g, streamName),
gaz: g,
}
g.streamRegistry.register(p)
return p
}
// Submit pushes the event to all subscribers and stores it by its key for new subscribers appearing on the stream
func (p *GetAndWatchStreamProvider) Submit(evt *stream.Event) {
p.metrics.sentCounter.Inc()
p.metrics.lastEventTimestamp.SetToCurrentTime()
p.broadcaster.Submit(base64.StdEncoding.EncodeToString(evt.Key), evt)
}
func (p *GetAndWatchStreamProvider) Delete(key []byte) {
p.broadcaster.Delete(base64.StdEncoding.EncodeToString(key))
}
func (p *GetAndWatchStreamProvider) sendHelloMessage(strm grpc.ServerStream, peer Peer) error {
gwe := stream.GetAndWatchEvent{
Metadata: &stream.Metadata{
KeyValue: make(map[string]string),
},
}
evt, err := proto.Marshal(&gwe)
if err != nil {
Log.Error("Error while marshalling GetAndWatchEvent", zap.Error(err))
return err
}
if err := strm.(grpc.ServerStream).SendMsg(evt); err != nil {
Log.Info("consumer disconnected", zap.Error(err), zap.String("stream", p.streamDef.Name), zap.String("peer", peer.address), zap.String("peer service", peer.serviceName))
return err
}
return nil
}
func (p *GetAndWatchStreamProvider) sendLoop(strm grpc.ServerStream, peer Peer, opts sendLoopOpts) error {
streamName := p.streamDef.Name
p.metrics.clientCounter.Inc()
defer p.metrics.clientCounter.Dec()
broadcaster := p.broadcaster
streamCh := make(chan *mux.StateUpdate, p.config.SubscriberInputBufferLen)
broadcaster.Register(streamCh, func(config *mux.ConsumerConfig) error {
config.OnBackpressure(func(interface{}) {
p.config.OnBackPressure(streamName)
p.metrics.backPressureCounter.Inc()
})
if opts.disconnectOnBackpressure {
config.DisconnectOnBackpressure()
}
return nil
})
defer broadcaster.Unregister(streamCh)
for {
select {
case su, ok := <-streamCh:
if !ok {
// if the broadcaster is closed, then there are no more values to be sent to consumers
if broadcaster.Closed() {
return nil
}
// otherwise, it's just for this consumer, it's because the consumer is not consuming fast enough
return status.Error(codes.DataLoss, "not consuming fast enough")
}
gwe := stream.GetAndWatchEvent{
Metadata: &stream.Metadata{
KeyValue: make(map[string]string),
},
}
if su.UpdateType == mux.Delete {
key := su.Value.(string)
bytes, err := base64.StdEncoding.DecodeString(key)
if err != nil {
Log.Error("Unable to decode key", zap.String("key", key))
continue
}
gwe.Key = bytes
gwe.EventType = stream.EventType_DELETE
} else {
se := su.Value.(*stream.Event)
if su.UpdateType == mux.Update {
gwe.EventType = stream.EventType_UPDATE
} else {
gwe.EventType = stream.EventType_INITIAL_STATE
}
gwe.Key = se.Key
gwe.Value = se.Value
var err error
gwe.Metadata, err = stream.EventMetadata(se)
if err != nil {
Log.Error("failed to inject context data into metadata", zap.Error(err))
}
}
evt, err := proto.Marshal(&gwe)
if err != nil {
Log.Error("Error while marshalling GetAndWatchEvent", zap.Error(err))
return err
}
if err := strm.(grpc.ServerStream).SendMsg(evt); err != nil {
Log.Info("consumer disconnected", zap.Error(err), zap.String("stream", streamName), zap.String("peer", peer.address), zap.String("peer service", peer.serviceName))
return err
}
case <-strm.Context().Done():
Log.Info("consumer disconnected", zap.String("stream", streamName), zap.String("peer", peer.address), zap.String("peer service", peer.serviceName))
return strm.Context().Err()
}
}
}
func (p *GetAndWatchStreamProvider) CloseStream() error {
return p.gaz.closeStream(p)
}
func (p *GetAndWatchStreamProvider) close() {
p.broadcaster.Close()
}
func (p *GetAndWatchStreamProvider) streamDefinition() *StreamDefinition {
return p.streamDef
}