From 1b485542c680f7d6f71308f7e7d1bb60c17d7f72 Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Tue, 31 Oct 2023 19:44:52 +0000 Subject: [PATCH] Fix multiple issues with Watch * Fixes an error where watching a compacted revision would return events, followed by an error. The watch is now immediately cancelled with an appropriate error, and the compact/current revision fields set. * Fixes an error where a high event rate could flood the GRPC stream with single-event watch responses. The event channel reader now merges all available events, and sends them in a single watch message. This was most frequently an issue on sqlite, where all inserts are done locally and immediately wake the polling goroutine to send the event without any opportunity for batching. Signed-off-by: Brad Davidson --- pkg/drivers/nats/nats.go | 22 +++++--- pkg/logstructured/logstructured.go | 48 ++++++++++------ pkg/logstructured/sqllog/sql.go | 25 +++++++- pkg/server/list.go | 3 + pkg/server/types.go | 8 ++- pkg/server/watch.go | 91 +++++++++++++++++++++--------- 6 files changed, 141 insertions(+), 56 deletions(-) diff --git a/pkg/drivers/nats/nats.go b/pkg/drivers/nats/nats.go index dd707ced..ee5f6b37 100644 --- a/pkg/drivers/nats/nats.go +++ b/pkg/drivers/nats/nats.go @@ -843,23 +843,29 @@ func (d *Driver) Update(ctx context.Context, key string, value []byte, revision, } -func (d *Driver) Watch(ctx context.Context, prefix string, revision int64) <-chan []*server.Event { - +func (d *Driver) Watch(ctx context.Context, prefix string, revision int64) server.WatchResult { + ctx, cancel := context.WithCancel(ctx) watcher, err := d.kv.(*kv.EncodedKV).Watch(prefix, nats.IgnoreDeletes(), nats.Context(ctx)) if revision > 0 { revision-- } - _, events, err := d.listAfter(ctx, prefix, revision) + result := make(chan []*server.Event, 100) + wr := server.WatchResult{Events: result} + + rev, events, err := d.listAfter(ctx, prefix, revision) if err != nil { logrus.Errorf("failed to create watcher %s for revision %d", prefix, revision) + if err == server.ErrCompacted { + compact, _ := d.compactRevision() + wr.CompactRevision = compact + wr.CurrentRevision = rev + } + cancel() } - result := make(chan []*server.Event, 100) - go func() { - if len(events) > 0 { result <- events revision = events[len(events)-1].KV.ModRevision @@ -915,11 +921,13 @@ func (d *Driver) Watch(ctx context.Context, prefix string, revision int64) <-cha if err := watcher.Stop(); err != nil && err != nats.ErrBadSubscription { logrus.Warnf("error stopping %s watcher: %v", prefix, err) } + close(result) + cancel() return } } }() - return result + return wr } // getPreviousEntry returns the nats.KeyValueEntry previous to the one provided, if the previous entry is a nats.KeyValuePut diff --git a/pkg/logstructured/logstructured.go b/pkg/logstructured/logstructured.go index 2b5b51f6..729693b9 100644 --- a/pkg/logstructured/logstructured.go +++ b/pkg/logstructured/logstructured.go @@ -17,6 +17,7 @@ const ( type Log interface { Start(ctx context.Context) error + CompactRevision(ctx context.Context) (int64, error) CurrentRevision(ctx context.Context) (int64, error) List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeletes bool) (int64, []*server.Event, error) After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error) @@ -288,13 +289,13 @@ func (l *LogStructured) ttl(ctx context.Context) { eventKV := loadTTLEventKV(rwMutex, ttlEventKVMap, event.KV.Key) if eventKV == nil { - logrus.Tracef("Add ttl event key %v, modRev %v", event.KV.Key, event.KV.ModRevision) expires := storeTTLEventKV(rwMutex, ttlEventKVMap, event.KV) + logrus.Tracef("TTL add event key=%v, modRev=%v, ttl=%v", event.KV.Key, event.KV.ModRevision, expires) queue.AddAfter(event.KV.Key, expires) } else { if event.KV.ModRevision > eventKV.modRevision { - logrus.Tracef("Update ttl event key %v, modRev %v", event.KV.Key, event.KV.ModRevision) expires := storeTTLEventKV(rwMutex, ttlEventKVMap, event.KV) + logrus.Tracef("TTL update event key=%v, modRev=%v, ttl=%v", event.KV.Key, event.KV.ModRevision, expires) queue.AddAfter(event.KV.Key, expires) } } @@ -312,13 +313,13 @@ func (l *LogStructured) handleTTLEvents(ctx context.Context, rwMutex *sync.RWMut eventKV := loadTTLEventKV(rwMutex, store, key.(string)) if eventKV == nil { - logrus.Errorf("Failed to find ttl event for key %v", key) + logrus.Errorf("TTL event not found for key=%v", key) return true } - if eventKV.expiredAt.After(time.Now()) { - logrus.Tracef("TTL event key %v has not expired yet, the latest expiration time is %v, requeuing", key, eventKV.expiredAt) - queue.AddAfter(key, time.Until(eventKV.expiredAt)) + if expires := time.Until(eventKV.expiredAt); expires > 0 { + logrus.Tracef("TTL has not expired for key=%v, ttl=%v, requeuing", key, expires) + queue.AddAfter(key, expires) return true } @@ -327,19 +328,19 @@ func (l *LogStructured) handleTTLEvents(ctx context.Context, rwMutex *sync.RWMut } func (l *LogStructured) deleteTTLEvent(ctx context.Context, rwMutex *sync.RWMutex, queue workqueue.DelayingInterface, store map[string]*ttlEventKV, preEventKV *ttlEventKV) { - logrus.Tracef("Delete ttl event key %v, modRev %v", preEventKV.key, preEventKV.modRevision) + logrus.Tracef("TTL delete key=%v, modRev=%v", preEventKV.key, preEventKV.modRevision) _, _, _, err := l.Delete(ctx, preEventKV.key, preEventKV.modRevision) rwMutex.Lock() defer rwMutex.Unlock() curEventKV := store[preEventKV.key] - if curEventKV.expiredAt.After(preEventKV.expiredAt) { - logrus.Tracef("TTL event key %v has updated, requeuing", curEventKV.key) - queue.AddAfter(curEventKV.key, time.Until(curEventKV.expiredAt)) + if expires := time.Until(preEventKV.expiredAt); expires > 0 { + logrus.Tracef("TTL changed for key=%v, ttl=%v, requeuing", curEventKV.key, expires) + queue.AddAfter(curEventKV.key, expires) return } if err != nil { - logrus.Errorf("Failed to delete key %v at end of lease: %v, requeuing", curEventKV.key, err) + logrus.Errorf("TTL delete trigger failed for key=%v: %v, requeuing", curEventKV.key, err) queue.AddAfter(curEventKV.key, retryInterval) return } @@ -366,7 +367,7 @@ func (l *LogStructured) ttlEvents(ctx context.Context) chan *server.Event { rev, events, err := l.log.List(ctx, "/", "", 1000, 0, false) for len(events) > 0 { if err != nil { - logrus.Errorf("Failed to read old events for ttl: %v", err) + logrus.Errorf("TTL event list failed: %v", err) break } @@ -389,17 +390,22 @@ func (l *LogStructured) ttlEvents(ctx context.Context) chan *server.Event { defer wg.Done() revision := <-lastListRevision if revision == 0 { - logrus.Error("TTL events last list revision is zero, retry to process ttl events") + logrus.Error("TTL event watch failed to get start revision") return } - for events := range l.Watch(ctx, "/", revision) { + wr := l.Watch(ctx, "/", revision) + if wr.CompactRevision != 0 { + logrus.Errorf("TTL event watch failed: %v", server.ErrCompacted) + return + } + for events := range wr.Events { for _, event := range events { if event.KV.Lease > 0 { result <- event } } } - logrus.Info("TTL events watch channel was closed") + logrus.Info("TTL events watch channel closed") }() return result @@ -423,7 +429,7 @@ func storeTTLEventKV(rwMutex *sync.RWMutex, store map[string]*ttlEventKV, eventK return expires } -func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64) <-chan []*server.Event { +func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64) server.WatchResult { logrus.Tracef("WATCH %s, revision=%d", prefix, revision) // starting watching right away so we don't miss anything @@ -436,10 +442,16 @@ func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64 } result := make(chan []*server.Event, 100) + wr := server.WatchResult{Events: result} rev, kvs, err := l.log.After(ctx, prefix, revision, 0) if err != nil { - logrus.Errorf("failed to list %s for revision %d", prefix, revision) + logrus.Errorf("Failed to list %s for revision %d: %v", prefix, revision, err) + if err == server.ErrCompacted { + compact, _ := l.log.CompactRevision(ctx) + wr.CompactRevision = compact + wr.CurrentRevision = rev + } cancel() } @@ -463,7 +475,7 @@ func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64 cancel() }() - return result + return wr } func filter(events []*server.Event, rev int64) []*server.Event { diff --git a/pkg/logstructured/sqllog/sql.go b/pkg/logstructured/sqllog/sql.go index bf9c4462..a2201df1 100644 --- a/pkg/logstructured/sqllog/sql.go +++ b/pkg/logstructured/sqllog/sql.go @@ -236,6 +236,10 @@ func (s *SQLLog) CurrentRevision(ctx context.Context) (int64, error) { return s.d.CurrentRevision(ctx) } +func (s *SQLLog) CompactRevision(ctx context.Context) (int64, error) { + return s.d.GetCompactRevision(ctx) +} + func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error) { if strings.HasSuffix(prefix, "/") { prefix += "%" @@ -247,8 +251,21 @@ func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64 } rev, compact, result, err := RowsToEvents(rows) + + if revision > 0 && len(result) == 0 { + // a zero length result won't have the compact or current revisions so get them manually + rev, err = s.d.CurrentRevision(ctx) + if err != nil { + return 0, nil, err + } + compact, err = s.d.GetCompactRevision(ctx) + if err != nil { + return 0, nil, err + } + } + if revision > 0 && revision < compact { - return rev, result, server.ErrCompacted + return rev, nil, server.ErrCompacted } return rev, result, err @@ -299,11 +316,11 @@ func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revis } if revision > rev { - return rev, result, server.ErrFutureRev + return rev, nil, server.ErrFutureRev } if revision > 0 && revision < compact { - return rev, result, server.ErrCompacted + return rev, nil, server.ErrCompacted } select { @@ -420,6 +437,8 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) { continue } + logrus.Tracef("POLL AFTER %d, limit=%d, events=%d", last, pollBatchSize, len(events)) + if len(events) == 0 { continue } diff --git a/pkg/server/list.go b/pkg/server/list.go index 5c98cf2e..ebec0df7 100644 --- a/pkg/server/list.go +++ b/pkg/server/list.go @@ -6,6 +6,7 @@ import ( "fmt" "strings" + "github.com/sirupsen/logrus" "go.etcd.io/etcd/api/v3/etcdserverpb" ) @@ -25,6 +26,7 @@ func (l *LimitedServer) list(ctx context.Context, r *etcdserverpb.RangeRequest) if err != nil { return nil, err } + logrus.Tracef("LIST COUNT key=%s, end=%s, revision=%d, currentRev=%d count=%d", r.Key, r.RangeEnd, r.Revision, rev, count) return &RangeResponse{ Header: txnHeader(rev), Count: count, @@ -41,6 +43,7 @@ func (l *LimitedServer) list(ctx context.Context, r *etcdserverpb.RangeRequest) return nil, err } + logrus.Tracef("LIST key=%s, end=%s, revision=%d, currentRev=%d count=%d, limit=%d", r.Key, r.RangeEnd, r.Revision, rev, len(kvs), r.Limit) resp := &RangeResponse{ Header: txnHeader(rev), Count: int64(len(kvs)), diff --git a/pkg/server/types.go b/pkg/server/types.go index 4cf55680..797e1dd8 100644 --- a/pkg/server/types.go +++ b/pkg/server/types.go @@ -25,7 +25,7 @@ type Backend interface { List(ctx context.Context, prefix, startKey string, limit, revision int64) (int64, []*KeyValue, error) Count(ctx context.Context, prefix string) (int64, int64, error) Update(ctx context.Context, key string, value []byte, revision, lease int64) (int64, *KeyValue, bool, error) - Watch(ctx context.Context, key string, revision int64) <-chan []*Event + Watch(ctx context.Context, key string, revision int64) WatchResult DbSize(ctx context.Context) (int64, error) } @@ -77,6 +77,12 @@ type Event struct { PrevKV *KeyValue } +type WatchResult struct { + CurrentRevision int64 + CompactRevision int64 + Events <-chan []*Event +} + func unsupported(field string) error { return status.New(codes.Unimplemented, field+" is not implemented by kine").Err() } diff --git a/pkg/server/watch.go b/pkg/server/watch.go index 51b8894f..b6a7af15 100644 --- a/pkg/server/watch.go +++ b/pkg/server/watch.go @@ -24,17 +24,20 @@ func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error { } defer w.Close() + logrus.Tracef("WATCH SERVER CREATE") + for { msg, err := ws.Recv() if err != nil { return err } - if msg.GetCreateRequest() != nil { - w.Start(ws.Context(), msg.GetCreateRequest()) - } else if msg.GetCancelRequest() != nil { - logrus.Tracef("WATCH CANCEL REQ id=%d", msg.GetCancelRequest().GetWatchId()) - w.Cancel(msg.GetCancelRequest().WatchId, nil) + if cr := msg.GetCreateRequest(); cr != nil { + w.Start(ws.Context(), cr) + } + if cr := msg.GetCancelRequest(); cr != nil { + logrus.Tracef("WATCH CANCEL REQ id=%d", cr.WatchId) + w.Cancel(cr.WatchId, 0, 0, nil) } } } @@ -69,31 +72,62 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) Created: true, WatchId: id, }); err != nil { - w.Cancel(id, err) + w.Cancel(id, 0, 0, err) return } - for events := range w.backend.Watch(ctx, key, r.StartRevision) { - if len(events) == 0 { - continue - } + wr := w.backend.Watch(ctx, key, r.StartRevision) + + // If the watch result has a non-zero CompactRevision, then the watch request failed due to + // the requested start revision having been compacted. Pass the current and and compact + // revision to the client via the cancel response, along with the correct error message. + if wr.CompactRevision != 0 { + w.Cancel(id, wr.CurrentRevision, wr.CompactRevision, ErrCompacted) + return + } - if logrus.IsLevelEnabled(logrus.DebugLevel) { - for _, event := range events { - logrus.Tracef("WATCH READ id=%d, key=%s, revision=%d", id, event.KV.Key, event.KV.ModRevision) + outer := true + for outer { + // Block on initial read from channel + reads := 1 + events := <-wr.Events + + // Collect additional queued events from the channel + inner := true + for inner { + select { + case e, ok := <-wr.Events: + reads++ + events = append(events, e...) + if !ok { + // channel was closed, break out of both loops + inner = false + outer = false + } + default: + inner = false } } - if err := w.server.Send(&etcdserverpb.WatchResponse{ - Header: txnHeader(events[len(events)-1].KV.ModRevision), - WatchId: id, - Events: toEvents(events...), - }); err != nil { - w.Cancel(id, err) - continue + // Send collected events in a single response + if len(events) > 0 { + if logrus.IsLevelEnabled(logrus.TraceLevel) { + for _, event := range events { + logrus.Tracef("WATCH READ id=%d, key=%s, revision=%d", id, event.KV.Key, event.KV.ModRevision) + } + } + wr := &etcdserverpb.WatchResponse{ + Header: txnHeader(events[len(events)-1].KV.ModRevision), + WatchId: id, + Events: toEvents(events...), + } + logrus.Tracef("WATCH SEND id=%d, events=%d, size=%d reads=%d", id, len(wr.Events), wr.Size(), reads) + if err := w.server.Send(wr); err != nil { + w.Cancel(id, 0, 0, err) + } } } - w.Cancel(id, nil) + w.Cancel(id, 0, 0, nil) logrus.Tracef("WATCH CLOSE id=%d, key=%s", id, key) }() } @@ -120,7 +154,7 @@ func toEvent(event *Event) *mvccpb.Event { return e } -func (w *watcher) Cancel(watchID int64, err error) { +func (w *watcher) Cancel(watchID, revision, compactRev int64, err error) { w.Lock() if cancel, ok := w.watches[watchID]; ok { cancel() @@ -132,12 +166,14 @@ func (w *watcher) Cancel(watchID int64, err error) { if err != nil { reason = err.Error() } - logrus.Tracef("WATCH CANCEL id=%d reason=%s", watchID, reason) + logrus.Tracef("WATCH CANCEL id=%d, reason=%s, compactRev=%d", watchID, reason, compactRev) + serr := w.server.Send(&etcdserverpb.WatchResponse{ - Header: &etcdserverpb.ResponseHeader{}, - Canceled: true, - CancelReason: "watch closed", - WatchId: watchID, + Header: txnHeader(revision), + Canceled: err != nil, + CancelReason: reason, + WatchId: watchID, + CompactRevision: compactRev, }) if serr != nil && err != nil && !clientv3.IsConnCanceled(serr) { logrus.Errorf("WATCH Failed to send cancel response for watchID %d: %v", watchID, serr) @@ -145,6 +181,7 @@ func (w *watcher) Cancel(watchID int64, err error) { } func (w *watcher) Close() { + logrus.Tracef("WATCH SERVER CLOSE") w.Lock() for _, v := range w.watches { v()