From a780f68377d9d2f2255be67971e6faa150c742b1 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Thu, 31 Oct 2024 10:52:52 +0100 Subject: [PATCH] Fix consumer group build error and remove unused configs Signed-off-by: Pierangelo Di Pilato --- .../100-config-kafka-descheduler.yaml | 35 -------- .../100-config-kafka-scheduler.yaml | 39 --------- .../reconciler/consumergroup/controller.go | 86 ++----------------- 3 files changed, 8 insertions(+), 152 deletions(-) delete mode 100644 control-plane/config/eventing-kafka-broker/200-controller/100-config-kafka-descheduler.yaml delete mode 100644 control-plane/config/eventing-kafka-broker/200-controller/100-config-kafka-scheduler.yaml diff --git a/control-plane/config/eventing-kafka-broker/200-controller/100-config-kafka-descheduler.yaml b/control-plane/config/eventing-kafka-broker/200-controller/100-config-kafka-descheduler.yaml deleted file mode 100644 index 5aca0a4bfc..0000000000 --- a/control-plane/config/eventing-kafka-broker/200-controller/100-config-kafka-descheduler.yaml +++ /dev/null @@ -1,35 +0,0 @@ -# Copyright 2021 The Knative Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -apiVersion: v1 -kind: ConfigMap -metadata: - name: config-kafka-descheduler - namespace: knative-eventing - labels: - app.kubernetes.io/version: devel -data: - predicates: |+ - [] - priorities: |+ - [ - {"Name": "RemoveWithEvenPodSpreadPriority", - "Weight": 10, - "Args": "{\"MaxSkew\": 2}"}, - {"Name": "RemoveWithAvailabilityZonePriority", - "Weight": 10, - "Args": "{\"MaxSkew\": 2}"}, - {"Name": "RemoveWithHighestOrdinalPriority", - "Weight": 2} - ] diff --git a/control-plane/config/eventing-kafka-broker/200-controller/100-config-kafka-scheduler.yaml b/control-plane/config/eventing-kafka-broker/200-controller/100-config-kafka-scheduler.yaml deleted file mode 100644 index 13f0cc6bef..0000000000 --- a/control-plane/config/eventing-kafka-broker/200-controller/100-config-kafka-scheduler.yaml +++ /dev/null @@ -1,39 +0,0 @@ -# Copyright 2021 The Knative Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -apiVersion: v1 -kind: ConfigMap -metadata: - name: config-kafka-scheduler - namespace: knative-eventing - labels: - app.kubernetes.io/version: devel -data: - predicates: |+ - [ - {"Name": "PodFitsResources"}, - {"Name": "NoMaxResourceCount", - "Args": "{\"NumPartitions\": 100}"} - ] - priorities: |+ - [ - {"Name": "AvailabilityZonePriority", - "Weight": 10, - "Args": "{\"MaxSkew\": 2}"}, - {"Name": "LowestOrdinalPriority", - "Weight": 2}, - {"Name": "EvenPodSpread", - "Weight": 2, - "Args": "{\"MaxSkew\": 2}"} - ] diff --git a/control-plane/pkg/reconciler/consumergroup/controller.go b/control-plane/pkg/reconciler/consumergroup/controller.go index 881f7e6758..9155ab2698 100644 --- a/control-plane/pkg/reconciler/consumergroup/controller.go +++ b/control-plane/pkg/reconciler/consumergroup/controller.go @@ -18,13 +18,11 @@ package consumergroup import ( "context" - "encoding/json" "fmt" "strings" "time" "github.com/kelseyhightower/envconfig" - "go.uber.org/multierr" "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -44,7 +42,6 @@ import ( kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset" configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap" - nodeinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/node" podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/filtered" secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret" "knative.dev/pkg/configmap" @@ -92,11 +89,9 @@ type envConfig struct { } type SchedulerConfig struct { - StatefulSetName string - RefreshPeriod time.Duration - Capacity int32 - SchedulerPolicy *scheduler.SchedulerPolicy - DeSchedulerPolicy *scheduler.SchedulerPolicy + StatefulSetName string + RefreshPeriod time.Duration + Capacity int32 } func NewController(ctx context.Context, watcher configmap.Watcher) *controller.Impl { @@ -111,10 +106,8 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I dispatcherPodLister := dispatcherPodInformer.Lister() c := SchedulerConfig{ - RefreshPeriod: time.Duration(env.SchedulerRefreshPeriod) * time.Second, - Capacity: env.PodCapacity, - SchedulerPolicy: schedulerPolicyFromConfigMapOrFail(ctx, env.SchedulerPolicyConfigMap), - DeSchedulerPolicy: schedulerPolicyFromConfigMapOrFail(ctx, env.DeSchedulerPolicyConfigMap), + RefreshPeriod: time.Duration(env.SchedulerRefreshPeriod) * time.Second, + Capacity: env.PodCapacity, } schedulers := map[string]Scheduler{ @@ -333,11 +326,9 @@ func createKafkaScheduler(ctx context.Context, c SchedulerConfig, podLister core ctx, podLister, SchedulerConfig{ - StatefulSetName: ssName, - RefreshPeriod: c.RefreshPeriod, - Capacity: c.Capacity, - SchedulerPolicy: c.SchedulerPolicy, - DeSchedulerPolicy: c.DeSchedulerPolicy, + StatefulSetName: ssName, + RefreshPeriod: c.RefreshPeriod, + Capacity: c.Capacity, }, func() ([]scheduler.VPod, error) { consumerGroups, err := lister.List(labels.SelectorFromSet(getSelectorLabel(ssName))) @@ -380,12 +371,8 @@ func createStatefulSetScheduler(ctx context.Context, podLister corelisters.PodLi ScaleCacheConfig: scheduler.ScaleCacheConfig{RefreshPeriod: statefulSetScaleCacheRefreshPeriod}, PodCapacity: c.Capacity, RefreshPeriod: c.RefreshPeriod, - SchedulerPolicy: scheduler.MAXFILLUP, - SchedPolicy: c.SchedulerPolicy, - DeschedPolicy: c.DeSchedulerPolicy, Evictor: newEvictor(ctx, zap.String("kafka.eventing.knative.dev/component", "evictor")).evict, VPodLister: lister, - NodeLister: nodeinformer.Get(ctx).Lister(), PodLister: podLister.Pods(system.Namespace()), }) @@ -394,60 +381,3 @@ func createStatefulSetScheduler(ctx context.Context, podLister corelisters.PodLi SchedulerConfig: c, } } - -// schedulerPolicyFromConfigMapOrFail reads predicates and priorities data from configMap -func schedulerPolicyFromConfigMapOrFail(ctx context.Context, configMapName string) *scheduler.SchedulerPolicy { - p, err := schedulerPolicyFromConfigMap(ctx, configMapName) - if err != nil { - logging.FromContext(ctx).Fatal(zap.Error(err)) - } - return p -} - -// schedulerPolicyFromConfigMap reads predicates and priorities data from configMap -func schedulerPolicyFromConfigMap(ctx context.Context, configMapName string) (*scheduler.SchedulerPolicy, error) { - policyConfigMap, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(ctx, configMapName, metav1.GetOptions{}) - if err != nil { - return nil, fmt.Errorf("couldn't get scheduler policy config map %s/%s: %v", system.Namespace(), configMapName, err) - } - - logger := logging.FromContext(ctx). - Desugar(). - With(zap.String("configmap", configMapName)) - policy := &scheduler.SchedulerPolicy{} - - preds, found := policyConfigMap.Data["predicates"] - if !found { - return nil, fmt.Errorf("missing policy config map %s/%s value at key predicates", system.Namespace(), configMapName) - } - if err := json.NewDecoder(strings.NewReader(preds)).Decode(&policy.Predicates); err != nil { - return nil, fmt.Errorf("invalid policy %v: %v", preds, err) - } - - priors, found := policyConfigMap.Data["priorities"] - if !found { - return nil, fmt.Errorf("missing policy config map value at key priorities") - } - if err := json.NewDecoder(strings.NewReader(priors)).Decode(&policy.Priorities); err != nil { - return nil, fmt.Errorf("invalid policy %v: %v", preds, err) - } - - if errs := validatePolicy(policy); errs != nil { - return nil, multierr.Combine(err) - } - - logger.Info("Schedulers policy registration", zap.Any("policy", policy)) - - return policy, nil -} - -func validatePolicy(policy *scheduler.SchedulerPolicy) []error { - var validationErrors []error - - for _, priority := range policy.Priorities { - if priority.Weight < scheduler.MinWeight || priority.Weight > scheduler.MaxWeight { - validationErrors = append(validationErrors, fmt.Errorf("priority %s should have a positive weight applied to it or it has overflown", priority.Name)) - } - } - return validationErrors -}