diff --git a/go.mod b/go.mod index 908345a5..f1743b64 100644 --- a/go.mod +++ b/go.mod @@ -18,13 +18,13 @@ require ( github.com/rancher/wrangler v1.1.1-0.20230425173236-39a4707f0689 github.com/shengdoushi/base58 v1.0.0 github.com/sirupsen/logrus v1.9.0 - github.com/soheilhy/cmux v0.1.5 github.com/urfave/cli v1.22.4 go.etcd.io/etcd/api/v3 v3.5.9 go.etcd.io/etcd/client/pkg/v3 v3.5.9 go.etcd.io/etcd/client/v3 v3.5.9 go.etcd.io/etcd/server/v3 v3.5.9 google.golang.org/grpc v1.56.3 + k8s.io/client-go v0.25.4 ) require ( @@ -36,6 +36,7 @@ require ( github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect github.com/dustin/go-humanize v1.0.0 // indirect github.com/ghodss/yaml v1.0.0 // indirect + github.com/go-logr/logr v1.2.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt/v4 v4.4.2 // indirect github.com/golang/protobuf v1.5.3 // indirect @@ -61,6 +62,7 @@ require ( github.com/rogpeppe/go-internal v1.10.0 // indirect github.com/russross/blackfriday/v2 v2.0.1 // indirect github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect + github.com/soheilhy/cmux v0.1.5 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect @@ -87,5 +89,8 @@ require ( google.golang.org/protobuf v1.30.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect + k8s.io/apimachinery v0.25.4 // indirect + k8s.io/klog/v2 v2.70.1 // indirect + k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed // indirect sigs.k8s.io/yaml v1.3.0 // indirect ) diff --git a/go.sum b/go.sum index ba56f793..ff54ba25 100644 --- a/go.sum +++ b/go.sum @@ -104,6 +104,10 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= +github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= +github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= +github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -301,6 +305,7 @@ github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVs github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js= github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -686,6 +691,15 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= +k8s.io/apimachinery v0.25.4 h1:CtXsuaitMESSu339tfhVXhQrPET+EiWnIY1rcurKnAc= +k8s.io/apimachinery v0.25.4/go.mod h1:jaF9C/iPNM1FuLl7Zuy5b9v+n35HGSh6AQ4HYRkCqwo= +k8s.io/client-go v0.25.4 h1:3RNRDffAkNU56M/a7gUfXaEzdhZlYhoW8dgViGy5fn8= +k8s.io/client-go v0.25.4/go.mod h1:8trHCAC83XKY0wsBIpbirZU4NTUpbuhc2JnI7OruGZw= +k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= +k8s.io/klog/v2 v2.70.1 h1:7aaoSdahviPmR+XkS7FyxlkkXs6tHISSG03RxleQAVQ= +k8s.io/klog/v2 v2.70.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= +k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed h1:jAne/RjBTyawwAy0utX5eqigAwz/lQhTmy+Hr/Cpue4= +k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/pkg/logstructured/logstructured.go b/pkg/logstructured/logstructured.go index 7a144b7a..2b5b51f6 100644 --- a/pkg/logstructured/logstructured.go +++ b/pkg/logstructured/logstructured.go @@ -5,8 +5,14 @@ import ( "sync" "time" - "github.com/k3s-io/kine/pkg/server" "github.com/sirupsen/logrus" + "k8s.io/client-go/util/workqueue" + + "github.com/k3s-io/kine/pkg/server" +) + +const ( + retryInterval = 250 * time.Millisecond ) type Log interface { @@ -20,6 +26,12 @@ type Log interface { DbSize(ctx context.Context) (int64, error) } +type ttlEventKV struct { + key string + modRevision int64 + expiredAt time.Time +} + type LogStructured struct { log Log } @@ -252,66 +264,163 @@ func (l *LogStructured) Update(ctx context.Context, key string, value []byte, re return rev, updateEvent.KV, true, err } +func (l *LogStructured) ttl(ctx context.Context) { + queue := workqueue.NewDelayingQueue() + rwMutex := &sync.RWMutex{} + ttlEventKVMap := make(map[string]*ttlEventKV) + go func() { + for l.handleTTLEvents(ctx, rwMutex, queue, ttlEventKVMap) { + } + }() + + for { + select { + case <-ctx.Done(): + queue.ShutDown() + return + default: + } + + for event := range l.ttlEvents(ctx) { + if event.Delete { + continue + } + + eventKV := loadTTLEventKV(rwMutex, ttlEventKVMap, event.KV.Key) + if eventKV == nil { + logrus.Tracef("Add ttl event key %v, modRev %v", event.KV.Key, event.KV.ModRevision) + expires := storeTTLEventKV(rwMutex, ttlEventKVMap, event.KV) + queue.AddAfter(event.KV.Key, expires) + } else { + if event.KV.ModRevision > eventKV.modRevision { + logrus.Tracef("Update ttl event key %v, modRev %v", event.KV.Key, event.KV.ModRevision) + expires := storeTTLEventKV(rwMutex, ttlEventKVMap, event.KV) + queue.AddAfter(event.KV.Key, expires) + } + } + } + } +} + +func (l *LogStructured) handleTTLEvents(ctx context.Context, rwMutex *sync.RWMutex, queue workqueue.DelayingInterface, store map[string]*ttlEventKV) bool { + key, shutdown := queue.Get() + if shutdown { + logrus.Info("TTL events work queue has shut down") + return false + } + defer queue.Done(key) + + eventKV := loadTTLEventKV(rwMutex, store, key.(string)) + if eventKV == nil { + logrus.Errorf("Failed to find ttl event for key %v", key) + return true + } + + if eventKV.expiredAt.After(time.Now()) { + logrus.Tracef("TTL event key %v has not expired yet, the latest expiration time is %v, requeuing", key, eventKV.expiredAt) + queue.AddAfter(key, time.Until(eventKV.expiredAt)) + return true + } + + l.deleteTTLEvent(ctx, rwMutex, queue, store, eventKV) + return true +} + +func (l *LogStructured) deleteTTLEvent(ctx context.Context, rwMutex *sync.RWMutex, queue workqueue.DelayingInterface, store map[string]*ttlEventKV, preEventKV *ttlEventKV) { + logrus.Tracef("Delete ttl event key %v, modRev %v", preEventKV.key, preEventKV.modRevision) + _, _, _, err := l.Delete(ctx, preEventKV.key, preEventKV.modRevision) + + rwMutex.Lock() + defer rwMutex.Unlock() + curEventKV := store[preEventKV.key] + if curEventKV.expiredAt.After(preEventKV.expiredAt) { + logrus.Tracef("TTL event key %v has updated, requeuing", curEventKV.key) + queue.AddAfter(curEventKV.key, time.Until(curEventKV.expiredAt)) + return + } + if err != nil { + logrus.Errorf("Failed to delete key %v at end of lease: %v, requeuing", curEventKV.key, err) + queue.AddAfter(curEventKV.key, retryInterval) + return + } + + delete(store, curEventKV.key) +} + func (l *LogStructured) ttlEvents(ctx context.Context) chan *server.Event { result := make(chan *server.Event) + lastListRevision := make(chan int64) wg := sync.WaitGroup{} wg.Add(2) go func() { wg.Wait() close(result) + close(lastListRevision) }() go func() { defer wg.Done() + var lastRev int64 + rev, events, err := l.log.List(ctx, "/", "", 1000, 0, false) for len(events) > 0 { if err != nil { - logrus.Errorf("failed to read old events for ttl") - return + logrus.Errorf("Failed to read old events for ttl: %v", err) + break } for _, event := range events { if event.KV.Lease > 0 { result <- event } + + if event.KV.ModRevision > lastRev { + lastRev = event.KV.ModRevision + } } _, events, err = l.log.List(ctx, "/", events[len(events)-1].KV.Key, 1000, rev, false) } + lastListRevision <- lastRev }() go func() { defer wg.Done() - for events := range l.log.Watch(ctx, "/") { + revision := <-lastListRevision + if revision == 0 { + logrus.Error("TTL events last list revision is zero, retry to process ttl events") + return + } + for events := range l.Watch(ctx, "/", revision) { for _, event := range events { if event.KV.Lease > 0 { result <- event } } } + logrus.Info("TTL events watch channel was closed") }() return result } -func (l *LogStructured) ttl(ctx context.Context) { - // vary naive TTL support - mutex := &sync.Mutex{} - for event := range l.ttlEvents(ctx) { - go func(event *server.Event) { - select { - case <-ctx.Done(): - return - case <-time.After(time.Duration(event.KV.Lease) * time.Second): - } - mutex.Lock() - if _, _, _, err := l.Delete(ctx, event.KV.Key, event.KV.ModRevision); err != nil { - logrus.Errorf("failed to delete expired key: %v", err) - } - mutex.Unlock() - }(event) +func loadTTLEventKV(rwMutex *sync.RWMutex, store map[string]*ttlEventKV, key string) *ttlEventKV { + rwMutex.RLock() + defer rwMutex.RUnlock() + return store[key] +} + +func storeTTLEventKV(rwMutex *sync.RWMutex, store map[string]*ttlEventKV, eventKV *server.KeyValue) time.Duration { + rwMutex.Lock() + defer rwMutex.Unlock() + expires := time.Duration(eventKV.Lease) * time.Second + store[eventKV.Key] = &ttlEventKV{ + key: eventKV.Key, + modRevision: eventKV.ModRevision, + expiredAt: time.Now().Add(expires), } + return expires } func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64) <-chan []*server.Event {