Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement zoneTracker for coordinating downscales across statefulsets using a file in object storage for the prepare_downscale admission webhook #96

Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion cmd/rollout-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/model"
"go.uber.org/atomic"
v1 "k8s.io/api/admission/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -54,6 +55,8 @@ type config struct {
serverSelfSignedCertExpiration model.Duration

updateWebhooksWithSelfSignedCABundle bool

useZoneTracker bool
}

func (cfg *config) register(fs *flag.FlagSet) {
Expand All @@ -77,6 +80,7 @@ func (cfg *config) register(fs *flag.FlagSet) {
cfg.serverSelfSignedCertExpiration = defaultServerSelfSignedCertExpiration

fs.BoolVar(&cfg.updateWebhooksWithSelfSignedCABundle, "webhooks.update-ca-bundle", true, "Update the CA bundle in the properly labeled webhook configurations with the self-signed certificate (-server-tls.self-signed-cert.enabled should be enabled).")
fs.BoolVar(&cfg.useZoneTracker, "use-zone-tracker", false, "Use zone tracker to prevent simultaneous downscales in different zones")
}

func (cfg config) validate() error {
Expand Down Expand Up @@ -185,10 +189,14 @@ func maybeStartTLSServer(cfg config, logger log.Logger, kubeClient *kubernetes.C
check(tlscert.PatchCABundleOnMutatingWebhooks(context.Background(), logger, kubeClient, cfg.kubeNamespace, cert.CA))
}

prepDownscaleAdmitFunc := func(ctx context.Context, logger log.Logger, ar v1.AdmissionReview, api *kubernetes.Clientset) *v1.AdmissionResponse {
return admission.PrepareDownscale(ctx, logger, ar, api, cfg.useZoneTracker)
}

tlsSrv, err := newTLSServer(cfg.serverTLSPort, logger, cert)
check(errors.Wrap(err, "failed to create tls server"))
tlsSrv.Handle(admission.NoDownscaleWebhookPath, admission.Serve(admission.NoDownscale, logger, kubeClient))
tlsSrv.Handle(admission.PrepareDownscaleWebhookPath, admission.Serve(admission.PrepareDownscale, logger, kubeClient))
tlsSrv.Handle(admission.PrepareDownscaleWebhookPath, admission.Serve(prepDownscaleAdmitFunc, logger, kubeClient))
check(errors.Wrap(tlsSrv.Start(), "failed to start tls server"))
}

Expand Down
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ require (
k8s.io/client-go v0.28.3
)

require github.com/efficientgo/core v1.0.0-rc.0.0.20221201130417-ba593f67d2a4 // indirect

require (
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
Expand Down Expand Up @@ -65,7 +67,7 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.16.5 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
Expand All @@ -85,7 +87,6 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc3 // indirect
github.com/opencontainers/runc v1.1.7 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect
Expand All @@ -98,6 +99,7 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.16.0 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
github.com/thanos-io/objstore v0.0.0-20231025225615-ff7faac741fb
github.com/theupdateframework/notary v0.7.0 // indirect
github.com/vbatts/tar-split v0.11.3 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
Expand Down
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDD
github.com/docker/libtrust v0.0.0-20160708172513-aabc10ec26b7 h1:UhxFibDNY/bfvqU5CAUmr9zpesgbU6SWc8/B4mflAE4=
github.com/docker/libtrust v0.0.0-20160708172513-aabc10ec26b7/go.mod h1:cyGadeNEkKy96OOhEzfZl+yxihPEzKnqJwvfuSUqbZE=
github.com/dvsekhvalnov/jose2go v0.0.0-20170216131308-f21a8cedbbae/go.mod h1:7BvyPhdbLxMXIYTFPLsyJRFMsKmOZnQmzh6Gb+uquuM=
github.com/efficientgo/core v1.0.0-rc.0.0.20221201130417-ba593f67d2a4 h1:rydBwnBoywKQMjWF0z8SriYtQ+uUcaFsxuijMjJr5PI=
github.com/efficientgo/core v1.0.0-rc.0.0.20221201130417-ba593f67d2a4/go.mod h1:kQa0V74HNYMfuJH6jiPiwNdpWXl4xd/K4tzlrcvYDQI=
github.com/emicklei/go-restful/v3 v3.10.1 h1:rc42Y5YTp7Am7CS630D7JmhRjq4UlEUuEKfrDac4bSQ=
github.com/emicklei/go-restful/v3 v3.10.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down Expand Up @@ -289,8 +291,8 @@ github.com/k3d-io/k3d/v5 v5.6.0 h1:XMRSQXyPErOcDCdOJVi6HUPjJZuWd/N6Dss7QeCDRhk=
github.com/k3d-io/k3d/v5 v5.6.0/go.mod h1:t/hRD2heCSkO9TJJdzFT72jXGCY8PjsCsClgjcmMoAA=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI=
github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
Expand Down Expand Up @@ -447,6 +449,8 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/subosito/gotenv v1.4.2 h1:X1TuBLAMDFbaTAChgCBLu3DU3UPyELpnF2jjJ2cz/S8=
github.com/subosito/gotenv v1.4.2/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
github.com/thanos-io/objstore v0.0.0-20231025225615-ff7faac741fb h1:fZuIuOSHsaUOJqvcWlIgt1lACXLF1073TmRuzoByQqw=
github.com/thanos-io/objstore v0.0.0-20231025225615-ff7faac741fb/go.mod h1:q369VBtseI5OQbK9IsGDfQCfcVu1fsur7ynUcojxnDA=
github.com/theupdateframework/notary v0.7.0 h1:QyagRZ7wlSpjT5N2qQAh/pN+DVqgekv4DzbAiAiEL3c=
github.com/theupdateframework/notary v0.7.0/go.mod h1:c9DRxcmhHmVLDay4/2fUYdISnHqbFDGRSlXPO0AhYWw=
github.com/urfave/cli v1.22.12/go.mod h1:sSBEIC79qR6OvcmsD4U3KABeOTxDqQtdDnaFuUN30b8=
Expand Down
106 changes: 76 additions & 30 deletions pkg/admission/prep_downscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,33 @@ import (

"github.com/grafana/rollout-operator/pkg/config"
"github.com/grafana/rollout-operator/pkg/util"
"github.com/thanos-io/objstore"
)

const (
PrepareDownscaleWebhookPath = "/admission/prepare-downscale"
)

func PrepareDownscale(ctx context.Context, logger log.Logger, ar v1.AdmissionReview, api *kubernetes.Clientset) *v1.AdmissionResponse {
func PrepareDownscale(ctx context.Context, logger log.Logger, ar v1.AdmissionReview, api *kubernetes.Clientset, useZoneTracker bool) *v1.AdmissionResponse {
client := &http.Client{
Timeout: 5 * time.Second,
}
return prepareDownscale(ctx, logger, ar, api, client)

var zt *zoneTracker
if useZoneTracker {
// TODO(jordanrushing): fix bucket creation semantics and wire-in config supporting multiple CSPs
bkt := objstore.NewInMemBucket()
zt = newZoneTracker(bkt, "testkey")
}

return prepareDownscale(ctx, logger, ar, api, client, zt)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like I mentioned in our 1:1, I think the easiest option for now is to turn the zoneTracker param into a string. If it's not nil, require that it be one of gcs, s3, whatever azure storage is called, etc. That's probably easier than trying to wire up a config file, configmap that populates that config file, etc. so that there could be a zoneTracker storage config in the file that specifies the storage backend.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up adding the necessary config with flags passed to the zoneTracker instantiation before passing zt through to prepareDownscale

}

type httpClient interface {
Post(url, contentType string, body io.Reader) (resp *http.Response, err error)
}

func prepareDownscale(ctx context.Context, logger log.Logger, ar v1.AdmissionReview, api kubernetes.Interface, client httpClient) *v1.AdmissionResponse {
func prepareDownscale(ctx context.Context, logger log.Logger, ar v1.AdmissionReview, api kubernetes.Interface, client httpClient, zt *zoneTracker) *v1.AdmissionResponse {
logger = log.With(logger, "name", ar.Request.Name, "resource", ar.Request.Resource.Resource, "namespace", ar.Request.Namespace)

oldObj, oldGVK, err := codecs.UniversalDeserializer().Decode(ar.Request.OldObject.Raw, nil, nil)
Expand Down Expand Up @@ -155,27 +164,54 @@ func prepareDownscale(ctx context.Context, logger log.Logger, ar v1.AdmissionRev
)
}

foundSts, err := findDownscalesDoneMinTimeAgo(stsList, ar.Request.Name)
if err != nil {
level.Warn(logger).Log("msg", "downscale not allowed due to error while parsing downscale annotations", "err", err)
return deny(
"downscale of %s/%s in %s from %d to %d replicas is not allowed because parsing downscale annotations failed.",
ar.Request.Resource.Resource, ar.Request.Name, ar.Request.Namespace, *oldReplicas, *newReplicas,
)
}
if foundSts != nil {
msg := fmt.Sprintf("downscale of %s/%s in %s from %d to %d replicas is not allowed because statefulset %v was downscaled at %v and is labelled to wait %s between zone downscales",
ar.Request.Resource.Resource, ar.Request.Name, ar.Request.Namespace, *oldReplicas, *newReplicas, foundSts.name, foundSts.lastDownscaleTime, foundSts.waitTime)
level.Warn(logger).Log("msg", msg, "err", err)
return deny(msg)
}
// If using zoneTracker instead of downscale annotations, check the zone file for recent downscaling.
// Otherwise, check the downscale annotations.
if zt != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could make 2 types (one with the zones file and one with the existing logic) here that implement the same interface. That way the code for them is not intermingled as here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion for a follow-up PR IMO.

if err := zt.loadZones(ctx, stsList); err != nil {
jhalterman marked this conversation as resolved.
Show resolved Hide resolved
level.Warn(logger).Log("msg", "downscale not allowed due to error while loading zones", "err", err)
return deny(
"downscale of %s/%s in %s from %d to %d replicas is not allowed because loading zones failed.",
ar.Request.Resource.Resource, ar.Request.Name, ar.Request.Namespace, *oldReplicas, *newReplicas,
)
}
// Check if the zone has been downscaled recently.
foundSts, err := zt.findDownscalesDoneMinTimeAgo(stsList, ar.Request.Name)
if err != nil {
level.Warn(logger).Log("msg", "downscale not allowed due to error while parsing downscale timestamps from the zone file", "err", err)
return deny(
"downscale of %s/%s in %s from %d to %d replicas is not allowed because parsing parsing downscale timestamps from the zone file failed.",
Comment on lines +181 to +183
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need a log line here and the returning of a warning? couldn't it be just returning the result, or an empty result and an error? then we only deal with logging once where ever it is we're calling prepareDownscale if the result is that we're denying the downscale

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's fair, generally I'm just trying to follow the pattern we've implemented elsewhere in this webhook. I don't think the extra logging has any real negative trade-offs as far as I can tell and this way depending on if you're looking at the operator directly or the admission responses you have visibility into the root cause of the failure.

ar.Request.Resource.Resource, ar.Request.Name, ar.Request.Namespace, *oldReplicas, *newReplicas,
)
}
if foundSts != nil {
msg := fmt.Sprintf("downscale of %s/%s in %s from %d to %d replicas is not allowed because statefulset %v was downscaled at %v and is labelled to wait %s between zone downscales",
ar.Request.Resource.Resource, ar.Request.Name, ar.Request.Namespace, *oldReplicas, *newReplicas, foundSts.name, foundSts.lastDownscaleTime, foundSts.waitTime)
level.Warn(logger).Log("msg", msg, "err", err)
return deny(msg)
}
} else {
foundSts, err := findDownscalesDoneMinTimeAgo(stsList, ar.Request.Name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need the old way? If the initial load of the zones adds the waiting time to the initial times won't that be enough to migrate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of the two conditionals is to maintain compatibility for users of the prepare_downscale webhook that don't have issues with the kubernetes annotations for this metadata and don't want to use the new zoneTracker.

if err != nil {
level.Warn(logger).Log("msg", "downscale not allowed due to error while parsing downscale annotations", "err", err)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

return deny(
"downscale of %s/%s in %s from %d to %d replicas is not allowed because parsing downscale annotations failed.",
ar.Request.Resource.Resource, ar.Request.Name, ar.Request.Namespace, *oldReplicas, *newReplicas,
)
}
if foundSts != nil {
msg := fmt.Sprintf("downscale of %s/%s in %s from %d to %d replicas is not allowed because statefulset %v was downscaled at %v and is labelled to wait %s between zone downscales",
ar.Request.Resource.Resource, ar.Request.Name, ar.Request.Namespace, *oldReplicas, *newReplicas, foundSts.name, foundSts.lastDownscaleTime, foundSts.waitTime)
level.Warn(logger).Log("msg", msg, "err", err)
return deny(msg)
}

foundSts = findStatefulSetWithNonUpdatedReplicas(ctx, api, ar.Request.Namespace, stsList)
if foundSts != nil {
msg := fmt.Sprintf("downscale of %s/%s in %s from %d to %d replicas is not allowed because statefulset %v has %d non-updated replicas and %d non-ready replicas",
ar.Request.Resource.Resource, ar.Request.Name, ar.Request.Namespace, *oldReplicas, *newReplicas, foundSts.name, foundSts.nonUpdatedReplicas, foundSts.nonReadyReplicas)
level.Warn(logger).Log("msg", msg)
return deny(msg)
foundSts = findStatefulSetWithNonUpdatedReplicas(ctx, api, ar.Request.Namespace, stsList)
if foundSts != nil {
msg := fmt.Sprintf("downscale of %s/%s in %s from %d to %d replicas is not allowed because statefulset %v has %d non-updated replicas and %d non-ready replicas",
ar.Request.Resource.Resource, ar.Request.Name, ar.Request.Namespace, *oldReplicas, *newReplicas, foundSts.name, foundSts.nonUpdatedReplicas, foundSts.nonReadyReplicas)
level.Warn(logger).Log("msg", msg)
return deny(msg)
}
}
}

Expand Down Expand Up @@ -241,13 +277,23 @@ func prepareDownscale(ctx context.Context, logger log.Logger, ar v1.AdmissionRev
)
}

err = addDownscaledAnnotationToStatefulSet(ctx, api, ar.Request.Namespace, ar.Request.Name)
if err != nil {
level.Error(logger).Log("msg", "downscale not allowed due to error while adding annotation", "err", err)
return deny(
"downscale of %s/%s in %s from %d to %d replicas is not allowed because adding an annotation to the statefulset failed.",
ar.Request.Resource.Resource, ar.Request.Name, ar.Request.Namespace, *oldReplicas, *newReplicas,
)
if zt != nil {
if err := zt.setDownscaled(ctx, ar.Request.Name); err != nil {
level.Error(logger).Log("msg", "downscale not allowed due to error while setting downscale timestamp in the zone file", "err", err)
return deny(
"downscale of %s/%s in %s from %d to %d replicas is not allowed because setting downscale timestamp in the zone file failed.",
ar.Request.Resource.Resource, ar.Request.Name, ar.Request.Namespace, *oldReplicas, *newReplicas,
)
}
} else {
err := addDownscaledAnnotationToStatefulSet(ctx, api, ar.Request.Namespace, ar.Request.Name)
if err != nil {
level.Error(logger).Log("msg", "downscale not allowed due to error while adding annotation", "err", err)
return deny(
"downscale of %s/%s in %s from %d to %d replicas is not allowed because adding an annotation to the statefulset failed.",
ar.Request.Resource.Resource, ar.Request.Name, ar.Request.Namespace, *oldReplicas, *newReplicas,
)
}
}

// Down-scale operation is allowed because all pods successfully prepared for shutdown
Expand Down
Loading
Loading