From decf681e3a9f62c862d57c4d69be30e204200b18 Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Fri, 4 Oct 2024 15:06:25 +0300 Subject: [PATCH 1/5] Fix deployment status propagation when scaling from zero --- pkg/apis/autoscaling/v1alpha1/pa_lifecycle.go | 17 +++++++ .../autoscaling/v1alpha1/pa_lifecycle_test.go | 2 + pkg/apis/autoscaling/v1alpha1/pa_types.go | 5 ++ pkg/apis/serving/v1/revision_lifecycle.go | 7 +++ pkg/reconciler/autoscaling/kpa/kpa.go | 9 +++- pkg/reconciler/autoscaling/kpa/kpa_test.go | 47 +++++++++--------- pkg/reconciler/autoscaling/kpa/scaler.go | 49 ++++++++++++++++++- pkg/reconciler/autoscaling/kpa/scaler_test.go | 4 +- pkg/reconciler/revision/table_test.go | 10 ++-- pkg/resources/pods.go | 12 +++++ pkg/testing/functional.go | 10 ++++ test/e2e/autoscale.go | 2 +- test/e2e/e2e.go | 8 ++- test/e2e/resource_quota_error_test.go | 31 ++++++++++++ test/ha/activator_test.go | 2 +- test/ha/autoscaler_test.go | 2 +- test/upgrade/preupgrade.go | 4 +- 17 files changed, 181 insertions(+), 40 deletions(-) diff --git a/pkg/apis/autoscaling/v1alpha1/pa_lifecycle.go b/pkg/apis/autoscaling/v1alpha1/pa_lifecycle.go index 9cff0ab9f547..b4c35c188b7f 100644 --- a/pkg/apis/autoscaling/v1alpha1/pa_lifecycle.go +++ b/pkg/apis/autoscaling/v1alpha1/pa_lifecycle.go @@ -35,6 +35,7 @@ var podCondSet = apis.NewLivingConditionSet( PodAutoscalerConditionActive, PodAutoscalerConditionScaleTargetInitialized, PodAutoscalerConditionSKSReady, + PodAutoscalerConditionScaleTargetScaled, ) // GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface. @@ -215,6 +216,12 @@ func (pas *PodAutoscalerStatus) MarkScaleTargetInitialized() { podCondSet.Manage(pas).MarkTrue(PodAutoscalerConditionScaleTargetInitialized) } +// ScaleTargetNotScaledAfterFailure returns true if the PodAutoscaler's scale target has been +// scaled successfully. +func (pas *PodAutoscalerStatus) ScaleTargetNotScaledAfterFailure() bool { + return pas.GetCondition(PodAutoscalerConditionScaleTargetScaled).IsFalse() +} + // MarkSKSReady marks the PA condition denoting that SKS is ready. func (pas *PodAutoscalerStatus) MarkSKSReady() { podCondSet.Manage(pas).MarkTrue(PodAutoscalerConditionSKSReady) @@ -250,6 +257,16 @@ func (pas *PodAutoscalerStatus) MarkInactive(reason, message string) { podCondSet.Manage(pas).MarkFalse(PodAutoscalerConditionActive, reason, message) } +// MarkWithNoScaleFailures marks the PA as free of scale failures. +func (pas *PodAutoscalerStatus) MarkWithNoScaleFailures() { + podCondSet.Manage(pas).MarkTrue(PodAutoscalerConditionScaleTargetScaled) +} + +// MarkWithScaleFailures marks the PA as failed due to scale failures. +func (pas *PodAutoscalerStatus) MarkWithScaleFailures(reason, message string) { + podCondSet.Manage(pas).MarkFalse(PodAutoscalerConditionScaleTargetScaled, reason, message) +} + // MarkResourceNotOwned changes the "Active" condition to false to reflect that the // resource of the given kind and name has already been created, and we do not own it. func (pas *PodAutoscalerStatus) MarkResourceNotOwned(kind, name string) { diff --git a/pkg/apis/autoscaling/v1alpha1/pa_lifecycle_test.go b/pkg/apis/autoscaling/v1alpha1/pa_lifecycle_test.go index e2e23399c2da..0c72455fe2b0 100644 --- a/pkg/apis/autoscaling/v1alpha1/pa_lifecycle_test.go +++ b/pkg/apis/autoscaling/v1alpha1/pa_lifecycle_test.go @@ -1056,6 +1056,7 @@ func TestTypicalFlow(t *testing.T) { // When we see traffic, mark ourselves active. r.MarkActive() r.MarkScaleTargetInitialized() + r.MarkWithNoScaleFailures() apistest.CheckConditionSucceeded(r, PodAutoscalerConditionScaleTargetInitialized, t) apistest.CheckConditionSucceeded(r, PodAutoscalerConditionSKSReady, t) apistest.CheckConditionSucceeded(r, PodAutoscalerConditionActive, t) @@ -1064,6 +1065,7 @@ func TestTypicalFlow(t *testing.T) { // Check idempotency. r.MarkActive() r.MarkScaleTargetInitialized() + r.MarkWithNoScaleFailures() r.MarkSKSReady() apistest.CheckConditionSucceeded(r, PodAutoscalerConditionScaleTargetInitialized, t) apistest.CheckConditionSucceeded(r, PodAutoscalerConditionActive, t) diff --git a/pkg/apis/autoscaling/v1alpha1/pa_types.go b/pkg/apis/autoscaling/v1alpha1/pa_types.go index 0cf64b692a9e..0c6258083216 100644 --- a/pkg/apis/autoscaling/v1alpha1/pa_types.go +++ b/pkg/apis/autoscaling/v1alpha1/pa_types.go @@ -108,6 +108,11 @@ const ( PodAutoscalerConditionActive apis.ConditionType = "Active" // PodAutoscalerConditionSKSReady is set when SKS is ready. PodAutoscalerConditionSKSReady = "SKSReady" + // PodAutoscalerConditionScaleTargetScaled is set when scaling the revision is successful. + // There are cases where there are pod failures during scaling eg. from zero but + // K8s does not properly propagate the failure to the deployment. In those cases + // progressdeadline is not enough. + PodAutoscalerConditionScaleTargetScaled apis.ConditionType = "ScaleTargetScaled" ) // PodAutoscalerStatus communicates the observed state of the PodAutoscaler (from the controller). diff --git a/pkg/apis/serving/v1/revision_lifecycle.go b/pkg/apis/serving/v1/revision_lifecycle.go index f0e54da9d872..a5fb242e5f5b 100644 --- a/pkg/apis/serving/v1/revision_lifecycle.go +++ b/pkg/apis/serving/v1/revision_lifecycle.go @@ -234,6 +234,13 @@ func (rs *RevisionStatus) PropagateAutoscalerStatus(ps *autoscalingv1alpha1.PodA case corev1.ConditionTrue: rs.MarkActiveTrue() } + + // Mark resource unavailable if we scaled back to zero, but we never achieved the required scale + // and deployment status was not updated properly byt K8s. For example due to a temporary image pull error. + if ps.IsScaleTargetInitialized() && !resUnavailable && ps.ScaleTargetNotScaledAfterFailure() { + condScaled := ps.GetCondition(autoscalingv1alpha1.PodAutoscalerConditionScaleTargetScaled) + rs.MarkResourcesAvailableFalse(condScaled.Reason, condScaled.Message) + } } // ResourceNotOwnedMessage constructs the status message if ownership on the diff --git a/pkg/reconciler/autoscaling/kpa/kpa.go b/pkg/reconciler/autoscaling/kpa/kpa.go index 3849b4427729..e295324a6a92 100644 --- a/pkg/reconciler/autoscaling/kpa/kpa.go +++ b/pkg/reconciler/autoscaling/kpa/kpa.go @@ -114,10 +114,15 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pa *autoscalingv1alpha1. if err := c.ReconcileMetric(ctx, pa, resolveScrapeTarget(ctx, pa)); err != nil { return fmt.Errorf("error reconciling Metric: %w", err) } + podCounter := resourceutil.NewPodAccessor(c.podsLister, pa.Namespace, pa.Labels[serving.RevisionLabelKey]) + + if err != nil { + return fmt.Errorf("error getting a pod for the revision: %w", err) + } // Get the appropriate current scale from the metric, and right size // the scaleTargetRef based on it. - want, err := c.scaler.scale(ctx, pa, sks, decider.Status.DesiredScale) + want, err := c.scaler.scale(ctx, pa, sks, decider.Status.DesiredScale, c.Client, &podCounter) if err != nil { return fmt.Errorf("error scaling target: %w", err) } @@ -145,7 +150,6 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pa *autoscalingv1alpha1. } // Compare the desired and observed resources to determine our situation. - podCounter := resourceutil.NewPodAccessor(c.podsLister, pa.Namespace, pa.Labels[serving.RevisionLabelKey]) ready, notReady, pending, terminating, err := podCounter.PodCountsByState() if err != nil { return fmt.Errorf("error getting pod counts: %w", err) @@ -272,6 +276,7 @@ func computeActiveCondition(ctx context.Context, pa *autoscalingv1alpha1.PodAuto minReady := activeThreshold(ctx, pa) if pc.ready >= minReady && pa.Status.ServiceName != "" { pa.Status.MarkScaleTargetInitialized() + pa.Status.MarkWithNoScaleFailures() } switch { diff --git a/pkg/reconciler/autoscaling/kpa/kpa_test.go b/pkg/reconciler/autoscaling/kpa/kpa_test.go index da2ee9ad4807..302482aab8fd 100644 --- a/pkg/reconciler/autoscaling/kpa/kpa_test.go +++ b/pkg/reconciler/autoscaling/kpa/kpa_test.go @@ -41,6 +41,7 @@ import ( fakepainformer "knative.dev/serving/pkg/client/injection/informers/autoscaling/v1alpha1/podautoscaler/fake" fakerevisioninformer "knative.dev/serving/pkg/client/injection/informers/serving/v1/revision/fake" "knative.dev/serving/pkg/metrics" + pkgtesting "knative.dev/serving/pkg/testing" networkingclient "knative.dev/networking/pkg/client/injection/client" filteredinformerfactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered" @@ -294,7 +295,7 @@ func TestReconcile(t *testing.T) { } activeKPAMinScale := func(g, w int32) *autoscalingv1alpha1.PodAutoscaler { return kpa( - testNamespace, testRevision, WithPASKSReady, WithTraffic, markScaleTargetInitialized, + testNamespace, testRevision, WithPASKSReady, WithTraffic, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, withScales(g, w), WithReachabilityReachable, withMinScale(defaultScale), WithPAStatusService(testRevision), WithPAMetricsService(privateSvc), WithObservedGeneration(1), @@ -332,7 +333,7 @@ func TestReconcile(t *testing.T) { Name: "steady state", Key: key, Objects: []runtime.Object{ - kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, + kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, pkgtesting.WithScaleTargetNoScaleFailures, markScaleTargetInitialized, WithPAMetricsService(privateSvc), withScales(1, defaultScale), WithPAStatusService(testRevision), WithObservedGeneration(1)), defaultSKS, @@ -360,12 +361,12 @@ func TestReconcile(t *testing.T) { }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: kpa(testNamespace, testRevision, WithTraffic, - markScaleTargetInitialized, WithPASKSReady, + markScaleTargetInitialized, WithPASKSReady, pkgtesting.WithScaleTargetNoScaleFailures, WithPAMetricsService(privateSvc), withScales(1, defaultScale), WithPAStatusService(testRevision), WithObservedGeneration(1)), }, { Object: kpa(testNamespace, testRevision, WithTraffic, - markScaleTargetInitialized, WithPASKSReady, + markScaleTargetInitialized, WithPASKSReady, pkgtesting.WithScaleTargetNoScaleFailures, WithPAMetricsService(privateSvc), withScales(1, defaultScale), WithPAStatusService(testRevision), WithObservedGeneration(1)), }}, @@ -414,12 +415,12 @@ func TestReconcile(t *testing.T) { Name: "create metric", Key: key, Objects: []runtime.Object{ - kpa(testNamespace, testRevision, WithTraffic, markScaleTargetInitialized, + kpa(testNamespace, testRevision, WithTraffic, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, withScales(1, defaultScale), WithPAStatusService(testRevision)), defaultSKS, defaultDeployment, defaultReady, }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: kpa(testNamespace, testRevision, WithTraffic, markScaleTargetInitialized, + Object: kpa(testNamespace, testRevision, WithTraffic, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, withScales(1, defaultScale), WithPASKSReady, WithPAStatusService(testRevision), WithPAMetricsService(privateSvc), WithObservedGeneration(1)), }}, @@ -430,7 +431,7 @@ func TestReconcile(t *testing.T) { Name: "scale up deployment", Key: key, Objects: []runtime.Object{ - kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, markScaleTargetInitialized, + kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, WithPAMetricsService(privateSvc), withScales(1, defaultScale), WithPAStatusService(testRevision), WithObservedGeneration(1)), defaultSKS, @@ -503,7 +504,7 @@ func TestReconcile(t *testing.T) { preciseReady...), WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: kpa(testNamespace, testRevision, withScales(defaultScale, defaultScale), - WithPASKSNotReady(""), WithTraffic, markScaleTargetInitialized, + WithPASKSNotReady(""), WithTraffic, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, WithPAMetricsService(privateSvc), WithPAStatusService(testRevision), WithObservedGeneration(1)), }}, }, { @@ -566,7 +567,7 @@ func TestReconcile(t *testing.T) { }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: kpa(testNamespace, testRevision, WithPASKSReady, - WithTraffic, markScaleTargetInitialized, withMinScale(2), WithPAMetricsService(privateSvc), + WithTraffic, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, withMinScale(2), WithPAMetricsService(privateSvc), withScales(1, defaultScale), WithPAStatusService(testRevision), WithReachabilityUnreachable, WithObservedGeneration(1)), }}, @@ -582,7 +583,7 @@ func TestReconcile(t *testing.T) { }, makeReadyPods(2, testNamespace, testRevision)...), WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: kpa(testNamespace, testRevision, WithPASKSReady, - WithTraffic, markScaleTargetInitialized, withMinScale(2), WithPAMetricsService(privateSvc), + WithTraffic, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, withMinScale(2), WithPAMetricsService(privateSvc), withScales(2, defaultScale), WithPAStatusService(testRevision), WithReachabilityReachable, WithObservedGeneration(1)), }}, @@ -598,7 +599,7 @@ func TestReconcile(t *testing.T) { }, makeReadyPods(2, testNamespace, testRevision)...), WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: kpa(testNamespace, testRevision, WithPASKSReady, - WithTraffic, markScaleTargetInitialized, withMinScale(2), WithPAMetricsService(privateSvc), + WithTraffic, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, withMinScale(2), WithPAMetricsService(privateSvc), withScales(2, defaultScale), WithPAStatusService(testRevision), WithReachabilityUnknown, WithObservedGeneration(1)), }}, @@ -761,7 +762,7 @@ func TestReconcile(t *testing.T) { deploy(testNamespace, testRevision), defaultReady, }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: kpa(testNamespace, testRevision, markScaleTargetInitialized, withScales(1, 0), + Object: kpa(testNamespace, testRevision, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, withScales(1, 0), WithPASKSReady, WithPAMetricsService(privateSvc), WithNoTraffic(noTrafficReason, "The target is not receiving traffic."), WithPAStatusService(testRevision), WithPAMetricsService(privateSvc), @@ -777,7 +778,7 @@ func TestReconcile(t *testing.T) { decider(testNamespace, testRevision, 0 /* desiredScale */, 0 /* ebc */)), Objects: []runtime.Object{ kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, - markScaleTargetInitialized, withScales(1, 1), + markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, withScales(1, 1), WithPAStatusService(testRevision), WithPAMetricsService(privateSvc), WithObservedGeneration(1)), defaultSKS, metric(testNamespace, testRevision), @@ -797,7 +798,7 @@ func TestReconcile(t *testing.T) { deploy(testNamespace, testRevision), defaultReady, }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: kpa(testNamespace, testRevision, markScaleTargetInitialized, WithPASKSReady, WithPAMetricsService(privateSvc), + Object: kpa(testNamespace, testRevision, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, WithPASKSReady, WithPAMetricsService(privateSvc), WithNoTraffic("TimedOut", "The target could not be activated."), withScales(1, 0), WithPAStatusService(testRevision), WithPAMetricsService(privateSvc), WithObservedGeneration(1)), @@ -873,7 +874,7 @@ func TestReconcile(t *testing.T) { minScalePatch, }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: activatingKPAMinScale(underscale, markScaleTargetInitialized, WithPASKSReady), + Object: activatingKPAMinScale(underscale, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, WithPASKSReady), }}, }, { // Scale to `minScale` and mark PA "active" @@ -968,7 +969,7 @@ func TestReconcile(t *testing.T) { -42 /* ebc */)), Objects: []runtime.Object{ kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, - markScaleTargetInitialized, WithPAMetricsService(privateSvc), + markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, WithPAMetricsService(privateSvc), withScales(1, defaultScale), WithPAStatusService(testRevision), WithObservedGeneration(1)), defaultProxySKS, metric(testNamespace, testRevision), @@ -981,7 +982,7 @@ func TestReconcile(t *testing.T) { decider(testNamespace, testRevision, defaultScale, /* desiredScale */ -18 /* ebc */)), Objects: []runtime.Object{ - kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, markScaleTargetInitialized, + kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, WithPAMetricsService(privateSvc), withScales(1, defaultScale), WithPAStatusService(testRevision), WithObservedGeneration(1)), sks(testNamespace, testRevision, WithDeployRef(deployName), WithSKSReady, @@ -1000,7 +1001,7 @@ func TestReconcile(t *testing.T) { decider(testNamespace, testRevision, defaultScale, /* desiredScale */ 1 /* ebc */)), Objects: []runtime.Object{ - kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, markScaleTargetInitialized, + kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, WithPAMetricsService(privateSvc), withScales(1, defaultScale), WithPAStatusService(testRevision), WithObservedGeneration(1)), defaultProxySKS, @@ -1055,7 +1056,7 @@ func TestReconcile(t *testing.T) { }, makeReadyPods(20, testNamespace, testRevision)...), WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, - markScaleTargetInitialized, withScales(20, 20), withInitialScale(20), WithReachabilityReachable, + markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, withScales(20, 20), withInitialScale(20), WithReachabilityReachable, WithPAStatusService(testRevision), WithPAMetricsService(privateSvc), WithObservedGeneration(1), ), }}, @@ -1081,7 +1082,7 @@ func TestReconcile(t *testing.T) { }), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: kpa(testNamespace, testRevision, markScaleTargetInitialized, + Object: kpa(testNamespace, testRevision, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, WithNoTraffic(noTrafficReason, "The target is not receiving traffic."), withScales(0, -1), WithReachabilityReachable, WithPAMetricsService(privateSvc), WithObservedGeneration(1), @@ -1151,7 +1152,7 @@ func TestReconcile(t *testing.T) { }), }, makeReadyPods(2, testNamespace, testRevision)...), WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: kpa(testNamespace, testRevision, WithTraffic, WithPASKSReady, markScaleTargetInitialized, + Object: kpa(testNamespace, testRevision, WithTraffic, WithPASKSReady, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, withScales(2, 2), WithReachabilityReachable, WithPAStatusService(testRevision), WithPAMetricsService(privateSvc), WithObservedGeneration(1), ), @@ -1163,7 +1164,7 @@ func TestReconcile(t *testing.T) { decider(testNamespace, testRevision, defaultScale, /* desiredScale */ 1 /* ebc */)), Objects: []runtime.Object{ - kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, markScaleTargetInitialized, + kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, WithPAMetricsService(privateSvc), withScales(1, defaultScale), WithPAStatusService(testRevision), WithObservedGeneration(1)), defaultProxySKS, @@ -1179,7 +1180,7 @@ func TestReconcile(t *testing.T) { decider(testNamespace, testRevision, defaultScale, /* desiredScale */ 1 /* ebc */)), Objects: []runtime.Object{ - kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, markScaleTargetInitialized, + kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, WithPAMetricsService(privateSvc), withScales(1, defaultScale), WithPAStatusService(testRevision), WithObservedGeneration(1)), defaultSKS, diff --git a/pkg/reconciler/autoscaling/kpa/scaler.go b/pkg/reconciler/autoscaling/kpa/scaler.go index daafcf746cbb..04d032420a08 100644 --- a/pkg/reconciler/autoscaling/kpa/scaler.go +++ b/pkg/reconciler/autoscaling/kpa/scaler.go @@ -19,6 +19,8 @@ package kpa import ( "context" "fmt" + v1 "knative.dev/serving/pkg/apis/serving/v1" + revresurces "knative.dev/serving/pkg/reconciler/revision/resources" "net" "net/http" "strconv" @@ -37,11 +39,14 @@ import ( "knative.dev/serving/pkg/activator" autoscalingv1alpha1 "knative.dev/serving/pkg/apis/autoscaling/v1alpha1" "knative.dev/serving/pkg/autoscaler/config/autoscalerconfig" + clientset "knative.dev/serving/pkg/client/clientset/versioned" "knative.dev/serving/pkg/reconciler/autoscaling/config" kparesources "knative.dev/serving/pkg/reconciler/autoscaling/kpa/resources" aresources "knative.dev/serving/pkg/reconciler/autoscaling/resources" "knative.dev/serving/pkg/resources" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -330,7 +335,7 @@ func (ks *scaler) applyScale(ctx context.Context, pa *autoscalingv1alpha1.PodAut } // scale attempts to scale the given PA's target reference to the desired scale. -func (ks *scaler) scale(ctx context.Context, pa *autoscalingv1alpha1.PodAutoscaler, sks *netv1alpha1.ServerlessService, desiredScale int32) (int32, error) { +func (ks *scaler) scale(ctx context.Context, pa *autoscalingv1alpha1.PodAutoscaler, sks *netv1alpha1.ServerlessService, desiredScale int32, client clientset.Interface, podCounter *resources.PodAccessor) (int32, error) { asConfig := config.FromContext(ctx).Autoscaler logger := logging.FromContext(ctx) @@ -375,6 +380,48 @@ func (ks *scaler) scale(ctx context.Context, pa *autoscalingv1alpha1.PodAutoscal return desiredScale, nil } + // Before we apply scale to zero and since we have failed to activate, check if any pod is in waiting state + // This should capture any case where we scale from zero and regular progressDeadline expiration will not be hit due to K8s limitations + // when not being in a deployment rollout. + if desiredScale == 0 && podCounter != nil { + ready, _, _, _, err := podCounter.PodCountsByState() + if err != nil { + return desiredScale, fmt.Errorf("error getting pod counts: %w", err) + } + if ready == 0 { + var pod *corev1.Pod + pod, err = podCounter.GetAnyPod() + if err != nil { + return desiredScale, fmt.Errorf("error getting a pod: %w", err) + } + checkForPodErrorsBeforeScalingDown(logger, pod, pa) + } + } + logger.Infof("Scaling from %d to %d", currentScale, desiredScale) return desiredScale, ks.applyScale(ctx, pa, desiredScale, ps) } + +func checkForPodErrorsBeforeScalingDown(logger *zap.SugaredLogger, pod *corev1.Pod, pa *autoscalingv1alpha1.PodAutoscaler) { + if pod != nil { + for _, status := range pod.Status.ContainerStatuses { + if status.Name != revresurces.QueueContainerName { + if t := status.LastTerminationState.Terminated; t != nil { + logger.Debugf("marking exiting with: %d/%s", t.ExitCode, t.Message) + if t.ExitCode == 0 && t.Message == "" { + // In cases where there is no error message, we should still provide some exit message in the status + pa.Status.MarkWithScaleFailures(v1.ExitCodeReason(t.ExitCode), + v1.RevisionContainerExitingMessage("container exited with no error")) + break + } else { + pa.Status.MarkWithScaleFailures(v1.ExitCodeReason(t.ExitCode), v1.RevisionContainerExitingMessage(t.Message)) + break + } + } else if w := status.State.Waiting; w != nil { + logger.Debugf("marking pa as failed: %s: %s", w.Reason, w.Message) + pa.Status.MarkWithScaleFailures(w.Reason, w.Message) + } + } + } + } +} diff --git a/pkg/reconciler/autoscaling/kpa/scaler_test.go b/pkg/reconciler/autoscaling/kpa/scaler_test.go index 732406a8022c..e2e55fe6339e 100644 --- a/pkg/reconciler/autoscaling/kpa/scaler_test.go +++ b/pkg/reconciler/autoscaling/kpa/scaler_test.go @@ -556,7 +556,7 @@ func TestScaler(t *testing.T) { test.configMutator(cfg) } ctx = config.ToContext(ctx, cfg) - desiredScale, err := revisionScaler.scale(ctx, pa, sks, test.scaleTo) + desiredScale, err := revisionScaler.scale(ctx, pa, sks, test.scaleTo, fakeservingclient.Get(ctx), nil) if err != nil { t.Error("Scale got an unexpected error:", err) } @@ -647,7 +647,7 @@ func TestDisableScaleToZero(t *testing.T) { conf := defaultConfig() conf.Autoscaler.EnableScaleToZero = false ctx = config.ToContext(ctx, conf) - desiredScale, err := revisionScaler.scale(ctx, pa, nil /*sks doesn't matter in this test*/, test.scaleTo) + desiredScale, err := revisionScaler.scale(ctx, pa, nil /*sks doesn't matter in this test*/, test.scaleTo, fakeservingclient.Get(ctx), nil) if err != nil { t.Error("Scale got an unexpected error:", err) } diff --git a/pkg/reconciler/revision/table_test.go b/pkg/reconciler/revision/table_test.go index 57f020700096..c1c1b810216c 100644 --- a/pkg/reconciler/revision/table_test.go +++ b/pkg/reconciler/revision/table_test.go @@ -258,7 +258,7 @@ func TestReconcile(t *testing.T) { Revision("foo", "pa-ready", WithLogURL, allUnknownConditions), pa("foo", "pa-ready", WithPASKSReady, WithTraffic, - WithScaleTargetInitialized, WithPAStatusService("new-stuff"), WithReachabilityUnknown), + WithScaleTargetInitialized, WithScaleTargetNoScaleFailures, WithPAStatusService("new-stuff"), WithReachabilityUnknown), deploy(t, "foo", "pa-ready"), image("foo", "pa-ready"), }, @@ -283,7 +283,7 @@ func TestReconcile(t *testing.T) { MarkRevisionReady, WithRevisionObservedGeneration(1)), pa("foo", "pa-not-ready", WithPAStatusService("its-not-confidential"), - WithBufferedTraffic, + WithBufferedTraffic, WithScaleTargetNoScaleFailures, WithReachabilityReachable), readyDeploy(deploy(t, "foo", "pa-not-ready")), image("foo", "pa-not-ready"), @@ -407,7 +407,7 @@ func TestReconcile(t *testing.T) { WithLogURL, MarkRevisionReady, WithRoutingState(v1.RoutingStateActive, fc)), pa("foo", "fix-mutated-pa", WithProtocolType(networking.ProtocolH2C), - WithTraffic, WithPASKSReady, WithScaleTargetInitialized, WithReachabilityReachable, + WithTraffic, WithPASKSReady, WithScaleTargetNoScaleFailures, WithScaleTargetInitialized, WithReachabilityReachable, WithPAStatusService("fix-mutated-pa")), deploy(t, "foo", "fix-mutated-pa"), image("foo", "fix-mutated-pa"), @@ -423,7 +423,7 @@ func TestReconcile(t *testing.T) { }}, WantUpdates: []clientgotesting.UpdateActionImpl{{ Object: pa("foo", "fix-mutated-pa", WithPASKSReady, - WithTraffic, WithScaleTargetInitialized, + WithTraffic, WithScaleTargetInitialized, WithScaleTargetNoScaleFailures, WithPAStatusService("fix-mutated-pa"), WithReachabilityReachable), }}, Key: "foo/fix-mutated-pa", @@ -624,7 +624,7 @@ func TestReconcile(t *testing.T) { // This signal should make our Reconcile mark the Revision as Ready. Objects: []runtime.Object{ Revision("foo", "steady-ready", WithLogURL), - pa("foo", "steady-ready", WithPASKSReady, WithTraffic, + pa("foo", "steady-ready", WithPASKSReady, WithTraffic, WithScaleTargetNoScaleFailures, WithScaleTargetInitialized, WithPAStatusService("steadier-even")), deploy(t, "foo", "steady-ready"), image("foo", "steady-ready"), diff --git a/pkg/resources/pods.go b/pkg/resources/pods.go index 9e10c01fbb18..c733a9b72813 100644 --- a/pkg/resources/pods.go +++ b/pkg/resources/pods.go @@ -44,6 +44,18 @@ func NewPodAccessor(lister corev1listers.PodLister, namespace, revisionName stri } } +// GetAnyPod returns a pod for the revision that owns the pa, if any +func (pa PodAccessor) GetAnyPod() (pod *corev1.Pod, err error) { + pods, err := pa.podsLister.List(pa.selector) + if err != nil { + return nil, err + } + if len(pods) != 0 { + return pods[0], nil + } + return nil, nil +} + // PodCountsByState returns number of pods for the revision grouped by their state, that is // of interest to knative (e.g. ignoring failed or terminated pods). func (pa PodAccessor) PodCountsByState() (ready, notReady, pending, terminating int, err error) { diff --git a/pkg/testing/functional.go b/pkg/testing/functional.go index f0ee41eb7658..904dde5939f5 100644 --- a/pkg/testing/functional.go +++ b/pkg/testing/functional.go @@ -103,6 +103,16 @@ func WithScaleTargetInitialized(pa *autoscalingv1alpha1.PodAutoscaler) { pa.Status.MarkScaleTargetInitialized() } +// WithScaleTargetNoScaleFailures updates the PA to reflect that there were no scale failures +func WithScaleTargetNoScaleFailures(pa *autoscalingv1alpha1.PodAutoscaler) { + pa.Status.MarkWithNoScaleFailures() +} + +// WithScaleTargetScaleFailures updates the PA to reflect that there were scale failures +func WithScaleTargetScaleFailures(pa *autoscalingv1alpha1.PodAutoscaler, reason, message string) { + pa.Status.MarkWithScaleFailures(reason, message) +} + // WithPAStatusService annotates PA Status with the provided service name. func WithPAStatusService(svc string) PodAutoscalerOption { return func(pa *autoscalingv1alpha1.PodAutoscaler) { diff --git a/test/e2e/autoscale.go b/test/e2e/autoscale.go index 4421736d44d4..ca55f17297c4 100644 --- a/test/e2e/autoscale.go +++ b/test/e2e/autoscale.go @@ -266,7 +266,7 @@ func SetupSvc(t *testing.T, aopts *AutoscalerOptions, topts test.Options, fopts func assertScaleDown(ctx *TestContext) { deploymentName := resourcenames.Deployment(ctx.resources.Revision) - if err := WaitForScaleToZero(ctx.t, deploymentName, ctx.clients); err != nil { + if err := WaitForScaleToZero(ctx.t, deploymentName, ctx.clients, ""); err != nil { ctx.t.Fatalf("Unable to observe the Deployment named %s scaling down: %v", deploymentName, err) } diff --git a/test/e2e/e2e.go b/test/e2e/e2e.go index dc1e4491d016..5f0222e75386 100644 --- a/test/e2e/e2e.go +++ b/test/e2e/e2e.go @@ -72,7 +72,7 @@ func autoscalerCM(clients *test.Clients) (*autoscalerconfig.Config, error) { // WaitForScaleToZero will wait for the specified deployment to scale to 0 replicas. // Will wait up to 6 times the configured ScaleToZeroGracePeriod before failing. -func WaitForScaleToZero(t *testing.T, deploymentName string, clients *test.Clients) error { +func WaitForScaleToZero(t *testing.T, deploymentName string, clients *test.Clients, overrideNamespace string) error { t.Helper() t.Logf("Waiting for %q to scale to zero", deploymentName) @@ -81,6 +81,10 @@ func WaitForScaleToZero(t *testing.T, deploymentName string, clients *test.Clien return fmt.Errorf("failed to get autoscaler configmap: %w", err) } + namespace := test.ServingFlags.TestNamespace + if overrideNamespace != "" { + namespace = overrideNamespace + } return pkgTest.WaitForDeploymentState( context.Background(), clients.KubeClient, @@ -95,7 +99,7 @@ func WaitForScaleToZero(t *testing.T, deploymentName string, clients *test.Clien nil }, "DeploymentIsScaledDown", - test.ServingFlags.TestNamespace, + namespace, cfg.ScaleToZeroGracePeriod*6, ) } diff --git a/test/e2e/resource_quota_error_test.go b/test/e2e/resource_quota_error_test.go index 4a8bfa8ea7f4..0ae0f5b8ddfe 100644 --- a/test/e2e/resource_quota_error_test.go +++ b/test/e2e/resource_quota_error_test.go @@ -129,4 +129,35 @@ func TestResourceQuotaError(t *testing.T) { if err != nil { t.Fatal("Failed to validate revision state:", err) } + + // We should check what happens after deployment scales to zero. Deployment's progress deadline should expire. + deploymentName := revisionName + "-deployment" + if err := WaitForScaleToZero(t, deploymentName, clients, "rq-test"); err != nil { + t.Fatalf("Unable to observe the Deployment named %s scaling down: %v", deploymentName, err) + } + + t.Log("When the containers are not scheduled, the revision should have error status.") + err = v1test.CheckRevisionState(clients.ServingClient, revisionName, func(r *v1.Revision) (bool, error) { + cond := r.Status.GetCondition(v1.RevisionConditionReady) + if cond != nil { + if strings.Contains(cond.Message, errorMsgQuota) && cond.IsFalse() { + return true, nil + } + // Can fail with either a progress deadline exceeded error + if cond.Reason == progressDeadlineReason { + return true, nil + } + // wait for the container creation + if cond.Reason == waitReason { + return false, nil + } + return true, fmt.Errorf("the revision %s was not marked with expected error condition (Reason=%q, Message=%q), but with (Reason=%q, Message=%q)", + revisionName, revisionReason, errorMsgQuota, cond.Reason, cond.Message) + } + return false, nil + }) + + if err != nil { + t.Fatal("Failed to validate revision state:", err) + } } diff --git a/test/ha/activator_test.go b/test/ha/activator_test.go index 800345bc00e8..34f309ff6726 100644 --- a/test/ha/activator_test.go +++ b/test/ha/activator_test.go @@ -107,7 +107,7 @@ func testActivatorHA(t *testing.T, gracePeriod *int64, slo float64) { i := 0 for name, ip := range activatorPods { t.Logf("Waiting for %s to scale to zero", namesScaleToZero.Revision) - if err := e2e.WaitForScaleToZero(t, revisionresourcenames.Deployment(resourcesScaleToZero.Revision), clients); err != nil { + if err := e2e.WaitForScaleToZero(t, revisionresourcenames.Deployment(resourcesScaleToZero.Revision), clients, ""); err != nil { t.Fatal("Failed to scale to zero:", err) } diff --git a/test/ha/autoscaler_test.go b/test/ha/autoscaler_test.go index 0a6bdc36c663..18ade5543d9f 100644 --- a/test/ha/autoscaler_test.go +++ b/test/ha/autoscaler_test.go @@ -77,7 +77,7 @@ func TestAutoscalerHA(t *testing.T) { t.Log("Got initial leader set:", leaders) t.Logf("Waiting for %s to scale to zero", names.Revision) - if err := e2e.WaitForScaleToZero(t, revisionresourcenames.Deployment(resources.Revision), clients); err != nil { + if err := e2e.WaitForScaleToZero(t, revisionresourcenames.Deployment(resources.Revision), clients, ""); err != nil { t.Fatal("Failed to scale to zero:", err) } diff --git a/test/upgrade/preupgrade.go b/test/upgrade/preupgrade.go index 8fa5efb61d23..40d0cd8f5808 100644 --- a/test/upgrade/preupgrade.go +++ b/test/upgrade/preupgrade.go @@ -85,7 +85,7 @@ func servicePreUpgradeAndScaleToZero(t *testing.T) { url := resources.Service.Status.URL.URL() assertServiceResourcesUpdated(t, clients, scaleToZeroServiceNames, url, test.PizzaPlanetText1) - if err := e2e.WaitForScaleToZero(t, revisionresourcenames.Deployment(resources.Revision), clients); err != nil { + if err := e2e.WaitForScaleToZero(t, revisionresourcenames.Deployment(resources.Revision), clients, ""); err != nil { t.Fatal("Could not scale to zero:", err) } } @@ -124,7 +124,7 @@ func initialScalePreUpgrade(t *testing.T) { if err != nil { t.Fatal("Failed to create Service:", err) } - if err = e2e.WaitForScaleToZero(t, revisionresourcenames.Deployment(resources.Revision), clients); err != nil { + if err = e2e.WaitForScaleToZero(t, revisionresourcenames.Deployment(resources.Revision), clients, ""); err != nil { t.Fatal("Could not scale to zero:", err) } } From fff55ffd8e8034c89b7e5515751d46bf852791aa Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Fri, 4 Oct 2024 17:36:51 +0300 Subject: [PATCH 2/5] lint --- pkg/reconciler/autoscaling/kpa/scaler.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/reconciler/autoscaling/kpa/scaler.go b/pkg/reconciler/autoscaling/kpa/scaler.go index 04d032420a08..9886c3d66fd4 100644 --- a/pkg/reconciler/autoscaling/kpa/scaler.go +++ b/pkg/reconciler/autoscaling/kpa/scaler.go @@ -19,8 +19,6 @@ package kpa import ( "context" "fmt" - v1 "knative.dev/serving/pkg/apis/serving/v1" - revresurces "knative.dev/serving/pkg/reconciler/revision/resources" "net" "net/http" "strconv" @@ -30,6 +28,9 @@ import ( "knative.dev/pkg/injection/clients/dynamicclient" "knative.dev/pkg/logging" + v1 "knative.dev/serving/pkg/apis/serving/v1" + revresources "knative.dev/serving/pkg/reconciler/revision/resources" + netapis "knative.dev/networking/pkg/apis/networking" netv1alpha1 "knative.dev/networking/pkg/apis/networking/v1alpha1" nethttp "knative.dev/networking/pkg/http" @@ -405,7 +406,7 @@ func (ks *scaler) scale(ctx context.Context, pa *autoscalingv1alpha1.PodAutoscal func checkForPodErrorsBeforeScalingDown(logger *zap.SugaredLogger, pod *corev1.Pod, pa *autoscalingv1alpha1.PodAutoscaler) { if pod != nil { for _, status := range pod.Status.ContainerStatuses { - if status.Name != revresurces.QueueContainerName { + if status.Name != revresources.QueueContainerName { if t := status.LastTerminationState.Terminated; t != nil { logger.Debugf("marking exiting with: %d/%s", t.ExitCode, t.Message) if t.ExitCode == 0 && t.Message == "" { From 8c18f73fbbf48f184fa6ecbd0306b31ea234425d Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Fri, 4 Oct 2024 17:55:38 +0300 Subject: [PATCH 3/5] clean up --- pkg/reconciler/autoscaling/kpa/kpa.go | 2 +- pkg/reconciler/autoscaling/kpa/kpa_test.go | 47 +++++++++---------- pkg/reconciler/autoscaling/kpa/scaler.go | 8 ++-- pkg/reconciler/autoscaling/kpa/scaler_test.go | 4 +- 4 files changed, 29 insertions(+), 32 deletions(-) diff --git a/pkg/reconciler/autoscaling/kpa/kpa.go b/pkg/reconciler/autoscaling/kpa/kpa.go index e295324a6a92..01fc5e123ca0 100644 --- a/pkg/reconciler/autoscaling/kpa/kpa.go +++ b/pkg/reconciler/autoscaling/kpa/kpa.go @@ -122,7 +122,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pa *autoscalingv1alpha1. // Get the appropriate current scale from the metric, and right size // the scaleTargetRef based on it. - want, err := c.scaler.scale(ctx, pa, sks, decider.Status.DesiredScale, c.Client, &podCounter) + want, err := c.scaler.scale(ctx, pa, sks, decider.Status.DesiredScale, &podCounter) if err != nil { return fmt.Errorf("error scaling target: %w", err) } diff --git a/pkg/reconciler/autoscaling/kpa/kpa_test.go b/pkg/reconciler/autoscaling/kpa/kpa_test.go index 302482aab8fd..8b0d4e133000 100644 --- a/pkg/reconciler/autoscaling/kpa/kpa_test.go +++ b/pkg/reconciler/autoscaling/kpa/kpa_test.go @@ -41,7 +41,6 @@ import ( fakepainformer "knative.dev/serving/pkg/client/injection/informers/autoscaling/v1alpha1/podautoscaler/fake" fakerevisioninformer "knative.dev/serving/pkg/client/injection/informers/serving/v1/revision/fake" "knative.dev/serving/pkg/metrics" - pkgtesting "knative.dev/serving/pkg/testing" networkingclient "knative.dev/networking/pkg/client/injection/client" filteredinformerfactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered" @@ -295,7 +294,7 @@ func TestReconcile(t *testing.T) { } activeKPAMinScale := func(g, w int32) *autoscalingv1alpha1.PodAutoscaler { return kpa( - testNamespace, testRevision, WithPASKSReady, WithTraffic, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, + testNamespace, testRevision, WithPASKSReady, WithTraffic, markScaleTargetInitialized, WithScaleTargetNoScaleFailures, withScales(g, w), WithReachabilityReachable, withMinScale(defaultScale), WithPAStatusService(testRevision), WithPAMetricsService(privateSvc), WithObservedGeneration(1), @@ -333,7 +332,7 @@ func TestReconcile(t *testing.T) { Name: "steady state", Key: key, Objects: []runtime.Object{ - kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, pkgtesting.WithScaleTargetNoScaleFailures, + kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, WithScaleTargetNoScaleFailures, markScaleTargetInitialized, WithPAMetricsService(privateSvc), withScales(1, defaultScale), WithPAStatusService(testRevision), WithObservedGeneration(1)), defaultSKS, @@ -361,12 +360,12 @@ func TestReconcile(t *testing.T) { }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: kpa(testNamespace, testRevision, WithTraffic, - markScaleTargetInitialized, WithPASKSReady, pkgtesting.WithScaleTargetNoScaleFailures, + markScaleTargetInitialized, WithPASKSReady, WithScaleTargetNoScaleFailures, WithPAMetricsService(privateSvc), withScales(1, defaultScale), WithPAStatusService(testRevision), WithObservedGeneration(1)), }, { Object: kpa(testNamespace, testRevision, WithTraffic, - markScaleTargetInitialized, WithPASKSReady, pkgtesting.WithScaleTargetNoScaleFailures, + markScaleTargetInitialized, WithPASKSReady, WithScaleTargetNoScaleFailures, WithPAMetricsService(privateSvc), withScales(1, defaultScale), WithPAStatusService(testRevision), WithObservedGeneration(1)), }}, @@ -415,12 +414,12 @@ func TestReconcile(t *testing.T) { Name: "create metric", Key: key, Objects: []runtime.Object{ - kpa(testNamespace, testRevision, WithTraffic, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, + kpa(testNamespace, testRevision, WithTraffic, markScaleTargetInitialized, WithScaleTargetNoScaleFailures, withScales(1, defaultScale), WithPAStatusService(testRevision)), defaultSKS, defaultDeployment, defaultReady, }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: kpa(testNamespace, testRevision, WithTraffic, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, + Object: kpa(testNamespace, testRevision, WithTraffic, markScaleTargetInitialized, WithScaleTargetNoScaleFailures, withScales(1, defaultScale), WithPASKSReady, WithPAStatusService(testRevision), WithPAMetricsService(privateSvc), WithObservedGeneration(1)), }}, @@ -431,7 +430,7 @@ func TestReconcile(t *testing.T) { Name: "scale up deployment", Key: key, Objects: []runtime.Object{ - kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, + kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, markScaleTargetInitialized, WithScaleTargetNoScaleFailures, WithPAMetricsService(privateSvc), withScales(1, defaultScale), WithPAStatusService(testRevision), WithObservedGeneration(1)), defaultSKS, @@ -504,7 +503,7 @@ func TestReconcile(t *testing.T) { preciseReady...), WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: kpa(testNamespace, testRevision, withScales(defaultScale, defaultScale), - WithPASKSNotReady(""), WithTraffic, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, + WithPASKSNotReady(""), WithTraffic, markScaleTargetInitialized, WithScaleTargetNoScaleFailures, WithPAMetricsService(privateSvc), WithPAStatusService(testRevision), WithObservedGeneration(1)), }}, }, { @@ -567,7 +566,7 @@ func TestReconcile(t *testing.T) { }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: kpa(testNamespace, testRevision, WithPASKSReady, - WithTraffic, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, withMinScale(2), WithPAMetricsService(privateSvc), + WithTraffic, markScaleTargetInitialized, WithScaleTargetNoScaleFailures, withMinScale(2), WithPAMetricsService(privateSvc), withScales(1, defaultScale), WithPAStatusService(testRevision), WithReachabilityUnreachable, WithObservedGeneration(1)), }}, @@ -583,7 +582,7 @@ func TestReconcile(t *testing.T) { }, makeReadyPods(2, testNamespace, testRevision)...), WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: kpa(testNamespace, testRevision, WithPASKSReady, - WithTraffic, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, withMinScale(2), WithPAMetricsService(privateSvc), + WithTraffic, markScaleTargetInitialized, WithScaleTargetNoScaleFailures, withMinScale(2), WithPAMetricsService(privateSvc), withScales(2, defaultScale), WithPAStatusService(testRevision), WithReachabilityReachable, WithObservedGeneration(1)), }}, @@ -599,7 +598,7 @@ func TestReconcile(t *testing.T) { }, makeReadyPods(2, testNamespace, testRevision)...), WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: kpa(testNamespace, testRevision, WithPASKSReady, - WithTraffic, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, withMinScale(2), WithPAMetricsService(privateSvc), + WithTraffic, markScaleTargetInitialized, WithScaleTargetNoScaleFailures, withMinScale(2), WithPAMetricsService(privateSvc), withScales(2, defaultScale), WithPAStatusService(testRevision), WithReachabilityUnknown, WithObservedGeneration(1)), }}, @@ -762,7 +761,7 @@ func TestReconcile(t *testing.T) { deploy(testNamespace, testRevision), defaultReady, }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: kpa(testNamespace, testRevision, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, withScales(1, 0), + Object: kpa(testNamespace, testRevision, markScaleTargetInitialized, WithScaleTargetNoScaleFailures, withScales(1, 0), WithPASKSReady, WithPAMetricsService(privateSvc), WithNoTraffic(noTrafficReason, "The target is not receiving traffic."), WithPAStatusService(testRevision), WithPAMetricsService(privateSvc), @@ -778,7 +777,7 @@ func TestReconcile(t *testing.T) { decider(testNamespace, testRevision, 0 /* desiredScale */, 0 /* ebc */)), Objects: []runtime.Object{ kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, - markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, withScales(1, 1), + markScaleTargetInitialized, WithScaleTargetNoScaleFailures, withScales(1, 1), WithPAStatusService(testRevision), WithPAMetricsService(privateSvc), WithObservedGeneration(1)), defaultSKS, metric(testNamespace, testRevision), @@ -798,7 +797,7 @@ func TestReconcile(t *testing.T) { deploy(testNamespace, testRevision), defaultReady, }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: kpa(testNamespace, testRevision, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, WithPASKSReady, WithPAMetricsService(privateSvc), + Object: kpa(testNamespace, testRevision, markScaleTargetInitialized, WithScaleTargetNoScaleFailures, WithPASKSReady, WithPAMetricsService(privateSvc), WithNoTraffic("TimedOut", "The target could not be activated."), withScales(1, 0), WithPAStatusService(testRevision), WithPAMetricsService(privateSvc), WithObservedGeneration(1)), @@ -874,7 +873,7 @@ func TestReconcile(t *testing.T) { minScalePatch, }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: activatingKPAMinScale(underscale, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, WithPASKSReady), + Object: activatingKPAMinScale(underscale, markScaleTargetInitialized, WithScaleTargetNoScaleFailures, WithPASKSReady), }}, }, { // Scale to `minScale` and mark PA "active" @@ -969,7 +968,7 @@ func TestReconcile(t *testing.T) { -42 /* ebc */)), Objects: []runtime.Object{ kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, - markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, WithPAMetricsService(privateSvc), + markScaleTargetInitialized, WithScaleTargetNoScaleFailures, WithPAMetricsService(privateSvc), withScales(1, defaultScale), WithPAStatusService(testRevision), WithObservedGeneration(1)), defaultProxySKS, metric(testNamespace, testRevision), @@ -982,7 +981,7 @@ func TestReconcile(t *testing.T) { decider(testNamespace, testRevision, defaultScale, /* desiredScale */ -18 /* ebc */)), Objects: []runtime.Object{ - kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, + kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, markScaleTargetInitialized, WithScaleTargetNoScaleFailures, WithPAMetricsService(privateSvc), withScales(1, defaultScale), WithPAStatusService(testRevision), WithObservedGeneration(1)), sks(testNamespace, testRevision, WithDeployRef(deployName), WithSKSReady, @@ -1001,7 +1000,7 @@ func TestReconcile(t *testing.T) { decider(testNamespace, testRevision, defaultScale, /* desiredScale */ 1 /* ebc */)), Objects: []runtime.Object{ - kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, + kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, markScaleTargetInitialized, WithScaleTargetNoScaleFailures, WithPAMetricsService(privateSvc), withScales(1, defaultScale), WithPAStatusService(testRevision), WithObservedGeneration(1)), defaultProxySKS, @@ -1056,7 +1055,7 @@ func TestReconcile(t *testing.T) { }, makeReadyPods(20, testNamespace, testRevision)...), WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, - markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, withScales(20, 20), withInitialScale(20), WithReachabilityReachable, + markScaleTargetInitialized, WithScaleTargetNoScaleFailures, withScales(20, 20), withInitialScale(20), WithReachabilityReachable, WithPAStatusService(testRevision), WithPAMetricsService(privateSvc), WithObservedGeneration(1), ), }}, @@ -1082,7 +1081,7 @@ func TestReconcile(t *testing.T) { }), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: kpa(testNamespace, testRevision, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, + Object: kpa(testNamespace, testRevision, markScaleTargetInitialized, WithScaleTargetNoScaleFailures, WithNoTraffic(noTrafficReason, "The target is not receiving traffic."), withScales(0, -1), WithReachabilityReachable, WithPAMetricsService(privateSvc), WithObservedGeneration(1), @@ -1152,7 +1151,7 @@ func TestReconcile(t *testing.T) { }), }, makeReadyPods(2, testNamespace, testRevision)...), WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: kpa(testNamespace, testRevision, WithTraffic, WithPASKSReady, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, + Object: kpa(testNamespace, testRevision, WithTraffic, WithPASKSReady, markScaleTargetInitialized, WithScaleTargetNoScaleFailures, withScales(2, 2), WithReachabilityReachable, WithPAStatusService(testRevision), WithPAMetricsService(privateSvc), WithObservedGeneration(1), ), @@ -1164,7 +1163,7 @@ func TestReconcile(t *testing.T) { decider(testNamespace, testRevision, defaultScale, /* desiredScale */ 1 /* ebc */)), Objects: []runtime.Object{ - kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, + kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, markScaleTargetInitialized, WithScaleTargetNoScaleFailures, WithPAMetricsService(privateSvc), withScales(1, defaultScale), WithPAStatusService(testRevision), WithObservedGeneration(1)), defaultProxySKS, @@ -1180,7 +1179,7 @@ func TestReconcile(t *testing.T) { decider(testNamespace, testRevision, defaultScale, /* desiredScale */ 1 /* ebc */)), Objects: []runtime.Object{ - kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, markScaleTargetInitialized, pkgtesting.WithScaleTargetNoScaleFailures, + kpa(testNamespace, testRevision, WithPASKSReady, WithTraffic, markScaleTargetInitialized, WithScaleTargetNoScaleFailures, WithPAMetricsService(privateSvc), withScales(1, defaultScale), WithPAStatusService(testRevision), WithObservedGeneration(1)), defaultSKS, diff --git a/pkg/reconciler/autoscaling/kpa/scaler.go b/pkg/reconciler/autoscaling/kpa/scaler.go index 9886c3d66fd4..4db7c7d4db63 100644 --- a/pkg/reconciler/autoscaling/kpa/scaler.go +++ b/pkg/reconciler/autoscaling/kpa/scaler.go @@ -40,7 +40,6 @@ import ( "knative.dev/serving/pkg/activator" autoscalingv1alpha1 "knative.dev/serving/pkg/apis/autoscaling/v1alpha1" "knative.dev/serving/pkg/autoscaler/config/autoscalerconfig" - clientset "knative.dev/serving/pkg/client/clientset/versioned" "knative.dev/serving/pkg/reconciler/autoscaling/config" kparesources "knative.dev/serving/pkg/reconciler/autoscaling/kpa/resources" aresources "knative.dev/serving/pkg/reconciler/autoscaling/resources" @@ -336,7 +335,7 @@ func (ks *scaler) applyScale(ctx context.Context, pa *autoscalingv1alpha1.PodAut } // scale attempts to scale the given PA's target reference to the desired scale. -func (ks *scaler) scale(ctx context.Context, pa *autoscalingv1alpha1.PodAutoscaler, sks *netv1alpha1.ServerlessService, desiredScale int32, client clientset.Interface, podCounter *resources.PodAccessor) (int32, error) { +func (ks *scaler) scale(ctx context.Context, pa *autoscalingv1alpha1.PodAutoscaler, sks *netv1alpha1.ServerlessService, desiredScale int32, podCounter *resources.PodAccessor) (int32, error) { asConfig := config.FromContext(ctx).Autoscaler logger := logging.FromContext(ctx) @@ -414,10 +413,9 @@ func checkForPodErrorsBeforeScalingDown(logger *zap.SugaredLogger, pod *corev1.P pa.Status.MarkWithScaleFailures(v1.ExitCodeReason(t.ExitCode), v1.RevisionContainerExitingMessage("container exited with no error")) break - } else { - pa.Status.MarkWithScaleFailures(v1.ExitCodeReason(t.ExitCode), v1.RevisionContainerExitingMessage(t.Message)) - break } + pa.Status.MarkWithScaleFailures(v1.ExitCodeReason(t.ExitCode), v1.RevisionContainerExitingMessage(t.Message)) + break } else if w := status.State.Waiting; w != nil { logger.Debugf("marking pa as failed: %s: %s", w.Reason, w.Message) pa.Status.MarkWithScaleFailures(w.Reason, w.Message) diff --git a/pkg/reconciler/autoscaling/kpa/scaler_test.go b/pkg/reconciler/autoscaling/kpa/scaler_test.go index e2e55fe6339e..b6e21b5c3e2d 100644 --- a/pkg/reconciler/autoscaling/kpa/scaler_test.go +++ b/pkg/reconciler/autoscaling/kpa/scaler_test.go @@ -556,7 +556,7 @@ func TestScaler(t *testing.T) { test.configMutator(cfg) } ctx = config.ToContext(ctx, cfg) - desiredScale, err := revisionScaler.scale(ctx, pa, sks, test.scaleTo, fakeservingclient.Get(ctx), nil) + desiredScale, err := revisionScaler.scale(ctx, pa, sks, test.scaleTo, nil) if err != nil { t.Error("Scale got an unexpected error:", err) } @@ -647,7 +647,7 @@ func TestDisableScaleToZero(t *testing.T) { conf := defaultConfig() conf.Autoscaler.EnableScaleToZero = false ctx = config.ToContext(ctx, conf) - desiredScale, err := revisionScaler.scale(ctx, pa, nil /*sks doesn't matter in this test*/, test.scaleTo, fakeservingclient.Get(ctx), nil) + desiredScale, err := revisionScaler.scale(ctx, pa, nil /*sks doesn't matter in this test*/, test.scaleTo, nil) if err != nil { t.Error("Scale got an unexpected error:", err) } From 39037b8799ebbf3c9c3b8f3322516333b3df40cb Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Mon, 7 Oct 2024 13:24:44 +0300 Subject: [PATCH 4/5] fixes --- pkg/apis/autoscaling/v1alpha1/pa_lifecycle.go | 6 ++-- pkg/apis/serving/v1/revision_lifecycle.go | 14 ++++---- pkg/reconciler/autoscaling/kpa/scaler.go | 2 +- pkg/reconciler/revision/table_test.go | 32 +++++++++++++++++++ pkg/testing/functional.go | 7 ++++ pkg/testing/v1/revision.go | 6 ++++ 6 files changed, 56 insertions(+), 11 deletions(-) diff --git a/pkg/apis/autoscaling/v1alpha1/pa_lifecycle.go b/pkg/apis/autoscaling/v1alpha1/pa_lifecycle.go index b4c35c188b7f..fa433a7cb2a2 100644 --- a/pkg/apis/autoscaling/v1alpha1/pa_lifecycle.go +++ b/pkg/apis/autoscaling/v1alpha1/pa_lifecycle.go @@ -216,9 +216,9 @@ func (pas *PodAutoscalerStatus) MarkScaleTargetInitialized() { podCondSet.Manage(pas).MarkTrue(PodAutoscalerConditionScaleTargetInitialized) } -// ScaleTargetNotScaledAfterFailure returns true if the PodAutoscaler's scale target has been -// scaled successfully. -func (pas *PodAutoscalerStatus) ScaleTargetNotScaledAfterFailure() bool { +// ScaleTargetNotScaled returns true if the PodAutoscaler's scale target has failed +// to scaled. +func (pas *PodAutoscalerStatus) ScaleTargetNotScaled() bool { return pas.GetCondition(PodAutoscalerConditionScaleTargetScaled).IsFalse() } diff --git a/pkg/apis/serving/v1/revision_lifecycle.go b/pkg/apis/serving/v1/revision_lifecycle.go index a5fb242e5f5b..6924635cc328 100644 --- a/pkg/apis/serving/v1/revision_lifecycle.go +++ b/pkg/apis/serving/v1/revision_lifecycle.go @@ -200,6 +200,13 @@ func (rs *RevisionStatus) PropagateAutoscalerStatus(ps *autoscalingv1alpha1.PodA // that implies that |service.endpoints| > 0. rs.MarkResourcesAvailableTrue() rs.MarkContainerHealthyTrue() + + // Mark resource unavailable if we are scaling back to zero, but we never achieved the required scale + // and deployment status was not updated properly by K8s. For example due to an image pull error. + if ps.ScaleTargetNotScaled() { + condScaled := ps.GetCondition(autoscalingv1alpha1.PodAutoscalerConditionScaleTargetScaled) + rs.MarkResourcesAvailableFalse(condScaled.Reason, condScaled.Message) + } } // Mark resource unavailable if we don't have a Service Name and the deployment is ready @@ -234,13 +241,6 @@ func (rs *RevisionStatus) PropagateAutoscalerStatus(ps *autoscalingv1alpha1.PodA case corev1.ConditionTrue: rs.MarkActiveTrue() } - - // Mark resource unavailable if we scaled back to zero, but we never achieved the required scale - // and deployment status was not updated properly byt K8s. For example due to a temporary image pull error. - if ps.IsScaleTargetInitialized() && !resUnavailable && ps.ScaleTargetNotScaledAfterFailure() { - condScaled := ps.GetCondition(autoscalingv1alpha1.PodAutoscalerConditionScaleTargetScaled) - rs.MarkResourcesAvailableFalse(condScaled.Reason, condScaled.Message) - } } // ResourceNotOwnedMessage constructs the status message if ownership on the diff --git a/pkg/reconciler/autoscaling/kpa/scaler.go b/pkg/reconciler/autoscaling/kpa/scaler.go index 4db7c7d4db63..ac36a045118b 100644 --- a/pkg/reconciler/autoscaling/kpa/scaler.go +++ b/pkg/reconciler/autoscaling/kpa/scaler.go @@ -388,7 +388,7 @@ func (ks *scaler) scale(ctx context.Context, pa *autoscalingv1alpha1.PodAutoscal if err != nil { return desiredScale, fmt.Errorf("error getting pod counts: %w", err) } - if ready == 0 { + if ready == 0 && currentScale > 0 { var pod *corev1.Pod pod, err = podCounter.GetAnyPod() if err != nil { diff --git a/pkg/reconciler/revision/table_test.go b/pkg/reconciler/revision/table_test.go index c1c1b810216c..4a7fdc7559d6 100644 --- a/pkg/reconciler/revision/table_test.go +++ b/pkg/reconciler/revision/table_test.go @@ -41,6 +41,7 @@ import ( tracingconfig "knative.dev/pkg/tracing/config" autoscalingv1alpha1 "knative.dev/serving/pkg/apis/autoscaling/v1alpha1" defaultconfig "knative.dev/serving/pkg/apis/config" + "knative.dev/serving/pkg/apis/serving" v1 "knative.dev/serving/pkg/apis/serving/v1" "knative.dev/serving/pkg/autoscaler/config/autoscalerconfig" servingclient "knative.dev/serving/pkg/client/injection/client" @@ -744,6 +745,37 @@ func TestReconcile(t *testing.T) { PodSpecPersistentVolumeClaim: defaultconfig.Enabled, PodSpecPersistentVolumeWrite: defaultconfig.Enabled, }}), + }, { + Name: "revision becomes not ready when PA reports a failed container", + Objects: []runtime.Object{ + Revision("foo", "container-failed", + WithLogURL, + MarkRevisionReady, + withDefaultContainerStatuses(), + WithRevisionLabel(serving.RoutingStateLabelKey, "active"), + ), + pa("foo", "container-failed", + WithPAStatusScaleTargetScaleFailures("ImagePullBackOff", "Back-off pulling image..."), + WithScaleTargetInitialized, + WithTraffic, + WithReachabilityReachable, + WithPAStatusService("something"), + ), + readyDeploy(deploy(t, "foo", "container-failed")), + image("foo", "container-failed"), + }, + Key: "foo/container-failed", + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: Revision("foo", "container-failed", + WithLogURL, + MarkInactive("ImagePullBackOff", "Back-off pulling image..."), + MarkResourcesUnavailable("ImagePullBackOff", "Back-off pulling image..."), + MarkContainerHealthy(), + withDefaultContainerStatuses(), + WithRevisionObservedGeneration(1), + WithRevisionLabel(serving.RoutingStateLabelKey, "active"), + ), + }}, }} table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, _ configmap.Watcher) controller.Reconciler { diff --git a/pkg/testing/functional.go b/pkg/testing/functional.go index 904dde5939f5..3b2a15a109ce 100644 --- a/pkg/testing/functional.go +++ b/pkg/testing/functional.go @@ -113,6 +113,13 @@ func WithScaleTargetScaleFailures(pa *autoscalingv1alpha1.PodAutoscaler, reason, pa.Status.MarkWithScaleFailures(reason, message) } +// WithPAStatusScaleTargetScaleFailures updates the PA to reflect that there were scale failures +func WithPAStatusScaleTargetScaleFailures(reason, message string) PodAutoscalerOption { + return func(pa *autoscalingv1alpha1.PodAutoscaler) { + pa.Status.MarkWithScaleFailures(reason, message) + } +} + // WithPAStatusService annotates PA Status with the provided service name. func WithPAStatusService(svc string) PodAutoscalerOption { return func(pa *autoscalingv1alpha1.PodAutoscaler) { diff --git a/pkg/testing/v1/revision.go b/pkg/testing/v1/revision.go index a475bad0aa40..491ab69422e5 100644 --- a/pkg/testing/v1/revision.go +++ b/pkg/testing/v1/revision.go @@ -158,6 +158,12 @@ func MarkContainerHealthyUnknown(reason string) RevisionOption { } } +func MarkContainerHealthy() RevisionOption { + return func(r *v1.Revision) { + r.Status.MarkContainerHealthyTrue() + } +} + // MarkProgressDeadlineExceeded calls the method of the same name on the Revision // with the message we expect the Revision Reconciler to pass. func MarkProgressDeadlineExceeded(message string) RevisionOption { From 5ae57a6338ffe0df2e0adab62876f7e3a89eb0ae Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Mon, 20 Jan 2025 16:32:27 +0200 Subject: [PATCH 5/5] lint --- test/e2e/resource_quota_error_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/test/e2e/resource_quota_error_test.go b/test/e2e/resource_quota_error_test.go index 0ae0f5b8ddfe..0583de4f6c10 100644 --- a/test/e2e/resource_quota_error_test.go +++ b/test/e2e/resource_quota_error_test.go @@ -156,7 +156,6 @@ func TestResourceQuotaError(t *testing.T) { } return false, nil }) - if err != nil { t.Fatal("Failed to validate revision state:", err) }