diff --git a/go.mod b/go.mod index 82a83897..91a83a1c 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( go.etcd.io/etcd/client/v3 v3.5.9 go.etcd.io/etcd/server/v3 v3.5.9 google.golang.org/grpc v1.56.3 + k8s.io/apimachinery v0.25.4 k8s.io/client-go v0.25.4 ) @@ -90,7 +91,6 @@ require ( google.golang.org/protobuf v1.30.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect - k8s.io/apimachinery v0.25.4 // indirect k8s.io/klog/v2 v2.70.1 // indirect k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed // indirect sigs.k8s.io/yaml v1.3.0 // indirect diff --git a/main.go b/main.go index 5b4f944f..6a05e904 100644 --- a/main.go +++ b/main.go @@ -96,6 +96,12 @@ func main() { Usage: "Enable net/http/pprof handlers on the metrics bind address. Default is false.", Destination: &metricsConfig.EnableProfiling, }, + cli.DurationFlag{ + Name: "watch-progress-notify-interval", + Usage: "Interval between periodic watch progress notifications. Default is 10m.", + Destination: &config.NotifyInterval, + Value: time.Minute * 10, + }, cli.BoolFlag{Name: "debug"}, } app.Action = run diff --git a/pkg/endpoint/endpoint.go b/pkg/endpoint/endpoint.go index 0dfa7b1f..2a6abad7 100644 --- a/pkg/endpoint/endpoint.go +++ b/pkg/endpoint/endpoint.go @@ -6,6 +6,7 @@ import ( "net" "os" "strings" + "time" "github.com/k3s-io/kine/pkg/drivers/dqlite" "github.com/k3s-io/kine/pkg/drivers/generic" @@ -44,6 +45,7 @@ type Config struct { ServerTLSConfig tls.Config BackendTLSConfig tls.Config MetricsRegisterer prometheus.Registerer + NotifyInterval time.Duration } type ETCDConfig struct { @@ -80,7 +82,7 @@ func Listen(ctx context.Context, config Config) (ETCDConfig, error) { } // set up GRPC server and register services - b := server.New(backend, endpointScheme(config)) + b := server.New(backend, endpointScheme(config), config.NotifyInterval) grpcServer, err := grpcServer(config) if err != nil { return ETCDConfig{}, errors.Wrap(err, "creating GRPC server") diff --git a/pkg/logstructured/sqllog/sql.go b/pkg/logstructured/sqllog/sql.go index a2201df1..59cf6ebf 100644 --- a/pkg/logstructured/sqllog/sql.go +++ b/pkg/logstructured/sqllog/sql.go @@ -26,6 +26,7 @@ type SQLLog struct { broadcaster broadcaster.Broadcaster ctx context.Context notify chan int64 + currentRev int64 } func New(d server.Dialect) *SQLLog { @@ -109,7 +110,7 @@ func (s *SQLLog) compactor(interval time.Duration) { t := time.NewTicker(interval) defer t.Stop() compactRev, _ := s.d.GetCompactRevision(s.ctx) - targetCompactRev, _ := s.d.CurrentRevision(s.ctx) + targetCompactRev, _ := s.CurrentRevision(s.ctx) logrus.Tracef("COMPACT starting compactRev=%d targetCompactRev=%d", compactRev, targetCompactRev) outer: @@ -233,6 +234,9 @@ func (s *SQLLog) postCompact() error { } func (s *SQLLog) CurrentRevision(ctx context.Context) (int64, error) { + if s.currentRev != 0 { + return s.currentRev, nil + } return s.d.CurrentRevision(ctx) } @@ -254,7 +258,7 @@ func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64 if revision > 0 && len(result) == 0 { // a zero length result won't have the compact or current revisions so get them manually - rev, err = s.d.CurrentRevision(ctx) + rev, err = s.CurrentRevision(ctx) if err != nil { return 0, nil, err } @@ -305,7 +309,7 @@ func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revis if revision > 0 && len(result) == 0 { // a zero length result won't have the compact or current revisions so get them manually - rev, err = s.d.CurrentRevision(ctx) + rev, err = s.CurrentRevision(ctx) if err != nil { return 0, nil, err } @@ -386,7 +390,7 @@ func filter(events interface{}, checkPrefix bool, prefix string) ([]*server.Even } func (s *SQLLog) startWatch() (chan interface{}, error) { - pollStart, err := s.d.GetCompactRevision(s.ctx) + pollStart, err := s.d.CurrentRevision(s.ctx) if err != nil { return nil, err } @@ -400,8 +404,9 @@ func (s *SQLLog) startWatch() (chan interface{}, error) { } func (s *SQLLog) poll(result chan interface{}, pollStart int64) { + s.currentRev = pollStart + var ( - last = pollStart skip int64 skipTime time.Time waitForMore = true @@ -417,7 +422,7 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) { case <-s.ctx.Done(): return case check := <-s.notify: - if check <= last { + if check <= s.currentRev { continue } case <-wait.C: @@ -425,7 +430,7 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) { } waitForMore = true - rows, err := s.d.After(s.ctx, "%", last, pollBatchSize) + rows, err := s.d.After(s.ctx, "%", s.currentRev, pollBatchSize) if err != nil { logrus.Errorf("fail to list latest changes: %v", err) continue @@ -437,7 +442,7 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) { continue } - logrus.Tracef("POLL AFTER %d, limit=%d, events=%d", last, pollBatchSize, len(events)) + logrus.Tracef("POLL AFTER %d, limit=%d, events=%d", s.currentRev, pollBatchSize, len(events)) if len(events) == 0 { continue @@ -445,7 +450,7 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) { waitForMore = len(events) < 100 - rev := last + rev := s.currentRev var ( sequential []*server.Event saveLast bool @@ -507,7 +512,7 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) { } if saveLast { - last = rev + s.currentRev = rev if len(sequential) > 0 { result <- sequential } diff --git a/pkg/server/limited.go b/pkg/server/limited.go index 5675a0b6..7957c84d 100644 --- a/pkg/server/limited.go +++ b/pkg/server/limited.go @@ -2,13 +2,15 @@ package server import ( "context" + "time" "go.etcd.io/etcd/api/v3/etcdserverpb" ) type LimitedServer struct { - backend Backend - scheme string + notifyInterval time.Duration + backend Backend + scheme string } func (l *LimitedServer) Range(ctx context.Context, r *etcdserverpb.RangeRequest) (*RangeResponse, error) { diff --git a/pkg/server/server.go b/pkg/server/server.go index 4823c62e..5aca5e57 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1,6 +1,8 @@ package server import ( + "time" + "go.etcd.io/etcd/api/v3/etcdserverpb" "google.golang.org/grpc" "google.golang.org/grpc/health" @@ -12,11 +14,12 @@ type KVServerBridge struct { limited *LimitedServer } -func New(backend Backend, scheme string) *KVServerBridge { +func New(backend Backend, scheme string, notifyInterval time.Duration) *KVServerBridge { return &KVServerBridge{ limited: &LimitedServer{ - backend: backend, - scheme: scheme, + notifyInterval: notifyInterval, + backend: backend, + scheme: scheme, }, } } diff --git a/pkg/server/watch.go b/pkg/server/watch.go index b20bbb8f..2ca2bbae 100644 --- a/pkg/server/watch.go +++ b/pkg/server/watch.go @@ -11,42 +11,35 @@ import ( "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" + "k8s.io/apimachinery/pkg/util/wait" ) var watchID int64 -var progressReportInterval = 10 * time.Minute // explicit interface check var _ etcdserverpb.WatchServer = (*KVServerBridge)(nil) -// GetProgressReportInterval returns the current progress report interval, with some jitter -func GetProgressReportInterval() time.Duration { - interval := progressReportInterval - - // add rand(1/10*progressReportInterval) as jitter so that kine will not +// getProgressReportInterval returns the configured progress report interval, with some jitter +func (s *KVServerBridge) getProgressReportInterval() time.Duration { + // add rand(1/10*notifyInterval) as jitter so that kine will not // send progress notifications to watchers at the same time even when watchers // are created at the same time. - jitter := time.Duration(rand.Int63n(int64(interval) / 10)) - - return interval + jitter + jitter := time.Duration(rand.Int63n(int64(s.limited.notifyInterval) / 10)) + return s.limited.notifyInterval + jitter } func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error { - interval := GetProgressReportInterval() - progressTicker := time.NewTicker(interval) - defer progressTicker.Stop() - w := watcher{ server: ws, backend: s.limited.backend, watches: map[int64]func(){}, - progress: map[int64]int64{}, + progress: map[int64]chan<- int64{}, } defer w.Close() logrus.Tracef("WATCH SERVER CREATE") - go w.DoProgress(ws.Context(), progressTicker) + go wait.PollInfiniteWithContext(ws.Context(), s.getProgressReportInterval(), w.ProgressIfSynced) for { msg, err := ws.Recv() @@ -61,21 +54,28 @@ func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error { logrus.Tracef("WATCH CANCEL REQ id=%d", cr.WatchId) w.Cancel(cr.WatchId, 0, 0, nil) } + if pr := msg.GetProgressRequest(); pr != nil { + w.Progress(ws.Context()) + } } } type watcher struct { - sync.Mutex - - wg sync.WaitGroup - backend Backend - server etcdserverpb.Watch_WatchServer - watches map[int64]func() - progress map[int64]int64 - progressRev int64 + sync.RWMutex + + wg sync.WaitGroup + backend Backend + server etcdserverpb.Watch_WatchServer + watches map[int64]func() + progress map[int64]chan<- int64 } func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) { + if r.WatchId != clientv3.AutoWatchID { + logrus.Warnf("WATCH START id=%d ignoring request with client-provided id", r.WatchId) + return + } + w.Lock() defer w.Unlock() @@ -86,12 +86,15 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) w.wg.Add(1) key := string(r.Key) + startRevision := r.StartRevision + var progressCh chan int64 if r.ProgressNotify { - w.progress[id] = w.progressRev + progressCh = make(chan int64) + w.progress[id] = progressCh } - logrus.Tracef("WATCH START id=%d, count=%d, key=%s, revision=%d, progressNotify=%v", id, len(w.watches), key, r.StartRevision, r.ProgressNotify) + logrus.Tracef("WATCH START id=%d, key=%s, revision=%d, progressNotify=%v, watchCount=%d", id, key, startRevision, r.ProgressNotify, len(w.watches)) go func() { defer w.wg.Done() @@ -104,7 +107,7 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) return } - wr := w.backend.Watch(ctx, key, r.StartRevision) + wr := w.backend.Watch(ctx, key, startRevision) // If the watch result has a non-zero CompactRevision, then the watch request failed due to // the requested start revision having been compacted. Pass the current and and compact @@ -114,53 +117,61 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) return } + trace := logrus.IsLevelEnabled(logrus.TraceLevel) outer := true for outer { - // Block on initial read from channel - reads := 1 - events := <-wr.Events - - // Collect additional queued events from the channel - inner := true - for inner { - select { - case e, ok := <-wr.Events: - reads++ - events = append(events, e...) - if !ok { - // channel was closed, break out of both loops + var reads int + var events []*Event + var revision int64 + + // Block on initial read from events or progress channel + select { + case events = <-wr.Events: + // got events; read additional queued events from the channel and add to batch + reads++ + inner := true + for inner { + select { + case e, ok := <-wr.Events: + reads++ + events = append(events, e...) + if !ok { + // channel was closed, break out of both loops + inner = false + outer = false + } + default: inner = false - outer = false } - default: - inner = false } + case revision = <-progressCh: + // have been requested to send progress with no events } - // Send collected events in a single response + // get max revision from collected events if len(events) > 0 { - revision := events[len(events)-1].KV.ModRevision - w.Lock() - if r, ok := w.progress[id]; ok && r == w.progressRev { - w.progress[id] = revision - } - w.Unlock() - if logrus.IsLevelEnabled(logrus.TraceLevel) { + revision = events[len(events)-1].KV.ModRevision + if trace { for _, event := range events { logrus.Tracef("WATCH READ id=%d, key=%s, revision=%d", id, event.KV.Key, event.KV.ModRevision) } } + } + + // send response. note that there are no events if this is a progress response. + if revision >= startRevision { wr := &etcdserverpb.WatchResponse{ Header: txnHeader(revision), WatchId: id, Events: toEvents(events...), } - logrus.Tracef("WATCH SEND id=%d, events=%d, size=%d reads=%d", id, len(wr.Events), wr.Size(), reads) + logrus.Tracef("WATCH SEND id=%d, key=%s, revision=%d, events=%d, size=%d, reads=%d", id, key, revision, len(wr.Events), wr.Size(), reads) if err := w.server.Send(wr); err != nil { w.Cancel(id, 0, 0, err) } } } + w.Cancel(id, 0, 0, nil) logrus.Tracef("WATCH CLOSE id=%d, key=%s", id, key) }() @@ -190,10 +201,13 @@ func toEvent(event *Event) *mvccpb.Event { func (w *watcher) Cancel(watchID, revision, compactRev int64, err error) { w.Lock() + if progressCh, ok := w.progress[watchID]; ok { + close(progressCh) + delete(w.progress, watchID) + } if cancel, ok := w.watches[watchID]; ok { cancel() delete(w.watches, watchID) - delete(w.progress, watchID) } w.Unlock() @@ -218,36 +232,68 @@ func (w *watcher) Cancel(watchID, revision, compactRev int64, err error) { func (w *watcher) Close() { logrus.Tracef("WATCH SERVER CLOSE") w.Lock() - for id, v := range w.watches { + for id, progressCh := range w.progress { + close(progressCh) delete(w.progress, id) - v() + } + for id, cancel := range w.watches { + cancel() + delete(w.watches, id) } w.Unlock() w.wg.Wait() } -func (w *watcher) DoProgress(ctx context.Context, ticker *time.Ticker) { - for { +// Progress sends a progress report if all watchers are synced. +// Ref: https://github.com/etcd-io/etcd/blob/v3.5.11/server/mvcc/watchable_store.go#L500-L504 +func (w *watcher) Progress(ctx context.Context) { + w.RLock() + defer w.RUnlock() + + logrus.Tracef("WATCH REQUEST PROGRESS") + + // All synced watchers will be blocked in the outer loop and able to receive on the progress channel. + // If any cannot be sent to, then it is not synced and has pending events to be sent. + // Send revision 0, as we don't actually want the watchers to send a progress response if they do receive. + for id, progressCh := range w.progress { select { - case <-ctx.Done(): + case progressCh <- 0: + default: + logrus.Tracef("WATCH SEND PROGRESS FAILED NOT SYNCED id=%d ", id) return - case <-ticker.C: - rev, err := w.backend.CurrentRevision(ctx) - if err != nil { - logrus.Errorf("Failed to get current revision for ProgressNotify: %v", err) - continue - } + } + } - w.Lock() - for id, r := range w.progress { - if r == w.progressRev { - logrus.Tracef("WATCH SEND PROGRESS id=%d, revision=%d", id, rev) - go w.server.Send(&etcdserverpb.WatchResponse{Header: txnHeader(rev), WatchId: id}) - } - w.progress[id] = rev - } - w.progressRev = rev - w.Unlock() + // If all watchers are synced, send a broadcast progress notification with the latest revision. + id := int64(clientv3.InvalidWatchID) + rev, err := w.backend.CurrentRevision(ctx) + if err != nil { + logrus.Errorf("Failed to get current revision for ProgressNotify: %v", err) + return + } + + logrus.Tracef("WATCH SEND PROGRESS id=%d, revision=%d", id, rev) + go w.server.Send(&etcdserverpb.WatchResponse{Header: txnHeader(rev), WatchId: id}) +} + +// ProgressIfSynced sends a progress report on any channels that are synced and blocked on the outer loop +func (w *watcher) ProgressIfSynced(ctx context.Context) (bool, error) { + logrus.Tracef("WATCH PROGRESS TICK") + revision, err := w.backend.CurrentRevision(ctx) + if err != nil { + logrus.Errorf("Failed to get current revision for ProgressNotify: %v", err) + return false, nil + } + + w.RLock() + defer w.RUnlock() + + // Send revision to all synced channels + for _, progressCh := range w.progress { + select { + case progressCh <- revision: + default: } } + return false, nil }