From 5d4e47c8cd301acabb89117df01705f5a28ab4e4 Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Sun, 13 Aug 2023 07:14:09 -0400 Subject: [PATCH] Rewrite backend The main new component is a btree cache to optimize List and Count operations. The List operation assumes and ordered set of results since a limit can be defined. A set of tests have been added. Support for embedded `dataDir` path and `credsFile` URL options have been added. Signed-off-by: Byron Ruth --- go.mod | 9 +- go.sum | 18 +- pkg/drivers/nats/backend.go | 379 +++++++++++++++++++ pkg/drivers/nats/backend_test.go | 312 ++++++++++++++-- pkg/drivers/nats/codec.go | 89 +++++ pkg/drivers/nats/codec_test.go | 93 +++++ pkg/drivers/nats/config.go | 6 + pkg/drivers/nats/kv.go | 523 ++++++++++++++++++++++++++ pkg/drivers/nats/kv/etcd_encoder.go | 104 ------ pkg/drivers/nats/kv/kv.go | 327 ----------------- pkg/drivers/nats/nats.go | 548 ---------------------------- pkg/drivers/nats/new.go | 13 +- 12 files changed, 1393 insertions(+), 1028 deletions(-) create mode 100644 pkg/drivers/nats/backend.go create mode 100644 pkg/drivers/nats/codec.go create mode 100644 pkg/drivers/nats/codec_test.go create mode 100644 pkg/drivers/nats/kv.go delete mode 100644 pkg/drivers/nats/kv/etcd_encoder.go delete mode 100644 pkg/drivers/nats/kv/kv.go delete mode 100644 pkg/drivers/nats/nats.go diff --git a/go.mod b/go.mod index 57b4f204..a6cfbfb8 100644 --- a/go.mod +++ b/go.mod @@ -12,13 +12,14 @@ require ( github.com/mattn/go-sqlite3 v1.14.17 github.com/nats-io/jsm.go v0.0.31-0.20220317133147-fe318f464eee github.com/nats-io/nats-server/v2 v2.9.18 - github.com/nats-io/nats.go v1.27.1-0.20230619112143-ec00e662324e + github.com/nats-io/nats.go v1.28.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.14.0 github.com/rancher/wrangler v1.1.1-0.20230425173236-39a4707f0689 github.com/shengdoushi/base58 v1.0.0 github.com/sirupsen/logrus v1.9.0 github.com/soheilhy/cmux v0.1.5 + github.com/tidwall/btree v1.6.0 github.com/urfave/cli v1.22.4 go.etcd.io/etcd/api/v3 v3.5.9 go.etcd.io/etcd/client/pkg/v3 v3.5.9 @@ -78,10 +79,10 @@ require ( go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.17.0 // indirect - golang.org/x/crypto v0.10.0 // indirect + golang.org/x/crypto v0.12.0 // indirect golang.org/x/net v0.10.0 // indirect - golang.org/x/sys v0.9.0 // indirect - golang.org/x/text v0.10.0 // indirect + golang.org/x/sys v0.11.0 // indirect + golang.org/x/text v0.12.0 // indirect golang.org/x/time v0.3.0 // indirect google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect google.golang.org/protobuf v1.28.1 // indirect diff --git a/go.sum b/go.sum index 1d165168..7eb16f36 100644 --- a/go.sum +++ b/go.sum @@ -238,8 +238,8 @@ github.com/nats-io/nats-server/v2 v2.7.5-0.20220309212130-5c0d1999ff72/go.mod h1 github.com/nats-io/nats-server/v2 v2.9.18 h1:00muGH0qu/7NAw1b/2eFcpIvdHcTghj6PFjUVhy8zEo= github.com/nats-io/nats-server/v2 v2.9.18/go.mod h1:aTb/xtLCGKhfTFLxP591CMWfkdgBmcUUSkiSOe5A3gw= github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= -github.com/nats-io/nats.go v1.27.1-0.20230619112143-ec00e662324e h1:+JoNIXzRg65A/J5MeA011xAWRW/gx5EL9F3+9O0glTg= -github.com/nats-io/nats.go v1.27.1-0.20230619112143-ec00e662324e/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= +github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c= +github.com/nats-io/nats.go v1.28.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= @@ -308,6 +308,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/tidwall/btree v1.6.0 h1:LDZfKfQIBHGHWSwckhXI0RPSXzlo+KYdjK7FWSqOzzg= +github.com/tidwall/btree v1.6.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY= github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 h1:uruHq4dN7GR16kFc5fp3d1RIYzJW5onx8Ybykw2YQFA= github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/urfave/cli v1.22.4 h1:u7tSpNPPswAFymm8IehJhy4uJMlUuU/GmqSkvJ1InXA= @@ -371,8 +373,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.10.0 h1:LKqV2xt9+kDzSTfOhx4FrkEBcMrAgHSYgzywV9zcGmM= -golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I= +golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk= +golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -501,8 +503,8 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.9.0 h1:KS/R3tvhPqvJvwcKfnBHJwwthS11LRhmM5D59eEXa0s= -golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -512,8 +514,8 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.10.0 h1:UpjohKhiEgNc0CSauXmwYftY1+LlaC75SJwh0SgCX58= -golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= +golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/pkg/drivers/nats/backend.go b/pkg/drivers/nats/backend.go new file mode 100644 index 00000000..4ad03e7c --- /dev/null +++ b/pkg/drivers/nats/backend.go @@ -0,0 +1,379 @@ +package nats + +import ( + "context" + "encoding/json" + "time" + + "github.com/k3s-io/kine/pkg/server" + "github.com/nats-io/nats.go" + "github.com/sirupsen/logrus" +) + +// TODO: version this data structure to simplify and optimize for size. +type natsData struct { + // v1 fields + KV *server.KeyValue `json:"KV"` + PrevRevision int64 `json:"PrevRevision"` + Create bool `json:"Create"` + Delete bool `json:"Delete"` + + CreateTime time.Time `json:"-"` +} + +func (d *natsData) Encode() ([]byte, error) { + buf, err := json.Marshal(d) + return buf, err +} + +func (d *natsData) Decode(e nats.KeyValueEntry) error { + if e == nil || e.Value() == nil { + return nil + } + + err := json.Unmarshal(e.Value(), d) + if err != nil { + return err + } + d.KV.ModRevision = int64(e.Revision()) + if d.KV.CreateRevision == 0 { + d.KV.CreateRevision = d.KV.ModRevision + } + d.CreateTime = e.Created() + return nil +} + +var ( + // Ensure Backend implements server.Backend. + _ server.Backend = (&Backend{}) +) + +type Backend struct { + nc *nats.Conn + js nats.JetStreamContext + kv *KeyValue + l *logrus.Logger +} + +// isExpiredKey checks if the key is expired based on the create time and lease. +func (b *Backend) isExpiredKey(value *natsData) bool { + if value.KV.Lease == 0 { + return false + } + + return time.Now().After(value.CreateTime.Add(time.Second * time.Duration(value.KV.Lease))) +} + +// get returns the key-value entry for the given key and revision, if specified. +// This takes into account entries that have been marked as deleted or expired. +func (b *Backend) get(ctx context.Context, key string, revision int64, allowDeletes bool) (int64, *natsData, error) { + var ( + entry nats.KeyValueEntry + err error + ) + + // Get latest revision if not specified. + if revision <= 0 { + entry, err = b.kv.Get(key) + } else { + entry, err = b.kv.GetRevision(key, uint64(revision)) + } + if err != nil { + return 0, nil, err + } + + rev := int64(entry.Revision()) + + var val natsData + err = val.Decode(entry) + if err != nil { + return rev, nil, err + } + + if val.Delete && !allowDeletes { + return rev, nil, nats.ErrKeyNotFound + } + + if b.isExpiredKey(&val) { + err := b.kv.Delete(val.KV.Key, nats.LastRevision(uint64(rev))) + if err != nil { + b.l.Warnf("Failed to delete expired key %s: %v", val.KV.Key, err) + } + // Return a zero indicating the key was deleted. + return 0, nil, nats.ErrKeyNotFound + } + + return rev, &val, nil +} + +// Start starts the backend. +// See https://github.com/kubernetes/kubernetes/blob/442a69c3bdf6fe8e525b05887e57d89db1e2f3a5/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go#L97 +func (b *Backend) Start(ctx context.Context) error { + if _, err := b.Create(ctx, "/registry/health", []byte(`{"health":"true"}`), 0); err != nil { + if err != server.ErrKeyExists { + b.l.Errorf("Failed to create health check key: %v", err) + } + } + return nil +} + +// DbSize get the kineBucket size from JetStream. +func (b *Backend) DbSize(context.Context) (int64, error) { + return b.kv.BucketSize() +} + +// Count returns an exact count of the number of matching keys and the current revision of the database. +func (b *Backend) Count(ctx context.Context, prefix string) (int64, int64, error) { + count, err := b.kv.Count(prefix) + if err != nil { + return 0, 0, err + } + + storeRev := b.kv.BucketRevision() + return storeRev, count, nil +} + +// Get returns the store's current revision, the associated server.KeyValue or an error. +func (b *Backend) Get(ctx context.Context, key, rangeEnd string, limit, revision int64) (int64, *server.KeyValue, error) { + // Get the kv entry and return the revision. + _, nv, err := b.get(ctx, key, revision, false) + if err != nil { + return 0, nil, err + } + + storeRev := b.kv.BucketRevision() + return storeRev, nv.KV, nil +} + +// Create attempts to create the key-value entry and returns the revision number. +func (b *Backend) Create(ctx context.Context, key string, value []byte, lease int64) (int64, error) { + // Check if key exists already. If the entry exists even if marked as expired or deleted, + // the revision will be returned to apply an update. + rev, pnv, err := b.get(ctx, key, 0, false) + if pnv != nil { + return rev, server.ErrKeyExists + } + // If an error other than key not found, return. + if err != nil && err != nats.ErrKeyNotFound { + return 0, err + } + + nv := natsData{ + Delete: false, + Create: true, + PrevRevision: 0, + KV: &server.KeyValue{ + Key: key, + CreateRevision: 0, + ModRevision: 0, + Value: value, + Lease: lease, + }, + } + + data, err := nv.Encode() + if err != nil { + return 0, err + } + + // An update with a zero revision will create the key. + seq, err := b.kv.Create(key, data) + if err != nil { + // This may occur if a concurrent writer created the key. + if jsWrongLastSeqErr.Is(err) { + b.l.Warnf("create: key=%s, err=%s", key, err) + return 0, server.ErrKeyExists + } + + return 0, err + } + + return int64(seq), nil +} + +func (b *Backend) Delete(ctx context.Context, key string, revision int64) (int64, *server.KeyValue, bool, error) { + // Get the key, allow deletes. + rev, value, err := b.get(ctx, key, 0, true) + if err != nil { + if err == nats.ErrKeyNotFound { + return rev, nil, true, nil + } + // TODO: if false, get the current KV. + // Expected last version is in the header. + return rev, nil, false, err + } + + // If deleted + if value.Delete { + return rev, value.KV, true, nil + } + + if revision != 0 && value.KV.ModRevision != revision { + return rev, value.KV, false, nil + } + + nv := natsData{ + Delete: true, + PrevRevision: rev, + KV: value.KV, + } + data, err := nv.Encode() + if err != nil { + return rev, nil, false, err + } + + // Update with a tombstone. + drev, err := b.kv.Update(key, data, uint64(rev)) + if err != nil { + return rev, value.KV, false, nil + } + + err = b.kv.Delete(key, nats.LastRevision(drev)) + if err != nil { + return rev, value.KV, false, nil + } + + return int64(drev), value.KV, true, nil +} + +func (b *Backend) Update(ctx context.Context, key string, value []byte, revision, lease int64) (int64, *server.KeyValue, bool, error) { + // Get the latest revision of the key. + rev, pnd, err := b.get(ctx, key, 0, false) + // TODO: correct semantics for these various errors? + if err != nil { + if err == nats.ErrKeyNotFound { + return rev, nil, false, nil + } + return rev, nil, false, err + } + + // Return nothing? + if pnd == nil { + return 0, nil, false, nil + } + + // Incorrect revision, return the current value. + if pnd.KV.ModRevision != revision { + return rev, pnd.KV, false, nil + } + + updateValue := natsData{ + Delete: false, + Create: false, + PrevRevision: pnd.KV.ModRevision, + KV: &server.KeyValue{ + Key: key, + CreateRevision: pnd.KV.CreateRevision, + Value: value, + Lease: lease, + }, + } + + valueBytes, err := updateValue.Encode() + if err != nil { + return 0, nil, false, err + } + + seq, err := b.kv.Update(key, valueBytes, uint64(revision)) + if err != nil { + return 0, nil, false, err + } + + updateValue.KV.ModRevision = int64(seq) + + return int64(seq), updateValue.KV, true, nil +} + +// List returns a range of keys starting with the prefix. +// This would translated to one or more tokens, e.g. `a.b.c`. +// The startKey would be the next set of tokens that follow the prefix +// that are alphanumerically equal to or greater than the startKey. +// If limit is provided, the maximum set of matches is limited. +// If revision is provided, this indicates the maximum revision to return. +func (b *Backend) List(ctx context.Context, prefix, startKey string, limit, maxRevision int64) (int64, []*server.KeyValue, error) { + matches, err := b.kv.List(prefix, startKey, limit, maxRevision) + if err != nil { + return 0, nil, err + } + + kvs := make([]*server.KeyValue, 0, len(matches)) + for _, e := range matches { + var nd natsData + err = nd.Decode(e) + if err != nil { + return 0, nil, err + } + kvs = append(kvs, nd.KV) + } + + storeRev := b.kv.BucketRevision() + return storeRev, kvs, nil +} + +func (b *Backend) Watch(ctx context.Context, prefix string, startRevision int64) <-chan []*server.Event { + events := make(chan []*server.Event, 32) + + go func() { + defer close(events) + + var w nats.KeyWatcher + for { + var err error + w, err = b.kv.Watch(ctx, prefix, startRevision) + if err == nil { + break + } + b.l.Warnf("watch: prefix=%s, err=%s", prefix, err) + time.Sleep(time.Second) + } + + for { + select { + case <-ctx.Done(): + if err := ctx.Err(); err != nil { + b.l.Warnf("watch ctx: prefix=%s, err=%s", prefix, err) + } + return + + case e := <-w.Updates(): + if e.Operation() != nats.KeyValuePut { + continue + } + + key := e.Key() + + // Tombstone event in NATS KV, so this requires looking up + // the previous value based on the Expected-Last-Subject-Sequence + // header. + // TODO + //if op == nats.KeyValueDelete { + //} + + var nd natsData + err := nd.Decode(e) + if err != nil { + b.l.Warnf("watch decode: key=%s, err=%s", key, err) + continue + } + + event := server.Event{ + Create: nd.Create, + Delete: nd.Delete, + KV: nd.KV, + PrevKV: &server.KeyValue{}, + } + + if nd.PrevRevision > 0 { + _, pnd, err := b.get(ctx, key, nd.PrevRevision, false) + if err == nil { + event.PrevKV = pnd.KV + } + } + + events <- []*server.Event{&event} + } + } + }() + + return events +} diff --git a/pkg/drivers/nats/backend_test.go b/pkg/drivers/nats/backend_test.go index c35a3e81..f665e76c 100644 --- a/pkg/drivers/nats/backend_test.go +++ b/pkg/drivers/nats/backend_test.go @@ -7,10 +7,11 @@ import ( "testing" "time" - "github.com/k3s-io/kine/pkg/drivers/nats/kv" + kserver "github.com/k3s-io/kine/pkg/server" "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats-server/v2/test" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" "github.com/sirupsen/logrus" ) @@ -28,20 +29,41 @@ func expErr(t *testing.T, err error) { } } -func expEqualErr(t *testing.T, got, want error) { +func expEqualErr(t *testing.T, want, got error) { t.Helper() if !errors.Is(want, got) { t.Fatalf("expected %v, got %v", want, got) } } -func expEqual[T comparable](t *testing.T, got, want T) { +func expEqual[T comparable](t *testing.T, want, got T) { t.Helper() if got != want { t.Fatalf("expected %v, got %v", want, got) } } +func expSortedKeys(t *testing.T, ents []*kserver.KeyValue) { + t.Helper() + var prev string + for _, ent := range ents { + if prev != "" { + if prev > ent.Key { + t.Fatalf("keys not sorted: %s > %s", prev, ent.Key) + } + } + prev = ent.Key + } +} + +func expEqualKeys(t *testing.T, want []string, got []*kserver.KeyValue) { + t.Helper() + expEqual(t, len(want), len(got)) + for i, k := range want { + expEqual(t, k, got[i].Key) + } +} + func setupBackend(t *testing.T) (*server.Server, *nats.Conn, *Backend) { ns := test.RunServer(&server.Options{ Port: -1, @@ -61,7 +83,11 @@ func setupBackend(t *testing.T) (*server.Server, *nats.Conn, *Backend) { }) noErr(t, err) - ekv := kv.NewEncodedKV(bkt, &kv.EtcdKeyCodec{}, &kv.S2ValueCodec{}) + njs, err := jetstream.New(nc) + noErr(t, err) + + ctx := context.Background() + ekv := NewKeyValue(ctx, bkt, njs) l := logrus.New() l.SetOutput(ioutil.Discard) @@ -75,60 +101,280 @@ func setupBackend(t *testing.T) (*server.Server, *nats.Conn, *Backend) { return ns, nc, &b } -func TestBackend(t *testing.T) { +func TestBackend_Create(t *testing.T) { ns, nc, b := setupBackend(t) defer ns.Shutdown() defer nc.Drain() ctx := context.Background() - // Create a key with a lease of 1 second. - rev, err := b.Create(ctx, "/foo", []byte("bar"), 1) + // Create a key. + rev, err := b.Create(ctx, "/a", nil, 0) noErr(t, err) expEqual(t, 1, rev) - rev, ent, err := b.Get(ctx, "/foo", "", 0, 0) + // Attempt to create again. + _, err = b.Create(ctx, "/a", nil, 0) + expEqualErr(t, err, kserver.ErrKeyExists) + + rev, err = b.Create(ctx, "/a/b", nil, 0) noErr(t, err) - expEqual(t, 1, rev) - expEqual(t, "bar", string(ent.Value)) - expEqual(t, "/foo", ent.Key) + expEqual(t, 2, rev) + + rev, err = b.Create(ctx, "/a/b/c", nil, 0) + noErr(t, err) + expEqual(t, 3, rev) + + rev, err = b.Create(ctx, "/b", nil, 1) + noErr(t, err) + expEqual(t, 4, rev) + + time.Sleep(2 * time.Millisecond) + + srev, count, err := b.Count(ctx, "/") + noErr(t, err) + expEqual(t, 4, srev) + expEqual(t, 4, count) + + time.Sleep(time.Second) + + srev, count, err = b.Count(ctx, "/") + noErr(t, err) + expEqual(t, 4, srev) + expEqual(t, 3, count) + + // Create /b again. Rev is 6 due to the internal delete. + // on read. + rev, err = b.Create(ctx, "/b", nil, 0) + noErr(t, err) + expEqual(t, 6, rev) + + time.Sleep(2 * time.Millisecond) + + srev, count, err = b.Count(ctx, "/") + noErr(t, err) + expEqual(t, 6, srev) + expEqual(t, 4, count) +} + +func TestBackend_Get(t *testing.T) { + ns, nc, b := setupBackend(t) + defer ns.Shutdown() + defer nc.Drain() + + ctx := context.Background() + + // Create with lease. + rev, err := b.Create(ctx, "/a", []byte("b"), 1) + noErr(t, err) + + time.Sleep(2 * time.Millisecond) + + srev, ent, err := b.Get(ctx, "/a", "", 0, 0) + noErr(t, err) + expEqual(t, 1, srev) + expEqual(t, "/a", ent.Key) + expEqual(t, "b", string(ent.Value)) expEqual(t, 1, ent.Lease) expEqual(t, 1, ent.ModRevision) - expEqual(t, 0, ent.CreateRevision) + expEqual(t, 1, ent.CreateRevision) + + time.Sleep(time.Second) + + // Latest is gone. + _, ent, err = b.Get(ctx, "/a", "", 0, 0) + expEqualErr(t, nats.ErrKeyNotFound, err) - // Count the items. - rev, count, err := b.Count(ctx, "/foo") + // Get at a revision will fail also. + _, ent, err = b.Get(ctx, "/a", "", 0, 1) + expEqualErr(t, nats.ErrKeyNotFound, err) + + // Get at later revision, does not exist. + _, _, err = b.Get(ctx, "/a", "", 0, 2) + expEqualErr(t, nats.ErrKeyNotFound, err) + + // Create it again and update it. + rev, err = b.Create(ctx, "/a", []byte("c"), 0) noErr(t, err) - expEqual(t, 1, rev) - expEqual(t, int64(1), count) + expEqual(t, 3, rev) - // List the keys. - rev, ents, err := b.List(ctx, "/foo", "", 0, 0) + _, _, _, err = b.Update(ctx, "/a", []byte("d"), rev, 0) noErr(t, err) - expEqual(t, 1, rev) - expEqual(t, 1, len(ents)) - // Expire the lease. - time.Sleep(time.Second) + // Get at prior version. + srev, ent, err = b.Get(ctx, "/a", "", 0, rev) + noErr(t, err) + expEqual(t, 4, srev) + expEqual(t, "/a", ent.Key) + expEqual(t, "c", string(ent.Value)) + expEqual(t, 0, ent.Lease) + expEqual(t, 3, ent.ModRevision) + expEqual(t, 3, ent.CreateRevision) +} + +func TestBackend_Update(t *testing.T) { + ns, nc, b := setupBackend(t) + defer ns.Shutdown() + defer nc.Drain() - // Try to get again. - rev, ent, err = b.Get(ctx, "/foo", "", 0, 0) - expEqualErr(t, err, nats.ErrKeyNotFound) + ctx := context.Background() - // Should be no items. - rev, count, err = b.Count(ctx, "/foo") + // Create with lease. + b.Create(ctx, "/a", []byte("b"), 1) + rev, ent, ok, err := b.Update(ctx, "/a", []byte("c"), 1, 0) noErr(t, err) expEqual(t, 2, rev) - expEqual(t, 0, count) + expEqual(t, true, ok) + expEqual(t, "/a", ent.Key) + expEqual(t, "c", string(ent.Value)) + expEqual(t, 0, ent.Lease) + expEqual(t, 2, ent.ModRevision) + expEqual(t, 1, ent.CreateRevision) - // Re-create the key without a lease. - rev, err = b.Create(ctx, "/foo", []byte("bar"), 0) + rev, ent, ok, err = b.Update(ctx, "/a", []byte("d"), 2, 1) noErr(t, err) expEqual(t, 3, rev) + expEqual(t, true, ok) + expEqual(t, "/a", ent.Key) + expEqual(t, "d", string(ent.Value)) + expEqual(t, 1, ent.Lease) + expEqual(t, 3, ent.ModRevision) + expEqual(t, 1, ent.CreateRevision) - // Get the key again. Expect revision to be 3 since the - // the key was deleted => 2 and create => 3. - rev, ent, err = b.Get(ctx, "/foo", "", 0, 0) + // Update with wrong revision. + rev, _, ok, err = b.Update(ctx, "/a", []byte("e"), 2, 1) noErr(t, err) expEqual(t, 3, rev) + expEqual(t, false, ok) +} + +func TestBackend_Delete(t *testing.T) { + ns, nc, b := setupBackend(t) + defer ns.Shutdown() + defer nc.Drain() + + ctx := context.Background() + + // Create with lease. + b.Create(ctx, "/a", []byte("b"), 1) + + // Note, deleting first performs an update to tombstone + // the key, followed by a KV delete. + rev, ent, ok, err := b.Delete(ctx, "/a", 1) + noErr(t, err) + expEqual(t, 2, rev) + expEqual(t, true, ok) + expEqual(t, "/a", ent.Key) + expEqual(t, "b", string(ent.Value)) + expEqual(t, 1, ent.Lease) + expEqual(t, 1, ent.ModRevision) + expEqual(t, 1, ent.CreateRevision) + + // Create again. + b.Create(ctx, "/a", []byte("b"), 0) + + // Fail to delete since the revision is not the same. + rev, _, ok, err = b.Delete(ctx, "/a", 1) + expEqual(t, 4, rev) + expEqual(t, false, ok) + expEqualErr(t, nil, err) + + // No revision, will delete the latest. + rev, _, ok, err = b.Delete(ctx, "/a", 0) + expEqual(t, 5, rev) + expEqual(t, true, ok) + expEqualErr(t, nil, err) +} + +func TestBackend_List(t *testing.T) { + ns, nc, b := setupBackend(t) + defer ns.Shutdown() + defer nc.Drain() + + ctx := context.Background() + + // Create a key. + b.Create(ctx, "/a/b/c", nil, 0) + b.Create(ctx, "/a", nil, 0) + b.Create(ctx, "/b", nil, 0) + b.Create(ctx, "/a/b", nil, 0) + b.Create(ctx, "/c", nil, 0) + b.Create(ctx, "/d/a", nil, 0) + b.Create(ctx, "/d/b", nil, 0) + + // Wait for the btree to be updated. + time.Sleep(time.Millisecond) + + // List the keys. + rev, ents, err := b.List(ctx, "/", "", 0, 0) + noErr(t, err) + expEqual(t, 7, rev) + expEqual(t, 7, len(ents)) + expSortedKeys(t, ents) + + // List the keys with prefix. + rev, ents, err = b.List(ctx, "/a", "", 0, 0) + noErr(t, err) + expEqual(t, 7, rev) + expEqual(t, 3, len(ents)) + expSortedKeys(t, ents) + + // List the keys >= start key. + rev, ents, err = b.List(ctx, "/", "b", 0, 0) + noErr(t, err) + expEqual(t, 7, rev) + expEqual(t, 4, len(ents)) + expSortedKeys(t, ents) + + // List the keys up to a revision. + rev, ents, err = b.List(ctx, "/", "", 0, 3) + noErr(t, err) + expEqual(t, 7, rev) + expEqual(t, 3, len(ents)) + expSortedKeys(t, ents) + expEqualKeys(t, []string{"/a", "/a/b/c", "/b"}, ents) + + // List the keys with a limit. + rev, ents, err = b.List(ctx, "/", "", 4, 0) + noErr(t, err) + expEqual(t, 7, rev) + expEqual(t, 4, len(ents)) + expSortedKeys(t, ents) + expEqualKeys(t, []string{"/a", "/a/b", "/a/b/c", "/b"}, ents) + + // List the keys with a limit after some start key. + rev, ents, err = b.List(ctx, "/", "b", 2, 0) + noErr(t, err) + expEqual(t, 7, rev) + expEqual(t, 2, len(ents)) + expSortedKeys(t, ents) + expEqualKeys(t, []string{"/b", "/c"}, ents) +} + +func TestBackend_Watch(t *testing.T) { + ns, nc, b := setupBackend(t) + defer ns.Shutdown() + defer nc.Drain() + + ctx := context.Background() + + cctx, cancel := context.WithCancel(ctx) + defer cancel() + + rev1, _ := b.Create(ctx, "/a", nil, 0) + rev2, _ := b.Create(ctx, "/a/1", nil, 0) + rev1, _, _, _ = b.Update(ctx, "/a", nil, rev1, 0) + b.Delete(ctx, "/a", rev1) + b.Update(ctx, "/a/1", nil, rev2, 0) + + ech := b.Watch(cctx, "/a", 0) + time.Sleep(20 * time.Millisecond) + cancel() + + var events []*kserver.Event + for es := range ech { + events = append(events, es...) + } + + expEqual(t, 5, len(events)) } diff --git a/pkg/drivers/nats/codec.go b/pkg/drivers/nats/codec.go new file mode 100644 index 00000000..07479a5e --- /dev/null +++ b/pkg/drivers/nats/codec.go @@ -0,0 +1,89 @@ +package nats + +import ( + "fmt" + "io" + "strings" + + "github.com/klauspost/compress/s2" + "github.com/nats-io/nats.go" + "github.com/shengdoushi/base58" +) + +var ( + keyAlphabet = base58.BitcoinAlphabet +) + +// keyCodec turns keys like /this/is/a.test.key into Base58 encoded values +// split on `.` This is because NATS keys are split on . rather than /. +type keyCodec struct{} + +func (e *keyCodec) EncodeRange(prefix string) (string, error) { + if prefix == "/" { + return ">", nil + } + + ek, err := e.Encode(prefix) + if err != nil { + return "", err + } + + return fmt.Sprintf("%s.>", ek), nil +} + +func (*keyCodec) Encode(key string) (retKey string, e error) { + if key == "" { + return "", nats.ErrInvalidKey + } + + // Trim leading and trailing slashes. + key = strings.Trim(key, "/") + + var parts []string + for _, part := range strings.Split(key, "/") { + parts = append(parts, base58.Encode([]byte(part), keyAlphabet)) + } + + if len(parts) == 0 { + return "", nats.ErrInvalidKey + } + + return strings.Join(parts, "."), nil +} + +func (*keyCodec) Decode(key string) (retKey string, e error) { + var parts []string + + for _, s := range strings.Split(key, ".") { + decodedPart, err := base58.Decode(s, keyAlphabet) + if err != nil { + return "", err + } + parts = append(parts, string(decodedPart[:])) + } + + if len(parts) == 0 { + return "", nats.ErrInvalidKey + } + + return fmt.Sprintf("/%s", strings.Join(parts, "/")), nil +} + +// valueCodec is a codec that compresses values using s2. +type valueCodec struct{} + +func (*valueCodec) Encode(src []byte, dst io.Writer) error { + enc := s2.NewWriter(dst) + err := enc.EncodeBuffer(src) + if err != nil { + enc.Close() + return err + } + return enc.Close() +} + +func (*valueCodec) Decode(src io.Reader, dst io.Writer) error { + dec := s2.NewReader(src) + _, err := io.Copy(dst, dec) + return err +} diff --git a/pkg/drivers/nats/codec_test.go b/pkg/drivers/nats/codec_test.go new file mode 100644 index 00000000..6ef724c7 --- /dev/null +++ b/pkg/drivers/nats/codec_test.go @@ -0,0 +1,93 @@ +package nats + +import "testing" + +func TestKeyEncode(t *testing.T) { + tests := []struct { + In string + Out string + Err bool + }{ + {"", "", true}, + {"/", "", true}, + {"a", "2g", false}, + {"/a/a", "2g.2g", false}, + {"a/a", "2g.2g", false}, + {"a/a/a", "2g.2g.2g", false}, + {"a/*/a", "2g.j.2g", false}, + {"a/*/a/", "2g.j.2g", false}, + } + + codec := &keyCodec{} + + for _, test := range tests { + out, err := codec.Encode(test.In) + if err != nil { + if !test.Err { + t.Errorf("Expected no error for %q, got %v", test.In, err) + } + continue + } + if out != test.Out { + t.Errorf("Expected %q for %q, got %q", test.Out, test.In, out) + } + } +} + +func TestKeyDecode(t *testing.T) { + tests := []struct { + In string + Out string + Err bool + }{ + {"", "/", false}, + {"2g", "/a", false}, + {"2g.2g", "/a/a", false}, + {"2g.2g.2g", "/a/a/a", false}, + } + + codec := &keyCodec{} + + for _, test := range tests { + out, err := codec.Decode(test.In) + if err != nil { + if !test.Err { + t.Errorf("Expected no error for %q, got %v", test.In, err) + } + continue + } + if out != test.Out { + t.Errorf("Expected %q for %q, got %q", test.Out, test.In, out) + } + } +} + +func TestKeyEncodeRange(t *testing.T) { + tests := []struct { + In string + Out string + Err bool + }{ + {"", "", true}, + {"/", ">", false}, + {"a", "2g.>", false}, + {"/a/a", "2g.2g.>", false}, + {"a/a/a", "2g.2g.2g.>", false}, + {"a/*/a", "2g.j.2g.>", false}, + } + + codec := &keyCodec{} + + for _, test := range tests { + out, err := codec.EncodeRange(test.In) + if err != nil { + if !test.Err { + t.Errorf("Expected no error for %q, got %v", test.In, err) + } + continue + } + if out != test.Out { + t.Errorf("Expected %q for %q, got %q", test.Out, test.In, out) + } + } +} diff --git a/pkg/drivers/nats/config.go b/pkg/drivers/nats/config.go index 622e2e37..5480a6b3 100644 --- a/pkg/drivers/nats/config.go +++ b/pkg/drivers/nats/config.go @@ -112,6 +112,12 @@ func parseConnection(dsn string, tlsInfo tls.Config) (*Config, error) { config.clientOptions = append(config.clientOptions, nats.RootCAs(tlsInfo.CAFile)) } + // Simpler direct reference to creds file. + if f := queryMap.Get("credsFile"); f != "" { + config.clientOptions = append(config.clientOptions, nats.UserCredentials(f)) + } + + // Reference a full context file. Note this will override any other options. if f := queryMap.Get("contextFile"); f != "" { if u.Host != "" { return config, fmt.Errorf("when using context endpoint no host should be provided") diff --git a/pkg/drivers/nats/kv.go b/pkg/drivers/nats/kv.go new file mode 100644 index 00000000..c51d568c --- /dev/null +++ b/pkg/drivers/nats/kv.go @@ -0,0 +1,523 @@ +package nats + +import ( + "bytes" + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "github.com/sirupsen/logrus" + "github.com/tidwall/btree" +) + +type entry struct { + kc *keyCodec + vc *valueCodec + entry nats.KeyValueEntry +} + +func (e *entry) Key() string { + dk, err := e.kc.Decode(e.entry.Key()) + // should not happen + if err != nil { + // should not happen + logrus.Warnf("could not decode key %s: %v", e.entry.Key(), err) + return "" + } + + return dk +} + +func (e *entry) Bucket() string { return e.entry.Bucket() } +func (e *entry) Value() []byte { + buf := new(bytes.Buffer) + if err := e.vc.Decode(bytes.NewBuffer(e.entry.Value()), buf); err != nil { + // should not happen + logrus.Warnf("could not decode value for %s: %v", e.Key(), err) + } + return buf.Bytes() +} +func (e *entry) Revision() uint64 { return e.entry.Revision() } +func (e *entry) Created() time.Time { return e.entry.Created() } +func (e *entry) Delta() uint64 { return e.entry.Delta() } +func (e *entry) Operation() nats.KeyValueOp { return e.entry.Operation() } + +type KeyValue struct { + nkv nats.KeyValue + njs jetstream.JetStream + kc *keyCodec + vc *valueCodec + bt *btree.Map[string, []*seqOp] + btm sync.RWMutex + lastSeq uint64 +} + +type seqOp struct { + seq uint64 + op nats.KeyValueOp + ex time.Time +} + +type streamWatcher struct { + con jetstream.Consumer + conctx jetstream.ConsumeContext + keyCodec *keyCodec + valueCodec *valueCodec + updates chan nats.KeyValueEntry + keyPrefix string + ctx context.Context + cancel context.CancelFunc +} + +func (w *streamWatcher) Context() context.Context { + if w == nil { + return nil + } + return w.ctx +} + +func (w *streamWatcher) Updates() <-chan nats.KeyValueEntry { + return w.updates +} + +func (w *streamWatcher) Stop() error { + if w.cancel != nil { + w.cancel() + } + w.conctx.Stop() + return nil +} + +type kvEntry struct { + key string + bucket string + value []byte + revision uint64 + created time.Time + delta uint64 + operation nats.KeyValueOp +} + +func (e *kvEntry) Key() string { + return e.key +} + +func (e *kvEntry) Bucket() string { return e.bucket } +func (e *kvEntry) Value() []byte { + return e.value +} +func (e *kvEntry) Revision() uint64 { return e.revision } +func (e *kvEntry) Created() time.Time { return e.created } +func (e *kvEntry) Delta() uint64 { return e.delta } +func (e *kvEntry) Operation() nats.KeyValueOp { return e.operation } + +func (e *KeyValue) newStreamWatcher(ctx context.Context, con jetstream.Consumer, keyPrefix string) (nats.KeyWatcher, error) { + w := &streamWatcher{ + con: con, + keyCodec: e.kc, + valueCodec: e.vc, + updates: make(chan nats.KeyValueEntry, 32), + keyPrefix: keyPrefix, + } + + w.ctx, w.cancel = context.WithCancel(ctx) + + var ( + conc jetstream.ConsumeContext + err error + ) + + subjectPrefix := fmt.Sprintf("$KV.%s.", e.nkv.Bucket()) + + conc, err = con.Consume(func(msg jetstream.Msg) { + md, _ := msg.Metadata() + key := strings.TrimPrefix(msg.Subject(), subjectPrefix) + + if keyPrefix != "" { + dkey, err := e.kc.Decode(strings.TrimPrefix(key, ".")) + if err != nil || !strings.HasPrefix(dkey, keyPrefix) { + return + } + } + + // Default is PUT + var op nats.KeyValueOp + switch msg.Headers().Get("KV-Operation") { + case "DEL": + op = nats.KeyValueDelete + case "PURGE": + op = nats.KeyValuePurge + } + // Not currently used... + delta := 0 + + w.updates <- &entry{ + kc: e.kc, + vc: e.vc, + entry: &kvEntry{ + key: key, + bucket: e.nkv.Bucket(), + value: msg.Data(), + revision: md.Sequence.Stream, + created: md.Timestamp, + delta: uint64(delta), + operation: op, + }, + } + }) + if err != nil { + return nil, err + } + w.conctx = conc + + return w, nil +} + +func (e *KeyValue) Get(key string) (nats.KeyValueEntry, error) { + ek, err := e.kc.Encode(key) + if err != nil { + return nil, err + } + + ent, err := e.nkv.Get(ek) + if err != nil { + return nil, err + } + + return &entry{ + kc: e.kc, + vc: e.vc, + entry: ent, + }, nil +} + +func (e *KeyValue) GetRevision(key string, revision uint64) (nats.KeyValueEntry, error) { + ek, err := e.kc.Encode(key) + if err != nil { + return nil, err + } + + ent, err := e.nkv.GetRevision(ek, revision) + if err != nil { + return nil, err + } + + return &entry{ + kc: e.kc, + vc: e.vc, + entry: ent, + }, nil +} + +func (e *KeyValue) Create(key string, value []byte) (uint64, error) { + ek, err := e.kc.Encode(key) + if err != nil { + return 0, err + } + + buf := new(bytes.Buffer) + + err = e.vc.Encode(value, buf) + if err != nil { + return 0, err + } + + return e.nkv.Create(ek, buf.Bytes()) +} + +func (e *KeyValue) Update(key string, value []byte, last uint64) (uint64, error) { + ek, err := e.kc.Encode(key) + if err != nil { + return 0, err + } + + buf := new(bytes.Buffer) + + err = e.vc.Encode(value, buf) + if err != nil { + return 0, err + } + + return e.nkv.Update(ek, buf.Bytes(), last) +} + +func (e *KeyValue) Delete(key string, opts ...nats.DeleteOpt) error { + ek, err := e.kc.Encode(key) + if err != nil { + return err + } + + return e.nkv.Delete(ek, opts...) +} + +func (e *KeyValue) Watch(ctx context.Context, keys string, startRev int64) (nats.KeyWatcher, error) { + // Everything but the last token will be treated as a filter + // on the watcher. The last token will used as a receipt-time filter. + filter := keys + if !strings.HasSuffix(filter, "/") { + idx := strings.LastIndexByte(filter, '/') + if idx > -1 { + filter = keys[:idx+1] + } + } + + return e.watchStream(ctx, filter, keys, uint64(startRev)) +} + +func (e *KeyValue) watchStream(ctx context.Context, filter, keyPrefix string, startRev uint64) (nats.KeyWatcher, error) { + var cfg jetstream.OrderedConsumerConfig + + if filter != "" { + p, err := e.kc.EncodeRange(filter) + if err != nil { + return nil, err + } + filter := fmt.Sprintf("$KV.%s.%s", e.nkv.Bucket(), p) + cfg.FilterSubjects = []string{filter} + } + + if startRev < 0 { + cfg.DeliverPolicy = jetstream.DeliverNewPolicy + } else if startRev == 0 { + cfg.DeliverPolicy = jetstream.DeliverAllPolicy + } else { + cfg.DeliverPolicy = jetstream.DeliverByStartSequencePolicy + cfg.OptStartSeq = startRev + } + + con, err := e.njs.OrderedConsumer(ctx, fmt.Sprintf("KV_%s", e.nkv.Bucket()), cfg) + if err != nil { + return nil, err + } + + w, err := e.newStreamWatcher(ctx, con, keyPrefix) + if err != nil { + return nil, err + } + + return w, nil +} + +// BucketSize returns the size of the bucket in bytes. +func (e *KeyValue) BucketSize() (int64, error) { + status, err := e.nkv.Status() + if err != nil { + return 0, err + } + return int64(status.Bytes()), nil +} + +// BucketRevision returns the latest revision of the bucket. +func (e *KeyValue) BucketRevision() int64 { + e.btm.RLock() + s := e.lastSeq + e.btm.RUnlock() + return int64(s) +} + +func (e *KeyValue) btreeWatcher(ctx context.Context) error { + w, err := e.nkv.WatchAll(nats.IncludeHistory(), nats.Context(ctx)) + if err != nil { + return err + } + defer w.Stop() + + status, _ := e.nkv.Status() + hsize := status.History() + + for { + select { + case <-ctx.Done(): + return nil + + case x := <-w.Updates(): + if x == nil { + continue + } + + seq := x.Revision() + op := x.Operation() + + key, err := e.kc.Decode(x.Key()) + if err != nil { + continue + } + + var ex time.Time + if op == nats.KeyValuePut { + xe := entry{ + kc: e.kc, + vc: e.vc, + entry: x, + } + + var nd natsData + err = nd.Decode(&xe) + if err != nil { + continue + } + if nd.KV.Lease > 0 { + ex = nd.CreateTime.Add(time.Second * time.Duration(nd.KV.Lease)) + } + } + + e.btm.Lock() + e.lastSeq = seq + val, ok := e.bt.Get(key) + if !ok { + val = make([]*seqOp, 0, hsize) + } + // Remove the oldest entry. + if len(val) == cap(val) { + val = append(val[:0], val[1:]...) + } + val = append(val, &seqOp{ + seq: seq, + op: op, + ex: ex, + }) + e.bt.Set(key, val) + e.btm.Unlock() + } + } +} + +type keySeq struct { + key string + seq uint64 +} + +func (e *KeyValue) Count(prefix string) (int64, error) { + it := e.bt.Iter() + + if prefix != "" { + ok := it.Seek(prefix) + if !ok { + return 0, nil + } + } + + var count int64 + now := time.Now() + + e.btm.RLock() + defer e.btm.RUnlock() + + for { + k := it.Key() + if !strings.HasPrefix(k, prefix) { + break + } + v := it.Value() + so := v[len(v)-1] + + if so.op == nats.KeyValuePut { + if so.ex.IsZero() || so.ex.After(now) { + count++ + } + } + + if !it.Next() { + break + } + } + + return count, nil +} + +func (e *KeyValue) List(prefix, startKey string, limit, revision int64) ([]nats.KeyValueEntry, error) { + seekKey := prefix + if startKey != "" { + seekKey = strings.TrimSuffix(seekKey, "/") + seekKey = fmt.Sprintf("%s/%s", seekKey, startKey) + } + + it := e.bt.Iter() + if seekKey != "" { + ok := it.Seek(seekKey) + if !ok { + return nil, nil + } + } + + var matches []*keySeq + + e.btm.RLock() + defer e.btm.RUnlock() + + for { + if limit > 0 && len(matches) == int(limit) { + break + } + + k := it.Key() + if !strings.HasPrefix(k, prefix) { + break + } + + v := it.Value() + + // Get the latest update for the key. + if revision <= 0 { + so := v[len(v)-1] + if so.op == nats.KeyValuePut { + if so.ex.IsZero() || so.ex.After(time.Now()) { + matches = append(matches, &keySeq{key: k, seq: so.seq}) + } + } + } else { + // Find the latest update below the given revision. + for i := len(v) - 1; i >= 0; i-- { + so := v[i] + if so.seq <= uint64(revision) { + if so.op == nats.KeyValuePut { + if so.ex.IsZero() || so.ex.After(time.Now()) { + matches = append(matches, &keySeq{key: k, seq: so.seq}) + } + } + break + } + } + } + + if !it.Next() { + break + } + } + + entries := make([]nats.KeyValueEntry, 0, len(matches)) + for _, m := range matches { + e, err := e.GetRevision(m.key, m.seq) + if err != nil { + return nil, err + } + entries = append(entries, e) + } + + return entries, nil +} + +func NewKeyValue(ctx context.Context, bucket nats.KeyValue, njs jetstream.JetStream) *KeyValue { + kv := &KeyValue{ + nkv: bucket, + njs: njs, + kc: &keyCodec{}, + vc: &valueCodec{}, + bt: btree.NewMap[string, []*seqOp](0), + } + + go func() { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + err := kv.btreeWatcher(ctx) + if err != nil { + panic(err) + } + }() + + return kv +} diff --git a/pkg/drivers/nats/kv/etcd_encoder.go b/pkg/drivers/nats/kv/etcd_encoder.go deleted file mode 100644 index b0484968..00000000 --- a/pkg/drivers/nats/kv/etcd_encoder.go +++ /dev/null @@ -1,104 +0,0 @@ -package kv - -import ( - "fmt" - "io" - "io/ioutil" - "strings" - - "github.com/klauspost/compress/s2" - "github.com/nats-io/nats.go" - "github.com/shengdoushi/base58" -) - -// EtcdKeyCodec turns keys like /this/is/a.test.key into Base58 encoded values split on `/` -// This is because NATS Jetstream Keys are split on . rather than / -type EtcdKeyCodec struct{} - -type S2ValueCodec struct{} - -type PlainCodec struct{} - -var ( - keyAlphabet = base58.BitcoinAlphabet -) - -func (e *EtcdKeyCodec) EncodeRange(keys string) (string, error) { - ek, err := e.Encode(keys) - if err != nil { - return "", err - } - if strings.HasSuffix(ek, ".") { - return fmt.Sprintf("%s>", ek), nil - } - return ek, nil -} - -func (*EtcdKeyCodec) Encode(key string) (retKey string, e error) { - //defer func() { - // logrus.Debugf("encoded %s => %s", key, retKey) - //}() - parts := []string{} - for _, part := range strings.Split(strings.TrimPrefix(key, "/"), "/") { - if part == ">" || part == "*" { - parts = append(parts, part) - continue - } - parts = append(parts, base58.Encode([]byte(part), keyAlphabet)) - } - - if len(parts) == 0 { - return "", nats.ErrInvalidKey - } - - return strings.Join(parts, "."), nil -} - -func (*EtcdKeyCodec) Decode(key string) (retKey string, e error) { - //defer func() { - // logrus.Debugf("decoded %s => %s", key, retKey) - //}() - parts := []string{} - for _, s := range strings.Split(key, ".") { - decodedPart, err := base58.Decode(s, keyAlphabet) - if err != nil { - return "", err - } - parts = append(parts, string(decodedPart[:])) - } - if len(parts) == 0 { - return "", nats.ErrInvalidKey - } - return fmt.Sprintf("/%s", strings.Join(parts, "/")), nil -} - -func (*S2ValueCodec) Encode(src []byte, dst io.Writer) error { - enc := s2.NewWriter(dst) - err := enc.EncodeBuffer(src) - if err != nil { - enc.Close() - return err - } - return enc.Close() -} - -func (*S2ValueCodec) Decode(src io.Reader, dst io.Writer) error { - dec := s2.NewReader(src) - _, err := io.Copy(dst, dec) - return err -} - -func (*PlainCodec) Encode(src []byte, dst io.Writer) error { - _, err := dst.Write(src) - return err -} - -func (*PlainCodec) Decode(src io.Reader, dst io.Writer) error { - b, err := ioutil.ReadAll(src) - if err != nil { - return err - } - _, err = dst.Write(b) - - return err -} diff --git a/pkg/drivers/nats/kv/kv.go b/pkg/drivers/nats/kv/kv.go deleted file mode 100644 index a6e2488f..00000000 --- a/pkg/drivers/nats/kv/kv.go +++ /dev/null @@ -1,327 +0,0 @@ -package kv - -import ( - "bytes" - "context" - "io" - "sort" - "time" - - "github.com/nats-io/nats.go" - "github.com/sirupsen/logrus" -) - -func NewEncodedKV(bucket nats.KeyValue, k KeyCodec, v ValueCodec) *EncodedKV { - return &EncodedKV{bucket: bucket, keyCodec: k, valueCodec: v} -} - -type WatcherWithCtx interface { - WatchWithCtx(ctx context.Context, keys string, opts ...nats.WatchOpt) nats.KeyWatcher -} - -type KeyCodec interface { - Encode(key string) (string, error) - Decode(key string) (string, error) - EncodeRange(keys string) (string, error) -} - -type ValueCodec interface { - Encode(src []byte, dst io.Writer) error - Decode(src io.Reader, dst io.Writer) error -} - -type EncodedKV struct { - WatcherWithCtx - bucket nats.KeyValue - keyCodec KeyCodec - valueCodec ValueCodec -} - -type watcher struct { - watcher nats.KeyWatcher - keyCodec KeyCodec - valueCodec ValueCodec - updates chan nats.KeyValueEntry - ctx context.Context - cancel context.CancelFunc -} - -func (w *watcher) Context() context.Context { - if w == nil { - return nil - } - return w.ctx -} - -type entry struct { - keyCodec KeyCodec - valueCodec ValueCodec - entry nats.KeyValueEntry -} - -func (e *entry) Key() string { - dk, err := e.keyCodec.Decode(e.entry.Key()) - // should not happen - if err != nil { - // should not happen - logrus.Warnf("could not decode key %s: %v", e.entry.Key(), err) - return "" - } - - return dk -} - -func (e *entry) Bucket() string { return e.entry.Bucket() } -func (e *entry) Value() []byte { - buf := new(bytes.Buffer) - if err := e.valueCodec.Decode(bytes.NewBuffer(e.entry.Value()), buf); err != nil { - // should not happen - logrus.Warnf("could not decode value for %s: %v", e.Key(), err) - } - return buf.Bytes() -} -func (e *entry) Revision() uint64 { return e.entry.Revision() } -func (e *entry) Created() time.Time { return e.entry.Created() } -func (e *entry) Delta() uint64 { return e.entry.Delta() } -func (e *entry) Operation() nats.KeyValueOp { return e.entry.Operation() } - -func (w *watcher) Updates() <-chan nats.KeyValueEntry { return w.updates } -func (w *watcher) Stop() error { - if w.cancel != nil { - w.cancel() - } - - return w.watcher.Stop() -} - -func (e *EncodedKV) newWatcher(w nats.KeyWatcher) nats.KeyWatcher { - watch := &watcher{ - watcher: w, - keyCodec: e.keyCodec, - valueCodec: e.valueCodec, - updates: make(chan nats.KeyValueEntry, 32)} - - if w.Context() == nil { - watch.ctx, watch.cancel = context.WithCancel(context.Background()) - } else { - watch.ctx, watch.cancel = context.WithCancel(w.Context()) - } - - go func() { - for { - select { - case ent := <-w.Updates(): - if ent == nil { - watch.updates <- nil - continue - } - - watch.updates <- &entry{ - keyCodec: e.keyCodec, - valueCodec: e.valueCodec, - entry: ent, - } - case <-watch.ctx.Done(): - return - } - } - }() - - return watch -} - -func (e *EncodedKV) Get(key string) (nats.KeyValueEntry, error) { - ek, err := e.keyCodec.Encode(key) - if err != nil { - return nil, err - } - - ent, err := e.bucket.Get(ek) - if err != nil { - return nil, err - } - - return &entry{ - keyCodec: e.keyCodec, - valueCodec: e.valueCodec, - entry: ent, - }, nil -} - -func (e *EncodedKV) GetRevision(key string, revision uint64) (nats.KeyValueEntry, error) { - ek, err := e.keyCodec.Encode(key) - if err != nil { - return nil, err - } - - ent, err := e.bucket.GetRevision(ek, revision) - if err != nil { - return nil, err - } - - return &entry{ - keyCodec: e.keyCodec, - valueCodec: e.valueCodec, - entry: ent, - }, nil -} - -func (e *EncodedKV) Create(key string, value []byte) (uint64, error) { - ek, err := e.keyCodec.Encode(key) - if err != nil { - return 0, err - } - - buf := new(bytes.Buffer) - - err = e.valueCodec.Encode(value, buf) - if err != nil { - return 0, err - } - - return e.bucket.Create(ek, buf.Bytes()) -} - -func (e *EncodedKV) Update(key string, value []byte, last uint64) (uint64, error) { - ek, err := e.keyCodec.Encode(key) - if err != nil { - return 0, err - } - - buf := new(bytes.Buffer) - - err = e.valueCodec.Encode(value, buf) - if err != nil { - return 0, err - } - - return e.bucket.Update(ek, buf.Bytes(), last) -} - -func (e *EncodedKV) Delete(key string, opts ...nats.DeleteOpt) error { - ek, err := e.keyCodec.Encode(key) - if err != nil { - return err - } - - return e.bucket.Delete(ek, opts...) -} - -func (e *EncodedKV) Purge(key string, opts ...nats.DeleteOpt) error { - ek, err := e.keyCodec.Encode(key) - if err != nil { - return err - } - - return e.bucket.Purge(ek, opts...) -} - -func (e *EncodedKV) Watch(keys string, opts ...nats.WatchOpt) (nats.KeyWatcher, error) { - ek, err := e.keyCodec.EncodeRange(keys) - if err != nil { - return nil, err - } - - nw, err := e.bucket.Watch(ek, opts...) - if err != nil { - return nil, err - } - - return e.newWatcher(nw), err -} - -func (e *EncodedKV) History(key string, opts ...nats.WatchOpt) ([]nats.KeyValueEntry, error) { - ek, err := e.keyCodec.Encode(key) - if err != nil { - return nil, err - } - - var res []nats.KeyValueEntry - hist, err := e.bucket.History(ek, opts...) - if err != nil { - return nil, err - } - - for _, ent := range hist { - res = append(res, &entry{e.keyCodec, e.valueCodec, ent}) - } - - return res, nil -} - -// GetKeys returns all keys matching the prefix. -func (e *EncodedKV) GetKeys(ctx context.Context, prefix string, sortResults bool) ([]string, error) { - watcher, err := e.Watch(prefix, nats.MetaOnly(), nats.IgnoreDeletes(), nats.Context(ctx)) - if err != nil { - return nil, err - } - defer func() { - err := watcher.Stop() - if err != nil { - logrus.Warnf("failed to stop %s getKeys watcher", prefix) - } - }() - - var keys []string - // grab all matching keys immediately - for entry := range watcher.Updates() { - if entry == nil { - break - } - keys = append(keys, entry.Key()) - } - - if sortResults { - sort.Strings(keys) - } - - return keys, nil -} - -// GetKeyValues returns a []nats.KeyValueEntry matching prefix -func (e *EncodedKV) GetKeyValues(ctx context.Context, prefix string, sortResults bool) ([]nats.KeyValueEntry, error) { - watcher, err := e.bucket.Watch(prefix, nats.IgnoreDeletes(), nats.Context(ctx)) - if err != nil { - return nil, err - } - defer func() { - err := watcher.Stop() - if err != nil { - logrus.Warnf("failed to stop %s getKeyValues watcher", prefix) - } - }() - - var entries []nats.KeyValueEntry - for entry := range watcher.Updates() { - if entry == nil { - break - } - entries = append(entries, entry) - } - - if sortResults { - sort.Slice(entries, func(i, j int) bool { - return entries[i].Key() < entries[j].Key() - }) - } - - return entries, nil -} - -// BucketSize returns the size of the bucket in bytes. -func (e *EncodedKV) BucketSize() (int64, error) { - status, err := e.bucket.Status() - if err != nil { - return 0, err - } - return int64(status.Bytes()), nil -} - -// BucketRevision returns the latest revision of the bucket. -func (e *EncodedKV) BucketRevision() (int64, error) { - status, err := e.bucket.Status() - if err != nil { - return 0, err - } - return int64(status.(*nats.KeyValueBucketStatus).StreamInfo().State.LastSeq), nil -} diff --git a/pkg/drivers/nats/nats.go b/pkg/drivers/nats/nats.go deleted file mode 100644 index aa8d7a4f..00000000 --- a/pkg/drivers/nats/nats.go +++ /dev/null @@ -1,548 +0,0 @@ -package nats - -import ( - "context" - "encoding/json" - "strings" - "time" - - "github.com/k3s-io/kine/pkg/drivers/nats/kv" - "github.com/k3s-io/kine/pkg/server" - "github.com/nats-io/nats.go" - "github.com/sirupsen/logrus" -) - -// TODO: version this data structure to simplify and optimize for size. -type natsData struct { - // v1 fields - KV *server.KeyValue `json:"KV"` - PrevRevision int64 `json:"PrevRevision"` - Create bool `json:"Create"` - Delete bool `json:"Delete"` - - CreateTime time.Time `json:"-"` -} - -func (d *natsData) Encode() ([]byte, error) { - buf, err := json.Marshal(d) - return buf, err -} - -func (d *natsData) Decode(e nats.KeyValueEntry) error { - if e == nil || e.Value() == nil { - return nil - } - - err := json.Unmarshal(e.Value(), d) - if err != nil { - return err - } - d.KV.ModRevision = int64(e.Revision()) - d.CreateTime = e.Created() - return nil -} - -var ( - // Ensure Backend implements server.Backend. - _ server.Backend = (&Backend{}) -) - -type Backend struct { - nc *nats.Conn - js nats.JetStreamContext - kv *kv.EncodedKV - l *logrus.Logger -} - -// isExpiredKey checks if the key is expired based on the create time and lease. -func (b *Backend) isExpiredKey(value *natsData) bool { - if value.KV.Lease == 0 { - return false - } - - return time.Now().After(value.CreateTime.Add(time.Second * time.Duration(value.KV.Lease))) -} - -// get returns the key-value entry for the given key and revision, if specified. -// This takes into account entries that have been marked as deleted or expired. -func (b *Backend) get(ctx context.Context, key string, revision int64, allowDeletes bool) (int64, *natsData, error) { - var ( - entry nats.KeyValueEntry - err error - ) - - // Get latest revision if not specified. - if revision <= 0 { - entry, err = b.kv.Get(key) - } else { - entry, err = b.kv.GetRevision(key, uint64(revision)) - } - if err != nil { - return 0, nil, err - } - - rev := int64(entry.Revision()) - - var val natsData - err = val.Decode(entry) - if err != nil { - return rev, nil, err - } - - if val.Delete && !allowDeletes { - return rev, nil, nats.ErrKeyNotFound - } - - if b.isExpiredKey(&val) { - err := b.kv.Delete(val.KV.Key, nats.LastRevision(uint64(rev))) - if err != nil { - b.l.Warnf("Failed to delete expired key %s: %v", val.KV.Key, err) - } - // Return a zero indicating the key was deleted. - return 0, nil, nats.ErrKeyNotFound - } - - return rev, &val, nil -} - -// Start starts the backend. -// See https://github.com/kubernetes/kubernetes/blob/442a69c3bdf6fe8e525b05887e57d89db1e2f3a5/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go#L97 -func (b *Backend) Start(ctx context.Context) error { - if _, err := b.Create(ctx, "/registry/health", []byte(`{"health":"true"}`), 0); err != nil { - if err != server.ErrKeyExists { - b.l.Errorf("Failed to create health check key: %v", err) - } - } - return nil -} - -// DbSize get the kineBucket size from JetStream. -func (b *Backend) DbSize(context.Context) (int64, error) { - return b.kv.BucketSize() -} - -// Count returns an exact count of the number of matching keys and the current revision of the database. -func (b *Backend) Count(ctx context.Context, prefix string) (int64, int64, error) { - keys, err := b.kv.GetKeys(ctx, prefix, false) - if err != nil { - return 0, 0, err - } - storeRev, err := b.kv.BucketRevision() - if err != nil { - return 0, 0, err - } - return storeRev, int64(len(keys)), nil -} - -// Get returns the store's current revision, the associated server.KeyValue or an error. -func (b *Backend) Get(ctx context.Context, key, rangeEnd string, limit, revision int64) (int64, *server.KeyValue, error) { - // Get the kv entry and return the revision. - rev, nv, err := b.get(ctx, key, revision, false) - if err != nil { - return 0, nil, err - } - - // Attempt to get the latest store revision. If this fails, use the key rev. - storeRev, err := b.kv.BucketRevision() - if err == nil && storeRev > rev { - rev = storeRev - } - - return rev, nv.KV, nil -} - -// Create attempts to create the key-value entry and returns the revision number. -func (b *Backend) Create(ctx context.Context, key string, value []byte, lease int64) (int64, error) { - // Check if key exists already. If the entry exists even if marked as expired or deleted, - // the revision will be returned to apply an update. - rev, pnv, err := b.get(ctx, key, 0, false) - if pnv != nil { - return rev, server.ErrKeyExists - } - // If an error other than key not found, return. - if err != nil && err != nats.ErrKeyNotFound { - return 0, err - } - - nv := natsData{ - Delete: false, - Create: true, - PrevRevision: 0, - KV: &server.KeyValue{ - Key: key, - CreateRevision: 0, - ModRevision: 0, - Value: value, - Lease: lease, - }, - } - - data, err := nv.Encode() - if err != nil { - return 0, err - } - - // An update with a zero revision will create the key. - seq, err := b.kv.Create(key, data) - if err != nil { - // This may occur if a concurrent writer created the key. - if jsWrongLastSeqErr.Is(err) { - b.l.Warnf("create: key=%s, err=%s", key, err) - return 0, server.ErrKeyExists - } - - return 0, err - } - - return int64(seq), nil -} - -func (b *Backend) Delete(ctx context.Context, key string, revision int64) (int64, *server.KeyValue, bool, error) { - // Get the key, allow deletes. - rev, value, err := b.get(ctx, key, 0, true) - if err != nil { - if err == nats.ErrKeyNotFound { - return rev, nil, true, nil - } - // TODO: if false, get the current KV. - // Expected last version is in the header. - return rev, nil, false, err - } - - // If deleted - if value.Delete { - return rev, value.KV, true, nil - } - - if revision != 0 && value.KV.ModRevision != revision { - return rev, value.KV, false, nil - } - - nv := natsData{ - Delete: true, - PrevRevision: rev, - KV: value.KV, - } - data, err := nv.Encode() - if err != nil { - return rev, nil, false, err - } - - drev, err := b.kv.Update(key, data, uint64(rev)) - if err != nil { - return rev, value.KV, false, nil - } - - err = b.kv.Delete(key, nats.LastRevision(drev)) - if err != nil { - return rev, value.KV, false, nil - } - - return int64(drev), value.KV, true, nil -} - -func (b *Backend) Update(ctx context.Context, key string, value []byte, revision, lease int64) (int64, *server.KeyValue, bool, error) { - rev, pjv, err := b.get(ctx, key, 0, false) - - if err != nil { - if err == nats.ErrKeyNotFound { - return rev, nil, false, nil - } - return rev, nil, false, err - } - - if pjv == nil { - return 0, nil, false, nil - } - - if pjv.KV.ModRevision != revision { - return rev, pjv.KV, false, nil - } - - updateValue := natsData{ - Delete: false, - Create: false, - PrevRevision: pjv.KV.ModRevision, - KV: &server.KeyValue{ - Key: key, - CreateRevision: pjv.KV.CreateRevision, - Value: value, - Lease: lease, - }, - } - if pjv.KV.CreateRevision == 0 { - updateValue.KV.CreateRevision = rev - } - - valueBytes, err := updateValue.Encode() - if err != nil { - return 0, nil, false, err - } - - seq, err := b.kv.Update(key, valueBytes, uint64(revision)) - if err != nil { - return 0, nil, false, err - } - - updateValue.KV.ModRevision = int64(seq) - - return int64(seq), updateValue.KV, true, err -} - -func (b *Backend) List(ctx context.Context, prefix, startKey string, limit, revision int64) (int64, []*server.KeyValue, error) { - // its assumed that when there is a start key that that key exists. - if strings.HasSuffix(prefix, "/") { - if prefix == startKey || strings.HasPrefix(prefix, startKey) { - startKey = "" - } - } - - storeRev, err := b.kv.BucketRevision() - if err != nil { - return 0, nil, err - } - - kvs := make([]*server.KeyValue, 0) - var count int64 - - // startkey provided so get max revision after the startKey matching the prefix - if startKey != "" { - histories := make(map[string][]nats.KeyValueEntry) - var minRev int64 - //var innerEntry nats.KeyValueEntry - if entries, err := b.kv.History(startKey, nats.Context(ctx)); err == nil { - histories[startKey] = entries - for i := len(entries) - 1; i >= 0; i-- { - // find the matching startKey - if int64(entries[i].Revision()) <= revision { - minRev = int64(entries[i].Revision()) - b.l.Debugf("Found min revision=%d for key=%s", minRev, startKey) - break - } - } - } else { - return 0, nil, err - } - - keys, err := b.kv.GetKeys(ctx, prefix, true) - if err != nil { - return 0, nil, err - } - - for _, key := range keys { - if key != startKey { - if history, err := b.kv.History(key, nats.Context(ctx)); err == nil { - histories[key] = history - } else { - // should not happen - b.l.Warnf("no history for %s", key) - } - } - } - var nextRevID = minRev - var nextRevision nats.KeyValueEntry - for k, v := range histories { - b.l.Debugf("Checking %s history", k) - for i := len(v) - 1; i >= 0; i-- { - if int64(v[i].Revision()) > nextRevID && int64(v[i].Revision()) <= revision { - nextRevID = int64(v[i].Revision()) - nextRevision = v[i] - b.l.Debugf("found next rev=%d", nextRevID) - break - } else if int64(v[i].Revision()) <= nextRevID { - break - } - } - } - if nextRevision != nil { - var entry natsData - err := entry.Decode(nextRevision) - if err != nil { - return 0, nil, err - } - kvs = append(kvs, entry.KV) - } - - return storeRev, kvs, nil - } - - current := true - - if revision != 0 { - storeRev = revision - current = false - } - - if current { - entries, err := b.kv.GetKeyValues(ctx, prefix, true) - if err != nil { - return 0, nil, err - } - - for _, e := range entries { - if count < limit || limit == 0 { - var entry natsData - err := entry.Decode(e) - if !b.isExpiredKey(&entry) && err == nil { - kvs = append(kvs, entry.KV) - count++ - } - } else { - break - } - } - - } else { - keys, err := b.kv.GetKeys(ctx, prefix, true) - if err != nil { - return 0, nil, err - } - if revision == 0 && len(keys) == 0 { - return storeRev, nil, nil - } - - for _, key := range keys { - if count < limit || limit == 0 { - if history, err := b.kv.History(key, nats.Context(ctx)); err == nil { - for i := len(history) - 1; i >= 0; i-- { - if int64(history[i].Revision()) <= revision { - var entry natsData - if err := entry.Decode(history[i]); err == nil { - kvs = append(kvs, entry.KV) - count++ - } else { - b.l.Warnf("Could not decode %s rev=> %d", key, history[i].Revision()) - } - break - } - } - } else { - // should not happen - b.l.Warnf("no history for %s", key) - } - } - } - - } - return storeRev, kvs, nil -} - -func (d *Backend) listAfter(ctx context.Context, prefix string, revision int64) (int64, []*server.Event, error) { - entries, err := d.kv.GetKeyValues(ctx, prefix, false) - - if err != nil { - return 0, nil, err - } - - storeRev, err := d.kv.BucketRevision() - if err != nil { - return 0, nil, err - } - if revision != 0 { - storeRev = revision - } - events := make([]*server.Event, 0) - for _, e := range entries { - var kv natsData - err := kv.Decode(e) - if err == nil && int64(e.Revision()) > revision { - event := server.Event{ - Delete: kv.Delete, - Create: kv.Create, - KV: kv.KV, - PrevKV: &server.KeyValue{}, - } - if _, prevKV, err := d.Get(ctx, kv.KV.Key, "", 1, kv.PrevRevision); err == nil && prevKV != nil { - event.PrevKV = prevKV - } - - events = append(events, &event) - } - } - return storeRev, events, nil -} - -func (b *Backend) Watch(ctx context.Context, prefix string, revision int64) <-chan []*server.Event { - // TODO: refactor to use a consumer to start after revision. - watcher, err := b.kv.Watch(prefix, nats.IgnoreDeletes(), nats.Context(ctx)) - if err != nil { - b.l.Errorf("failed to create watcher %s for revision %d", prefix, revision) - ch := make(chan []*server.Event, 0) - close(ch) - return ch - } - - if revision > 0 { - revision-- - } - - _, events, err := b.listAfter(ctx, prefix, revision) - if err != nil { - b.l.Errorf("failed to create watcher %s for revision %d", prefix, revision) - } - - result := make(chan []*server.Event, 100) - - go func() { - if len(events) > 0 { - result <- events - revision = events[len(events)-1].KV.ModRevision - } - - for { - select { - case i := <-watcher.Updates(): - if i != nil { - if int64(i.Revision()) > revision { - events := make([]*server.Event, 1) - var err error - value := &natsData{ - KV: &server.KeyValue{}, - PrevRevision: 0, - Create: false, - Delete: false, - } - prevValue := &natsData{ - KV: &server.KeyValue{}, - PrevRevision: 0, - Create: false, - Delete: false, - } - lastEntry := i - - err = value.Decode(lastEntry) - if err != nil { - b.l.Warnf("watch event: could not decode %s seq %d", i.Key(), i.Revision()) - } - if _, prevEntry, prevErr := b.get(ctx, i.Key(), value.PrevRevision, false); prevErr == nil { - if prevEntry != nil { - prevValue = prevEntry - } - } - if err == nil { - event := &server.Event{ - Create: value.Create, - Delete: value.Delete, - KV: value.KV, - PrevKV: prevValue.KV, - } - events[0] = event - result <- events - } else { - b.l.Warnf("error decoding %s event %v", i.Key(), err) - continue - } - } - } - case <-ctx.Done(): - b.l.Infof("watcher: %s context cancelled", prefix) - if err := watcher.Stop(); err != nil && err != nats.ErrBadSubscription { - b.l.Warnf("error stopping %s watcher: %v", prefix, err) - } - return - } - } - }() - - return result -} diff --git a/pkg/drivers/nats/new.go b/pkg/drivers/nats/new.go index 8de99a50..ffb2b529 100644 --- a/pkg/drivers/nats/new.go +++ b/pkg/drivers/nats/new.go @@ -7,11 +7,11 @@ import ( "os/signal" "time" - "github.com/k3s-io/kine/pkg/drivers/nats/kv" natsserver "github.com/k3s-io/kine/pkg/drivers/nats/server" "github.com/k3s-io/kine/pkg/server" "github.com/k3s-io/kine/pkg/tls" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" "github.com/sirupsen/logrus" ) @@ -115,12 +115,12 @@ func newBackend(ctx context.Context, connection string, tlsInfo tls.Config, lega logrus.Infof("using bucket: %s", config.bucket) - conn, err := nats.Connect(config.clientURL, nopts...) + nc, err := nats.Connect(config.clientURL, nopts...) if err != nil { return nil, fmt.Errorf("failed to connect to NATS server: %w", err) } - js, err := conn.JetStream() + js, err := nc.JetStream() if err != nil { return nil, fmt.Errorf("failed to get JetStream context: %w", err) } @@ -155,7 +155,12 @@ func newBackend(ctx context.Context, connection string, tlsInfo tls.Config, lega logrus.Infof("bucket initialized: %s", config.bucket) - ekv := kv.NewEncodedKV(bucket, &kv.EtcdKeyCodec{}, &kv.S2ValueCodec{}) + njs, err := jetstream.New(nc) + if err != nil { + return nil, fmt.Errorf("failed to create JetStream context: %w", err) + } + + ekv := NewKeyValue(ctx, bucket, njs) // Reference the global logger, since it appears log levels are // applied globally.