From f1225c0fb9a9d051da565b7562eecf8b9f568c23 Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Fri, 3 Nov 2023 15:02:59 -0400 Subject: [PATCH] Retry on failed updates Signed-off-by: Byron Ruth --- pkg/drivers/nats/new.go | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/pkg/drivers/nats/new.go b/pkg/drivers/nats/new.go index 3962da41..977d9956 100644 --- a/pkg/drivers/nats/new.go +++ b/pkg/drivers/nats/new.go @@ -233,17 +233,26 @@ func getOrCreateBucket(ctx context.Context, js jetstream.JetStream, config *Conf } func disableDirectGets(ctx context.Context, js jetstream.JetStream, config *Config) error { - str, err := js.Stream(ctx, fmt.Sprintf("KV_%s", config.bucket)) - if err != nil { - return fmt.Errorf("failed to get stream info: %w", err) - } - scfg := str.CachedInfo().Config - scfg.AllowDirect = false + for { + str, err := js.Stream(ctx, fmt.Sprintf("KV_%s", config.bucket)) + if errors.Is(err, context.DeadlineExceeded) { + continue + } + if err != nil { + return fmt.Errorf("failed to get stream info: %w", err) + } - _, err = js.UpdateStream(ctx, scfg) - if err != nil { - return fmt.Errorf("failed to update stream config: %w", err) - } + scfg := str.CachedInfo().Config + scfg.AllowDirect = false + + _, err = js.UpdateStream(ctx, scfg) + if errors.Is(err, context.DeadlineExceeded) { + continue + } + if err != nil { + return fmt.Errorf("failed to update stream config: %w", err) + } - return nil + return nil + } }