Skip to content

Commit

Permalink
Revert to legacy JS ordered consumer for speed comparison
Browse files Browse the repository at this point in the history
Signed-off-by: Byron Ruth <[email protected]>
  • Loading branch information
bruth committed Aug 20, 2023
1 parent 003b4c9 commit e99c1a9
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 103 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ require (
go.opentelemetry.io/otel/trace v1.0.1 // indirect
go.opentelemetry.io/proto/otlp v0.9.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/automaxprocs v1.5.3 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.17.0 // indirect
golang.org/x/crypto v0.12.0 // indirect
Expand Down
6 changes: 0 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,6 @@ github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.16.6 h1:91SKEy4K37vkp255cJ8QesJhjyRO0hn9i9G0GoUwLsk=
github.com/klauspost/compress v1.16.6/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down Expand Up @@ -237,8 +235,6 @@ github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hl
github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4=
github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats-server/v2 v2.7.5-0.20220309212130-5c0d1999ff72/go.mod h1:1vZ2Nijh8tcyNe8BDVyTviCd9NYzRbubQYiEHsvOQWc=
github.com/nats-io/nats-server/v2 v2.9.18 h1:00muGH0qu/7NAw1b/2eFcpIvdHcTghj6PFjUVhy8zEo=
github.com/nats-io/nats-server/v2 v2.9.18/go.mod h1:aTb/xtLCGKhfTFLxP591CMWfkdgBmcUUSkiSOe5A3gw=
github.com/nats-io/nats-server/v2 v2.9.21 h1:2TBTh0UDE74eNXQmV4HofsmRSCiVN0TH2Wgrp6BD6fk=
github.com/nats-io/nats-server/v2 v2.9.21/go.mod h1:ozqMZc2vTHcNcblOiXMWIXkf8+0lDGAi5wQcG+O1mHU=
github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
Expand Down Expand Up @@ -363,8 +359,6 @@ go.opentelemetry.io/proto/otlp v0.9.0/go.mod h1:1vKfU9rv61e9EVGthD1zNvUbiwPcimSs
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
Expand Down
6 changes: 1 addition & 5 deletions pkg/drivers/nats/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats-server/v2/test"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -83,11 +82,8 @@ func setupBackend(t *testing.T) (*server.Server, *nats.Conn, *Backend) {
})
noErr(t, err)

njs, err := jetstream.New(nc)
noErr(t, err)

ctx := context.Background()
ekv := NewKeyValue(ctx, bkt, njs)
ekv := NewKeyValue(ctx, bkt, js)

l := logrus.New()
l.SetOutput(ioutil.Discard)
Expand Down
146 changes: 62 additions & 84 deletions pkg/drivers/nats/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/sirupsen/logrus"
"github.com/tidwall/btree"
)
Expand Down Expand Up @@ -48,7 +47,7 @@ func (e *entry) Operation() nats.KeyValueOp { return e.entry.Operation() }

type KeyValue struct {
nkv nats.KeyValue
njs jetstream.JetStream
js nats.JetStreamContext
kc *keyCodec
vc *valueCodec
bt *btree.Map[string, []*seqOp]
Expand All @@ -63,8 +62,7 @@ type seqOp struct {
}

type streamWatcher struct {
con jetstream.Consumer
conctx jetstream.ConsumeContext
sub *nats.Subscription
keyCodec *keyCodec
valueCodec *valueCodec
updates chan nats.KeyValueEntry
Expand All @@ -88,7 +86,7 @@ func (w *streamWatcher) Stop() error {
if w.cancel != nil {
w.cancel()
}
w.conctx.Stop()
w.sub.Drain()
return nil
}

Expand All @@ -115,68 +113,6 @@ func (e *kvEntry) Created() time.Time { return e.created }
func (e *kvEntry) Delta() uint64 { return e.delta }
func (e *kvEntry) Operation() nats.KeyValueOp { return e.operation }

func (e *KeyValue) newStreamWatcher(ctx context.Context, con jetstream.Consumer, keyPrefix string) (nats.KeyWatcher, error) {
w := &streamWatcher{
con: con,
keyCodec: e.kc,
valueCodec: e.vc,
updates: make(chan nats.KeyValueEntry, 32),
keyPrefix: keyPrefix,
}

w.ctx, w.cancel = context.WithCancel(ctx)

var (
conc jetstream.ConsumeContext
err error
)

subjectPrefix := fmt.Sprintf("$KV.%s.", e.nkv.Bucket())

conc, err = con.Consume(func(msg jetstream.Msg) {
md, _ := msg.Metadata()
key := strings.TrimPrefix(msg.Subject(), subjectPrefix)

if keyPrefix != "" {
dkey, err := e.kc.Decode(strings.TrimPrefix(key, "."))
if err != nil || !strings.HasPrefix(dkey, keyPrefix) {
return
}
}

// Default is PUT
var op nats.KeyValueOp
switch msg.Headers().Get("KV-Operation") {
case "DEL":
op = nats.KeyValueDelete
case "PURGE":
op = nats.KeyValuePurge
}
// Not currently used...
delta := 0

w.updates <- &entry{
kc: e.kc,
vc: e.vc,
entry: &kvEntry{
key: key,
bucket: e.nkv.Bucket(),
value: msg.Data(),
revision: md.Sequence.Stream,
created: md.Timestamp,
delta: uint64(delta),
operation: op,
},
}
})
if err != nil {
return nil, err
}
w.conctx = conc

return w, nil
}

func (e *KeyValue) Get(key string) (nats.KeyValueEntry, error) {
ek, err := e.kc.Encode(key)
if err != nil {
Expand Down Expand Up @@ -256,7 +192,7 @@ func (e *KeyValue) Delete(key string, opts ...nats.DeleteOpt) error {

func (e *KeyValue) Watch(ctx context.Context, keys string, startRev int64) (nats.KeyWatcher, error) {
// Everything but the last token will be treated as a filter
// on the watcher. The last token will used as a receipt-time filter.
// on the watcher. The last token will used as a deliver-time filter.
filter := keys
if !strings.HasSuffix(filter, "/") {
idx := strings.LastIndexByte(filter, '/')
Expand All @@ -265,38 +201,80 @@ func (e *KeyValue) Watch(ctx context.Context, keys string, startRev int64) (nats
}
}

return e.watchStream(ctx, filter, keys, uint64(startRev))
}

func (e *KeyValue) watchStream(ctx context.Context, filter, keyPrefix string, startRev uint64) (nats.KeyWatcher, error) {
var cfg jetstream.OrderedConsumerConfig
sopts := []nats.SubOpt{
nats.BindStream(fmt.Sprintf("KV_%s", e.nkv.Bucket())),
nats.OrderedConsumer(),
}

if filter != "" {
p, err := e.kc.EncodeRange(filter)
if err != nil {
return nil, err
}
filter := fmt.Sprintf("$KV.%s.%s", e.nkv.Bucket(), p)
cfg.FilterSubjects = []string{filter}
filter = fmt.Sprintf("$KV.%s.%s", e.nkv.Bucket(), p)
}

if startRev <= 0 {
cfg.DeliverPolicy = jetstream.DeliverLastPerSubjectPolicy
sopts = append(sopts, nats.DeliverLastPerSubject())
} else {
cfg.DeliverPolicy = jetstream.DeliverByStartSequencePolicy
cfg.OptStartSeq = startRev
sopts = append(sopts, nats.StartSequence(uint64(startRev)))
}

con, err := e.njs.OrderedConsumer(ctx, fmt.Sprintf("KV_%s", e.nkv.Bucket()), cfg)
if err != nil {
return nil, err
wctx, cancel := context.WithCancel(ctx)
updates := make(chan nats.KeyValueEntry, 32)
subjectPrefix := fmt.Sprintf("$KV.%s.", e.nkv.Bucket())

handler := func(msg *nats.Msg) {
md, _ := msg.Metadata()
key := strings.TrimPrefix(msg.Subject, subjectPrefix)

if keys != "" {
dkey, err := e.kc.Decode(strings.TrimPrefix(key, "."))
if err != nil || !strings.HasPrefix(dkey, keys) {
return
}
}

// Default is PUT
var op nats.KeyValueOp
switch msg.Header.Get("KV-Operation") {
case "DEL":
op = nats.KeyValueDelete
case "PURGE":
op = nats.KeyValuePurge
}
// Not currently used...
delta := 0

updates <- &entry{
kc: e.kc,
vc: e.vc,
entry: &kvEntry{
key: key,
bucket: e.nkv.Bucket(),
value: msg.Data,
revision: md.Sequence.Stream,
created: md.Timestamp,
delta: uint64(delta),
operation: op,
},
}
}

w, err := e.newStreamWatcher(ctx, con, keyPrefix)
sub, err := e.js.Subscribe(filter, handler, sopts...)
if err != nil {
return nil, err
}

w := &streamWatcher{
sub: sub,
keyCodec: e.kc,
valueCodec: e.vc,
updates: updates,
ctx: wctx,
cancel: cancel,
}

return w, nil
}

Expand Down Expand Up @@ -499,10 +477,10 @@ func (e *KeyValue) List(prefix, startKey string, limit, revision int64) ([]nats.
return entries, nil
}

func NewKeyValue(ctx context.Context, bucket nats.KeyValue, njs jetstream.JetStream) *KeyValue {
func NewKeyValue(ctx context.Context, bucket nats.KeyValue, js nats.JetStreamContext) *KeyValue {
kv := &KeyValue{
nkv: bucket,
njs: njs,
js: js,
kc: &keyCodec{},
vc: &valueCodec{},
bt: btree.NewMap[string, []*seqOp](0),
Expand Down
8 changes: 1 addition & 7 deletions pkg/drivers/nats/new.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/k3s-io/kine/pkg/server"
"github.com/k3s-io/kine/pkg/tls"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -155,12 +154,7 @@ func newBackend(ctx context.Context, connection string, tlsInfo tls.Config, lega

logrus.Infof("bucket initialized: %s", config.bucket)

njs, err := jetstream.New(nc)
if err != nil {
return nil, fmt.Errorf("failed to create JetStream context: %w", err)
}

ekv := NewKeyValue(ctx, bucket, njs)
ekv := NewKeyValue(ctx, bucket, js)

// Reference the global logger, since it appears log levels are
// applied globally.
Expand Down

0 comments on commit e99c1a9

Please sign in to comment.