diff --git a/go.mod b/go.mod index aef53829..76c9b72b 100644 --- a/go.mod +++ b/go.mod @@ -77,7 +77,6 @@ require ( go.opentelemetry.io/otel/trace v1.0.1 // indirect go.opentelemetry.io/proto/otlp v0.9.0 // indirect go.uber.org/atomic v1.7.0 // indirect - go.uber.org/automaxprocs v1.5.3 // indirect go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.17.0 // indirect golang.org/x/crypto v0.12.0 // indirect diff --git a/go.sum b/go.sum index 5718c32b..deec8137 100644 --- a/go.sum +++ b/go.sum @@ -203,8 +203,6 @@ github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8 github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/klauspost/compress v1.16.6 h1:91SKEy4K37vkp255cJ8QesJhjyRO0hn9i9G0GoUwLsk= -github.com/klauspost/compress v1.16.6/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -237,8 +235,6 @@ github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hl github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4= github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= github.com/nats-io/nats-server/v2 v2.7.5-0.20220309212130-5c0d1999ff72/go.mod h1:1vZ2Nijh8tcyNe8BDVyTviCd9NYzRbubQYiEHsvOQWc= -github.com/nats-io/nats-server/v2 v2.9.18 h1:00muGH0qu/7NAw1b/2eFcpIvdHcTghj6PFjUVhy8zEo= -github.com/nats-io/nats-server/v2 v2.9.18/go.mod h1:aTb/xtLCGKhfTFLxP591CMWfkdgBmcUUSkiSOe5A3gw= github.com/nats-io/nats-server/v2 v2.9.21 h1:2TBTh0UDE74eNXQmV4HofsmRSCiVN0TH2Wgrp6BD6fk= github.com/nats-io/nats-server/v2 v2.9.21/go.mod h1:ozqMZc2vTHcNcblOiXMWIXkf8+0lDGAi5wQcG+O1mHU= github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= @@ -363,8 +359,6 @@ go.opentelemetry.io/proto/otlp v0.9.0/go.mod h1:1vKfU9rv61e9EVGthD1zNvUbiwPcimSs go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8= -go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= diff --git a/pkg/drivers/nats/backend_test.go b/pkg/drivers/nats/backend_test.go index e102ae3f..27f22bb9 100644 --- a/pkg/drivers/nats/backend_test.go +++ b/pkg/drivers/nats/backend_test.go @@ -11,7 +11,6 @@ import ( "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats-server/v2/test" "github.com/nats-io/nats.go" - "github.com/nats-io/nats.go/jetstream" "github.com/sirupsen/logrus" ) @@ -83,11 +82,8 @@ func setupBackend(t *testing.T) (*server.Server, *nats.Conn, *Backend) { }) noErr(t, err) - njs, err := jetstream.New(nc) - noErr(t, err) - ctx := context.Background() - ekv := NewKeyValue(ctx, bkt, njs) + ekv := NewKeyValue(ctx, bkt, js) l := logrus.New() l.SetOutput(ioutil.Discard) diff --git a/pkg/drivers/nats/kv.go b/pkg/drivers/nats/kv.go index e6d60cda..fec4518c 100644 --- a/pkg/drivers/nats/kv.go +++ b/pkg/drivers/nats/kv.go @@ -9,7 +9,6 @@ import ( "time" "github.com/nats-io/nats.go" - "github.com/nats-io/nats.go/jetstream" "github.com/sirupsen/logrus" "github.com/tidwall/btree" ) @@ -48,7 +47,7 @@ func (e *entry) Operation() nats.KeyValueOp { return e.entry.Operation() } type KeyValue struct { nkv nats.KeyValue - njs jetstream.JetStream + js nats.JetStreamContext kc *keyCodec vc *valueCodec bt *btree.Map[string, []*seqOp] @@ -63,8 +62,7 @@ type seqOp struct { } type streamWatcher struct { - con jetstream.Consumer - conctx jetstream.ConsumeContext + sub *nats.Subscription keyCodec *keyCodec valueCodec *valueCodec updates chan nats.KeyValueEntry @@ -88,7 +86,7 @@ func (w *streamWatcher) Stop() error { if w.cancel != nil { w.cancel() } - w.conctx.Stop() + w.sub.Drain() return nil } @@ -115,68 +113,6 @@ func (e *kvEntry) Created() time.Time { return e.created } func (e *kvEntry) Delta() uint64 { return e.delta } func (e *kvEntry) Operation() nats.KeyValueOp { return e.operation } -func (e *KeyValue) newStreamWatcher(ctx context.Context, con jetstream.Consumer, keyPrefix string) (nats.KeyWatcher, error) { - w := &streamWatcher{ - con: con, - keyCodec: e.kc, - valueCodec: e.vc, - updates: make(chan nats.KeyValueEntry, 32), - keyPrefix: keyPrefix, - } - - w.ctx, w.cancel = context.WithCancel(ctx) - - var ( - conc jetstream.ConsumeContext - err error - ) - - subjectPrefix := fmt.Sprintf("$KV.%s.", e.nkv.Bucket()) - - conc, err = con.Consume(func(msg jetstream.Msg) { - md, _ := msg.Metadata() - key := strings.TrimPrefix(msg.Subject(), subjectPrefix) - - if keyPrefix != "" { - dkey, err := e.kc.Decode(strings.TrimPrefix(key, ".")) - if err != nil || !strings.HasPrefix(dkey, keyPrefix) { - return - } - } - - // Default is PUT - var op nats.KeyValueOp - switch msg.Headers().Get("KV-Operation") { - case "DEL": - op = nats.KeyValueDelete - case "PURGE": - op = nats.KeyValuePurge - } - // Not currently used... - delta := 0 - - w.updates <- &entry{ - kc: e.kc, - vc: e.vc, - entry: &kvEntry{ - key: key, - bucket: e.nkv.Bucket(), - value: msg.Data(), - revision: md.Sequence.Stream, - created: md.Timestamp, - delta: uint64(delta), - operation: op, - }, - } - }) - if err != nil { - return nil, err - } - w.conctx = conc - - return w, nil -} - func (e *KeyValue) Get(key string) (nats.KeyValueEntry, error) { ek, err := e.kc.Encode(key) if err != nil { @@ -256,7 +192,7 @@ func (e *KeyValue) Delete(key string, opts ...nats.DeleteOpt) error { func (e *KeyValue) Watch(ctx context.Context, keys string, startRev int64) (nats.KeyWatcher, error) { // Everything but the last token will be treated as a filter - // on the watcher. The last token will used as a receipt-time filter. + // on the watcher. The last token will used as a deliver-time filter. filter := keys if !strings.HasSuffix(filter, "/") { idx := strings.LastIndexByte(filter, '/') @@ -265,38 +201,80 @@ func (e *KeyValue) Watch(ctx context.Context, keys string, startRev int64) (nats } } - return e.watchStream(ctx, filter, keys, uint64(startRev)) -} - -func (e *KeyValue) watchStream(ctx context.Context, filter, keyPrefix string, startRev uint64) (nats.KeyWatcher, error) { - var cfg jetstream.OrderedConsumerConfig + sopts := []nats.SubOpt{ + nats.BindStream(fmt.Sprintf("KV_%s", e.nkv.Bucket())), + nats.OrderedConsumer(), + } if filter != "" { p, err := e.kc.EncodeRange(filter) if err != nil { return nil, err } - filter := fmt.Sprintf("$KV.%s.%s", e.nkv.Bucket(), p) - cfg.FilterSubjects = []string{filter} + filter = fmt.Sprintf("$KV.%s.%s", e.nkv.Bucket(), p) } if startRev <= 0 { - cfg.DeliverPolicy = jetstream.DeliverLastPerSubjectPolicy + sopts = append(sopts, nats.DeliverLastPerSubject()) } else { - cfg.DeliverPolicy = jetstream.DeliverByStartSequencePolicy - cfg.OptStartSeq = startRev + sopts = append(sopts, nats.StartSequence(uint64(startRev))) } - con, err := e.njs.OrderedConsumer(ctx, fmt.Sprintf("KV_%s", e.nkv.Bucket()), cfg) - if err != nil { - return nil, err + wctx, cancel := context.WithCancel(ctx) + updates := make(chan nats.KeyValueEntry, 32) + subjectPrefix := fmt.Sprintf("$KV.%s.", e.nkv.Bucket()) + + handler := func(msg *nats.Msg) { + md, _ := msg.Metadata() + key := strings.TrimPrefix(msg.Subject, subjectPrefix) + + if keys != "" { + dkey, err := e.kc.Decode(strings.TrimPrefix(key, ".")) + if err != nil || !strings.HasPrefix(dkey, keys) { + return + } + } + + // Default is PUT + var op nats.KeyValueOp + switch msg.Header.Get("KV-Operation") { + case "DEL": + op = nats.KeyValueDelete + case "PURGE": + op = nats.KeyValuePurge + } + // Not currently used... + delta := 0 + + updates <- &entry{ + kc: e.kc, + vc: e.vc, + entry: &kvEntry{ + key: key, + bucket: e.nkv.Bucket(), + value: msg.Data, + revision: md.Sequence.Stream, + created: md.Timestamp, + delta: uint64(delta), + operation: op, + }, + } } - w, err := e.newStreamWatcher(ctx, con, keyPrefix) + sub, err := e.js.Subscribe(filter, handler, sopts...) if err != nil { return nil, err } + w := &streamWatcher{ + sub: sub, + keyCodec: e.kc, + valueCodec: e.vc, + updates: updates, + ctx: wctx, + cancel: cancel, + } + return w, nil } @@ -499,10 +477,10 @@ func (e *KeyValue) List(prefix, startKey string, limit, revision int64) ([]nats. return entries, nil } -func NewKeyValue(ctx context.Context, bucket nats.KeyValue, njs jetstream.JetStream) *KeyValue { +func NewKeyValue(ctx context.Context, bucket nats.KeyValue, js nats.JetStreamContext) *KeyValue { kv := &KeyValue{ nkv: bucket, - njs: njs, + js: js, kc: &keyCodec{}, vc: &valueCodec{}, bt: btree.NewMap[string, []*seqOp](0), diff --git a/pkg/drivers/nats/new.go b/pkg/drivers/nats/new.go index ffb2b529..ae059988 100644 --- a/pkg/drivers/nats/new.go +++ b/pkg/drivers/nats/new.go @@ -11,7 +11,6 @@ import ( "github.com/k3s-io/kine/pkg/server" "github.com/k3s-io/kine/pkg/tls" "github.com/nats-io/nats.go" - "github.com/nats-io/nats.go/jetstream" "github.com/sirupsen/logrus" ) @@ -155,12 +154,7 @@ func newBackend(ctx context.Context, connection string, tlsInfo tls.Config, lega logrus.Infof("bucket initialized: %s", config.bucket) - njs, err := jetstream.New(nc) - if err != nil { - return nil, fmt.Errorf("failed to create JetStream context: %w", err) - } - - ekv := NewKeyValue(ctx, bucket, njs) + ekv := NewKeyValue(ctx, bucket, js) // Reference the global logger, since it appears log levels are // applied globally.