Skip to content

Commit

Permalink
Progress...
Browse files Browse the repository at this point in the history
Signed-off-by: Byron Ruth <[email protected]>
  • Loading branch information
bruth committed Aug 13, 2023
1 parent 076f7ae commit 0659a48
Show file tree
Hide file tree
Showing 8 changed files with 898 additions and 798 deletions.
134 changes: 134 additions & 0 deletions pkg/drivers/nats/backend_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package nats

import (
"context"
"errors"
"io/ioutil"
"testing"
"time"

"github.com/k3s-io/kine/pkg/drivers/nats/kv"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats-server/v2/test"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
)

func noErr(t *testing.T, err error) {
t.Helper()
if err != nil {
t.Fatal(err)
}
}

func expErr(t *testing.T, err error) {
t.Helper()
if err == nil {
t.Fatal("expected error")
}
}

func expEqualErr(t *testing.T, got, want error) {
t.Helper()
if !errors.Is(want, got) {
t.Fatalf("expected %v, got %v", want, got)
}
}

func expEqual[T comparable](t *testing.T, got, want T) {
t.Helper()
if got != want {
t.Fatalf("expected %v, got %v", want, got)
}
}

func setupBackend(t *testing.T) (*server.Server, *nats.Conn, *Backend) {
ns := test.RunServer(&server.Options{
Port: -1,
JetStream: true,
StoreDir: t.TempDir(),
})

nc, err := nats.Connect(ns.ClientURL())
noErr(t, err)

js, err := nc.JetStream()
noErr(t, err)

bkt, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "kine",
History: 10,
})
noErr(t, err)

ekv := kv.NewEncodedKV(bkt, &kv.EtcdKeyCodec{}, &kv.S2ValueCodec{})

l := logrus.New()
l.SetOutput(ioutil.Discard)

b := Backend{
l: l,
kv: ekv,
js: js,
}

return ns, nc, &b
}

func TestBackend(t *testing.T) {
ns, nc, b := setupBackend(t)
defer ns.Shutdown()
defer nc.Drain()

ctx := context.Background()

// Create a key with a lease of 1 second.
rev, err := b.Create(ctx, "/foo", []byte("bar"), 1)
noErr(t, err)
expEqual(t, 1, rev)

rev, ent, err := b.Get(ctx, "/foo", "", 0, 0)
noErr(t, err)
expEqual(t, 1, rev)
expEqual(t, "bar", string(ent.Value))
expEqual(t, "/foo", ent.Key)
expEqual(t, 1, ent.Lease)
expEqual(t, 1, ent.ModRevision)
expEqual(t, 0, ent.CreateRevision)

// Count the items.
rev, count, err := b.Count(ctx, "/foo")
noErr(t, err)
expEqual(t, 1, rev)
expEqual(t, int64(1), count)

// List the keys.
rev, ents, err := b.List(ctx, "/foo", "", 0, 0)
noErr(t, err)
expEqual(t, 1, rev)
expEqual(t, 1, len(ents))

// Expire the lease.
time.Sleep(time.Second)

// Try to get again.
rev, ent, err = b.Get(ctx, "/foo", "", 0, 0)
expEqualErr(t, err, nats.ErrKeyNotFound)

// Should be no items.
rev, count, err = b.Count(ctx, "/foo")
noErr(t, err)
expEqual(t, 2, rev)
expEqual(t, 0, count)

// Re-create the key without a lease.
rev, err = b.Create(ctx, "/foo", []byte("bar"), 0)
noErr(t, err)
expEqual(t, 3, rev)

// Get the key again. Expect revision to be 3 since the
// the key was deleted => 2 and create => 3.
rev, ent, err = b.Get(ctx, "/foo", "", 0, 0)
noErr(t, err)
expEqual(t, 3, rev)
}
181 changes: 181 additions & 0 deletions pkg/drivers/nats/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package nats

import (
"fmt"
"net/url"
"strconv"
"strings"
"time"

natsserver "github.com/k3s-io/kine/pkg/drivers/nats/server"
"github.com/k3s-io/kine/pkg/tls"
"github.com/nats-io/jsm.go/natscontext"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
)

type Config struct {
// Client URL which could be a list of comma separated URLs.
clientURL string
// Client connection options.
clientOptions []nats.Option
// Number of revisions to keep in history. Defaults to 10.
revHistory uint8
// Name of the bucket. Defaults to "kine".
bucket string
// Number of replicas for the bucket. Defaults to 1
replicas int
// Indicates the duration of a method before it is considered slow. Defaults to 500ms.
slowThreshold time.Duration
// If true, an embedded server will not be used.
noEmbed bool
// If true, use a socket for the embedded server.
dontListen bool
// Path to a server configuration file when embedded.
serverConfig string
// If true, the embedded server will log to stdout.
stdoutLogging bool
// The explicit host to listen on when embedded.
host string
// The explicit port to listen on when embedded.
port int
// Data directory.
dataDir string
}

// parseConnection returns nats connection url, bucketName and []nats.Option, error
func parseConnection(dsn string, tlsInfo tls.Config) (*Config, error) {
config := &Config{
slowThreshold: defaultSlowMethod,
revHistory: defaultRevHistory,
bucket: defaultBucket,
replicas: defaultReplicas,
}

// Parse the first URL in the connection string which contains the
// query parameters.
connections := strings.Split(dsn, ",")
u, err := url.Parse(connections[0])
if err != nil {
return nil, err
}

// Extract the host and port if embedded server is used.
config.host = u.Hostname()
if u.Port() != "" {
config.port, _ = strconv.Atoi(u.Port())
}

// Extract the query parameters to build configuration.
queryMap, err := url.ParseQuery(u.RawQuery)
if err != nil {
return nil, err
}

if v := queryMap.Get("bucket"); v != "" {
config.bucket = v
}

if v := queryMap.Get("replicas"); v != "" {
if r, err := strconv.ParseUint(v, 10, 8); err == nil {
if r >= 1 && r <= 5 {
config.replicas = int(r)
} else {
return nil, fmt.Errorf("invalid replicas, must be >= 1 and <= 5")
}
}
}

if d := queryMap.Get("slowMethod"); d != "" {
if dur, err := time.ParseDuration(d); err == nil {
config.slowThreshold = dur
} else {
return nil, fmt.Errorf("invalid slowMethod duration: %w", err)
}
}

if r := queryMap.Get("revHistory"); r != "" {
if revs, err := strconv.ParseUint(r, 10, 8); err == nil {
if revs >= 2 && revs <= 64 {
config.revHistory = uint8(revs)
} else {
return nil, fmt.Errorf("invalid revHistory, must be >= 2 and <= 64")
}
}
}

if tlsInfo.KeyFile != "" && tlsInfo.CertFile != "" {
config.clientOptions = append(config.clientOptions, nats.ClientCert(tlsInfo.CertFile, tlsInfo.KeyFile))
}

if tlsInfo.CAFile != "" {
config.clientOptions = append(config.clientOptions, nats.RootCAs(tlsInfo.CAFile))
}

if f := queryMap.Get("contextFile"); f != "" {
if u.Host != "" {
return config, fmt.Errorf("when using context endpoint no host should be provided")
}

logrus.Debugf("loading nats context file: %s", f)

natsContext, err := natscontext.NewFromFile(f)
if err != nil {
return nil, err
}

connections = strings.Split(natsContext.ServerURL(), ",")

// command line options provided to kine will override the file
// https://github.com/nats-io/jsm.go/blob/v0.0.29/natscontext/context.go#L257
// allows for user, creds, nke, token, certifcate, ca, inboxprefix from the context.json
natsClientOpts, err := natsContext.NATSOptions(config.clientOptions...)
if err != nil {
return nil, err
}
config.clientOptions = natsClientOpts
}

connBuilder := strings.Builder{}
for idx, c := range connections {
if idx > 0 {
connBuilder.WriteString(",")
}

u, err := url.Parse(c)
if err != nil {
return nil, err
}

if u.Scheme != "nats" {
return nil, fmt.Errorf("invalid connection string=%s", c)
}

connBuilder.WriteString("nats://")

if u.User != nil && idx == 0 {
userInfo := strings.Split(u.User.String(), ":")
if len(userInfo) > 1 {
config.clientOptions = append(config.clientOptions, nats.UserInfo(userInfo[0], userInfo[1]))
} else {
config.clientOptions = append(config.clientOptions, nats.Token(userInfo[0]))
}
}
connBuilder.WriteString(u.Host)
}

config.clientURL = connBuilder.String()

// Config options only relevant if built with embedded NATS.
if natsserver.Embedded {
config.noEmbed = queryMap.Has("noEmbed")
config.serverConfig = queryMap.Get("serverConfig")
config.stdoutLogging = queryMap.Has("stdoutLogging")
config.dontListen = queryMap.Has("dontListen")
config.dataDir = queryMap.Get("dataDir")
}

logrus.Debugf("using config %#v", config)

return config, nil
}
Loading

0 comments on commit 0659a48

Please sign in to comment.