Skip to content

Commit

Permalink
Improve logic to get or create the bucket
Browse files Browse the repository at this point in the history
Fix missing reference.

Signed-off-by: Byron Ruth <[email protected]>
  • Loading branch information
bruth committed Nov 3, 2023
1 parent be2a008 commit 5922299
Showing 1 changed file with 66 additions and 38 deletions.
104 changes: 66 additions & 38 deletions pkg/drivers/nats/new.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nats

import (
"context"
"errors"
"fmt"
"os"
"os/signal"
Expand Down Expand Up @@ -90,9 +91,9 @@ func newBackend(ctx context.Context, connection string, tlsInfo tls.Config, lega
// Start the server.
go ns.Start()
logrus.Infof("started embedded NATS server")
time.Sleep(100 * time.Millisecond)

// Wait for the server to be ready.
// TODO: limit the number of retries?
var retries int
for {
if ns.Ready() {
Expand Down Expand Up @@ -143,47 +144,14 @@ func newBackend(ctx context.Context, connection string, tlsInfo tls.Config, lega
return nil, fmt.Errorf("failed to get JetStream context: %w", err)
}

// Create the bucket if it doesn't exist. Note, this is a no-op if the bucket
// already exists with the same configuration.
var bucket jetstream.KeyValue
for {
bucket, err = js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: config.bucket,
Description: "Holds kine key/values",
History: config.revHistory,
Replicas: config.replicas,
})
if err == nil || jetstream.ErrStreamNameAlreadyInUse.APIError().Is(err) {
break
}
if err == context.DeadlineExceeded {
logrus.Warnf("timed out waiting for bucket %s to be created. retrying", config.bucket)
continue
}
// Check for temporary JetStream errors when the cluster is unhealthy and retry.
if jsClusterNotAvailErr.Is(err) || jsNoSuitablePeersErr.Is(err) {
logrus.Warnf(err.Error())
time.Sleep(time.Second)
continue
}
if err != nil {
cancel()
return nil, fmt.Errorf("failed to initialize KV bucket: %w", err)
}
}

str, err := js.Stream(ctx, fmt.Sprintf("KV_%s", config.bucket))
bucket, err := getOrCreateBucket(ctx, js, config)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to get stream info: %w", err)
return nil, fmt.Errorf("failed to get or create bucket: %w", err)
}
scfg := str.CachedInfo().Config
scfg.AllowDirect = false

_, err = js.UpdateStream(ctx, scfg)
if err != nil {
if err := disableDirectGets(ctx, js, config); err != nil {
cancel()
return nil, fmt.Errorf("failed to update stream config: %w", err)
return nil, fmt.Errorf("failed to disable direct gets: %w", err)
}

logrus.Infof("bucket initialized: %s", config.bucket)
Expand All @@ -195,6 +163,7 @@ func newBackend(ctx context.Context, connection string, tlsInfo tls.Config, lega
l := logrus.StandardLogger()

backend := Backend{
nc: nc,
l: l,
kv: ekv,
js: js,
Expand All @@ -219,3 +188,62 @@ func newBackend(ctx context.Context, connection string, tlsInfo tls.Config, lega
threshold: config.slowThreshold,
}, nil
}

func getOrCreateBucket(ctx context.Context, js jetstream.JetStream, config *Config) (jetstream.KeyValue, error) {
bucket, err := js.KeyValue(ctx, config.bucket)
if err == nil {
return bucket, nil
}

// If it does not exist, attempt to create it.
for {
bucket, err = js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: config.bucket,
Description: "Holds kine key/values",
History: config.revHistory,
Replicas: config.replicas,
})
if err == nil {
return bucket, nil
}

// Check for timeout errors and retry.
if errors.Is(err, context.DeadlineExceeded) {
logrus.Warnf("timed out waiting for bucket %s to be created. retrying", config.bucket)
continue
}

// Concurrent creation can cause this error.
if jetstream.ErrStreamNameAlreadyInUse.APIError().Is(err) {
return js.KeyValue(ctx, config.bucket)
}

// Check for temporary JetStream errors when the cluster is unhealthy and retry.
if jsClusterNotAvailErr.Is(err) || jsNoSuitablePeersErr.Is(err) {
logrus.Warnf(err.Error())
time.Sleep(time.Second)
continue
}

// Some unexpected error.
if err != nil {
return nil, fmt.Errorf("failed to initialize KV bucket: %w", err)
}
}
}

func disableDirectGets(ctx context.Context, js jetstream.JetStream, config *Config) error {
str, err := js.Stream(ctx, fmt.Sprintf("KV_%s", config.bucket))
if err != nil {
return fmt.Errorf("failed to get stream info: %w", err)
}
scfg := str.CachedInfo().Config
scfg.AllowDirect = false

_, err = js.UpdateStream(ctx, scfg)
if err != nil {
return fmt.Errorf("failed to update stream config: %w", err)
}

return nil
}

0 comments on commit 5922299

Please sign in to comment.