diff --git a/control-plane/pkg/reconciler/consumergroup/consumergroup.go b/control-plane/pkg/reconciler/consumergroup/consumergroup.go index 83fac8389b..ec230a3d4d 100644 --- a/control-plane/pkg/reconciler/consumergroup/consumergroup.go +++ b/control-plane/pkg/reconciler/consumergroup/consumergroup.go @@ -449,7 +449,7 @@ func (r *Reconciler) schedule(ctx context.Context, cg *kafkainternals.ConsumerGr return cg.MarkScheduleConsumerFailed("Schedule", err) } - placements, err := statefulSetScheduler.Schedule(cg) + placements, err := statefulSetScheduler.Schedule(ctx, cg) if err != nil { return cg.MarkScheduleConsumerFailed("Schedule", err) } diff --git a/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go b/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go index c83bd6613e..37985e0706 100644 --- a/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go +++ b/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go @@ -61,12 +61,6 @@ import ( kedaclient "knative.dev/eventing-kafka-broker/third_party/pkg/client/injection/client/fake" ) -type SchedulerFunc func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) - -func (f SchedulerFunc) Schedule(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { - return f(vpod) -} - const ( testSchedulerKey = "scheduler" noTestScheduler = "no-scheduler" @@ -102,7 +96,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -189,7 +183,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -307,7 +301,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -402,7 +396,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -528,7 +522,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -702,7 +696,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -877,7 +871,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -1034,7 +1028,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, }, nil @@ -1121,7 +1115,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -1208,7 +1202,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -1303,7 +1297,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 2}, @@ -1426,7 +1420,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 2}, @@ -1533,7 +1527,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -1630,7 +1624,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return nil, io.EOF }), }, @@ -1758,7 +1752,7 @@ func TestReconcileKindNoAutoscaler(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -1922,7 +1916,7 @@ func TestFinalizeKind(t *testing.T) { }, Key: testKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return nil, nil }), }, @@ -1991,7 +1985,7 @@ func TestFinalizeKind(t *testing.T) { }, Key: testKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return nil, nil }), }, @@ -2117,7 +2111,7 @@ func TestFinalizeKind(t *testing.T) { }, Key: testKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return nil, nil }), }, @@ -2163,7 +2157,7 @@ func TestFinalizeKind(t *testing.T) { }, Key: testKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return nil, nil }), kafkatesting.ErrorOnDeleteConsumerGroupTestKey: sarama.ErrUnknownTopicOrPartition, @@ -2210,7 +2204,7 @@ func TestFinalizeKind(t *testing.T) { }, Key: testKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return nil, nil }), kafkatesting.ErrorOnDeleteConsumerGroupTestKey: sarama.ErrGroupIDNotFound, @@ -2258,7 +2252,7 @@ func TestFinalizeKind(t *testing.T) { WantErr: true, Key: testKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return nil, nil }), kafkatesting.ErrorOnDeleteConsumerGroupTestKey: sarama.ErrClusterAuthorizationFailed, diff --git a/control-plane/pkg/reconciler/consumergroup/controller.go b/control-plane/pkg/reconciler/consumergroup/controller.go index 83bf44bc3f..881f7e6758 100644 --- a/control-plane/pkg/reconciler/consumergroup/controller.go +++ b/control-plane/pkg/reconciler/consumergroup/controller.go @@ -32,6 +32,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apiserver/pkg/storage/names" + corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing" @@ -106,6 +107,9 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I logger.Panicf("unable to process required environment variables: %v", err) } + dispatcherPodInformer := podinformer.Get(ctx, eventing.DispatcherLabelSelectorStr) + dispatcherPodLister := dispatcherPodInformer.Lister() + c := SchedulerConfig{ RefreshPeriod: time.Duration(env.SchedulerRefreshPeriod) * time.Second, Capacity: env.PodCapacity, @@ -114,13 +118,11 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I } schedulers := map[string]Scheduler{ - KafkaSourceScheduler: createKafkaScheduler(ctx, c, kafkainternals.SourceStatefulSetName), - KafkaTriggerScheduler: createKafkaScheduler(ctx, c, kafkainternals.BrokerStatefulSetName), - KafkaChannelScheduler: createKafkaScheduler(ctx, c, kafkainternals.ChannelStatefulSetName), + KafkaSourceScheduler: createKafkaScheduler(ctx, c, dispatcherPodLister, kafkainternals.SourceStatefulSetName), + KafkaTriggerScheduler: createKafkaScheduler(ctx, c, dispatcherPodLister, kafkainternals.BrokerStatefulSetName), + KafkaChannelScheduler: createKafkaScheduler(ctx, c, dispatcherPodLister, kafkainternals.ChannelStatefulSetName), } - dispatcherPodInformer := podinformer.Get(ctx, eventing.DispatcherLabelSelectorStr) - r := &Reconciler{ SchedulerFunc: func(s string) (Scheduler, bool) { sched, ok := schedulers[strings.ToLower(s)]; return sched, ok }, ConsumerLister: consumer.Get(ctx).Lister(), @@ -325,10 +327,11 @@ func enqueueConsumerGroupFromConsumer(enqueue func(name types.NamespacedName)) f } } -func createKafkaScheduler(ctx context.Context, c SchedulerConfig, ssName string) Scheduler { +func createKafkaScheduler(ctx context.Context, c SchedulerConfig, podLister corelisters.PodLister, ssName string) Scheduler { lister := consumergroup.Get(ctx).Lister() return createStatefulSetScheduler( ctx, + podLister, SchedulerConfig{ StatefulSetName: ssName, RefreshPeriod: c.RefreshPeriod, @@ -370,7 +373,7 @@ func getSelectorLabel(ssName string) map[string]string { return selectorLabel } -func createStatefulSetScheduler(ctx context.Context, c SchedulerConfig, lister scheduler.VPodLister) Scheduler { +func createStatefulSetScheduler(ctx context.Context, podLister corelisters.PodLister, c SchedulerConfig, lister scheduler.VPodLister) Scheduler { ss, _ := statefulsetscheduler.New(ctx, &statefulsetscheduler.Config{ StatefulSetNamespace: system.Namespace(), StatefulSetName: c.StatefulSetName, @@ -383,6 +386,7 @@ func createStatefulSetScheduler(ctx context.Context, c SchedulerConfig, lister s Evictor: newEvictor(ctx, zap.String("kafka.eventing.knative.dev/component", "evictor")).evict, VPodLister: lister, NodeLister: nodeinformer.Get(ctx).Lister(), + PodLister: podLister.Pods(system.Namespace()), }) return Scheduler{ diff --git a/go.mod b/go.mod index b49ccb1413..913956f058 100644 --- a/go.mod +++ b/go.mod @@ -35,7 +35,7 @@ require ( k8s.io/apiserver v0.29.2 k8s.io/client-go v0.29.2 k8s.io/utils v0.0.0-20240102154912-e7106e64919e - knative.dev/eventing v0.42.0 + knative.dev/eventing v0.42.2-0.20240923151015-7fedbd08af70 knative.dev/hack v0.0.0-20240704013904-b9799599afcf knative.dev/pkg v0.0.0-20240716082220-4355f0c73608 knative.dev/reconciler-test v0.0.0-20240716134925-00d94f40c470 diff --git a/go.sum b/go.sum index d6f185826e..e1a222d1b4 100644 --- a/go.sum +++ b/go.sum @@ -1213,8 +1213,8 @@ k8s.io/utils v0.0.0-20200912215256-4140de9c8800/go.mod h1:jPW/WVKK9YHAvNhRxK0md/ k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20240102154912-e7106e64919e h1:eQ/4ljkx21sObifjzXwlPKpdGLrCfRziVtos3ofG/sQ= k8s.io/utils v0.0.0-20240102154912-e7106e64919e/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= -knative.dev/eventing v0.42.0 h1:pbPPhV4JlgpHBZxLBhJTUf+4HuZe5y/zlkOGHZfvtZ0= -knative.dev/eventing v0.42.0/go.mod h1:hW5BMYcihtCelT9pqaMtK8gmNOo1ybxcigjBY+/fU+k= +knative.dev/eventing v0.42.2-0.20240923151015-7fedbd08af70 h1:Cf6YhPrDySVcyIqHcvBonCQpyt0hlEYJuIF9pF5zIVo= +knative.dev/eventing v0.42.2-0.20240923151015-7fedbd08af70/go.mod h1:hW5BMYcihtCelT9pqaMtK8gmNOo1ybxcigjBY+/fU+k= knative.dev/hack v0.0.0-20240704013904-b9799599afcf h1:n92FmZRywgtHso7pFAku7CW0qvRAs1hXtMQqO0R6eiE= knative.dev/hack v0.0.0-20240704013904-b9799599afcf/go.mod h1:yk2OjGDsbEnQjfxdm0/HJKS2WqTLEFg/N6nUs6Rqx3Q= knative.dev/pkg v0.0.0-20240716082220-4355f0c73608 h1:BOiRzcnRS9Z5ruxlCiS/K1/Hb5bUN0X4W3xCegdcYQE= diff --git a/vendor/knative.dev/eventing/pkg/scheduler/scheduler.go b/vendor/knative.dev/eventing/pkg/scheduler/scheduler.go index 88e470d8b4..a9ca7b1d5a 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/scheduler.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/scheduler.go @@ -92,18 +92,18 @@ type Evictor func(pod *corev1.Pod, vpod VPod, from *duckv1alpha1.Placement) erro // Scheduler is responsible for placing VPods into real Kubernetes pods type Scheduler interface { // Schedule computes the new set of placements for vpod. - Schedule(vpod VPod) ([]duckv1alpha1.Placement, error) + Schedule(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error) } // SchedulerFunc type is an adapter to allow the use of // ordinary functions as Schedulers. If f is a function // with the appropriate signature, SchedulerFunc(f) is a // Scheduler that calls f. -type SchedulerFunc func(vpod VPod) ([]duckv1alpha1.Placement, error) +type SchedulerFunc func(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error) // Schedule implements the Scheduler interface. -func (f SchedulerFunc) Schedule(vpod VPod) ([]duckv1alpha1.Placement, error) { - return f(vpod) +func (f SchedulerFunc) Schedule(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error) { + return f(ctx, vpod) } // VPod represents virtual replicas placed into real Kubernetes pods diff --git a/vendor/knative.dev/eventing/pkg/scheduler/state/helpers.go b/vendor/knative.dev/eventing/pkg/scheduler/state/helpers.go index 5ec66b2156..ad3a5aaf76 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/state/helpers.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/state/helpers.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + "knative.dev/eventing/pkg/scheduler" ) @@ -55,10 +56,10 @@ func SatisfyZoneAvailability(feasiblePods []int32, states *State) bool { var zoneName string var err error for _, podID := range feasiblePods { - wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - zoneName, _, err = states.GetPodInfo(PodNameFromOrdinal(states.StatefulSetName, podID)) - return err == nil, nil - }) + zoneName, _, err = states.GetPodInfo(PodNameFromOrdinal(states.StatefulSetName, podID)) + if err != nil { + continue + } zoneMap[zoneName] = struct{}{} } return len(zoneMap) == int(states.NumZones) diff --git a/vendor/knative.dev/eventing/pkg/scheduler/state/state.go b/vendor/knative.dev/eventing/pkg/scheduler/state/state.go index aa84ca996f..44069babe9 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/state/state.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/state/state.go @@ -22,7 +22,6 @@ import ( "errors" "math" "strconv" - "time" "go.uber.org/zap" v1 "k8s.io/api/core/v1" @@ -30,9 +29,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" corev1 "k8s.io/client-go/listers/core/v1" - "knative.dev/pkg/logging" "knative.dev/eventing/pkg/scheduler" @@ -42,7 +39,7 @@ type StateAccessor interface { // State returns the current state (snapshot) about placed vpods // Take into account reserved vreplicas and update `reserved` to reflect // the current state. - State(reserved map[types.NamespacedName]map[string]int32) (*State, error) + State(ctx context.Context, reserved map[types.NamespacedName]map[string]int32) (*State, error) } // state provides information about the current scheduling of all vpods @@ -152,8 +149,6 @@ func (s *State) IsSchedulablePod(ordinal int32) bool { // stateBuilder reconstruct the state from scratch, by listing vpods type stateBuilder struct { - ctx context.Context - logger *zap.SugaredLogger vpodLister scheduler.VPodLister capacity int32 schedulerPolicy scheduler.SchedulerPolicyType @@ -166,11 +161,9 @@ type stateBuilder struct { } // NewStateBuilder returns a StateAccessor recreating the state from scratch each time it is requested -func NewStateBuilder(ctx context.Context, namespace, sfsname string, lister scheduler.VPodLister, podCapacity int32, schedulerPolicy scheduler.SchedulerPolicyType, schedPolicy *scheduler.SchedulerPolicy, deschedPolicy *scheduler.SchedulerPolicy, podlister corev1.PodNamespaceLister, nodeLister corev1.NodeLister, statefulSetCache *scheduler.ScaleCache) StateAccessor { +func NewStateBuilder(sfsname string, lister scheduler.VPodLister, podCapacity int32, schedulerPolicy scheduler.SchedulerPolicyType, schedPolicy, deschedPolicy *scheduler.SchedulerPolicy, podlister corev1.PodNamespaceLister, nodeLister corev1.NodeLister, statefulSetCache *scheduler.ScaleCache) StateAccessor { return &stateBuilder{ - ctx: ctx, - logger: logging.FromContext(ctx), vpodLister: lister, capacity: podCapacity, schedulerPolicy: schedulerPolicy, @@ -183,15 +176,18 @@ func NewStateBuilder(ctx context.Context, namespace, sfsname string, lister sche } } -func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) (*State, error) { +func (s *stateBuilder) State(ctx context.Context, reserved map[types.NamespacedName]map[string]int32) (*State, error) { vpods, err := s.vpodLister() if err != nil { return nil, err } - scale, err := s.statefulSetCache.GetScale(s.ctx, s.statefulSetName, metav1.GetOptions{}) + logger := logging.FromContext(ctx).With("subcomponent", "statebuilder") + ctx = logging.WithLogger(ctx, logger) + + scale, err := s.statefulSetCache.GetScale(ctx, s.statefulSetName, metav1.GetOptions{}) if err != nil { - s.logger.Infow("failed to get statefulset", zap.Error(err)) + logger.Infow("failed to get statefulset", zap.Error(err)) return nil, err } @@ -235,36 +231,35 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) } for podId := int32(0); podId < scale.Spec.Replicas && s.podLister != nil; podId++ { - var pod *v1.Pod - wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - pod, err = s.podLister.Get(PodNameFromOrdinal(s.statefulSetName, podId)) - return err == nil, nil - }) - - if pod != nil { - if isPodUnschedulable(pod) { - // Pod is marked for eviction - CANNOT SCHEDULE VREPS on this pod. - continue - } - - node, err := s.nodeLister.Get(pod.Spec.NodeName) - if err != nil { - return nil, err - } + pod, err := s.podLister.Get(PodNameFromOrdinal(s.statefulSetName, podId)) + if err != nil { + logger.Warnw("Failed to get pod", zap.Int32("ordinal", podId), zap.Error(err)) + continue + } + if isPodUnschedulable(pod) { + // Pod is marked for eviction - CANNOT SCHEDULE VREPS on this pod. + logger.Debugw("Pod is unschedulable", zap.Any("pod", pod)) + continue + } - if isNodeUnschedulable(node) { - // Node is marked as Unschedulable - CANNOT SCHEDULE VREPS on a pod running on this node. - continue - } + node, err := s.nodeLister.Get(pod.Spec.NodeName) + if err != nil { + return nil, err + } - // Pod has no annotation or not annotated as unschedulable and - // not on an unschedulable node, so add to feasible - schedulablePods.Insert(podId) + if isNodeUnschedulable(node) { + // Node is marked as Unschedulable - CANNOT SCHEDULE VREPS on a pod running on this node. + logger.Debugw("Pod is on an unschedulable node", zap.Any("pod", node)) + continue } + + // Pod has no annotation or not annotated as unschedulable and + // not on an unschedulable node, so add to feasible + schedulablePods.Insert(podId) } for _, p := range schedulablePods.List() { - free, last = s.updateFreeCapacity(free, last, PodNameFromOrdinal(s.statefulSetName, p), 0) + free, last = s.updateFreeCapacity(logger, free, last, PodNameFromOrdinal(s.statefulSetName, p), 0) } // Getting current state from existing placements for all vpods @@ -286,15 +281,14 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) // Account for reserved vreplicas vreplicas = withReserved(vpod.GetKey(), podName, vreplicas, reserved) - free, last = s.updateFreeCapacity(free, last, podName, vreplicas) + free, last = s.updateFreeCapacity(logger, free, last, podName, vreplicas) withPlacement[vpod.GetKey()][podName] = true - var pod *v1.Pod - wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - pod, err = s.podLister.Get(podName) - return err == nil, nil - }) + pod, err := s.podLister.Get(podName) + if err != nil { + logger.Warnw("Failed to get pod", zap.String("podName", podName), zap.Error(err)) + } if pod != nil && schedulablePods.Has(OrdinalFromPodName(pod.GetName())) { nodeName := pod.Spec.NodeName //node name for this pod @@ -315,11 +309,10 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) continue } - var pod *v1.Pod - wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - pod, err = s.podLister.Get(podName) - return err == nil, nil - }) + pod, err := s.podLister.Get(podName) + if err != nil { + logger.Warnw("Failed to get pod", zap.String("podName", podName), zap.Error(err)) + } if pod != nil && schedulablePods.Has(OrdinalFromPodName(pod.GetName())) { nodeName := pod.Spec.NodeName //node name for this pod @@ -330,7 +323,7 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) } } - free, last = s.updateFreeCapacity(free, last, podName, rvreplicas) + free, last = s.updateFreeCapacity(logger, free, last, podName, rvreplicas) } } @@ -338,7 +331,7 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) SchedulerPolicy: s.schedulerPolicy, SchedPolicy: s.schedPolicy, DeschedPolicy: s.deschedPolicy, NodeToZoneMap: nodeToZoneMap, StatefulSetName: s.statefulSetName, PodLister: s.podLister, PodSpread: podSpread, NodeSpread: nodeSpread, ZoneSpread: zoneSpread, Pending: pending, ExpectedVReplicaByVPod: expectedVReplicasByVPod} - s.logger.Infow("cluster state info", zap.Any("state", state), zap.Any("reserved", toJSONable(reserved))) + logger.Infow("cluster state info", zap.Any("state", state), zap.Any("reserved", toJSONable(reserved))) return state, nil } @@ -350,7 +343,7 @@ func pendingFromVPod(vpod scheduler.VPod) int32 { return int32(math.Max(float64(0), float64(expected-scheduled))) } -func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName string, vreplicas int32) ([]int32, int32) { +func (s *stateBuilder) updateFreeCapacity(logger *zap.SugaredLogger, free []int32, last int32, podName string, vreplicas int32) ([]int32, int32) { ordinal := OrdinalFromPodName(podName) free = grow(free, ordinal, s.capacity) @@ -359,7 +352,7 @@ func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName stri // Assert the pod is not overcommitted if free[ordinal] < 0 { // This should not happen anymore. Log as an error but do not interrupt the current scheduling. - s.logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal])) + logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal])) } if ordinal > last { diff --git a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go index 296feb16f2..3245dabc16 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go @@ -18,6 +18,7 @@ package statefulset import ( "context" + "fmt" "math" "sync" "sync/atomic" @@ -27,10 +28,8 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" - "knative.dev/pkg/reconciler" - "knative.dev/pkg/logging" + "knative.dev/pkg/reconciler" "knative.dev/eventing/pkg/scheduler" st "knative.dev/eventing/pkg/scheduler/state" @@ -58,9 +57,8 @@ type autoscaler struct { statefulSetCache *scheduler.ScaleCache statefulSetName string vpodLister scheduler.VPodLister - logger *zap.SugaredLogger stateAccessor st.StateAccessor - trigger chan struct{} + trigger chan context.Context evictor scheduler.Evictor // capacity is the total number of virtual replicas available per pod. @@ -68,7 +66,9 @@ type autoscaler struct { // refreshPeriod is how often the autoscaler tries to scale down the statefulset refreshPeriod time.Duration - lock sync.Locker + // retryPeriod is how often the autoscaler retry failed autoscale operations + retryPeriod time.Duration + lock sync.Locker // isLeader signals whether a given autoscaler instance is leader or not. // The autoscaler is considered the leader when ephemeralLeaderElectionObject is in a @@ -104,17 +104,17 @@ func (a *autoscaler) Demote(b reconciler.Bucket) { } } -func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAccessor, statefulSetCache *scheduler.ScaleCache) *autoscaler { - return &autoscaler{ - logger: logging.FromContext(ctx).With(zap.String("component", "autoscaler")), +func newAutoscaler(cfg *Config, stateAccessor st.StateAccessor, statefulSetCache *scheduler.ScaleCache) *autoscaler { + a := &autoscaler{ statefulSetCache: statefulSetCache, statefulSetName: cfg.StatefulSetName, vpodLister: cfg.VPodLister, stateAccessor: stateAccessor, evictor: cfg.Evictor, - trigger: make(chan struct{}, 1), + trigger: make(chan context.Context, 1), capacity: cfg.PodCapacity, refreshPeriod: cfg.RefreshPeriod, + retryPeriod: cfg.RetryPeriod, lock: new(sync.Mutex), isLeader: atomic.Bool{}, getReserved: cfg.getReserved, @@ -124,25 +124,38 @@ func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAcces Add(-cfg.RefreshPeriod). Add(-time.Minute), } + + if a.retryPeriod == 0 { + a.retryPeriod = time.Second + } + + return a } func (a *autoscaler) Start(ctx context.Context) { attemptScaleDown := false for { + autoscaleCtx := ctx select { case <-ctx.Done(): return case <-time.After(a.refreshPeriod): - a.logger.Infow("Triggering scale down", zap.Bool("isLeader", a.isLeader.Load())) + logging.FromContext(ctx).Infow("Triggering scale down", zap.Bool("isLeader", a.isLeader.Load())) attemptScaleDown = true - case <-a.trigger: - a.logger.Infow("Triggering scale up", zap.Bool("isLeader", a.isLeader.Load())) + case autoscaleCtx = <-a.trigger: + logging.FromContext(autoscaleCtx).Infow("Triggering scale up", zap.Bool("isLeader", a.isLeader.Load())) attemptScaleDown = false } // Retry a few times, just so that we don't have to wait for the next beat when // a transient error occurs - a.syncAutoscale(ctx, attemptScaleDown) + if err := a.syncAutoscale(autoscaleCtx, attemptScaleDown); err != nil { + logging.FromContext(autoscaleCtx).Errorw("Failed to sync autoscale", zap.Error(err)) + go func() { + time.Sleep(a.retryPeriod) + a.Autoscale(ctx) // Use top-level context for background retries + }() + } } } @@ -150,10 +163,10 @@ func (a *autoscaler) Autoscale(ctx context.Context) { select { // We trigger the autoscaler asynchronously by using the channel so that the scale down refresh // period is reset. - case a.trigger <- struct{}{}: + case a.trigger <- ctx: default: // We don't want to block if the channel's buffer is full, it will be triggered eventually. - + logging.FromContext(ctx).Debugw("Skipping autoscale since autoscale is in progress") } } @@ -161,36 +174,34 @@ func (a *autoscaler) syncAutoscale(ctx context.Context, attemptScaleDown bool) e a.lock.Lock() defer a.lock.Unlock() - var lastErr error - wait.Poll(500*time.Millisecond, 5*time.Second, func() (bool, error) { - err := a.doautoscale(ctx, attemptScaleDown) - if err != nil { - logging.FromContext(ctx).Errorw("Failed to autoscale", zap.Error(err)) - } - lastErr = err - return err == nil, nil - }) - return lastErr + if err := a.doautoscale(ctx, attemptScaleDown); err != nil { + return fmt.Errorf("failed to do autoscale: %w", err) + } + return nil } func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool) error { if !a.isLeader.Load() { return nil } - state, err := a.stateAccessor.State(a.getReserved()) + + logger := logging.FromContext(ctx).With("component", "autoscaler") + ctx = logging.WithLogger(ctx, logger) + + state, err := a.stateAccessor.State(ctx, a.getReserved()) if err != nil { - a.logger.Info("error while refreshing scheduler state (will retry)", zap.Error(err)) + logger.Info("error while refreshing scheduler state (will retry)", zap.Error(err)) return err } scale, err := a.statefulSetCache.GetScale(ctx, a.statefulSetName, metav1.GetOptions{}) if err != nil { // skip a beat - a.logger.Infow("failed to get scale subresource", zap.Error(err)) + logger.Infow("failed to get scale subresource", zap.Error(err)) return err } - a.logger.Debugw("checking adapter capacity", + logger.Debugw("checking adapter capacity", zap.Int32("replicas", scale.Spec.Replicas), zap.Any("state", state)) @@ -234,43 +245,43 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool) err if newreplicas != scale.Spec.Replicas { scale.Spec.Replicas = newreplicas - a.logger.Infow("updating adapter replicas", zap.Int32("replicas", scale.Spec.Replicas)) + logger.Infow("updating adapter replicas", zap.Int32("replicas", scale.Spec.Replicas)) _, err = a.statefulSetCache.UpdateScale(ctx, a.statefulSetName, scale, metav1.UpdateOptions{}) if err != nil { - a.logger.Errorw("updating scale subresource failed", zap.Error(err)) + logger.Errorw("updating scale subresource failed", zap.Error(err)) return err } } else if attemptScaleDown { // since the number of replicas hasn't changed and time has approached to scale down, // take the opportunity to compact the vreplicas - a.mayCompact(state, scaleUpFactor) + return a.mayCompact(logger, state, scaleUpFactor) } return nil } -func (a *autoscaler) mayCompact(s *st.State, scaleUpFactor int32) { +func (a *autoscaler) mayCompact(logger *zap.SugaredLogger, s *st.State, scaleUpFactor int32) error { // This avoids a too aggressive scale down by adding a "grace period" based on the refresh // period nextAttempt := a.lastCompactAttempt.Add(a.refreshPeriod) if time.Now().Before(nextAttempt) { - a.logger.Debugw("Compact was retried before refresh period", + logger.Debugw("Compact was retried before refresh period", zap.Time("lastCompactAttempt", a.lastCompactAttempt), zap.Time("nextAttempt", nextAttempt), zap.String("refreshPeriod", a.refreshPeriod.String()), ) - return + return nil } - a.logger.Debugw("Trying to compact and scale down", + logger.Debugw("Trying to compact and scale down", zap.Int32("scaleUpFactor", scaleUpFactor), zap.Any("state", s), ) // when there is only one pod there is nothing to move or number of pods is just enough! if s.LastOrdinal < 1 || len(s.SchedulablePods) <= int(scaleUpFactor) { - return + return nil } if s.SchedulerPolicy == scheduler.MAXFILLUP { @@ -283,7 +294,7 @@ func (a *autoscaler) mayCompact(s *st.State, scaleUpFactor int32) { a.lastCompactAttempt = time.Now() err := a.compact(s, scaleUpFactor) if err != nil { - a.logger.Errorw("vreplicas compaction failed", zap.Error(err)) + return fmt.Errorf("vreplicas compaction failed (scaleUpFactor %d): %w", scaleUpFactor, err) } } @@ -303,10 +314,11 @@ func (a *autoscaler) mayCompact(s *st.State, scaleUpFactor int32) { a.lastCompactAttempt = time.Now() err := a.compact(s, scaleUpFactor) if err != nil { - a.logger.Errorw("vreplicas compaction failed", zap.Error(err)) + return fmt.Errorf("vreplicas compaction failed (scaleUpFactor %d): %w", scaleUpFactor, err) } } } + return nil } func (a *autoscaler) compact(s *st.State, scaleUpFactor int32) error { @@ -323,16 +335,14 @@ func (a *autoscaler) compact(s *st.State, scaleUpFactor int32) error { ordinal := st.OrdinalFromPodName(placements[i].PodName) if ordinal == s.LastOrdinal-j { - wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - if s.PodLister != nil { - pod, err = s.PodLister.Get(placements[i].PodName) - } - return err == nil, nil - }) + pod, err = s.PodLister.Get(placements[i].PodName) + if err != nil { + return fmt.Errorf("failed to get pod %s: %w", placements[i].PodName, err) + } err = a.evictor(pod, vpod, &placements[i]) if err != nil { - return err + return fmt.Errorf("failed to evict pod %s: %w", pod.Name, err) } } } diff --git a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go index 931aca1550..6995d6ff45 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go @@ -28,18 +28,16 @@ import ( "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/informers" clientappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1" corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/utils/integer" + "knative.dev/pkg/logging" "knative.dev/pkg/reconciler" kubeclient "knative.dev/pkg/client/injection/kube/client" - statefulsetinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset" "knative.dev/pkg/controller" - "knative.dev/pkg/logging" - - podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod" duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" "knative.dev/eventing/pkg/scheduler" @@ -69,6 +67,8 @@ type Config struct { PodCapacity int32 `json:"podCapacity"` // Autoscaler refresh period RefreshPeriod time.Duration `json:"refreshPeriod"` + // Autoscaler retry period + RetryPeriod time.Duration `json:"retryPeriod"` SchedulerPolicy scheduler.SchedulerPolicyType `json:"schedulerPolicy"` SchedPolicy *scheduler.SchedulerPolicy `json:"schedPolicy"` @@ -78,6 +78,8 @@ type Config struct { VPodLister scheduler.VPodLister `json:"-"` NodeLister corev1listers.NodeLister `json:"-"` + // Pod lister for statefulset: StatefulSetNamespace / StatefulSetName + PodLister corev1listers.PodNamespaceLister `json:"-"` // getReserved returns reserved replicas getReserved GetReserved @@ -85,19 +87,20 @@ type Config struct { func New(ctx context.Context, cfg *Config) (scheduler.Scheduler, error) { - podInformer := podinformer.Get(ctx) - podLister := podInformer.Lister().Pods(cfg.StatefulSetNamespace) + if cfg.PodLister == nil { + return nil, fmt.Errorf("Config.PodLister is required") + } scaleCache := scheduler.NewScaleCache(ctx, cfg.StatefulSetNamespace, kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace), cfg.ScaleCacheConfig) - stateAccessor := st.NewStateBuilder(ctx, cfg.StatefulSetNamespace, cfg.StatefulSetName, cfg.VPodLister, cfg.PodCapacity, cfg.SchedulerPolicy, cfg.SchedPolicy, cfg.DeschedPolicy, podLister, cfg.NodeLister, scaleCache) + stateAccessor := st.NewStateBuilder(cfg.StatefulSetName, cfg.VPodLister, cfg.PodCapacity, cfg.SchedulerPolicy, cfg.SchedPolicy, cfg.DeschedPolicy, cfg.PodLister, cfg.NodeLister, scaleCache) var getReserved GetReserved cfg.getReserved = func() map[types.NamespacedName]map[string]int32 { return getReserved() } - autoscaler := newAutoscaler(ctx, cfg, stateAccessor, scaleCache) + autoscaler := newAutoscaler(cfg, stateAccessor, scaleCache) var wg sync.WaitGroup wg.Add(1) @@ -106,7 +109,7 @@ func New(ctx context.Context, cfg *Config) (scheduler.Scheduler, error) { autoscaler.Start(ctx) }() - s := newStatefulSetScheduler(ctx, cfg, stateAccessor, autoscaler, podLister) + s := newStatefulSetScheduler(ctx, cfg, stateAccessor, autoscaler) getReserved = s.Reserved wg.Done() @@ -125,12 +128,9 @@ func (p Pending) Total() int32 { // StatefulSetScheduler is a scheduler placing VPod into statefulset-managed set of pods type StatefulSetScheduler struct { - ctx context.Context - logger *zap.SugaredLogger statefulSetName string statefulSetNamespace string statefulSetClient clientappsv1.StatefulSetInterface - podLister corev1listers.PodNamespaceLister vpodLister scheduler.VPodLister lock sync.Locker stateAccessor st.StateAccessor @@ -168,16 +168,12 @@ func (s *StatefulSetScheduler) Demote(b reconciler.Bucket) { func newStatefulSetScheduler(ctx context.Context, cfg *Config, stateAccessor st.StateAccessor, - autoscaler Autoscaler, - podlister corev1listers.PodNamespaceLister) *StatefulSetScheduler { + autoscaler Autoscaler) *StatefulSetScheduler { scheduler := &StatefulSetScheduler{ - ctx: ctx, - logger: logging.FromContext(ctx), statefulSetNamespace: cfg.StatefulSetNamespace, statefulSetName: cfg.StatefulSetName, statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace), - podLister: podlister, vpodLister: cfg.VPodLister, lock: new(sync.Mutex), stateAccessor: stateAccessor, @@ -186,22 +182,38 @@ func newStatefulSetScheduler(ctx context.Context, } // Monitor our statefulset - statefulsetInformer := statefulsetinformer.Get(ctx) - statefulsetInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: controller.FilterWithNameAndNamespace(cfg.StatefulSetNamespace, cfg.StatefulSetName), - Handler: controller.HandleAll(scheduler.updateStatefulset), - }) + c := kubeclient.Get(ctx) + sif := informers.NewSharedInformerFactoryWithOptions(c, + controller.GetResyncPeriod(ctx), + informers.WithNamespace(cfg.StatefulSetNamespace), + ) + + sif.Apps().V1().StatefulSets().Informer(). + AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: controller.FilterWithNameAndNamespace(cfg.StatefulSetNamespace, cfg.StatefulSetName), + Handler: controller.HandleAll(func(i interface{}) { + scheduler.updateStatefulset(ctx, i) + }), + }) + + sif.Start(ctx.Done()) + _ = sif.WaitForCacheSync(ctx.Done()) + + go func() { + <-ctx.Done() + sif.Shutdown() + }() return scheduler } -func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Placement, error) { +func (s *StatefulSetScheduler) Schedule(ctx context.Context, vpod scheduler.VPod) ([]duckv1alpha1.Placement, error) { s.lock.Lock() defer s.lock.Unlock() s.reservedMu.Lock() defer s.reservedMu.Unlock() - placements, err := s.scheduleVPod(vpod) + placements, err := s.scheduleVPod(ctx, vpod) if placements == nil { return placements, err } @@ -216,11 +228,13 @@ func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Pla return placements, err } -func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1.Placement, error) { - logger := s.logger.With("key", vpod.GetKey(), zap.String("component", "scheduler")) +func (s *StatefulSetScheduler) scheduleVPod(ctx context.Context, vpod scheduler.VPod) ([]duckv1alpha1.Placement, error) { + logger := logging.FromContext(ctx).With("key", vpod.GetKey(), zap.String("component", "scheduler")) + ctx = logging.WithLogger(ctx, logger) + // Get the current placements state // Quite an expensive operation but safe and simple. - state, err := s.stateAccessor.State(s.reserved) + state, err := s.stateAccessor.State(ctx, s.reserved) if err != nil { logger.Debug("error while refreshing scheduler state (will retry)", zap.Error(err)) return nil, err @@ -258,13 +272,15 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 } // Handle overcommitted pods. - if state.FreeCap[ordinal] < 0 { + if state.Free(ordinal) < 0 { // vr > free => vr: 9, overcommit 4 -> free: 0, vr: 5, pending: +4 // vr = free => vr: 4, overcommit 4 -> free: 0, vr: 0, pending: +4 // vr < free => vr: 3, overcommit 4 -> free: -1, vr: 0, pending: +3 overcommit := -state.FreeCap[ordinal] + logger.Debugw("overcommit", zap.Any("overcommit", overcommit), zap.Any("placement", p)) + if p.VReplicas >= overcommit { state.SetFree(ordinal, 0) state.Pending[vpod.GetKey()] += overcommit @@ -301,7 +317,9 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 if state.SchedulerPolicy != "" { // Need less => scale down if tr > vpod.GetVReplicas() { - logger.Debugw("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) + logger.Debugw("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()), + zap.Any("placements", placements), + zap.Any("existingPlacements", existingPlacements)) placements = s.removeReplicas(tr-vpod.GetVReplicas(), placements) @@ -311,15 +329,19 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 } // Need more => scale up - logger.Debugw("scaling up", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) + logger.Debugw("scaling up", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()), + zap.Any("placements", placements), + zap.Any("existingPlacements", existingPlacements)) placements, left = s.addReplicas(state, vpod.GetVReplicas()-tr, placements) } else { //Predicates and priorities must be used for scheduling // Need less => scale down if tr > vpod.GetVReplicas() && state.DeschedPolicy != nil { - logger.Infow("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) - placements = s.removeReplicasWithPolicy(vpod, tr-vpod.GetVReplicas(), placements) + logger.Infow("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()), + zap.Any("placements", placements), + zap.Any("existingPlacements", existingPlacements)) + placements = s.removeReplicasWithPolicy(ctx, vpod, tr-vpod.GetVReplicas(), placements) // Do not trigger the autoscaler to avoid unnecessary churn @@ -331,8 +353,10 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 // Need more => scale up // rebalancing needed for all vreps most likely since there are pending vreps from previous reconciliation // can fall here when vreps scaled up or after eviction - logger.Infow("scaling up with a rebalance (if needed)", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) - placements, left = s.rebalanceReplicasWithPolicy(vpod, vpod.GetVReplicas(), placements) + logger.Infow("scaling up with a rebalance (if needed)", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()), + zap.Any("placements", placements), + zap.Any("existingPlacements", existingPlacements)) + placements, left = s.rebalanceReplicasWithPolicy(ctx, vpod, vpod.GetVReplicas(), placements) } } @@ -343,10 +367,10 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 // Trigger the autoscaler if s.autoscaler != nil { logger.Infow("Awaiting autoscaler", zap.Any("placement", placements), zap.Int32("left", left)) - s.autoscaler.Autoscale(s.ctx) + s.autoscaler.Autoscale(ctx) } - if state.SchedPolicy != nil { + if state.SchedulerPolicy == "" && state.SchedPolicy != nil { logger.Info("reverting to previous placements") s.reservePlacements(vpod, existingPlacements) // rebalancing doesn't care about new placements since all vreps will be re-placed return existingPlacements, s.notEnoughPodReplicas(left) // requeue to wait for the autoscaler to do its job @@ -368,25 +392,25 @@ func toJSONable(pending map[types.NamespacedName]int32) map[string]int32 { return r } -func (s *StatefulSetScheduler) rebalanceReplicasWithPolicy(vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) ([]duckv1alpha1.Placement, int32) { +func (s *StatefulSetScheduler) rebalanceReplicasWithPolicy(ctx context.Context, vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) ([]duckv1alpha1.Placement, int32) { s.makeZeroPlacements(vpod, placements) - placements, diff = s.addReplicasWithPolicy(vpod, diff, make([]duckv1alpha1.Placement, 0)) //start fresh with a new placements list + placements, diff = s.addReplicasWithPolicy(ctx, vpod, diff, make([]duckv1alpha1.Placement, 0)) //start fresh with a new placements list return placements, diff } -func (s *StatefulSetScheduler) removeReplicasWithPolicy(vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) []duckv1alpha1.Placement { - logger := s.logger.Named("remove replicas with policy") +func (s *StatefulSetScheduler) removeReplicasWithPolicy(ctx context.Context, vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) []duckv1alpha1.Placement { + logger := logging.FromContext(ctx).Named("remove replicas with policy") numVreps := diff for i := int32(0); i < numVreps; i++ { //deschedule one vreplica at a time - state, err := s.stateAccessor.State(s.reserved) + state, err := s.stateAccessor.State(ctx, s.reserved) if err != nil { logger.Info("error while refreshing scheduler state (will retry)", zap.Error(err)) return placements } - feasiblePods := s.findFeasiblePods(s.ctx, state, vpod, state.DeschedPolicy) + feasiblePods := s.findFeasiblePods(ctx, state, vpod, state.DeschedPolicy) feasiblePods = s.removePodsNotInPlacement(vpod, feasiblePods) if len(feasiblePods) == 1 { //nothing to score, remove vrep from that pod placementPodID := feasiblePods[0] @@ -397,7 +421,7 @@ func (s *StatefulSetScheduler) removeReplicasWithPolicy(vpod scheduler.VPod, dif continue } - priorityList, err := s.prioritizePods(s.ctx, state, vpod, feasiblePods, state.DeschedPolicy) + priorityList, err := s.prioritizePods(ctx, state, vpod, feasiblePods, state.DeschedPolicy) if err != nil { logger.Info("error while scoring pods using priorities", zap.Error(err)) s.reservePlacements(vpod, placements) @@ -443,13 +467,13 @@ func (s *StatefulSetScheduler) removeSelectionFromPlacements(placementPodID int3 return newPlacements } -func (s *StatefulSetScheduler) addReplicasWithPolicy(vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) ([]duckv1alpha1.Placement, int32) { - logger := s.logger.Named("add replicas with policy") +func (s *StatefulSetScheduler) addReplicasWithPolicy(ctx context.Context, vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) ([]duckv1alpha1.Placement, int32) { + logger := logging.FromContext(ctx).Named("add replicas with policy") numVreps := diff for i := int32(0); i < numVreps; i++ { //schedule one vreplica at a time (find most suitable pod placement satisying predicates with high score) // Get the current placements state - state, err := s.stateAccessor.State(s.reserved) + state, err := s.stateAccessor.State(ctx, s.reserved) if err != nil { logger.Info("error while refreshing scheduler state (will retry)", zap.Error(err)) return placements, diff @@ -462,7 +486,7 @@ func (s *StatefulSetScheduler) addReplicasWithPolicy(vpod scheduler.VPod, diff i break //end the iteration for all vreps since there are not pods } - feasiblePods := s.findFeasiblePods(s.ctx, state, vpod, state.SchedPolicy) + feasiblePods := s.findFeasiblePods(ctx, state, vpod, state.SchedPolicy) if len(feasiblePods) == 0 { //no pods available to schedule this vreplica logger.Info("no feasible pods available to schedule this vreplica") s.reservePlacements(vpod, placements) @@ -480,7 +504,7 @@ func (s *StatefulSetScheduler) addReplicasWithPolicy(vpod scheduler.VPod, diff i continue } */ - priorityList, err := s.prioritizePods(s.ctx, state, vpod, feasiblePods, state.SchedPolicy) + priorityList, err := s.prioritizePods(ctx, state, vpod, feasiblePods, state.SchedPolicy) if err != nil { logger.Info("error while scoring pods using priorities", zap.Error(err)) s.reservePlacements(vpod, placements) @@ -555,7 +579,7 @@ func (s *StatefulSetScheduler) removePodsNotInPlacement(vpod scheduler.VPod, fea // prioritizePods prioritizes the pods by running the score plugins, which return a score for each pod. // The scores from each plugin are added together to make the score for that pod. func (s *StatefulSetScheduler) prioritizePods(ctx context.Context, states *st.State, vpod scheduler.VPod, feasiblePods []int32, policy *scheduler.SchedulerPolicy) (st.PodScoreList, error) { - logger := s.logger.Named("prioritize all feasible pods") + logger := logging.FromContext(ctx).Named("prioritize all feasible pods") // If no priority configs are provided, then all pods will have a score of one result := make(st.PodScoreList, 0, len(feasiblePods)) @@ -618,7 +642,7 @@ func (s *StatefulSetScheduler) selectPod(podScoreList st.PodScoreList) (int32, e // If any of these plugins doesn't return "Success", the pod is not suitable for placing the vrep. // Meanwhile, the failure message and status are set for the given pod. func (s *StatefulSetScheduler) RunFilterPlugins(ctx context.Context, states *st.State, vpod scheduler.VPod, podID int32, policy *scheduler.SchedulerPolicy) st.PluginToStatus { - logger := s.logger.Named("run all filter plugins") + logger := logging.FromContext(ctx).Named("run all filter plugins") statuses := make(st.PluginToStatus) for _, plugin := range policy.Predicates { @@ -651,7 +675,7 @@ func (s *StatefulSetScheduler) runFilterPlugin(ctx context.Context, pl st.Filter // RunScorePlugins runs the set of configured scoring plugins. It returns a list that stores for each scoring plugin name the corresponding PodScoreList(s). // It also returns *Status, which is set to non-success if any of the plugins returns a non-success status. func (s *StatefulSetScheduler) RunScorePlugins(ctx context.Context, states *st.State, vpod scheduler.VPod, feasiblePods []int32, policy *scheduler.SchedulerPolicy) (st.PluginToPodScores, *st.Status) { - logger := s.logger.Named("run all score plugins") + logger := logging.FromContext(ctx).Named("run all score plugins") pluginToPodScores := make(st.PluginToPodScores, len(policy.Priorities)) for _, plugin := range policy.Priorities { @@ -764,10 +788,11 @@ func (s *StatefulSetScheduler) addReplicas(states *st.State, diff int32, placeme return newPlacements, diff } -func (s *StatefulSetScheduler) updateStatefulset(obj interface{}) { +func (s *StatefulSetScheduler) updateStatefulset(ctx context.Context, obj interface{}) { statefulset, ok := obj.(*appsv1.StatefulSet) if !ok { - s.logger.Fatalw("expected a Statefulset object", zap.Any("object", obj)) + logging.FromContext(ctx).Warnw("expected a Statefulset object", zap.Any("object", obj)) + return } s.lock.Lock() @@ -777,7 +802,7 @@ func (s *StatefulSetScheduler) updateStatefulset(obj interface{}) { s.replicas = 1 } else if s.replicas != *statefulset.Spec.Replicas { s.replicas = *statefulset.Spec.Replicas - s.logger.Infow("statefulset replicas updated", zap.Int32("replicas", s.replicas)) + logging.FromContext(ctx).Infow("statefulset replicas updated", zap.Int32("replicas", s.replicas)) } } diff --git a/vendor/modules.txt b/vendor/modules.txt index 390d93b83f..97ba0c93ac 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1142,7 +1142,7 @@ k8s.io/utils/pointer k8s.io/utils/ptr k8s.io/utils/strings/slices k8s.io/utils/trace -# knative.dev/eventing v0.42.0 +# knative.dev/eventing v0.42.2-0.20240923151015-7fedbd08af70 ## explicit; go 1.22 knative.dev/eventing/cmd/event_display knative.dev/eventing/cmd/heartbeats