From 4bd2b10b54d1bb103f847fad533082bb8681ad7c Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Tue, 21 Feb 2023 15:28:55 +0100 Subject: [PATCH] Add retry (#113) Signed-off-by: Jarema --- controllers/jetstream/consumer.go | 43 ++++++++++++++++++------------- controllers/jetstream/stream.go | 37 +++++++++++++++++--------- 2 files changed, 50 insertions(+), 30 deletions(-) diff --git a/controllers/jetstream/consumer.go b/controllers/jetstream/consumer.go index 1e8cc847..a5fb78ac 100644 --- a/controllers/jetstream/consumer.go +++ b/controllers/jetstream/consumer.go @@ -19,6 +19,7 @@ import ( k8sapi "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" k8smeta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/retry" ) func (c *Controller) runConsumerQueue() { @@ -434,15 +435,18 @@ func setConsumerOK(ctx context.Context, s *apis.Consumer, i typed.ConsumerInterf Message: "Consumer successfully created", }) - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - - res, err := i.UpdateStatus(ctx, sc, k8smeta.UpdateOptions{}) - if err != nil { - return nil, fmt.Errorf("failed to set consumer %q status: %w", s.Spec.DurableName, err) - } - - return res, nil + var res *apis.Consumer + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + var err error + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + res, err = i.UpdateStatus(ctx, sc, k8smeta.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to set consumer %q status: %w", s.Spec.DurableName, err) + } + return nil + }) + return res, err } func setConsumerErrored(ctx context.Context, s *apis.Consumer, sif typed.ConsumerInterface, err error) (*apis.Consumer, error) { @@ -459,13 +463,16 @@ func setConsumerErrored(ctx context.Context, s *apis.Consumer, sif typed.Consume Message: err.Error(), }) - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - - res, err := sif.UpdateStatus(ctx, sc, k8smeta.UpdateOptions{}) - if err != nil { - return nil, fmt.Errorf("failed to set consumer errored status: %w", err) - } - - return res, nil + var res *apis.Consumer + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + var err error + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + res, err = sif.UpdateStatus(ctx, sc, k8smeta.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to set consumer errored status: %w", err) + } + return nil + }) + return res, err } diff --git a/controllers/jetstream/stream.go b/controllers/jetstream/stream.go index 52f93fbf..d25ca8ff 100644 --- a/controllers/jetstream/stream.go +++ b/controllers/jetstream/stream.go @@ -31,6 +31,7 @@ import ( k8sapi "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" k8smeta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/retry" ) func (c *Controller) runStreamQueue() { @@ -517,12 +518,18 @@ func setStreamErrored(ctx context.Context, s *apis.Stream, sif typed.StreamInter ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - res, err := sif.UpdateStatus(ctx, sc, k8smeta.UpdateOptions{}) - if err != nil { - return nil, fmt.Errorf("failed to set stream errored status: %w", err) - } - - return res, nil + var res *apis.Stream + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + var err error + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + res, err = sif.UpdateStatus(ctx, sc, k8smeta.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to set stream errored status: %w", err) + } + return nil + }) + return res, err } func setStreamOK(ctx context.Context, s *apis.Stream, i typed.StreamInterface) (*apis.Stream, error) { @@ -540,12 +547,18 @@ func setStreamOK(ctx context.Context, s *apis.Stream, i typed.StreamInterface) ( ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - res, err := i.UpdateStatus(ctx, sc, k8smeta.UpdateOptions{}) - if err != nil { - return nil, fmt.Errorf("failed to set stream %q status: %w", s.Spec.Name, err) - } - - return res, nil + var res *apis.Stream + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + var err error + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + res, err = i.UpdateStatus(ctx, sc, k8smeta.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to set stream %q status: %w", s.Spec.Name, err) + } + return nil + }) + return res, err } func getMaxAge(v string) (time.Duration, error) {