From 1621eb0c11fb609574754233e8edec8af6e86744 Mon Sep 17 00:00:00 2001 From: Knative Automation Date: Tue, 14 Jan 2025 11:26:02 -0500 Subject: [PATCH] upgrade to latest dependencies (#4213) bumping knative.dev/eventing 7da3cee...96ab579: bumping knative.dev/hack 05b2fb3...30344ae: > 30344ae Export KO_FLAGS for consuming scripts (# 402) Signed-off-by: Knative Automation --- go.mod | 4 +- go.sum | 8 +-- .../eventing/pkg/scheduler/scheduler.go | 2 - .../eventing/pkg/scheduler/state/state.go | 56 +++++++++++-------- .../pkg/scheduler/statefulset/autoscaler.go | 18 ++++-- vendor/knative.dev/hack/release.sh | 2 +- vendor/modules.txt | 4 +- 7 files changed, 55 insertions(+), 39 deletions(-) diff --git a/go.mod b/go.mod index 2ae73faed6..42a649053d 100644 --- a/go.mod +++ b/go.mod @@ -35,8 +35,8 @@ require ( k8s.io/apiserver v0.30.3 k8s.io/client-go v0.30.3 k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 - knative.dev/eventing v0.43.4-0.20241219143011-7da3cee603c5 - knative.dev/hack v0.0.0-20241010131451-05b2fb30cb4d + knative.dev/eventing v0.43.3 + knative.dev/hack v0.0.0-20250114120502-30344aeba756 knative.dev/pkg v0.0.0-20241021183759-9b9d535af5ad knative.dev/reconciler-test v0.0.0-20241015093232-09111f0f1364 sigs.k8s.io/controller-runtime v0.12.3 diff --git a/go.sum b/go.sum index e7f4438bfd..cbe8180ee8 100644 --- a/go.sum +++ b/go.sum @@ -1214,10 +1214,10 @@ k8s.io/utils v0.0.0-20200912215256-4140de9c8800/go.mod h1:jPW/WVKK9YHAvNhRxK0md/ k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 h1:pUdcCO1Lk/tbT5ztQWOBi5HBgbBP1J8+AsQnQCKsi8A= k8s.io/utils v0.0.0-20240711033017-18e509b52bc8/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= -knative.dev/eventing v0.43.4-0.20241219143011-7da3cee603c5 h1:R1j048Vx6XeMvVTHbPirDsovpUAk4SNWKhBU703l6rk= -knative.dev/eventing v0.43.4-0.20241219143011-7da3cee603c5/go.mod h1:pdrF+bEUfRkNn9ifWXS7DoVj5W31gA5KQVd8iwplXUo= -knative.dev/hack v0.0.0-20241010131451-05b2fb30cb4d h1:aCfX7kwkvgGxXXGbso5tLqdwQmzBkJ9d+EIRwksKTvk= -knative.dev/hack v0.0.0-20241010131451-05b2fb30cb4d/go.mod h1:R0ritgYtjLDO9527h5vb5X6gfvt5LCrJ55BNbVDsWiY= +knative.dev/eventing v0.43.3 h1:xt10rMoe6t1hFPPrnsR5NYe/vWRXQqLgRK8lS6n5mU8= +knative.dev/eventing v0.43.3/go.mod h1:pdrF+bEUfRkNn9ifWXS7DoVj5W31gA5KQVd8iwplXUo= +knative.dev/hack v0.0.0-20250114120502-30344aeba756 h1:WOZy3XeC4frTOCHmmPjPj70ojyeAjO8MtNfO02nMq0w= +knative.dev/hack v0.0.0-20250114120502-30344aeba756/go.mod h1:R0ritgYtjLDO9527h5vb5X6gfvt5LCrJ55BNbVDsWiY= knative.dev/pkg v0.0.0-20241021183759-9b9d535af5ad h1:Nrjtr2H168rJeamH4QdyLMV1lEKHejNhaj1ymgQMfLk= knative.dev/pkg v0.0.0-20241021183759-9b9d535af5ad/go.mod h1:StJI72GWcm/iErmk4RqFJiOo8RLbVqPbHxUqeVwAzeo= knative.dev/reconciler-test v0.0.0-20241015093232-09111f0f1364 h1:DIc+vbaFKOSGktPXJ1MaXIXoDjlmUIXQkHiZaPcYGbQ= diff --git a/vendor/knative.dev/eventing/pkg/scheduler/scheduler.go b/vendor/knative.dev/eventing/pkg/scheduler/scheduler.go index 0dd4f2b6c8..62dcf163d2 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/scheduler.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/scheduler.go @@ -42,8 +42,6 @@ type VPodLister func() ([]VPod, error) // Evictor allows for vreplicas to be evicted. // For instance, the evictor is used by the statefulset scheduler to // move vreplicas to pod with a lower ordinal. -// -// pod might be `nil`. type Evictor func(pod *corev1.Pod, vpod VPod, from *duckv1alpha1.Placement) error // Scheduler is responsible for placing VPods into real Kubernetes pods diff --git a/vendor/knative.dev/eventing/pkg/scheduler/state/state.go b/vendor/knative.dev/eventing/pkg/scheduler/state/state.go index 4f3ed65979..9d4503b915 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/state/state.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/state/state.go @@ -44,14 +44,15 @@ type StateAccessor interface { // It is used by for the scheduler and the autoscaler type State struct { // free tracks the free capacity of each pod. - // - // Including pods that might not exist anymore, it reflects the free capacity determined by - // placements in the vpod status. FreeCap []int32 // schedulable pods tracks the pods that aren't being evicted. SchedulablePods []int32 + // LastOrdinal is the ordinal index corresponding to the last statefulset replica + // with placed vpods. + LastOrdinal int32 + // Pod capacity. Capacity int32 @@ -142,10 +143,14 @@ func (s *stateBuilder) State(ctx context.Context) (*State, error) { return nil, err } - freeCap := make([]int32, 0) + free := make([]int32, 0) pending := make(map[types.NamespacedName]int32, 4) expectedVReplicasByVPod := make(map[types.NamespacedName]int32, len(vpods)) schedulablePods := sets.NewInt32() + last := int32(-1) + + // keep track of (vpod key, podname) pairs with existing placements + withPlacement := make(map[types.NamespacedName]map[string]bool) podSpread := make(map[types.NamespacedName]map[string]int32) @@ -167,7 +172,7 @@ func (s *stateBuilder) State(ctx context.Context) (*State, error) { } for _, p := range schedulablePods.List() { - freeCap = s.updateFreeCapacity(logger, freeCap, PodNameFromOrdinal(s.statefulSetName, p), 0) + free, last = s.updateFreeCapacity(logger, free, last, PodNameFromOrdinal(s.statefulSetName, p), 0) } // Getting current state from existing placements for all vpods @@ -177,13 +182,16 @@ func (s *stateBuilder) State(ctx context.Context) (*State, error) { pending[vpod.GetKey()] = pendingFromVPod(vpod) expectedVReplicasByVPod[vpod.GetKey()] = vpod.GetVReplicas() + withPlacement[vpod.GetKey()] = make(map[string]bool) podSpread[vpod.GetKey()] = make(map[string]int32) for i := 0; i < len(ps); i++ { podName := ps[i].PodName vreplicas := ps[i].VReplicas - freeCap = s.updateFreeCapacity(logger, freeCap, podName, vreplicas) + free, last = s.updateFreeCapacity(logger, free, last, podName, vreplicas) + + withPlacement[vpod.GetKey()][podName] = true pod, err := s.podLister.Get(podName) if err != nil { @@ -196,17 +204,8 @@ func (s *stateBuilder) State(ctx context.Context) (*State, error) { } } - state := &State{ - FreeCap: freeCap, - SchedulablePods: schedulablePods.List(), - Capacity: s.capacity, - Replicas: scale.Spec.Replicas, - StatefulSetName: s.statefulSetName, - PodLister: s.podLister, - PodSpread: podSpread, - Pending: pending, - ExpectedVReplicaByVPod: expectedVReplicasByVPod, - } + state := &State{FreeCap: free, SchedulablePods: schedulablePods.List(), LastOrdinal: last, Capacity: s.capacity, Replicas: scale.Spec.Replicas, StatefulSetName: s.statefulSetName, PodLister: s.podLister, + PodSpread: podSpread, Pending: pending, ExpectedVReplicaByVPod: expectedVReplicasByVPod} logger.Infow("cluster state info", zap.Any("state", state)) @@ -220,19 +219,23 @@ func pendingFromVPod(vpod scheduler.VPod) int32 { return int32(math.Max(float64(0), float64(expected-scheduled))) } -func (s *stateBuilder) updateFreeCapacity(logger *zap.SugaredLogger, freeCap []int32, podName string, vreplicas int32) []int32 { +func (s *stateBuilder) updateFreeCapacity(logger *zap.SugaredLogger, free []int32, last int32, podName string, vreplicas int32) ([]int32, int32) { ordinal := OrdinalFromPodName(podName) - freeCap = grow(freeCap, ordinal, s.capacity) + free = grow(free, ordinal, s.capacity) - freeCap[ordinal] -= vreplicas + free[ordinal] -= vreplicas // Assert the pod is not overcommitted - if overcommit := freeCap[ordinal]; overcommit < 0 { + if free[ordinal] < 0 { // This should not happen anymore. Log as an error but do not interrupt the current scheduling. - logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("overcommit", overcommit)) + logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal])) + } + + if ordinal > last { + last = ordinal } - return freeCap + return free, last } func (s *State) TotalPending() int32 { @@ -280,16 +283,23 @@ func (s *State) MarshalJSON() ([]byte, error) { type S struct { FreeCap []int32 `json:"freeCap"` SchedulablePods []int32 `json:"schedulablePods"` + LastOrdinal int32 `json:"lastOrdinal"` Capacity int32 `json:"capacity"` Replicas int32 `json:"replicas"` + NumZones int32 `json:"numZones"` + NumNodes int32 `json:"numNodes"` + NodeToZoneMap map[string]string `json:"nodeToZoneMap"` StatefulSetName string `json:"statefulSetName"` PodSpread map[string]map[string]int32 `json:"podSpread"` + NodeSpread map[string]map[string]int32 `json:"nodeSpread"` + ZoneSpread map[string]map[string]int32 `json:"zoneSpread"` Pending map[string]int32 `json:"pending"` } sj := S{ FreeCap: s.FreeCap, SchedulablePods: s.SchedulablePods, + LastOrdinal: s.LastOrdinal, Capacity: s.Capacity, Replicas: s.Replicas, StatefulSetName: s.StatefulSetName, diff --git a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go index 653ec12f15..8b61ca4a83 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go @@ -26,7 +26,6 @@ import ( "go.uber.org/zap" v1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/integer" @@ -251,8 +250,17 @@ func (a *autoscaler) mayCompact(logger *zap.SugaredLogger, s *st.State) error { zap.Any("state", s), ) - // Determine if there are vpods that need compaction - if s.Replicas != int32(len(s.FreeCap)) { + // when there is only one pod there is nothing to move or number of pods is just enough! + if s.LastOrdinal < 1 || len(s.SchedulablePods) <= 1 { + return nil + } + + // Determine if there is enough free capacity to + // move all vreplicas placed in the last pod to pods with a lower ordinal + freeCapacity := s.FreeCapacity() - s.Free(s.LastOrdinal) + usedInLastPod := s.Capacity - s.Free(s.LastOrdinal) + + if freeCapacity >= usedInLastPod { a.lastCompactAttempt = time.Now() err := a.compact(s) if err != nil { @@ -277,9 +285,9 @@ func (a *autoscaler) compact(s *st.State) error { for i := len(placements) - 1; i >= 0; i-- { //start from the last placement ordinal := st.OrdinalFromPodName(placements[i].PodName) - if ordinal >= s.Replicas { + if ordinal == s.LastOrdinal { pod, err = s.PodLister.Get(placements[i].PodName) - if err != nil && !apierrors.IsNotFound(err) { + if err != nil { return fmt.Errorf("failed to get pod %s: %w", placements[i].PodName, err) } diff --git a/vendor/knative.dev/hack/release.sh b/vendor/knative.dev/hack/release.sh index 450c0671b1..a378a5f805 100644 --- a/vendor/knative.dev/hack/release.sh +++ b/vendor/knative.dev/hack/release.sh @@ -75,7 +75,7 @@ RELEASE_NOTES="" RELEASE_BRANCH="" RELEASE_GCS_BUCKET="knative-nightly/${REPO_NAME}" RELEASE_DIR="" -KO_FLAGS="-P --platform=all" +export KO_FLAGS="-P --platform=all" VALIDATION_TESTS="./test/presubmit-tests.sh" ARTIFACTS_TO_PUBLISH="" FROM_NIGHTLY_RELEASE="" diff --git a/vendor/modules.txt b/vendor/modules.txt index dadd5ae00f..46ef6dc4fd 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1167,7 +1167,7 @@ k8s.io/utils/pointer k8s.io/utils/ptr k8s.io/utils/strings/slices k8s.io/utils/trace -# knative.dev/eventing v0.43.4-0.20241219143011-7da3cee603c5 +# knative.dev/eventing v0.43.3 ## explicit; go 1.22.0 knative.dev/eventing/cmd/event_display knative.dev/eventing/cmd/heartbeats @@ -1342,7 +1342,7 @@ knative.dev/eventing/test/upgrade/prober/wathola/fetcher knative.dev/eventing/test/upgrade/prober/wathola/forwarder knative.dev/eventing/test/upgrade/prober/wathola/receiver knative.dev/eventing/test/upgrade/prober/wathola/sender -# knative.dev/hack v0.0.0-20241010131451-05b2fb30cb4d +# knative.dev/hack v0.0.0-20250114120502-30344aeba756 ## explicit; go 1.21 knative.dev/hack # knative.dev/pkg v0.0.0-20241021183759-9b9d535af5ad