Skip to content

Commit

Permalink
Add support for WatchProgressRequest
Browse files Browse the repository at this point in the history
Also fixes progress notifications to be more reliable

Signed-off-by: Brad Davidson <[email protected]>
  • Loading branch information
brandond committed Jan 18, 2024
1 parent 4929f3e commit 04e0f5a
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 67 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
go.etcd.io/etcd/client/v3 v3.5.9
go.etcd.io/etcd/server/v3 v3.5.9
google.golang.org/grpc v1.56.3
k8s.io/apimachinery v0.25.4
k8s.io/client-go v0.25.4
)

Expand Down Expand Up @@ -90,7 +91,6 @@ require (
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
k8s.io/apimachinery v0.25.4 // indirect
k8s.io/klog/v2 v2.70.1 // indirect
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
Expand Down
178 changes: 112 additions & 66 deletions pkg/server/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"k8s.io/apimachinery/pkg/util/wait"
)

var watchID int64
Expand All @@ -21,32 +22,25 @@ var _ etcdserverpb.WatchServer = (*KVServerBridge)(nil)

// GetProgressReportInterval returns the current progress report interval, with some jitter
func GetProgressReportInterval() time.Duration {
interval := progressReportInterval

// add rand(1/10*progressReportInterval) as jitter so that kine will not
// send progress notifications to watchers at the same time even when watchers
// are created at the same time.
jitter := time.Duration(rand.Int63n(int64(interval) / 10))

return interval + jitter
jitter := time.Duration(rand.Int63n(int64(progressReportInterval) / 10))
return progressReportInterval + jitter
}

func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error {
interval := GetProgressReportInterval()
progressTicker := time.NewTicker(interval)
defer progressTicker.Stop()

w := watcher{
server: ws,
backend: s.limited.backend,
watches: map[int64]func(){},
progress: map[int64]int64{},
progress: map[int64]chan<- int64{},
}
defer w.Close()

logrus.Tracef("WATCH SERVER CREATE")

go w.DoProgress(ws.Context(), progressTicker)
go wait.PollInfiniteWithContext(ws.Context(), GetProgressReportInterval(), w.ProgressIfSynced)

for {
msg, err := ws.Recv()
Expand All @@ -61,21 +55,28 @@ func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error {
logrus.Tracef("WATCH CANCEL REQ id=%d", cr.WatchId)
w.Cancel(cr.WatchId, 0, 0, nil)
}
if pr := msg.GetProgressRequest(); pr != nil {
w.Progress(ws.Context())
}
}
}

type watcher struct {
sync.Mutex

wg sync.WaitGroup
backend Backend
server etcdserverpb.Watch_WatchServer
watches map[int64]func()
progress map[int64]int64
progressRev int64
sync.RWMutex

wg sync.WaitGroup
backend Backend
server etcdserverpb.Watch_WatchServer
watches map[int64]func()
progress map[int64]chan<- int64
}

func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) {
if r.WatchId != clientv3.AutoWatchID {
logrus.Warnf("WATCH START id=%d ignoring request with client-provided id", r.WatchId)
return
}

w.Lock()
defer w.Unlock()

Expand All @@ -87,11 +88,14 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest)

key := string(r.Key)

var progressCh chan int64
if r.ProgressNotify {
w.progress[id] = w.progressRev
progressCh = make(chan int64)
w.progress[id] = progressCh
}

logrus.Tracef("WATCH START id=%d, count=%d, key=%s, revision=%d, progressNotify=%v", id, len(w.watches), key, r.StartRevision, r.ProgressNotify)
startRevision := r.StartRevision

go func() {
defer w.wg.Done()
Expand All @@ -116,51 +120,58 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest)

outer := true
for outer {
// Block on initial read from channel
reads := 1
events := <-wr.Events

// Collect additional queued events from the channel
inner := true
for inner {
select {
case e, ok := <-wr.Events:
reads++
events = append(events, e...)
if !ok {
// channel was closed, break out of both loops
var reads int
var events []*Event
var revision int64

// Block on initial read from events or progress channel
select {
case events = <-wr.Events:
// got events; read additional queued events from the channel and add to batch
reads++
inner := true
for inner {
select {
case e, ok := <-wr.Events:
reads++
events = append(events, e...)
if !ok {
// channel was closed, break out of both loops
inner = false
outer = false
}
default:
inner = false
outer = false
}
default:
inner = false
}
case revision = <-progressCh:
// have been requested to send progress with no events
}

// Send collected events in a single response
// get max revision from collected events
if len(events) > 0 {
revision := events[len(events)-1].KV.ModRevision
w.Lock()
if r, ok := w.progress[id]; ok && r == w.progressRev {
w.progress[id] = revision
}
w.Unlock()
revision = events[len(events)-1].KV.ModRevision
if logrus.IsLevelEnabled(logrus.TraceLevel) {
for _, event := range events {
logrus.Tracef("WATCH READ id=%d, key=%s, revision=%d", id, event.KV.Key, event.KV.ModRevision)
}
}
}

// send response. note that there are no events if this is a progress response.
if revision >= startRevision {
wr := &etcdserverpb.WatchResponse{
Header: txnHeader(revision),
WatchId: id,
Events: toEvents(events...),
}
logrus.Tracef("WATCH SEND id=%d, events=%d, size=%d reads=%d", id, len(wr.Events), wr.Size(), reads)
logrus.Tracef("WATCH SEND id=%d, revision=%d, events=%d, size=%d reads=%d", id, revision, len(wr.Events), wr.Size(), reads)
if err := w.server.Send(wr); err != nil {
w.Cancel(id, 0, 0, err)
}
}
}

w.Cancel(id, 0, 0, nil)
logrus.Tracef("WATCH CLOSE id=%d, key=%s", id, key)
}()
Expand Down Expand Up @@ -190,10 +201,13 @@ func toEvent(event *Event) *mvccpb.Event {

func (w *watcher) Cancel(watchID, revision, compactRev int64, err error) {
w.Lock()
if progressCh, ok := w.progress[watchID]; ok {
close(progressCh)
delete(w.progress, watchID)
}
if cancel, ok := w.watches[watchID]; ok {
cancel()
delete(w.watches, watchID)
delete(w.progress, watchID)
}
w.Unlock()

Expand All @@ -218,36 +232,68 @@ func (w *watcher) Cancel(watchID, revision, compactRev int64, err error) {
func (w *watcher) Close() {
logrus.Tracef("WATCH SERVER CLOSE")
w.Lock()
for id, v := range w.watches {
for id, progressCh := range w.progress {
close(progressCh)
delete(w.progress, id)
v()
}
for id, cancel := range w.watches {
cancel()
delete(w.watches, id)
}
w.Unlock()
w.wg.Wait()
}

func (w *watcher) DoProgress(ctx context.Context, ticker *time.Ticker) {
for {
// Progress sends a progress report if all watchers are synced.
// Ref: https://github.com/etcd-io/etcd/blob/v3.5.11/server/mvcc/watchable_store.go#L500-L504
func (w *watcher) Progress(ctx context.Context) {
w.RLock()
defer w.RUnlock()

logrus.Tracef("WATCH REQUEST PROGRESS")

// All synced watchers will be blocked in the outer loop and able to receive on the progress channel.
// If any cannot be sent to, then it is not synced and has pending events to be sent.
// Send revision 0, as we don't actually want the watchers to send a progress response if they do receive.
for id, progressCh := range w.progress {
select {
case <-ctx.Done():
case progressCh <- 0:
default:
logrus.Tracef("WATCH SEND PROGRESS FAILED NOT SYNCED id=%d ", id)
return
case <-ticker.C:
rev, err := w.backend.CurrentRevision(ctx)
if err != nil {
logrus.Errorf("Failed to get current revision for ProgressNotify: %v", err)
continue
}
}
}

w.Lock()
for id, r := range w.progress {
if r == w.progressRev {
logrus.Tracef("WATCH SEND PROGRESS id=%d, revision=%d", id, rev)
go w.server.Send(&etcdserverpb.WatchResponse{Header: txnHeader(rev), WatchId: id})
}
w.progress[id] = rev
}
w.progressRev = rev
w.Unlock()
// If all watchers are synced, send a broadcast progress notification with the latest revision.
id := int64(clientv3.InvalidWatchID)
rev, err := w.backend.CurrentRevision(ctx)
if err != nil {
logrus.Errorf("Failed to get current revision for ProgressNotify: %v", err)
return
}

logrus.Tracef("WATCH SEND PROGRESS id=%d, revision=%d", id, rev)
go w.server.Send(&etcdserverpb.WatchResponse{Header: txnHeader(rev), WatchId: id})
}

// ProgressIfSynced sends a progress report on any channels that are synced and blocked on the outer loop
func (w *watcher) ProgressIfSynced(ctx context.Context) (bool, error) {
logrus.Tracef("WATCH PROGRESS TICK")
revision, err := w.backend.CurrentRevision(ctx)
if err != nil {
logrus.Errorf("Failed to get current revision for ProgressNotify: %v", err)
return false, nil
}

w.RLock()
defer w.RUnlock()

// Send revision to all synced channels
for _, progressCh := range w.progress {
select {
case progressCh <- revision:
default:
}
}
return false, nil
}

0 comments on commit 04e0f5a

Please sign in to comment.