Skip to content

Commit

Permalink
Fix multiple issues with Watch
Browse files Browse the repository at this point in the history
* Fixes an error where watching a compacted revision would return events, followed by an error.
  The watch is now immediately cancelled with an appropriate error, and the compact/current
  revision fields set.

* Fixes an error where a high event rate could flood the GRPC stream with single-event watch
  responses. The event channel reader now merges all available events, and sends them in a
  single watch message. This was most frequently an issue on sqlite, where all inserts are done
  locally and immediately wake the polling goroutine to send the event without any opportunity
  for batching.

Signed-off-by: Brad Davidson <[email protected]>
  • Loading branch information
brandond committed Nov 3, 2023
1 parent 5fd1780 commit 1b48554
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 56 deletions.
22 changes: 15 additions & 7 deletions pkg/drivers/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,23 +843,29 @@ func (d *Driver) Update(ctx context.Context, key string, value []byte, revision,

}

func (d *Driver) Watch(ctx context.Context, prefix string, revision int64) <-chan []*server.Event {

func (d *Driver) Watch(ctx context.Context, prefix string, revision int64) server.WatchResult {
ctx, cancel := context.WithCancel(ctx)
watcher, err := d.kv.(*kv.EncodedKV).Watch(prefix, nats.IgnoreDeletes(), nats.Context(ctx))

if revision > 0 {
revision--
}
_, events, err := d.listAfter(ctx, prefix, revision)

result := make(chan []*server.Event, 100)
wr := server.WatchResult{Events: result}

rev, events, err := d.listAfter(ctx, prefix, revision)
if err != nil {
logrus.Errorf("failed to create watcher %s for revision %d", prefix, revision)
if err == server.ErrCompacted {
compact, _ := d.compactRevision()
wr.CompactRevision = compact
wr.CurrentRevision = rev
}
cancel()
}

result := make(chan []*server.Event, 100)

go func() {

if len(events) > 0 {
result <- events
revision = events[len(events)-1].KV.ModRevision
Expand Down Expand Up @@ -915,11 +921,13 @@ func (d *Driver) Watch(ctx context.Context, prefix string, revision int64) <-cha
if err := watcher.Stop(); err != nil && err != nats.ErrBadSubscription {
logrus.Warnf("error stopping %s watcher: %v", prefix, err)
}
close(result)
cancel()
return
}
}
}()
return result
return wr
}

// getPreviousEntry returns the nats.KeyValueEntry previous to the one provided, if the previous entry is a nats.KeyValuePut
Expand Down
48 changes: 30 additions & 18 deletions pkg/logstructured/logstructured.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (

type Log interface {
Start(ctx context.Context) error
CompactRevision(ctx context.Context) (int64, error)
CurrentRevision(ctx context.Context) (int64, error)
List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeletes bool) (int64, []*server.Event, error)
After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error)
Expand Down Expand Up @@ -288,13 +289,13 @@ func (l *LogStructured) ttl(ctx context.Context) {

eventKV := loadTTLEventKV(rwMutex, ttlEventKVMap, event.KV.Key)
if eventKV == nil {
logrus.Tracef("Add ttl event key %v, modRev %v", event.KV.Key, event.KV.ModRevision)
expires := storeTTLEventKV(rwMutex, ttlEventKVMap, event.KV)
logrus.Tracef("TTL add event key=%v, modRev=%v, ttl=%v", event.KV.Key, event.KV.ModRevision, expires)
queue.AddAfter(event.KV.Key, expires)
} else {
if event.KV.ModRevision > eventKV.modRevision {
logrus.Tracef("Update ttl event key %v, modRev %v", event.KV.Key, event.KV.ModRevision)
expires := storeTTLEventKV(rwMutex, ttlEventKVMap, event.KV)
logrus.Tracef("TTL update event key=%v, modRev=%v, ttl=%v", event.KV.Key, event.KV.ModRevision, expires)
queue.AddAfter(event.KV.Key, expires)
}
}
Expand All @@ -312,13 +313,13 @@ func (l *LogStructured) handleTTLEvents(ctx context.Context, rwMutex *sync.RWMut

eventKV := loadTTLEventKV(rwMutex, store, key.(string))
if eventKV == nil {
logrus.Errorf("Failed to find ttl event for key %v", key)
logrus.Errorf("TTL event not found for key=%v", key)
return true
}

if eventKV.expiredAt.After(time.Now()) {
logrus.Tracef("TTL event key %v has not expired yet, the latest expiration time is %v, requeuing", key, eventKV.expiredAt)
queue.AddAfter(key, time.Until(eventKV.expiredAt))
if expires := time.Until(eventKV.expiredAt); expires > 0 {
logrus.Tracef("TTL has not expired for key=%v, ttl=%v, requeuing", key, expires)
queue.AddAfter(key, expires)
return true
}

Expand All @@ -327,19 +328,19 @@ func (l *LogStructured) handleTTLEvents(ctx context.Context, rwMutex *sync.RWMut
}

func (l *LogStructured) deleteTTLEvent(ctx context.Context, rwMutex *sync.RWMutex, queue workqueue.DelayingInterface, store map[string]*ttlEventKV, preEventKV *ttlEventKV) {
logrus.Tracef("Delete ttl event key %v, modRev %v", preEventKV.key, preEventKV.modRevision)
logrus.Tracef("TTL delete key=%v, modRev=%v", preEventKV.key, preEventKV.modRevision)
_, _, _, err := l.Delete(ctx, preEventKV.key, preEventKV.modRevision)

rwMutex.Lock()
defer rwMutex.Unlock()
curEventKV := store[preEventKV.key]
if curEventKV.expiredAt.After(preEventKV.expiredAt) {
logrus.Tracef("TTL event key %v has updated, requeuing", curEventKV.key)
queue.AddAfter(curEventKV.key, time.Until(curEventKV.expiredAt))
if expires := time.Until(preEventKV.expiredAt); expires > 0 {
logrus.Tracef("TTL changed for key=%v, ttl=%v, requeuing", curEventKV.key, expires)
queue.AddAfter(curEventKV.key, expires)
return
}
if err != nil {
logrus.Errorf("Failed to delete key %v at end of lease: %v, requeuing", curEventKV.key, err)
logrus.Errorf("TTL delete trigger failed for key=%v: %v, requeuing", curEventKV.key, err)
queue.AddAfter(curEventKV.key, retryInterval)
return
}
Expand All @@ -366,7 +367,7 @@ func (l *LogStructured) ttlEvents(ctx context.Context) chan *server.Event {
rev, events, err := l.log.List(ctx, "/", "", 1000, 0, false)
for len(events) > 0 {
if err != nil {
logrus.Errorf("Failed to read old events for ttl: %v", err)
logrus.Errorf("TTL event list failed: %v", err)
break
}

Expand All @@ -389,17 +390,22 @@ func (l *LogStructured) ttlEvents(ctx context.Context) chan *server.Event {
defer wg.Done()
revision := <-lastListRevision
if revision == 0 {
logrus.Error("TTL events last list revision is zero, retry to process ttl events")
logrus.Error("TTL event watch failed to get start revision")
return
}
for events := range l.Watch(ctx, "/", revision) {
wr := l.Watch(ctx, "/", revision)
if wr.CompactRevision != 0 {
logrus.Errorf("TTL event watch failed: %v", server.ErrCompacted)
return
}
for events := range wr.Events {
for _, event := range events {
if event.KV.Lease > 0 {
result <- event
}
}
}
logrus.Info("TTL events watch channel was closed")
logrus.Info("TTL events watch channel closed")
}()

return result
Expand All @@ -423,7 +429,7 @@ func storeTTLEventKV(rwMutex *sync.RWMutex, store map[string]*ttlEventKV, eventK
return expires
}

func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64) <-chan []*server.Event {
func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64) server.WatchResult {
logrus.Tracef("WATCH %s, revision=%d", prefix, revision)

// starting watching right away so we don't miss anything
Expand All @@ -436,10 +442,16 @@ func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64
}

result := make(chan []*server.Event, 100)
wr := server.WatchResult{Events: result}

rev, kvs, err := l.log.After(ctx, prefix, revision, 0)
if err != nil {
logrus.Errorf("failed to list %s for revision %d", prefix, revision)
logrus.Errorf("Failed to list %s for revision %d: %v", prefix, revision, err)
if err == server.ErrCompacted {
compact, _ := l.log.CompactRevision(ctx)
wr.CompactRevision = compact
wr.CurrentRevision = rev
}
cancel()
}

Expand All @@ -463,7 +475,7 @@ func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64
cancel()
}()

return result
return wr
}

func filter(events []*server.Event, rev int64) []*server.Event {
Expand Down
25 changes: 22 additions & 3 deletions pkg/logstructured/sqllog/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ func (s *SQLLog) CurrentRevision(ctx context.Context) (int64, error) {
return s.d.CurrentRevision(ctx)
}

func (s *SQLLog) CompactRevision(ctx context.Context) (int64, error) {
return s.d.GetCompactRevision(ctx)
}

func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error) {
if strings.HasSuffix(prefix, "/") {
prefix += "%"
Expand All @@ -247,8 +251,21 @@ func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64
}

rev, compact, result, err := RowsToEvents(rows)

if revision > 0 && len(result) == 0 {
// a zero length result won't have the compact or current revisions so get them manually
rev, err = s.d.CurrentRevision(ctx)
if err != nil {
return 0, nil, err
}
compact, err = s.d.GetCompactRevision(ctx)
if err != nil {
return 0, nil, err
}
}

if revision > 0 && revision < compact {
return rev, result, server.ErrCompacted
return rev, nil, server.ErrCompacted
}

return rev, result, err
Expand Down Expand Up @@ -299,11 +316,11 @@ func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revis
}

if revision > rev {
return rev, result, server.ErrFutureRev
return rev, nil, server.ErrFutureRev
}

if revision > 0 && revision < compact {
return rev, result, server.ErrCompacted
return rev, nil, server.ErrCompacted
}

select {
Expand Down Expand Up @@ -420,6 +437,8 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) {
continue
}

logrus.Tracef("POLL AFTER %d, limit=%d, events=%d", last, pollBatchSize, len(events))

if len(events) == 0 {
continue
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"strings"

"github.com/sirupsen/logrus"
"go.etcd.io/etcd/api/v3/etcdserverpb"
)

Expand All @@ -25,6 +26,7 @@ func (l *LimitedServer) list(ctx context.Context, r *etcdserverpb.RangeRequest)
if err != nil {
return nil, err
}
logrus.Tracef("LIST COUNT key=%s, end=%s, revision=%d, currentRev=%d count=%d", r.Key, r.RangeEnd, r.Revision, rev, count)
return &RangeResponse{
Header: txnHeader(rev),
Count: count,
Expand All @@ -41,6 +43,7 @@ func (l *LimitedServer) list(ctx context.Context, r *etcdserverpb.RangeRequest)
return nil, err
}

logrus.Tracef("LIST key=%s, end=%s, revision=%d, currentRev=%d count=%d, limit=%d", r.Key, r.RangeEnd, r.Revision, rev, len(kvs), r.Limit)
resp := &RangeResponse{
Header: txnHeader(rev),
Count: int64(len(kvs)),
Expand Down
8 changes: 7 additions & 1 deletion pkg/server/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Backend interface {
List(ctx context.Context, prefix, startKey string, limit, revision int64) (int64, []*KeyValue, error)
Count(ctx context.Context, prefix string) (int64, int64, error)
Update(ctx context.Context, key string, value []byte, revision, lease int64) (int64, *KeyValue, bool, error)
Watch(ctx context.Context, key string, revision int64) <-chan []*Event
Watch(ctx context.Context, key string, revision int64) WatchResult
DbSize(ctx context.Context) (int64, error)
}

Expand Down Expand Up @@ -77,6 +77,12 @@ type Event struct {
PrevKV *KeyValue
}

type WatchResult struct {
CurrentRevision int64
CompactRevision int64
Events <-chan []*Event
}

func unsupported(field string) error {
return status.New(codes.Unimplemented, field+" is not implemented by kine").Err()
}
Loading

0 comments on commit 1b48554

Please sign in to comment.