From 02597ce89f2c4a59b384e4d628be4c517a996b87 Mon Sep 17 00:00:00 2001 From: "wangzhe.21" Date: Tue, 11 Jun 2024 17:21:53 +0800 Subject: [PATCH] skip pod when getTopologyHints --- pkg/agent/orm/manager.go | 31 +++++++--- pkg/agent/orm/manager_test.go | 99 +++++++++++++++++++++++++++++++ pkg/agent/orm/resourceprovider.go | 5 +- 3 files changed, 123 insertions(+), 12 deletions(-) diff --git a/pkg/agent/orm/manager.go b/pkg/agent/orm/manager.go index f1d9156f2..9ee2c844d 100644 --- a/pkg/agent/orm/manager.go +++ b/pkg/agent/orm/manager.go @@ -212,6 +212,15 @@ func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map return nil } + skipPod, err := isSkippedPod(pod, m.qosConfig) + if err != nil { + klog.Errorf("[ORM] check skip pod fail for pod: %v, err: %v", pod.Name, err) + return nil + } + if skipPod { + return nil + } + podUID := string(pod.UID) contName := container.Name containerType, containerIndex, err := GetContainerTypeAndIndex(pod, container) @@ -415,13 +424,12 @@ func (m *ManagerImpl) processDeletePod(podUID string) error { func (m *ManagerImpl) addContainer(pod *v1.Pod, container *v1.Container) error { klog.V(5).Infof("[ORM] addContainer, pod: %v, container: %v", pod.Name, container.Name) - systemCores, err := isPodKatalystQoSLevelSystemCores(m.qosConfig, pod) + skipPod, err := isSkippedPod(pod, m.qosConfig) if err != nil { klog.Errorf("[ORM] check pod %s qos level fail: %v", pod.Name, err) return err } - - if native.CheckDaemonPod(pod) && !systemCores { + if skipPod { klog.Infof("[ORM] skip pod: %s/%s, container: %s resource allocation", pod.Namespace, pod.Name, container.Name) return nil @@ -563,14 +571,15 @@ func (m *ManagerImpl) reconcile() { if pod == nil { continue } - systemCores, err := isPodKatalystQoSLevelSystemCores(m.qosConfig, pod) + skipPod, err := isSkippedPod(pod, m.qosConfig) if err != nil { klog.Errorf("[ORM] check pod %s qos level fail: %v", pod.Name, err) + continue } - - if native.CheckDaemonPod(pod) && !systemCores { + if skipPod { continue } + for _, container := range pod.Spec.Containers { needsReAllocate := false @@ -732,13 +741,17 @@ func isSkippedContainer(pod *v1.Pod, container *v1.Container) bool { return containerType == pluginapi.ContainerType_INIT } -func isPodKatalystQoSLevelSystemCores(qosConfig *generic.QoSConfiguration, pod *v1.Pod) (bool, error) { - qosLevel, err := qosConfig.GetQoSLevelForPod(pod) +func isSkippedPod(pod *v1.Pod, qosConfig *generic.QoSConfiguration) (bool, error) { + systemCores, err := qosConfig.CheckSystemQoSForPod(pod) if err != nil { + klog.Errorf("[ORM] check pod %s qos level fail: %v", pod.Name, err) return false, err } - return qosLevel == pluginapi.KatalystQoSLevelSystemCores, nil + if native.CheckDaemonPod(pod) && !systemCores { + return true, nil + } + return false, nil } func ParseListOfTopologyHints(hintsList *pluginapi.ListOfTopologyHints) []topology.TopologyHint { diff --git a/pkg/agent/orm/manager_test.go b/pkg/agent/orm/manager_test.go index c34b6905c..abc8885dd 100644 --- a/pkg/agent/orm/manager_test.go +++ b/pkg/agent/orm/manager_test.go @@ -35,6 +35,7 @@ import ( pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "github.com/kubewharf/katalyst-api/pkg/consts" katalyst_base "github.com/kubewharf/katalyst-core/cmd/base" "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options" "github.com/kubewharf/katalyst-core/pkg/agent/orm/endpoint" @@ -278,6 +279,104 @@ func TestIsSkippedContainer(t *testing.T) { } } +func TestIsSkippedPod(t *testing.T) { + t.Parallel() + + testCases := []struct { + Name string + Pod *v1.Pod + Expected bool + ExpectErr bool + }{ + { + Name: "daemon and shared", + Pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpod", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Kind: "DaemonSet", + }, + }, + }, + }, + ExpectErr: false, + Expected: true, + }, + { + Name: "daemon and system", + Pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpod", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSystemCores, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Kind: "DaemonSet", + }, + }, + }, + }, + ExpectErr: false, + Expected: false, + }, + { + Name: "deployment", + Pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpod", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores, + }, + OwnerReferences: []metav1.OwnerReference{ + { + Kind: "Deployment", + }, + }, + }, + }, + ExpectErr: false, + Expected: false, + }, + { + Name: "fail", + Pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpod", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: "unknow value", + }, + OwnerReferences: []metav1.OwnerReference{ + { + Kind: "DaemonSet", + }, + }, + }, + }, + ExpectErr: true, + Expected: false, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.Name, func(t *testing.T) { + t.Parallel() + qosCfg := generic.NewQoSConfiguration() + skip, err := isSkippedPod(tc.Pod, qosCfg) + if tc.ExpectErr { + assert.Error(t, err) + } else { + assert.Equal(t, tc.Expected, skip) + } + }) + } +} + func TestGetMappedResourceName(t *testing.T) { t.Parallel() diff --git a/pkg/agent/orm/resourceprovider.go b/pkg/agent/orm/resourceprovider.go index fd356d501..f9e389069 100644 --- a/pkg/agent/orm/resourceprovider.go +++ b/pkg/agent/orm/resourceprovider.go @@ -29,7 +29,6 @@ import ( maputil "k8s.io/kubernetes/pkg/util/maps" "github.com/kubewharf/katalyst-core/pkg/metrics" - "github.com/kubewharf/katalyst-core/pkg/util/native" ) func (m *ManagerImpl) GetTopologyAwareResources(pod *v1.Pod, container *v1.Container) []*podresourcesapi.TopologyAwareResource { @@ -131,12 +130,12 @@ func (m *ManagerImpl) getTopologyAwareResources(pod *v1.Pod, container *v1.Conta err := fmt.Errorf("GetTopologyAwareResources got nil pod: %v or container: %v", pod, container) return nil, err } - systemCores, err := isPodKatalystQoSLevelSystemCores(m.qosConfig, pod) + skipPod, err := isSkippedPod(pod, m.qosConfig) if err != nil { err = fmt.Errorf("[ORM] check pod %s qos level fail: %v", pod.Name, err) return nil, err } - if native.CheckDaemonPod(pod) && !systemCores { + if skipPod { klog.V(5).Infof("[ORM] skip pod: %s, container: %v", pod.Name, container.Name) return nil, nil }