From 5545830dfd7782ba56d802bcaca62c5abd794c50 Mon Sep 17 00:00:00 2001 From: "wangzhe.21" Date: Tue, 2 Jul 2024 20:12:32 +0800 Subject: [PATCH] add ut for loadaware scheduler plugin and npd plugin --- cmd/katalyst-controller/app/options/npd.go | 3 +- cmd/katalyst-scheduler/main.go | 2 +- go.mod | 2 +- go.sum | 4 +- .../npd/indicator-plugin/loadaware/handler.go | 24 +- .../loadaware/handler_test.go | 143 ++++ .../npd/indicator-plugin/loadaware/helper.go | 48 +- .../indicator-plugin/loadaware/helper_test.go | 68 ++ .../indicator-plugin/loadaware/loadaware.go | 70 +- .../loadaware/loadaware_test.go | 553 +++++++++++++++ .../loadaware/sorter/helper.go | 16 + .../indicator-plugin/loadaware/sorter/pod.go | 16 + .../loadaware/sorter/pod_test.go | 16 + .../loadaware/sorter/scorer.go | 16 + .../npd/indicator-plugin/loadaware/types.go | 52 +- pkg/scheduler/plugins/loadaware/cache.go | 27 +- pkg/scheduler/plugins/loadaware/cache_test.go | 19 +- pkg/scheduler/plugins/loadaware/fit.go | 16 + pkg/scheduler/plugins/loadaware/fit_test.go | 443 +++++++++++- pkg/scheduler/plugins/loadaware/handler.go | 40 +- pkg/scheduler/plugins/loadaware/helper.go | 21 +- pkg/scheduler/plugins/loadaware/plugin.go | 28 +- .../plugins/loadaware/plugin_test.go | 91 +++ pkg/scheduler/plugins/loadaware/reserve.go | 20 +- .../plugins/loadaware/reserve_test.go | 94 +++ pkg/scheduler/plugins/loadaware/score.go | 40 +- pkg/scheduler/plugins/loadaware/score_test.go | 640 ++++++++++++++++++ 27 files changed, 2373 insertions(+), 139 deletions(-) create mode 100644 pkg/controller/npd/indicator-plugin/loadaware/handler_test.go create mode 100644 pkg/controller/npd/indicator-plugin/loadaware/helper_test.go create mode 100644 pkg/controller/npd/indicator-plugin/loadaware/loadaware_test.go create mode 100644 pkg/scheduler/plugins/loadaware/plugin_test.go create mode 100644 pkg/scheduler/plugins/loadaware/reserve_test.go create mode 100644 pkg/scheduler/plugins/loadaware/score_test.go diff --git a/cmd/katalyst-controller/app/options/npd.go b/cmd/katalyst-controller/app/options/npd.go index 22dfa22445..e63e20cc8b 100644 --- a/cmd/katalyst-controller/app/options/npd.go +++ b/cmd/katalyst-controller/app/options/npd.go @@ -17,9 +17,10 @@ limitations under the License. package options import ( - cliflag "k8s.io/component-base/cli/flag" "time" + cliflag "k8s.io/component-base/cli/flag" + "github.com/kubewharf/katalyst-core/pkg/config/controller" ) diff --git a/cmd/katalyst-scheduler/main.go b/cmd/katalyst-scheduler/main.go index b7aed52947..3cb9a72253 100644 --- a/cmd/katalyst-scheduler/main.go +++ b/cmd/katalyst-scheduler/main.go @@ -17,13 +17,13 @@ limitations under the License. package main import ( - "github.com/kubewharf/katalyst-core/pkg/scheduler/plugins/loadaware" "os" "github.com/spf13/cobra" "k8s.io/component-base/logs" "github.com/kubewharf/katalyst-core/cmd/katalyst-scheduler/app" + "github.com/kubewharf/katalyst-core/pkg/scheduler/plugins/loadaware" "github.com/kubewharf/katalyst-core/pkg/scheduler/plugins/nodeovercommitment" "github.com/kubewharf/katalyst-core/pkg/scheduler/plugins/noderesourcetopology" "github.com/kubewharf/katalyst-core/pkg/scheduler/plugins/qosawarenoderesources" diff --git a/go.mod b/go.mod index 4a7c34c23c..bda26a81e3 100644 --- a/go.mod +++ b/go.mod @@ -161,7 +161,7 @@ require ( ) replace ( - github.com/kubewharf/katalyst-api => github.com/WangZzzhe/katalyst-api v0.0.0-20240626083651-4a90fe53af11 + github.com/kubewharf/katalyst-api => github.com/WangZzzhe/katalyst-api v0.0.0-20240719035252-ac200da4db6c k8s.io/api => k8s.io/api v0.24.6 k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.24.6 k8s.io/apimachinery => k8s.io/apimachinery v0.24.6 diff --git a/go.sum b/go.sum index 717e5ff1a8..564916d68d 100644 --- a/go.sum +++ b/go.sum @@ -84,8 +84,8 @@ github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWX github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= -github.com/WangZzzhe/katalyst-api v0.0.0-20240626083651-4a90fe53af11 h1:4RUG7QfX0hBwtHtI3Nll6F4lCP31ThYxkWIu93G6Ei4= -github.com/WangZzzhe/katalyst-api v0.0.0-20240626083651-4a90fe53af11/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k= +github.com/WangZzzhe/katalyst-api v0.0.0-20240719035252-ac200da4db6c h1:/0fwVknrQEJoRKnT2H0f5xkzCdcDIH4qfNvpPn7QoH8= +github.com/WangZzzhe/katalyst-api v0.0.0-20240719035252-ac200da4db6c/go.mod h1:HHUJnOrDN5xrzKhEspq70ZJL859b09j07pMAl9ACnwU= github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= diff --git a/pkg/controller/npd/indicator-plugin/loadaware/handler.go b/pkg/controller/npd/indicator-plugin/loadaware/handler.go index b9884af7bd..36e48181c2 100644 --- a/pkg/controller/npd/indicator-plugin/loadaware/handler.go +++ b/pkg/controller/npd/indicator-plugin/loadaware/handler.go @@ -1,3 +1,19 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package loadaware import ( @@ -72,7 +88,9 @@ func (p *Plugin) OnPodAdd(obj interface{}) { p.Lock() defer p.Unlock() if p.podUsageSelectorKey != "" { - if value, exist := pod.Labels[p.podUsageSelectorKey]; exist && value == p.podUsageSelectorVal { + if value, exist := pod.Labels[p.podUsageSelectorKey]; exist && + value == p.podUsageSelectorVal && + p.podUsageSelectorNamespace == pod.Namespace { klog.Info("start sync pod usage to nodeMonitor") p.enableSyncPodUsage = true } @@ -143,7 +161,9 @@ func (p *Plugin) OnPodDelete(obj interface{}) { p.Lock() defer p.Unlock() if p.podUsageSelectorVal != "" { - if value, exist := pod.Labels[p.podUsageSelectorKey]; exist && value == p.podUsageSelectorVal { + if value, exist := pod.Labels[p.podUsageSelectorKey]; exist && + value == p.podUsageSelectorVal && + p.podUsageSelectorNamespace == pod.Namespace { klog.Info("stop sync pod usage to nodeMonitor") p.enableSyncPodUsage = false } diff --git a/pkg/controller/npd/indicator-plugin/loadaware/handler_test.go b/pkg/controller/npd/indicator-plugin/loadaware/handler_test.go new file mode 100644 index 0000000000..dba9477114 --- /dev/null +++ b/pkg/controller/npd/indicator-plugin/loadaware/handler_test.go @@ -0,0 +1,143 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loadaware + +import ( + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" +) + +func TestOnNodeAdd(t *testing.T) { + t.Parallel() + + p := &Plugin{ + workers: 3, + nodePoolMap: map[int32]sets.String{}, + nodeStatDataMap: map[string]*NodeMetricData{}, + } + + testNode1 := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode1", + }, + Status: v1.NodeStatus{ + Allocatable: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("16"), + v1.ResourceMemory: resource.MustParse("32Gi"), + }, + }, + } + + p.OnNodeAdd(testNode1) + assert.NotNil(t, p.nodeStatDataMap["testNode1"]) + assert.Equal(t, 2, len(p.nodeStatDataMap["testNode1"].TotalRes)) + + p.OnNodeDelete(testNode1) + assert.Nil(t, p.nodeStatDataMap["testNode1"]) +} + +func TestOnNodeUpdate(t *testing.T) { + t.Parallel() + + p := &Plugin{ + nodeStatDataMap: map[string]*NodeMetricData{}, + } + + testNode1 := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode1", + }, + Status: v1.NodeStatus{ + Allocatable: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("16"), + v1.ResourceMemory: resource.MustParse("32Gi"), + }, + }, + } + + p.OnNodeUpdate(nil, testNode1) + assert.NotNil(t, p.nodeStatDataMap["testNode1"]) + assert.Equal(t, 2, len(p.nodeStatDataMap["testNode1"].TotalRes)) +} + +func TestOnPodAdd(t *testing.T) { + t.Parallel() + + p := &Plugin{ + nodeToPodsMap: map[string]map[string]struct{}{}, + podUsageSelectorKey: "app", + podUsageSelectorVal: "testPod", + podUsageSelectorNamespace: "katalyst-system", + } + + testPod1 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testPod1", + Namespace: "katalyst-system", + Labels: map[string]string{ + "app": "testPod", + }, + }, + Spec: v1.PodSpec{ + NodeName: "testNode1", + }, + } + + p.OnPodAdd(testPod1) + assert.NotNil(t, p.nodeToPodsMap["testNode1"]) + assert.Equal(t, 1, len(p.nodeToPodsMap["testNode1"])) + + p.OnPodDelete(testPod1) + assert.Equal(t, 0, len(p.nodeToPodsMap["testNode1"])) + + p.OnPodDelete("") +} + +func TestOnPodUpdate(t *testing.T) { + t.Parallel() + + p := &Plugin{ + nodeToPodsMap: map[string]map[string]struct{}{}, + podUsageSelectorKey: "app", + podUsageSelectorVal: "testPod", + podUsageSelectorNamespace: "katalyst-system", + } + + testPod1 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testPod1", + Namespace: "katalyst-system", + Labels: map[string]string{ + "app": "testPod", + }, + }, + Spec: v1.PodSpec{ + NodeName: "testNode1", + }, + } + + p.OnPodUpdate(nil, testPod1) + assert.NotNil(t, p.nodeToPodsMap["testNode1"]) + assert.Equal(t, 1, len(p.nodeToPodsMap["testNode1"])) + + p.OnPodUpdate(nil, "") +} diff --git a/pkg/controller/npd/indicator-plugin/loadaware/helper.go b/pkg/controller/npd/indicator-plugin/loadaware/helper.go index bdc2c0be4e..003d89a810 100644 --- a/pkg/controller/npd/indicator-plugin/loadaware/helper.go +++ b/pkg/controller/npd/indicator-plugin/loadaware/helper.go @@ -1,13 +1,31 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package loadaware import ( - "github.com/kubewharf/katalyst-core/pkg/controller/npd/indicator-plugin/loadaware/sorter" + "time" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" quotav1 "k8s.io/apiserver/pkg/quota/v1" "k8s.io/metrics/pkg/apis/metrics/v1beta1" - "time" + + "github.com/kubewharf/katalyst-core/pkg/controller/npd/indicator-plugin/loadaware/sorter" ) // getUsage transfer cpu Nano to Milli, memory Ki to Mega @@ -92,7 +110,7 @@ func refreshNodeMetricData(metricData *NodeMetricData, metricInfo *v1beta1.NodeM max1Hour := calCPUAndMemoryMax(metricData.Latest1HourCache) metricData.Max1Hour = max1Hour.DeepCopy() - //calculate 1 day max data + // calculate 1 day max data if metricData.ifCanInsertLatest1DayCache(now) { resWithTime := &ResourceListWithTime{ ResourceList: max1Hour.DeepCopy(), @@ -115,7 +133,7 @@ func refreshPodMetricData(metricData *PodMetricData, metricInfo *v1beta1.PodMetr podUsage = quotav1.Add(podUsage, containerMetrics.Usage) } metricData.LatestUsage = podUsage.DeepCopy() - //calculate 5 min avg data + // calculate 5 min avg data metricData.Latest5MinCache = append(metricData.Latest5MinCache, getUsage(podUsage)) if len(metricData.Latest5MinCache) > Avg5MinPointNumber { metricData.Latest5MinCache = metricData.Latest5MinCache[len(metricData.Latest5MinCache)-Avg5MinPointNumber:] @@ -153,25 +171,3 @@ func getTopNPodUsages(podUsages map[string]corev1.ResourceList, maxPodUsageCount } return topNPodUsages } - -func calNodeLoad(resourceName corev1.ResourceName, usage, totalRes corev1.ResourceList) int64 { - if usage == nil || totalRes == nil { - return 0 - } - used := int64(0) - total := int64(0) - if resourceName == corev1.ResourceCPU { - used = usage.Cpu().MilliValue() - total = totalRes.Cpu().MilliValue() - } else { - used = usage.Memory().Value() - total = totalRes.Memory().Value() - } - if total == 0 { - return 0 - } - if used >= total { - return 99 - } - return used * 100 / total -} diff --git a/pkg/controller/npd/indicator-plugin/loadaware/helper_test.go b/pkg/controller/npd/indicator-plugin/loadaware/helper_test.go new file mode 100644 index 0000000000..9459a1fc98 --- /dev/null +++ b/pkg/controller/npd/indicator-plugin/loadaware/helper_test.go @@ -0,0 +1,68 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loadaware + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" +) + +func TestGetTopNPodUsages(t *testing.T) { + t.Parallel() + podRealUsage := map[string]corev1.ResourceList{ + "default/test-1": { + corev1.ResourceCPU: resource.MustParse("80"), + corev1.ResourceMemory: resource.MustParse("10Gi"), + }, + "default/test-2": { + corev1.ResourceCPU: resource.MustParse("30"), + corev1.ResourceMemory: resource.MustParse("10Gi"), + }, + "default/test-3": { + corev1.ResourceCPU: resource.MustParse("50"), + corev1.ResourceMemory: resource.MustParse("10Gi"), + }, + "default/test-4": { + corev1.ResourceCPU: resource.MustParse("70"), + corev1.ResourceMemory: resource.MustParse("10Gi"), + }, + "default/test-5": { + corev1.ResourceCPU: resource.MustParse("10"), + corev1.ResourceMemory: resource.MustParse("10Gi"), + }, + "default/test-6": { + corev1.ResourceCPU: resource.MustParse("40"), + corev1.ResourceMemory: resource.MustParse("10Gi"), + }, + "default/test-7": { + corev1.ResourceCPU: resource.MustParse("60"), + corev1.ResourceMemory: resource.MustParse("10Gi"), + }, + } + resultMap := getTopNPodUsages(podRealUsage, 3) + expected := []string{"default/test-1", "default/test-4", "default/test-7"} + assert.Equal(t, len(resultMap), 3) + for _, v := range expected { + if _, ok := resultMap[v]; !ok { + t.Error(fmt.Errorf("not exit")) + } + } +} diff --git a/pkg/controller/npd/indicator-plugin/loadaware/loadaware.go b/pkg/controller/npd/indicator-plugin/loadaware/loadaware.go index 33b5444992..c76df08b69 100644 --- a/pkg/controller/npd/indicator-plugin/loadaware/loadaware.go +++ b/pkg/controller/npd/indicator-plugin/loadaware/loadaware.go @@ -1,9 +1,26 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package loadaware import ( "context" "fmt" "hash/crc32" + "sort" "sync" "time" @@ -64,7 +81,8 @@ type Plugin struct { } func NewLoadAwarePlugin(ctx context.Context, conf *controller.NPDConfig, extraConf interface{}, - controlCtx *katalystbase.GenericContext, updater indicator_plugin.IndicatorUpdater) (indicator_plugin.IndicatorPlugin, error) { + controlCtx *katalystbase.GenericContext, updater indicator_plugin.IndicatorUpdater, +) (indicator_plugin.IndicatorPlugin, error) { p := &Plugin{ ctx: ctx, workers: int32(conf.Workers), @@ -150,7 +168,7 @@ func (p *Plugin) Run() { if err != nil { klog.Fatalf("get all nodes from cache error, err:%v", err) } - //init worker node pool + // init worker node pool for _, node := range nodes { bucketID := p.getBucketID(node.Name) if pool, ok := p.nodePoolMap[bucketID]; !ok { @@ -162,7 +180,7 @@ func (p *Plugin) Run() { p.constructNodeToPodMap() - //restore npd from api server + // restore npd from api server p.restoreNPD() // start sync node @@ -230,11 +248,11 @@ func (p *Plugin) restoreNPD() { } var ( - avg15MinCache = make([]corev1.ResourceList, 0) + avg15MinCache = make([]*ResourceListWithTime, 0) max1HourCache = make([]*ResourceListWithTime, 0) max1DayCache = make([]*ResourceListWithTime, 0) - avg15MinMap = make(map[metav1.Time]corev1.ResourceList) + avg15MinMap = make(map[metav1.Time]*ResourceListWithTime) max1HourMap = make(map[metav1.Time]*ResourceListWithTime) max1DayMap = make(map[metav1.Time]*ResourceListWithTime) ) @@ -242,9 +260,12 @@ func (p *Plugin) restoreNPD() { for _, metricValue := range npd.Status.NodeMetrics[i].Metrics { if metricValue.Window.Duration == 15*time.Minute { if _, ok := avg15MinMap[metricValue.Timestamp]; !ok { - avg15MinMap[metricValue.Timestamp] = corev1.ResourceList{} + avg15MinMap[metricValue.Timestamp] = &ResourceListWithTime{ + Ts: metricValue.Timestamp.Unix(), + ResourceList: corev1.ResourceList{}, + } + avg15MinMap[metricValue.Timestamp].ResourceList[corev1.ResourceName(metricValue.MetricName)] = metricValue.Value } - avg15MinMap[metricValue.Timestamp][corev1.ResourceName(metricValue.MetricName)] = metricValue.Value } else if metricValue.Window.Duration == time.Hour { if _, ok := max1HourMap[metricValue.Timestamp]; !ok { max1HourMap[metricValue.Timestamp] = &ResourceListWithTime{ @@ -269,15 +290,18 @@ func (p *Plugin) restoreNPD() { for i := range avg15MinMap { avg15MinCache = append(avg15MinCache, avg15MinMap[i]) } + sort.Sort(ResourceListWithTimeList(avg15MinCache)) for i := range max1HourMap { max1HourCache = append(max1HourCache, max1HourMap[i]) } + sort.Sort(ResourceListWithTimeList(max1HourCache)) for i := range max1DayMap { max1DayCache = append(max1DayCache, max1DayMap[i]) } + sort.Sort(ResourceListWithTimeList(max1DayCache)) p.nodeStatDataMap[npd.Name] = &NodeMetricData{ - Latest15MinCache: avg15MinCache, + Latest15MinCache: ResourceListWithTimeList(avg15MinCache).ToResourceList(), Latest1HourCache: max1HourCache, Latest1DayCache: max1DayCache, } @@ -628,33 +652,3 @@ func (p *Plugin) checkPodUsageRequired() { } } } - -func (p *Plugin) reportNodeLoadMetric() { - p.RLock() - defer p.RUnlock() - resourceDims := []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory} - for _, resourceName := range resourceDims { - resultMap := make(map[int64]*int64) - for _, data := range p.nodeStatDataMap { - data.lock.RLock() - load := calNodeLoad(resourceName, data.LatestUsage, data.TotalRes) - data.lock.RUnlock() - idx := load / 10 - if count, ok := resultMap[idx]; !ok { - i := int64(1) - resultMap[idx] = &i - } else { - *count++ - } - } - for idx, level := range levels { - typeTag := metrics.MetricTag{Key: metricTagType, Val: string(resourceName)} - levelTag := metrics.MetricTag{Key: metricTagLevel, Val: level} - if count, ok := resultMap[int64(idx)]; ok { - _ = p.emitter.StoreFloat64(loadAwareMetricName, float64(*count), metrics.MetricTypeNameRaw, typeTag, levelTag) - } else { - _ = p.emitter.StoreFloat64(loadAwareMetricName, 0, metrics.MetricTypeNameRaw, typeTag, levelTag) - } - } - } -} diff --git a/pkg/controller/npd/indicator-plugin/loadaware/loadaware_test.go b/pkg/controller/npd/indicator-plugin/loadaware/loadaware_test.go new file mode 100644 index 0000000000..8e98cabc14 --- /dev/null +++ b/pkg/controller/npd/indicator-plugin/loadaware/loadaware_test.go @@ -0,0 +1,553 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loadaware + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + v12 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/metrics/pkg/apis/metrics/v1beta1" + + "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1" + katalyst_base "github.com/kubewharf/katalyst-core/cmd/base" + "github.com/kubewharf/katalyst-core/pkg/config/controller" +) + +func TestNewLoadAwarePlugin(t *testing.T) { + t.Parallel() + + controlCtx, err := katalyst_base.GenerateFakeGenericContext() + assert.NoError(t, err) + + conf := &controller.NPDConfig{ + LoadAwarePluginConfig: &controller.LoadAwarePluginConfig{ + Workers: 3, + PodUsageSelectorNamespace: "katalyst-system", + PodUsageSelectorKey: "app", + PodUsageSelectorVal: "testPod", + MaxPodUsageCount: 10, + }, + } + + updater := &fakeIndicatorUpdater{} + + p, err := NewLoadAwarePlugin(context.TODO(), conf, nil, controlCtx, updater) + assert.NoError(t, err) + assert.NotNil(t, p) +} + +func TestRestoreNPD(t *testing.T) { + t.Parallel() + + controlCtx, err := katalyst_base.GenerateFakeGenericContext() + assert.NoError(t, err) + + p := &Plugin{ + npdLister: controlCtx.InternalInformerFactory.Node().V1alpha1().NodeProfileDescriptors().Lister(), + } + + npds := []*v1alpha1.NodeProfileDescriptor{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "testNode1", + }, + Status: v1alpha1.NodeProfileDescriptorStatus{ + NodeMetrics: []v1alpha1.ScopedNodeMetrics{ + { + Scope: "testScope", + }, + { + Scope: loadAwareMetricMetadataScope, + Metrics: makeTestMetadata(4, 8*1024*1024*1024), + }, + }, + }, + }, + } + controlCtx.StartInformer(context.TODO()) + for _, npd := range npds { + _, err = controlCtx.Client.InternalClient.NodeV1alpha1().NodeProfileDescriptors(). + Create(context.TODO(), npd, v1.CreateOptions{}) + assert.NoError(t, err) + } + time.Sleep(time.Second) + p.restoreNPD() + + for _, npd := range npds { + data, ok := p.nodeStatDataMap[npd.Name] + assert.True(t, ok) + + assert.Equal(t, Avg15MinPointNumber, len(data.Latest15MinCache)) + assert.Equal(t, Max1HourPointNumber, len(data.Latest1HourCache)) + assert.Equal(t, Max1DayPointNumber, len(data.Latest1DayCache)) + } +} + +func TestConstructNodeToPodMap(t *testing.T) { + t.Parallel() + + controlCtx, err := katalyst_base.GenerateFakeGenericContext() + assert.NoError(t, err) + + p := &Plugin{ + nodeToPodsMap: map[string]map[string]struct{}{}, + podLister: controlCtx.KubeInformerFactory.Core().V1().Pods().Lister(), + } + + pods := []*v12.Pod{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "pod1", + Namespace: "default", + }, + Spec: v12.PodSpec{ + NodeName: "testNode1", + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "pod2", + Namespace: "default", + }, + Spec: v12.PodSpec{ + NodeName: "testNode1", + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "pod3", + Namespace: "default", + }, + Spec: v12.PodSpec{ + NodeName: "testNode2", + }, + }, + } + + controlCtx.StartInformer(context.TODO()) + for _, pod := range pods { + _, err = controlCtx.Client.KubeClient.CoreV1().Pods(pod.Namespace). + Create(context.TODO(), pod, v1.CreateOptions{}) + assert.NoError(t, err) + } + time.Sleep(time.Second) + + p.constructNodeToPodMap() + assert.Equal(t, 2, len(p.nodeToPodsMap)) + assert.Equal(t, 2, len(p.nodeToPodsMap["testNode1"])) +} + +func TestWorker(t *testing.T) { + t.Parallel() + + controlCtx, err := katalyst_base.GenerateFakeGenericContext() + assert.NoError(t, err) + + p := &Plugin{ + nodeToPodsMap: map[string]map[string]struct{}{}, + podLister: controlCtx.KubeInformerFactory.Core().V1().Pods().Lister(), + enableSyncPodUsage: false, + npdUpdater: &fakeIndicatorUpdater{}, + workers: 1, + nodePoolMap: map[int32]sets.String{ + 0: sets.NewString("node1", "node2", "node3"), + }, + nodeStatDataMap: map[string]*NodeMetricData{}, + npdLister: controlCtx.InternalInformerFactory.Node().V1alpha1().NodeProfileDescriptors().Lister(), + } + controlCtx.StartInformer(context.TODO()) + makeTestNodeStatData(p, "node1", 4, 16*1024*1024*1024) + + nodeMetrics := map[string]*v1beta1.NodeMetrics{ + "node1": { + ObjectMeta: v1.ObjectMeta{ + Name: "node1", + }, + Timestamp: v1.Time{Time: time.Now()}, + Usage: v12.ResourceList{ + v12.ResourceCPU: resource.MustParse("4"), + v12.ResourceMemory: resource.MustParse("6Gi"), + }, + }, + "node2": { + ObjectMeta: v1.ObjectMeta{ + Name: "node2", + }, + Timestamp: v1.Time{Time: time.Now()}, + Usage: v12.ResourceList{ + v12.ResourceCPU: resource.MustParse("4"), + v12.ResourceMemory: resource.MustParse("6Gi"), + }, + }, + "node3": { + ObjectMeta: v1.ObjectMeta{ + Name: "node3", + }, + Timestamp: v1.Time{Time: time.Now()}, + Usage: v12.ResourceList{ + v12.ResourceCPU: resource.MustParse("2"), + v12.ResourceMemory: resource.MustParse("5Gi"), + }, + }, + } + + npds := []*v1alpha1.NodeProfileDescriptor{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "node1", + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "node2", + }, + }, + } + for i := range npds { + _, err = controlCtx.Client.InternalClient.NodeV1alpha1().NodeProfileDescriptors(). + Create(context.TODO(), npds[i], v1.CreateOptions{}) + assert.NoError(t, err) + } + time.Sleep(time.Second) + + p.worker(0, nodeMetrics) +} + +func TestTransferMetaToCRStore(t *testing.T) { + t.Parallel() + + controlCtx, err := katalyst_base.GenerateFakeGenericContext() + assert.NoError(t, err) + updater := &fakeIndicatorUpdater{} + + p := &Plugin{ + npdLister: controlCtx.InternalInformerFactory.Node().V1alpha1().NodeProfileDescriptors().Lister(), + nodeStatDataMap: map[string]*NodeMetricData{}, + npdUpdater: updater, + } + makeTestNodeStatData(p, "testNode1", 16, 32*1024*1024*1024) + controlCtx.StartInformer(context.TODO()) + + npds := []*v1alpha1.NodeProfileDescriptor{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "testNode1", + }, + }, + } + for _, npd := range npds { + _, err = controlCtx.Client.InternalClient.NodeV1alpha1().NodeProfileDescriptors(). + Create(context.TODO(), npd, v1.CreateOptions{}) + assert.NoError(t, err) + } + time.Sleep(time.Second) + + p.transferMetaToCRStore() + assert.NotNil(t, updater.data["testNode1"]) + assert.Equal(t, loadAwareMetricMetadataScope, updater.data["testNode1"].NodeMetrics[0].Scope) + assert.Equal(t, 48+30+8, len(updater.data["testNode1"].NodeMetrics[0].Metrics)) +} + +func TestUpdatePodMetrics(t *testing.T) { + t.Parallel() + + p := &Plugin{} + + npdStatus := &v1alpha1.NodeProfileDescriptorStatus{ + PodMetrics: []v1alpha1.ScopedPodMetrics{}, + } + podUsage := map[string]v12.ResourceList{ + "default/testPod1": { + v12.ResourceCPU: resource.MustParse("2"), + v12.ResourceMemory: resource.MustParse("6Gi"), + }, + "default/testPod2": { + v12.ResourceCPU: resource.MustParse("3"), + v12.ResourceMemory: resource.MustParse("8Gi"), + }, + "default/testPod3": { + v12.ResourceCPU: resource.MustParse("1"), + v12.ResourceMemory: resource.MustParse("6Gi"), + }, + "default/testPod4": { + v12.ResourceCPU: resource.MustParse("2"), + v12.ResourceMemory: resource.MustParse("5Gi"), + }, + "default/testPod5": { + v12.ResourceCPU: resource.MustParse("4"), + v12.ResourceMemory: resource.MustParse("6Gi"), + }, + "default/testPod6": { + v12.ResourceCPU: resource.MustParse("7"), + v12.ResourceMemory: resource.MustParse("13Gi"), + }, + } + + p.updatePodMetrics(npdStatus, podUsage, 5) + + assert.Equal(t, 1, len(npdStatus.PodMetrics)) + assert.Equal(t, loadAwareMetricsScope, npdStatus.PodMetrics[0].Scope) + assert.Equal(t, 5, len(npdStatus.PodMetrics[0].PodMetrics)) +} + +func TestCheckPodUsageRequired(t *testing.T) { + t.Parallel() + + controlCtx, err := katalyst_base.GenerateFakeGenericContext() + assert.NoError(t, err) + + p := &Plugin{ + podLister: controlCtx.KubeInformerFactory.Core().V1().Pods().Lister(), + workers: 3, + podUsageSelectorNamespace: "katalyst-system", + podUsageSelectorKey: "app", + podUsageSelectorVal: "testPod", + } + controlCtx.StartInformer(context.TODO()) + + pods := []*v12.Pod{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "testPod", + Namespace: "katalyst-system", + Labels: map[string]string{ + "app": "testPod", + }, + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "testPod2", + Namespace: "default", + Labels: map[string]string{ + "app": "testPod", + }, + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "testPod3", + Namespace: "katalyst-system", + Labels: map[string]string{ + "app": "testPod3", + }, + }, + }, + } + for _, pod := range pods { + _, err = controlCtx.Client.KubeClient.CoreV1().Pods(pod.Namespace). + Create(context.TODO(), pod, v1.CreateOptions{}) + assert.NoError(t, err) + } + time.Sleep(time.Second) + + p.checkPodUsageRequired() + assert.True(t, p.enableSyncPodUsage) + + err = controlCtx.Client.KubeClient.CoreV1().Pods("katalyst-system").Delete(context.TODO(), "testPod", v1.DeleteOptions{}) + assert.NoError(t, err) + time.Sleep(time.Second) + + for i := 0; i < 10; i++ { + p.checkPodUsageRequired() + } + assert.False(t, p.enableSyncPodUsage) +} + +func TestReCleanPodData(t *testing.T) { + t.Parallel() + + controlCtx, err := katalyst_base.GenerateFakeGenericContext() + assert.NoError(t, err) + + p := &Plugin{ + podLister: controlCtx.KubeInformerFactory.Core().V1().Pods().Lister(), + podStatDataMap: map[string]*PodMetricData{ + "default/testPod1": nil, + "default/testPod2": nil, + "default/testPod3": nil, + }, + } + controlCtx.StartInformer(context.TODO()) + + pods := []*v12.Pod{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "testPod1", + Namespace: "default", + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "testPod5", + Namespace: "default", + }, + }, + } + for _, pod := range pods { + _, err = controlCtx.Client.KubeClient.CoreV1().Pods(pod.Namespace). + Create(context.TODO(), pod, v1.CreateOptions{}) + assert.NoError(t, err) + } + time.Sleep(time.Second) + + p.reCleanPodData() + assert.Equal(t, 1, len(p.podStatDataMap)) +} + +func TestName(t *testing.T) { + t.Parallel() + p := &Plugin{} + assert.Equal(t, LoadAwarePluginName, p.Name()) +} + +func TestGetSupportedNodeMetricsScope(t *testing.T) { + t.Parallel() + p := Plugin{} + assert.Equal(t, []string{loadAwareMetricsScope, loadAwareMetricMetadataScope}, p.GetSupportedNodeMetricsScope()) + assert.Equal(t, []string{loadAwareMetricsScope}, p.GetSupportedPodMetricsScope()) +} + +func makeTestMetadata(cpu, memory int64) []v1alpha1.MetricValue { + res := make([]v1alpha1.MetricValue, 0) + now := time.Now() + rand.Seed(now.Unix()) + for i := 0; i < Avg15MinPointNumber; i++ { + res = append(res, v1alpha1.MetricValue{ + MetricName: "cpu", + Timestamp: v1.Time{Time: now}, + Window: &v1.Duration{Duration: 15 * time.Minute}, + Value: *resource.NewQuantity(rand.Int63nRange(0, cpu), resource.DecimalSI), + }) + res = append(res, v1alpha1.MetricValue{ + MetricName: "memory", + Timestamp: v1.Time{Time: now}, + Window: &v1.Duration{Duration: 15 * time.Minute}, + Value: *resource.NewQuantity(rand.Int63nRange(0, memory), resource.BinarySI), + }) + now = now.Add(time.Minute) + } + + for i := 0; i < Max1HourPointNumber; i++ { + res = append(res, v1alpha1.MetricValue{ + MetricName: "cpu", + Timestamp: v1.Time{Time: now}, + Window: &v1.Duration{Duration: time.Hour}, + Value: *resource.NewQuantity(rand.Int63nRange(0, cpu), resource.DecimalSI), + }) + res = append(res, v1alpha1.MetricValue{ + MetricName: "memory", + Timestamp: v1.Time{Time: now}, + Window: &v1.Duration{Duration: time.Hour}, + Value: *resource.NewQuantity(rand.Int63nRange(0, memory), resource.BinarySI), + }) + now = now.Add(time.Minute) + } + + for i := 0; i < Max1DayPointNumber; i++ { + res = append(res, v1alpha1.MetricValue{ + MetricName: "cpu", + Timestamp: v1.Time{Time: now}, + Window: &v1.Duration{Duration: 24 * time.Hour}, + Value: *resource.NewQuantity(rand.Int63nRange(0, cpu), resource.DecimalSI), + }) + res = append(res, v1alpha1.MetricValue{ + MetricName: "memory", + Timestamp: v1.Time{Time: now}, + Window: &v1.Duration{Duration: 24 * time.Hour}, + Value: *resource.NewQuantity(rand.Int63nRange(0, memory), resource.BinarySI), + }) + now = now.Add(time.Minute) + } + + return res +} + +func makeTestNodeStatData(plugin *Plugin, nodeName string, cpu, memory int64) { + if plugin.nodeStatDataMap == nil { + plugin.nodeStatDataMap = map[string]*NodeMetricData{} + } + if plugin.nodeStatDataMap[nodeName] == nil { + plugin.nodeStatDataMap[nodeName] = &NodeMetricData{} + } + now := time.Now().Add(-2 * time.Hour) + rand.Seed(now.Unix()) + + for i := 0; i < Avg15MinPointNumber; i++ { + plugin.nodeStatDataMap[nodeName].Latest15MinCache = append(plugin.nodeStatDataMap[nodeName].Latest15MinCache, v12.ResourceList{ + v12.ResourceCPU: *resource.NewQuantity(rand.Int63nRange(0, cpu), resource.DecimalSI), + v12.ResourceMemory: *resource.NewQuantity(rand.Int63nRange(0, memory), resource.BinarySI), + }) + } + + for i := 0; i < Max1HourPointNumber; i++ { + plugin.nodeStatDataMap[nodeName].Latest1HourCache = append(plugin.nodeStatDataMap[nodeName].Latest1HourCache, &ResourceListWithTime{ + Ts: now.Unix(), + ResourceList: v12.ResourceList{ + v12.ResourceCPU: *resource.NewQuantity(rand.Int63nRange(0, cpu), resource.DecimalSI), + v12.ResourceMemory: *resource.NewQuantity(rand.Int63nRange(0, memory), resource.BinarySI), + }, + }) + now = now.Add(time.Minute) + } + + for i := 0; i < Max1DayPointNumber; i++ { + plugin.nodeStatDataMap[nodeName].Latest1DayCache = append(plugin.nodeStatDataMap[nodeName].Latest1DayCache, &ResourceListWithTime{ + Ts: now.Unix(), + ResourceList: v12.ResourceList{ + v12.ResourceCPU: *resource.NewQuantity(rand.Int63nRange(0, cpu), resource.DecimalSI), + v12.ResourceMemory: *resource.NewQuantity(rand.Int63nRange(0, memory), resource.BinarySI), + }, + }) + now = now.Add(time.Minute) + } +} + +type fakeIndicatorUpdater struct { + data map[string]v1alpha1.NodeProfileDescriptorStatus +} + +func (f *fakeIndicatorUpdater) UpdateNodeMetrics(name string, scopedNodeMetrics []v1alpha1.ScopedNodeMetrics) { + if f.data == nil { + f.data = map[string]v1alpha1.NodeProfileDescriptorStatus{} + } + data, ok := f.data[name] + if !ok { + data = v1alpha1.NodeProfileDescriptorStatus{} + } + data.NodeMetrics = scopedNodeMetrics + f.data[name] = data +} + +func (f *fakeIndicatorUpdater) UpdatePodMetrics(nodeName string, scopedPodMetrics []v1alpha1.ScopedPodMetrics) { + if f.data == nil { + f.data = map[string]v1alpha1.NodeProfileDescriptorStatus{} + } + data, ok := f.data[nodeName] + if !ok { + data = v1alpha1.NodeProfileDescriptorStatus{} + } + data.PodMetrics = scopedPodMetrics + f.data[nodeName] = data +} diff --git a/pkg/controller/npd/indicator-plugin/loadaware/sorter/helper.go b/pkg/controller/npd/indicator-plugin/loadaware/sorter/helper.go index df85126c27..18676bfcdc 100644 --- a/pkg/controller/npd/indicator-plugin/loadaware/sorter/helper.go +++ b/pkg/controller/npd/indicator-plugin/loadaware/sorter/helper.go @@ -1,3 +1,19 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package sorter import "sort" diff --git a/pkg/controller/npd/indicator-plugin/loadaware/sorter/pod.go b/pkg/controller/npd/indicator-plugin/loadaware/sorter/pod.go index d615b96528..cc03a5accd 100644 --- a/pkg/controller/npd/indicator-plugin/loadaware/sorter/pod.go +++ b/pkg/controller/npd/indicator-plugin/loadaware/sorter/pod.go @@ -1,3 +1,19 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package sorter import corev1 "k8s.io/api/core/v1" diff --git a/pkg/controller/npd/indicator-plugin/loadaware/sorter/pod_test.go b/pkg/controller/npd/indicator-plugin/loadaware/sorter/pod_test.go index a99f5a7456..c52a089355 100644 --- a/pkg/controller/npd/indicator-plugin/loadaware/sorter/pod_test.go +++ b/pkg/controller/npd/indicator-plugin/loadaware/sorter/pod_test.go @@ -1,3 +1,19 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package sorter import ( diff --git a/pkg/controller/npd/indicator-plugin/loadaware/sorter/scorer.go b/pkg/controller/npd/indicator-plugin/loadaware/sorter/scorer.go index 36ea6260b1..0b03cc58aa 100644 --- a/pkg/controller/npd/indicator-plugin/loadaware/sorter/scorer.go +++ b/pkg/controller/npd/indicator-plugin/loadaware/sorter/scorer.go @@ -1,3 +1,19 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package sorter import ( diff --git a/pkg/controller/npd/indicator-plugin/loadaware/types.go b/pkg/controller/npd/indicator-plugin/loadaware/types.go index 6afa123a35..155ea08247 100644 --- a/pkg/controller/npd/indicator-plugin/loadaware/types.go +++ b/pkg/controller/npd/indicator-plugin/loadaware/types.go @@ -1,9 +1,26 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package loadaware import ( - v1 "k8s.io/api/core/v1" "sync" "time" + + v1 "k8s.io/api/core/v1" ) const ( @@ -20,9 +37,6 @@ const ( LoadAwarePluginName = "loadAware" loadAwareMetricsScope = "loadAware" loadAwareMetricMetadataScope = "loadAware_metadata" - loadAwareMetricName = "node_load" - metricTagType = "type" - metricTagLevel = "level" ) var ( @@ -39,9 +53,9 @@ type NodeMetricData struct { Avg15Min v1.ResourceList Max1Hour v1.ResourceList Max1Day v1.ResourceList - Latest15MinCache []v1.ResourceList //latest 15 1min_avg_data - Latest1HourCache []*ResourceListWithTime //latest 4 15min_max_data - Latest1DayCache []*ResourceListWithTime //latest 24 1hour_max_data + Latest15MinCache []v1.ResourceList // latest 15 1min_avg_data + Latest1HourCache []*ResourceListWithTime // latest 4 15min_max_data + Latest1DayCache []*ResourceListWithTime // latest 24 1hour_max_data } func (md *NodeMetricData) ifCanInsertLatest1HourCache(now time.Time) bool { @@ -72,7 +86,7 @@ type PodMetricData struct { lock sync.RWMutex LatestUsage v1.ResourceList Avg5Min v1.ResourceList - Latest5MinCache []v1.ResourceList //latest 15 1min_avg_data + Latest5MinCache []v1.ResourceList // latest 15 1min_avg_data } // ResourceListWithTime ... @@ -80,3 +94,25 @@ type ResourceListWithTime struct { v1.ResourceList `json:"R,omitempty"` Ts int64 `json:"T,omitempty"` } + +type ResourceListWithTimeList []*ResourceListWithTime + +func (r ResourceListWithTimeList) Len() int { + return len(r) +} + +func (r ResourceListWithTimeList) Swap(i, j int) { + r[i], r[j] = r[j], r[i] +} + +func (r ResourceListWithTimeList) Less(i, j int) bool { + return r[i].Ts < r[j].Ts +} + +func (r ResourceListWithTimeList) ToResourceList() []v1.ResourceList { + res := make([]v1.ResourceList, 0) + for i := range r { + res = append(res, r[i].ResourceList) + } + return res +} diff --git a/pkg/scheduler/plugins/loadaware/cache.go b/pkg/scheduler/plugins/loadaware/cache.go index 20c9e8409c..5912aa46a5 100644 --- a/pkg/scheduler/plugins/loadaware/cache.go +++ b/pkg/scheduler/plugins/loadaware/cache.go @@ -1,3 +1,19 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package loadaware import ( @@ -9,14 +25,6 @@ import ( "k8s.io/klog/v2" ) -var cache *Cache - -func init() { - cache = &Cache{ - NodePodInfo: map[string]*NodeCache{}, - } -} - type SPDLister interface { GetPodPortrait(pod *v1.Pod) *ResourceUsage } @@ -75,7 +83,8 @@ func (c *Cache) ReconcilePredictUsage() { var ( nodePredictUsage = &ResourceUsage{ Cpu: make([]float64, portraitItemsLength, portraitItemsLength), - Memory: make([]float64, portraitItemsLength, portraitItemsLength)} + Memory: make([]float64, portraitItemsLength, portraitItemsLength), + } err error ) for _, podInfo := range nc.PodInfoMap { diff --git a/pkg/scheduler/plugins/loadaware/cache_test.go b/pkg/scheduler/plugins/loadaware/cache_test.go index 433b7cced3..8d172ed778 100644 --- a/pkg/scheduler/plugins/loadaware/cache_test.go +++ b/pkg/scheduler/plugins/loadaware/cache_test.go @@ -1,3 +1,19 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package loadaware import ( @@ -64,7 +80,8 @@ func TestAddPod(t *testing.T) { ObjectMeta: v12.ObjectMeta{ Name: "testPod2", UID: "testPod2", - }}) + }, + }) assert.Equal(t, 1, len(c.NodePodInfo["testNode"].PodInfoMap)) } diff --git a/pkg/scheduler/plugins/loadaware/fit.go b/pkg/scheduler/plugins/loadaware/fit.go index 2b1590a3ea..1bc8792992 100644 --- a/pkg/scheduler/plugins/loadaware/fit.go +++ b/pkg/scheduler/plugins/loadaware/fit.go @@ -1,3 +1,19 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package loadaware import ( diff --git a/pkg/scheduler/plugins/loadaware/fit_test.go b/pkg/scheduler/plugins/loadaware/fit_test.go index cc5880777b..684bec7f4c 100644 --- a/pkg/scheduler/plugins/loadaware/fit_test.go +++ b/pkg/scheduler/plugins/loadaware/fit_test.go @@ -1,3 +1,19 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package loadaware import ( @@ -17,13 +33,251 @@ import ( "k8s.io/metrics/pkg/apis/metrics/v1beta1" "k8s.io/utils/pointer" + v1alpha12 "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1" "github.com/kubewharf/katalyst-api/pkg/apis/scheduling/config" "github.com/kubewharf/katalyst-api/pkg/apis/workload/v1alpha1" + "github.com/kubewharf/katalyst-api/pkg/consts" katalyst_base "github.com/kubewharf/katalyst-core/cmd/base" "github.com/kubewharf/katalyst-core/pkg/config/generic" "github.com/kubewharf/katalyst-core/pkg/scheduler/util" ) +func TestFilter(t *testing.T) { + t.Parallel() + + util.SetQoSConfig(generic.NewQoSConfiguration()) + + for _, tc := range []struct { + name string + pod *v1.Pod + node *v1.Node + pods []*v1.Pod + npd *v1alpha12.NodeProfileDescriptor + portraits []*v1alpha1.ServiceProfileDescriptor + expectRes *framework.Status + }{ + { + name: "filter success", + pod: &v1.Pod{ + ObjectMeta: v12.ObjectMeta{ + Name: "pod1", + UID: "pod1UID", + Namespace: "testNs", + OwnerReferences: []v12.OwnerReference{ + { + Kind: "Deployment", + Name: "deployment1", + }, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "testContainer", + Resources: v1.ResourceRequirements{ + Limits: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("8"), + v1.ResourceMemory: resource.MustParse("16Gi"), + }, + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("8"), + v1.ResourceMemory: resource.MustParse("16Gi"), + }, + }, + }, + }, + }, + }, + node: &v1.Node{ + ObjectMeta: v12.ObjectMeta{ + Name: "node1", + }, + Spec: v1.NodeSpec{}, + Status: v1.NodeStatus{ + Capacity: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("32"), + v1.ResourceMemory: resource.MustParse("64Gi"), + }, + Allocatable: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("32"), + v1.ResourceMemory: resource.MustParse("64Gi"), + }, + }, + }, + pods: []*v1.Pod{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "pod2", + UID: "pod2UID", + Namespace: "testNs", + OwnerReferences: []v12.OwnerReference{ + { + Kind: "Deployment", + Name: "deployment2", + }, + }, + }, + }, + { + ObjectMeta: v12.ObjectMeta{ + Name: "pod3", + UID: "pod3UID", + Namespace: "testNs", + OwnerReferences: []v12.OwnerReference{ + { + Kind: "Deployment", + Name: "deployment3", + }, + }, + }, + }, + }, + portraits: []*v1alpha1.ServiceProfileDescriptor{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "deployment1", + Namespace: "testNs", + }, + Status: v1alpha1.ServiceProfileDescriptorStatus{ + AggMetrics: []v1alpha1.AggPodMetrics{ + { + Scope: spdPortraitScope, + Items: rangeItems(4, 8*1024*1024*1024), + }, + }, + }, + }, + { + ObjectMeta: v12.ObjectMeta{ + Name: "deployment2", + Namespace: "testNs", + }, + Status: v1alpha1.ServiceProfileDescriptorStatus{ + AggMetrics: []v1alpha1.AggPodMetrics{ + { + Scope: spdPortraitScope, + Items: fixedItems(4, 8*1024*1024*1024), + }, + }, + }, + }, + { + ObjectMeta: v12.ObjectMeta{ + Name: "deployment3", + Namespace: "testNs", + }, + Status: v1alpha1.ServiceProfileDescriptorStatus{ + AggMetrics: []v1alpha1.AggPodMetrics{ + { + Scope: spdPortraitScope, + Items: fixedItems(8, 16*1024*1024*1024), + }, + }, + }, + }, + }, + npd: &v1alpha12.NodeProfileDescriptor{ + ObjectMeta: v12.ObjectMeta{ + Name: "node1", + }, + Spec: v1alpha12.NodeProfileDescriptorSpec{}, + Status: v1alpha12.NodeProfileDescriptorStatus{ + NodeMetrics: []v1alpha12.ScopedNodeMetrics{ + { + Scope: loadAwareMetricScope, + Metrics: []v1alpha12.MetricValue{ + { + MetricName: "cpu", + Value: resource.MustParse("1088m"), + Window: &metav1.Duration{Duration: 15 * time.Minute}, + Timestamp: metav1.Time{Time: time.Now()}, + }, + { + MetricName: "memory", + Value: resource.MustParse("5035916Ki"), + Window: &metav1.Duration{Duration: 15 * time.Minute}, + Timestamp: metav1.Time{Time: time.Now()}, + }, + { + MetricName: "cpu", + Value: resource.MustParse("1090m"), + Window: &metav1.Duration{Duration: 5 * time.Minute}, + Timestamp: metav1.Time{Time: time.Now()}, + }, + { + MetricName: "memory", + Value: resource.MustParse("5035916Ki"), + Window: &metav1.Duration{Duration: 5 * time.Minute}, + Timestamp: metav1.Time{Time: time.Now()}, + }, + }, + }, + }, + }, + }, + expectRes: nil, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + nodeInfo := framework.NewNodeInfo() + nodeInfo.SetNode(tc.node) + for _, pod := range tc.pods { + nodeInfo.AddPod(pod) + } + fw, err := runtime.NewFramework(nil, nil, + runtime.WithSnapshotSharedLister(newTestSharedLister(tc.pods, []*v1.Node{tc.node}))) + assert.NoError(t, err) + + controlCtx, err := katalyst_base.GenerateFakeGenericContext() + assert.NoError(t, err) + + p := &Plugin{ + handle: fw, + args: makeTestArgs(), + spdLister: controlCtx.InternalInformerFactory.Workload().V1alpha1().ServiceProfileDescriptors().Lister(), + npdLister: controlCtx.InternalInformerFactory.Node().V1alpha1().NodeProfileDescriptors().Lister(), + spdHasSynced: controlCtx.InternalInformerFactory.Workload().V1alpha1().ServiceProfileDescriptors().Informer().HasSynced, + cache: &Cache{ + NodePodInfo: map[string]*NodeCache{}, + }, + } + p.cache.SetSPDLister(p) + + for _, pr := range tc.portraits { + _, err = controlCtx.Client.InternalClient.WorkloadV1alpha1().ServiceProfileDescriptors(pr.Namespace). + Create(context.TODO(), pr, v12.CreateOptions{}) + assert.NoError(t, err) + } + _, err = controlCtx.Client.InternalClient.NodeV1alpha1().NodeProfileDescriptors(). + Create(context.TODO(), tc.npd, v12.CreateOptions{}) + assert.NoError(t, err) + controlCtx.StartInformer(context.TODO()) + + // wait for portrait synced + if !cache2.WaitForCacheSync(context.TODO().Done(), p.spdHasSynced) { + t.Error("wait for portrait informer synced fail") + t.FailNow() + } + + // add pod to cache + for _, pod := range tc.pods { + p.cache.addPod(tc.node.Name, pod, time.Now()) + } + + status := p.Filter(context.TODO(), nil, tc.pod, nodeInfo) + + if tc.expectRes == nil { + assert.Nil(t, status) + } else { + assert.Equal(t, tc.expectRes.Code(), status.Code()) + } + }) + } +} + func TestFitByPortrait(t *testing.T) { t.Parallel() @@ -181,8 +435,11 @@ func TestFitByPortrait(t *testing.T) { args: makeTestArgs(), spdLister: controlCtx.InternalInformerFactory.Workload().V1alpha1().ServiceProfileDescriptors().Lister(), spdHasSynced: controlCtx.InternalInformerFactory.Workload().V1alpha1().ServiceProfileDescriptors().Informer().HasSynced, + cache: &Cache{ + NodePodInfo: map[string]*NodeCache{}, + }, } - cache.SetSPDLister(p) + p.cache.SetSPDLister(p) for _, pr := range tc.portraits { _, err = controlCtx.Client.InternalClient.WorkloadV1alpha1().ServiceProfileDescriptors(pr.Namespace). @@ -199,7 +456,7 @@ func TestFitByPortrait(t *testing.T) { // add pod to cache for _, pod := range tc.pods { - cache.addPod(tc.node.Name, pod, time.Now()) + p.cache.addPod(tc.node.Name, pod, time.Now()) } status := p.fitByPortrait(tc.pod, nodeInfo) @@ -213,6 +470,169 @@ func TestFitByPortrait(t *testing.T) { } } +func TestFitByNPD(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + name string + node *v1.Node + npd *v1alpha12.NodeProfileDescriptor + expectRes *framework.Status + }{ + { + name: "less than threshold", + node: &v1.Node{ + ObjectMeta: v12.ObjectMeta{ + Name: "node1", + }, + Spec: v1.NodeSpec{}, + Status: v1.NodeStatus{ + Capacity: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("32"), + v1.ResourceMemory: resource.MustParse("64Gi"), + }, + Allocatable: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("32"), + v1.ResourceMemory: resource.MustParse("64Gi"), + }, + }, + }, + npd: &v1alpha12.NodeProfileDescriptor{ + ObjectMeta: v12.ObjectMeta{ + Name: "node1", + }, + Spec: v1alpha12.NodeProfileDescriptorSpec{}, + Status: v1alpha12.NodeProfileDescriptorStatus{ + NodeMetrics: []v1alpha12.ScopedNodeMetrics{ + { + Scope: loadAwareMetricScope, + Metrics: []v1alpha12.MetricValue{ + { + MetricName: "cpu", + Value: resource.MustParse("1088m"), + Window: &metav1.Duration{Duration: 15 * time.Minute}, + Timestamp: metav1.Time{Time: time.Now()}, + }, + { + MetricName: "memory", + Value: resource.MustParse("5035916Ki"), + Window: &metav1.Duration{Duration: 15 * time.Minute}, + Timestamp: metav1.Time{Time: time.Now()}, + }, + { + MetricName: "cpu", + Value: resource.MustParse("1090m"), + Window: &metav1.Duration{Duration: 5 * time.Minute}, + Timestamp: metav1.Time{Time: time.Now()}, + }, + { + MetricName: "memory", + Value: resource.MustParse("5035916Ki"), + Window: &metav1.Duration{Duration: 5 * time.Minute}, + Timestamp: metav1.Time{Time: time.Now()}, + }, + }, + }, + }, + }, + }, + expectRes: nil, + }, + { + name: "more than threshold", + node: &v1.Node{ + ObjectMeta: v12.ObjectMeta{ + Name: "node1", + }, + Spec: v1.NodeSpec{}, + Status: v1.NodeStatus{ + Capacity: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("32"), + v1.ResourceMemory: resource.MustParse("64Gi"), + }, + Allocatable: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("32"), + v1.ResourceMemory: resource.MustParse("64Gi"), + }, + }, + }, + npd: &v1alpha12.NodeProfileDescriptor{ + ObjectMeta: v12.ObjectMeta{ + Name: "node1", + }, + Spec: v1alpha12.NodeProfileDescriptorSpec{}, + Status: v1alpha12.NodeProfileDescriptorStatus{ + NodeMetrics: []v1alpha12.ScopedNodeMetrics{ + { + Scope: loadAwareMetricScope, + Metrics: []v1alpha12.MetricValue{ + { + MetricName: "cpu", + Value: resource.MustParse("21088m"), + Window: &metav1.Duration{Duration: 15 * time.Minute}, + Timestamp: metav1.Time{Time: time.Now()}, + }, + { + MetricName: "memory", + Value: resource.MustParse("5035916Ki"), + Window: &metav1.Duration{Duration: 15 * time.Minute}, + Timestamp: metav1.Time{Time: time.Now()}, + }, + { + MetricName: "cpu", + Value: resource.MustParse("1090m"), + Window: &metav1.Duration{Duration: 5 * time.Minute}, + Timestamp: metav1.Time{Time: time.Now()}, + }, + { + MetricName: "memory", + Value: resource.MustParse("5035916Ki"), + Window: &metav1.Duration{Duration: 5 * time.Minute}, + Timestamp: metav1.Time{Time: time.Now()}, + }, + }, + }, + }, + }, + }, + expectRes: framework.NewStatus(framework.Unschedulable, ""), + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + nodeInfo := framework.NewNodeInfo() + nodeInfo.SetNode(tc.node) + + controlCtx, err := katalyst_base.GenerateFakeGenericContext() + assert.NoError(t, err) + + p := &Plugin{ + args: makeTestArgs(), + npdLister: controlCtx.InternalInformerFactory.Node().V1alpha1().NodeProfileDescriptors().Lister(), + cache: &Cache{ + NodePodInfo: map[string]*NodeCache{}, + }, + } + + _, err = controlCtx.Client.InternalClient.NodeV1alpha1().NodeProfileDescriptors(). + Create(context.TODO(), tc.npd, v12.CreateOptions{}) + assert.NoError(t, err) + controlCtx.StartInformer(context.TODO()) + time.Sleep(time.Second) + + status := p.fitByNPD(nodeInfo) + + if tc.expectRes == nil { + assert.Nil(t, status) + } else { + assert.Equal(t, tc.expectRes.Code(), status.Code()) + } + }) + } +} + func fixedItems(cpu, memory int64) []v1beta1.PodMetrics { res := make([]v1beta1.PodMetrics, portraitItemsLength, portraitItemsLength) @@ -273,19 +693,16 @@ func makeTestArgs() *config.LoadAwareArgs { v1.ResourceCPU: 1, v1.ResourceMemory: 1, }, + CalculateIndicatorWeight: map[config.IndicatorType]int64{ + consts.Usage15MinAvgKey: 5, + consts.Usage1HourMaxKey: 3, + consts.Usage1DayMaxKey: 2, + }, + NodeMetricsExpiredSeconds: new(int64), + PodAnnotationLoadAwareEnable: new(string), } - args.PodAnnotationLoadAwareEnable = new(string) + *args.NodeMetricsExpiredSeconds = 300 *args.PodAnnotationLoadAwareEnable = "" return args } - -func TestTTT(t *testing.T) { - a := resource.MustParse("4") - cpu := a.MilliValue() - t.Log(cpu) - - a = resource.MustParse("165m") - cpu = a.MilliValue() - t.Log(cpu) -} diff --git a/pkg/scheduler/plugins/loadaware/handler.go b/pkg/scheduler/plugins/loadaware/handler.go index a1a7004a55..a7e260bc5a 100644 --- a/pkg/scheduler/plugins/loadaware/handler.go +++ b/pkg/scheduler/plugins/loadaware/handler.go @@ -1,3 +1,19 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package loadaware import ( @@ -19,7 +35,7 @@ const ( LoadAwareSPDHandler = "LoadAwareSPDHandler" ) -func RegisterPodHandler() { +func (p *Plugin) registerPodHandler() { eventhandlers.RegisterEventHandler( LoadAwarePodHandler, func(informerFactory informers.SharedInformerFactory, _ externalversions.SharedInformerFactory) { @@ -30,9 +46,9 @@ func RegisterPodHandler() { return true }, Handler: toolcache.ResourceEventHandlerFuncs{ - AddFunc: OnAdd, - UpdateFunc: OnUpdate, - DeleteFunc: OnDelete, + AddFunc: p.OnAdd, + UpdateFunc: p.OnUpdate, + DeleteFunc: p.OnDelete, }, }, ) @@ -58,7 +74,7 @@ func (p *Plugin) registerSPDHandler() { ) } -func OnAdd(obj interface{}) { +func (p *Plugin) OnAdd(obj interface{}) { pod, ok := obj.(*v1.Pod) if !ok { klog.Warningf("transfer obj to pod fail") @@ -73,27 +89,27 @@ func OnAdd(obj interface{}) { startTime = pod.Status.StartTime.Time } - cache.addPod(nodeName, pod, startTime) + p.cache.addPod(nodeName, pod, startTime) } -func OnUpdate(oldObj, newObj interface{}) { +func (p *Plugin) OnUpdate(oldObj, newObj interface{}) { pod, ok := newObj.(*v1.Pod) if !ok { return } if v1pod.IsPodTerminal(pod) { - cache.removePod(pod.Spec.NodeName, pod) + p.cache.removePod(pod.Spec.NodeName, pod) } else { - //pod delete and pod may merge a update event + // pod delete and pod may merge a update event assignTime := time.Now() if pod.Status.StartTime != nil { assignTime = pod.Status.StartTime.Time } - cache.addPod(pod.Spec.NodeName, pod, assignTime) + p.cache.addPod(pod.Spec.NodeName, pod, assignTime) } } -func OnDelete(obj interface{}) { +func (p *Plugin) OnDelete(obj interface{}) { var pod *v1.Pod switch t := obj.(type) { case *v1.Pod: @@ -107,5 +123,5 @@ func OnDelete(obj interface{}) { default: return } - cache.removePod(pod.Spec.NodeName, pod) + p.cache.removePod(pod.Spec.NodeName, pod) } diff --git a/pkg/scheduler/plugins/loadaware/helper.go b/pkg/scheduler/plugins/loadaware/helper.go index 7f4c00d4bb..db1ca98bd6 100644 --- a/pkg/scheduler/plugins/loadaware/helper.go +++ b/pkg/scheduler/plugins/loadaware/helper.go @@ -1,3 +1,19 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package loadaware import ( @@ -8,9 +24,9 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" "github.com/kubewharf/katalyst-core/pkg/util/native" - "k8s.io/kubernetes/pkg/scheduler/framework" ) type Item struct { @@ -29,14 +45,13 @@ func (it Items) Swap(i, j int) { } func (it Items) Less(i, j int) bool { - location, err := time.LoadLocation("Asia/Shanghai") if err != nil { location = time.Local } // sort sample timestamp hour houri := it[i].Timestamp.In(location).Hour() - hourj := it[i].Timestamp.In(location).Hour() + hourj := it[j].Timestamp.In(location).Hour() return houri < hourj } diff --git a/pkg/scheduler/plugins/loadaware/plugin.go b/pkg/scheduler/plugins/loadaware/plugin.go index c1da0cd544..fddb243001 100644 --- a/pkg/scheduler/plugins/loadaware/plugin.go +++ b/pkg/scheduler/plugins/loadaware/plugin.go @@ -1,3 +1,19 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package loadaware import ( @@ -49,6 +65,7 @@ type Plugin struct { npdLister listers.NodeProfileDescriptorLister spdLister workloadlisters.ServiceProfileDescriptorLister spdHasSynced toolscache.InformerSynced + cache *Cache } func NewPlugin(args runtime.Object, handle framework.Handle) (framework.Plugin, error) { @@ -65,13 +82,16 @@ func NewPlugin(args runtime.Object, handle framework.Handle) (framework.Plugin, p := &Plugin{ handle: handle, args: pluginArgs, + cache: &Cache{ + NodePodInfo: map[string]*NodeCache{}, + }, } p.registerNPDHandler() p.registerSPDHandler() - RegisterPodHandler() + p.registerPodHandler() if p.enablePortrait() { - cache.SetSPDLister(p) + p.cache.SetSPDLister(p) } go func() { @@ -80,7 +100,7 @@ func NewPlugin(args runtime.Object, handle framework.Handle) (framework.Plugin, klog.Warningf("portrait has not synced, skip") return } - cache.ReconcilePredictUsage() + p.cache.ReconcilePredictUsage() }, time.Hour, context.TODO().Done()) }() @@ -240,7 +260,7 @@ func (p *Plugin) portraitByRequest(pod *v1.Pod) *ResourceUsage { } func (p *Plugin) getNodePredictUsage(pod *v1.Pod, nodeName string) (*ResourceUsage, error) { - nodePredictUsage := cache.GetNodePredictUsage(nodeName) + nodePredictUsage := p.cache.GetNodePredictUsage(nodeName) klog.V(6).Infof("node %v predict usage cpu: %v, memory: %v", nodeName, nodePredictUsage.Cpu, nodePredictUsage.Memory) podPredictUsage := p.GetPodPortrait(pod) diff --git a/pkg/scheduler/plugins/loadaware/plugin_test.go b/pkg/scheduler/plugins/loadaware/plugin_test.go new file mode 100644 index 0000000000..e3a6aff6b0 --- /dev/null +++ b/pkg/scheduler/plugins/loadaware/plugin_test.go @@ -0,0 +1,91 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loadaware + +import ( + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + v12 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/kubewharf/katalyst-api/pkg/apis/scheduling/config" +) + +func TestIsLoadAwareEnabled(t *testing.T) { + t.Parallel() + + p := &Plugin{ + args: &config.LoadAwareArgs{ + PodAnnotationLoadAwareEnable: new(string), + }, + } + *p.args.PodAnnotationLoadAwareEnable = "" + + testpod := &v1.Pod{ + ObjectMeta: v12.ObjectMeta{ + Annotations: map[string]string{ + "loadAwareEnable": "false", + }, + }, + } + + assert.True(t, p.IsLoadAwareEnabled(testpod)) + + *p.args.PodAnnotationLoadAwareEnable = "loadAwareEnable" + assert.False(t, p.IsLoadAwareEnabled(testpod)) + + testpod.Annotations["loadAwareEnable"] = "true" + assert.True(t, p.IsLoadAwareEnabled(testpod)) +} + +func TestPortraitByRequest(t *testing.T) { + t.Parallel() + + p := Plugin{ + args: &config.LoadAwareArgs{ + PodAnnotationLoadAwareEnable: new(string), + }, + } + + testpod := &v1.Pod{ + ObjectMeta: v12.ObjectMeta{ + Name: "testPod", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "testContainer", + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + }, + }, + }, + }, + } + + resourceUsage := p.portraitByRequest(testpod) + assert.Equal(t, len(resourceUsage.Cpu), portraitItemsLength) + assert.Equal(t, len(resourceUsage.Memory), portraitItemsLength) + + assert.Equal(t, resourceUsage.Cpu[0], 4000.0) + assert.Equal(t, resourceUsage.Memory[0], float64(8*1024*1024*1024)) +} diff --git a/pkg/scheduler/plugins/loadaware/reserve.go b/pkg/scheduler/plugins/loadaware/reserve.go index 76e59d6d14..38f4e23ed2 100644 --- a/pkg/scheduler/plugins/loadaware/reserve.go +++ b/pkg/scheduler/plugins/loadaware/reserve.go @@ -1,3 +1,19 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package loadaware import ( @@ -9,10 +25,10 @@ import ( ) func (p *Plugin) Reserve(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { - cache.addPod(nodeName, pod, time.Now()) + p.cache.addPod(nodeName, pod, time.Now()) return nil } func (p *Plugin) Unreserve(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeName string) { - cache.removePod(nodeName, pod) + p.cache.removePod(nodeName, pod) } diff --git a/pkg/scheduler/plugins/loadaware/reserve_test.go b/pkg/scheduler/plugins/loadaware/reserve_test.go new file mode 100644 index 0000000000..fc11372b75 --- /dev/null +++ b/pkg/scheduler/plugins/loadaware/reserve_test.go @@ -0,0 +1,94 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loadaware + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + v12 "k8s.io/apimachinery/pkg/apis/meta/v1" + cache2 "k8s.io/client-go/tools/cache" + + "github.com/kubewharf/katalyst-api/pkg/apis/workload/v1alpha1" + katalyst_base "github.com/kubewharf/katalyst-core/cmd/base" +) + +func TestReserve(t *testing.T) { + t.Parallel() + + controlCtx, err := katalyst_base.GenerateFakeGenericContext() + assert.NoError(t, err) + + p := &Plugin{ + args: makeTestArgs(), + spdLister: controlCtx.InternalInformerFactory.Workload().V1alpha1().ServiceProfileDescriptors().Lister(), + spdHasSynced: controlCtx.InternalInformerFactory.Workload().V1alpha1().ServiceProfileDescriptors().Informer().HasSynced, + cache: &Cache{ + NodePodInfo: map[string]*NodeCache{}, + }, + } + p.cache.SetSPDLister(p) + + testPod := &v1.Pod{ + ObjectMeta: v12.ObjectMeta{ + Name: "testPod", + Namespace: "default", + OwnerReferences: []v12.OwnerReference{ + { + Name: "reserveDeployment1", + Kind: "Deployment", + }, + }, + }, + } + testNode := "testReserveNode" + testSPD := &v1alpha1.ServiceProfileDescriptor{ + ObjectMeta: v12.ObjectMeta{ + Name: "reserveDeployment1", + Namespace: "default", + }, + Status: v1alpha1.ServiceProfileDescriptorStatus{ + AggMetrics: []v1alpha1.AggPodMetrics{ + { + Scope: spdPortraitScope, + Items: fixedItems(4, 8*1024*1024*1024), + }, + }, + }, + } + _, err = controlCtx.Client.InternalClient.WorkloadV1alpha1().ServiceProfileDescriptors(testSPD.GetNamespace()). + Create(context.TODO(), testSPD, v12.CreateOptions{}) + assert.NoError(t, err) + controlCtx.StartInformer(context.TODO()) + // wait for portrait synced + if !cache2.WaitForCacheSync(context.TODO().Done(), p.spdHasSynced) { + t.Error("wait for portrait informer synced fail") + t.FailNow() + } + + _ = p.Reserve(context.TODO(), nil, testPod, testNode) + resourceUsage := p.cache.GetNodePredictUsage(testNode) + assert.Equal(t, portraitItemsLength, len(resourceUsage.Cpu)) + assert.Equal(t, portraitItemsLength, len(resourceUsage.Memory)) + assert.NotZero(t, resourceUsage.Cpu[0]) + + p.Unreserve(context.TODO(), nil, testPod, testNode) + resourceUsage = p.cache.GetNodePredictUsage(testNode) + assert.Zero(t, resourceUsage.Cpu[0]) +} diff --git a/pkg/scheduler/plugins/loadaware/score.go b/pkg/scheduler/plugins/loadaware/score.go index dd189dc6ea..07297f1ab4 100644 --- a/pkg/scheduler/plugins/loadaware/score.go +++ b/pkg/scheduler/plugins/loadaware/score.go @@ -1,12 +1,29 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package loadaware import ( "context" "fmt" - "k8s.io/klog/v2" "math" "time" + "k8s.io/klog/v2" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -19,13 +36,6 @@ import ( "github.com/kubewharf/katalyst-api/pkg/consts" ) -const ( - metric5Min = "avg_5min" - metric15Min = "avg_15min" - metric1Hour = "max_1hour" - metric1Day = "max_1day" -) - func (p *Plugin) ScoreExtensions() framework.ScoreExtensions { return nil } @@ -64,11 +74,11 @@ func (p *Plugin) scoreByNPD(pod *v1.Pod, nodeName string) (int64, *framework.Sta loadAwareUsage := p.getLoadAwareResourceList(npd) - //estimated the recent assign pod usage + // estimated the recent assign pod usage estimatedUsed := estimatedPodUsed(pod, p.args.ResourceToWeightMap, p.args.ResourceToScalingFactorMap) estimatedAssignedPodUsage := p.estimatedAssignedPodUsage(nodeName, timeStamp) finalEstimatedUsed := quotav1.Add(estimatedUsed, estimatedAssignedPodUsage) - //add estimated usage to avg_15min_usage + // add estimated usage to avg_15min_usage finalNodeUsedOfIndicators := make(map[config.IndicatorType]v1.ResourceList) for indicator := range p.args.CalculateIndicatorWeight { if loadAwareUsage != nil { @@ -99,9 +109,7 @@ func (p *Plugin) scoreByPortrait(pod *v1.Pod, nodeName string) (int64, *framewor return framework.MinNodeScore, nil } - var ( - scoreSum, weightSum int64 - ) + var scoreSum, weightSum int64 for _, resourceName := range []v1.ResourceName{v1.ResourceCPU, v1.ResourceMemory} { targetUsage, ok := p.args.ResourceToTargetMap[resourceName] @@ -155,9 +163,9 @@ func (p *Plugin) estimatedAssignedPodUsage(nodeName string, updateTime time.Time estimatedUsed = make(map[v1.ResourceName]int64) result = v1.ResourceList{} ) - cache.RLock() - nodeCache, ok := cache.NodePodInfo[nodeName] - cache.RUnlock() + p.cache.RLock() + nodeCache, ok := p.cache.NodePodInfo[nodeName] + p.cache.RUnlock() if !ok { return result } diff --git a/pkg/scheduler/plugins/loadaware/score_test.go b/pkg/scheduler/plugins/loadaware/score_test.go new file mode 100644 index 0000000000..ce4ba355e6 --- /dev/null +++ b/pkg/scheduler/plugins/loadaware/score_test.go @@ -0,0 +1,640 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loadaware + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + v12 "k8s.io/apimachinery/pkg/apis/meta/v1" + cache2 "k8s.io/client-go/tools/cache" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/runtime" + + v1alpha12 "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1" + "github.com/kubewharf/katalyst-api/pkg/apis/workload/v1alpha1" + katalyst_base "github.com/kubewharf/katalyst-core/cmd/base" + "github.com/kubewharf/katalyst-core/pkg/config/generic" + "github.com/kubewharf/katalyst-core/pkg/scheduler/util" +) + +func TestTargetLoadPacking(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + name string + targetRatio float64 + usageRatio float64 + expectErr bool + expectRes int64 + }{ + { + name: "less than target", + targetRatio: 50, + usageRatio: 10, + expectErr: false, + expectRes: 60, + }, + { + name: "greater than target", + targetRatio: 50, + usageRatio: 60, + expectErr: false, + expectRes: 40, + }, + { + name: "zero target", + targetRatio: 0, + usageRatio: 10, + expectErr: true, + expectRes: 0, + }, + { + name: "target greater than 100", + targetRatio: 200, + usageRatio: 10, + expectErr: true, + expectRes: 0, + }, + { + name: "usage less than 0", + targetRatio: 50, + usageRatio: -1, + expectErr: false, + expectRes: 50, + }, + { + name: "usage greater than 100", + targetRatio: 50, + usageRatio: 101, + expectErr: false, + expectRes: 0, + }, + { + name: "low usage", + targetRatio: 30, + usageRatio: 0.1, + expectErr: false, + expectRes: 30, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + res, err := targetLoadPacking(tc.targetRatio, tc.usageRatio) + if !tc.expectErr { + assert.NoError(t, err) + } else { + assert.Error(t, err) + } + assert.Equal(t, tc.expectRes, res) + }) + } +} + +func TestScore(t *testing.T) { + t.Parallel() + + util.SetQoSConfig(generic.NewQoSConfiguration()) + + for _, tc := range []struct { + name string + pod *v1.Pod + lowNode *v1.Node + highNode *v1.Node + lowNodePods []*v1.Pod + highNodePods []*v1.Pod + spd []*v1alpha1.ServiceProfileDescriptor + npd []*v1alpha12.NodeProfileDescriptor + enablePortrait bool + }{ + { + name: "enablePortrait", + pod: &v1.Pod{ + ObjectMeta: v12.ObjectMeta{ + Name: "pod1", + UID: "pod1UID", + Namespace: "testNs", + OwnerReferences: []v12.OwnerReference{ + { + Kind: "Deployment", + Name: "deployment1", + }, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "testContainer", + Resources: v1.ResourceRequirements{ + Limits: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("8"), + v1.ResourceMemory: resource.MustParse("16Gi"), + }, + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("8"), + v1.ResourceMemory: resource.MustParse("16Gi"), + }, + }, + }, + }, + }, + }, + lowNode: &v1.Node{ + ObjectMeta: v12.ObjectMeta{ + Name: "node1", + }, + Spec: v1.NodeSpec{}, + Status: v1.NodeStatus{ + Capacity: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("32"), + v1.ResourceMemory: resource.MustParse("64Gi"), + }, + Allocatable: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("32"), + v1.ResourceMemory: resource.MustParse("64Gi"), + }, + }, + }, + highNode: &v1.Node{ + ObjectMeta: v12.ObjectMeta{ + Name: "node2", + }, + Spec: v1.NodeSpec{}, + Status: v1.NodeStatus{ + Capacity: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("32"), + v1.ResourceMemory: resource.MustParse("64Gi"), + }, + Allocatable: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("32"), + v1.ResourceMemory: resource.MustParse("64Gi"), + }, + }, + }, + lowNodePods: []*v1.Pod{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "pod2", + UID: "pod2UID", + Namespace: "testNs", + OwnerReferences: []v12.OwnerReference{ + { + Kind: "Deployment", + Name: "deployment3", + }, + }, + }, + }, + { + ObjectMeta: v12.ObjectMeta{ + Name: "pod3", + UID: "pod3UID", + Namespace: "testNs", + OwnerReferences: []v12.OwnerReference{ + { + Kind: "Deployment", + Name: "deployment3", + }, + }, + }, + }, + }, + highNodePods: []*v1.Pod{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "pod5", + UID: "pod5UID", + Namespace: "testNs", + OwnerReferences: []v12.OwnerReference{ + { + Kind: "Deployment", + Name: "deployment2", + }, + }, + }, + }, + { + ObjectMeta: v12.ObjectMeta{ + Name: "pod4", + UID: "pod4UID", + Namespace: "testNs", + OwnerReferences: []v12.OwnerReference{ + { + Kind: "Deployment", + Name: "deployment2", + }, + }, + }, + }, + }, + spd: []*v1alpha1.ServiceProfileDescriptor{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "deployment1", + Namespace: "testNs", + }, + Status: v1alpha1.ServiceProfileDescriptorStatus{ + AggMetrics: []v1alpha1.AggPodMetrics{ + { + Scope: spdPortraitScope, + Items: rangeItems(4, 8*1024*1024*1024), + }, + }, + }, + }, + { + ObjectMeta: v12.ObjectMeta{ + Name: "deployment2", + Namespace: "testNs", + }, + Status: v1alpha1.ServiceProfileDescriptorStatus{ + AggMetrics: []v1alpha1.AggPodMetrics{ + { + Scope: spdPortraitScope, + Items: fixedItems(4, 8*1024*1024*1024), + }, + }, + }, + }, + { + ObjectMeta: v12.ObjectMeta{ + Name: "deployment3", + Namespace: "testNs", + }, + Status: v1alpha1.ServiceProfileDescriptorStatus{ + AggMetrics: []v1alpha1.AggPodMetrics{ + { + Scope: spdPortraitScope, + Items: fixedItems(8, 16*1024*1024*1024), + }, + }, + }, + }, + }, + npd: []*v1alpha12.NodeProfileDescriptor{}, + enablePortrait: true, + }, + { + name: "unablePortrait", + pod: &v1.Pod{ + ObjectMeta: v12.ObjectMeta{ + Name: "pod1", + UID: "pod1UID", + Namespace: "testNs", + OwnerReferences: []v12.OwnerReference{ + { + Kind: "Deployment", + Name: "deployment1", + }, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "testContainer", + Resources: v1.ResourceRequirements{ + Limits: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("8"), + v1.ResourceMemory: resource.MustParse("16Gi"), + }, + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("8"), + v1.ResourceMemory: resource.MustParse("16Gi"), + }, + }, + }, + }, + }, + }, + lowNode: &v1.Node{ + ObjectMeta: v12.ObjectMeta{ + Name: "node1", + }, + Spec: v1.NodeSpec{}, + Status: v1.NodeStatus{ + Capacity: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("32"), + v1.ResourceMemory: resource.MustParse("64Gi"), + }, + Allocatable: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("32"), + v1.ResourceMemory: resource.MustParse("64Gi"), + }, + }, + }, + highNode: &v1.Node{ + ObjectMeta: v12.ObjectMeta{ + Name: "node2", + }, + Spec: v1.NodeSpec{}, + Status: v1.NodeStatus{ + Capacity: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("32"), + v1.ResourceMemory: resource.MustParse("64Gi"), + }, + Allocatable: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("32"), + v1.ResourceMemory: resource.MustParse("64Gi"), + }, + }, + }, + lowNodePods: []*v1.Pod{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "pod2", + UID: "pod2UID", + Namespace: "testNs", + OwnerReferences: []v12.OwnerReference{ + { + Kind: "Deployment", + Name: "deployment3", + }, + }, + }, + }, + { + ObjectMeta: v12.ObjectMeta{ + Name: "pod3", + UID: "pod3UID", + Namespace: "testNs", + OwnerReferences: []v12.OwnerReference{ + { + Kind: "Deployment", + Name: "deployment3", + }, + }, + }, + }, + }, + highNodePods: []*v1.Pod{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "pod5", + UID: "pod5UID", + Namespace: "testNs", + OwnerReferences: []v12.OwnerReference{ + { + Kind: "Deployment", + Name: "deployment2", + }, + }, + }, + }, + { + ObjectMeta: v12.ObjectMeta{ + Name: "pod4", + UID: "pod4UID", + Namespace: "testNs", + OwnerReferences: []v12.OwnerReference{ + { + Kind: "Deployment", + Name: "deployment2", + }, + }, + }, + }, + }, + spd: []*v1alpha1.ServiceProfileDescriptor{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "deployment1", + Namespace: "testNs", + }, + Status: v1alpha1.ServiceProfileDescriptorStatus{ + AggMetrics: []v1alpha1.AggPodMetrics{ + { + Scope: spdPortraitScope, + Items: rangeItems(4, 8*1024*1024*1024), + }, + }, + }, + }, + { + ObjectMeta: v12.ObjectMeta{ + Name: "deployment2", + Namespace: "testNs", + }, + Status: v1alpha1.ServiceProfileDescriptorStatus{ + AggMetrics: []v1alpha1.AggPodMetrics{ + { + Scope: spdPortraitScope, + Items: fixedItems(4, 8*1024*1024*1024), + }, + }, + }, + }, + { + ObjectMeta: v12.ObjectMeta{ + Name: "deployment3", + Namespace: "testNs", + }, + Status: v1alpha1.ServiceProfileDescriptorStatus{ + AggMetrics: []v1alpha1.AggPodMetrics{ + { + Scope: spdPortraitScope, + Items: fixedItems(8, 16*1024*1024*1024), + }, + }, + }, + }, + }, + npd: []*v1alpha12.NodeProfileDescriptor{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "node1", + }, + Status: v1alpha12.NodeProfileDescriptorStatus{ + NodeMetrics: []v1alpha12.ScopedNodeMetrics{ + { + Scope: loadAwareMetricScope, + Metrics: []v1alpha12.MetricValue{ + { + MetricName: "cpu", + Value: resource.MustParse("9088m"), + Window: &v12.Duration{Duration: 15 * time.Minute}, + Timestamp: v12.Time{Time: time.Now()}, + }, + { + MetricName: "memory", + Value: resource.MustParse("9035916Ki"), + Window: &v12.Duration{Duration: 15 * time.Minute}, + Timestamp: v12.Time{Time: time.Now()}, + }, + { + MetricName: "cpu", + Value: resource.MustParse("10090m"), + Window: &v12.Duration{Duration: time.Hour}, + Timestamp: v12.Time{Time: time.Now()}, + }, + { + MetricName: "memory", + Value: resource.MustParse("9035916Ki"), + Window: &v12.Duration{Duration: time.Hour}, + Timestamp: v12.Time{Time: time.Now()}, + }, + { + MetricName: "cpu", + Value: resource.MustParse("12088m"), + Window: &v12.Duration{Duration: 24 * time.Hour}, + Timestamp: v12.Time{Time: time.Now()}, + }, + { + MetricName: "memory", + Value: resource.MustParse("9035916Ki"), + Window: &v12.Duration{Duration: 24 * time.Hour}, + Timestamp: v12.Time{Time: time.Now()}, + }, + }, + }, + }, + }, + }, + { + ObjectMeta: v12.ObjectMeta{ + Name: "node2", + }, + Status: v1alpha12.NodeProfileDescriptorStatus{ + NodeMetrics: []v1alpha12.ScopedNodeMetrics{ + { + Scope: loadAwareMetricScope, + Metrics: []v1alpha12.MetricValue{ + { + MetricName: "cpu", + Value: resource.MustParse("1088m"), + Window: &v12.Duration{Duration: 15 * time.Minute}, + Timestamp: v12.Time{Time: time.Now()}, + }, + { + MetricName: "memory", + Value: resource.MustParse("5035916Ki"), + Window: &v12.Duration{Duration: 15 * time.Minute}, + Timestamp: v12.Time{Time: time.Now()}, + }, + { + MetricName: "cpu", + Value: resource.MustParse("1090m"), + Window: &v12.Duration{Duration: time.Hour}, + Timestamp: v12.Time{Time: time.Now()}, + }, + { + MetricName: "memory", + Value: resource.MustParse("5035916Ki"), + Window: &v12.Duration{Duration: time.Hour}, + Timestamp: v12.Time{Time: time.Now()}, + }, + { + MetricName: "cpu", + Value: resource.MustParse("1088m"), + Window: &v12.Duration{Duration: 24 * time.Hour}, + Timestamp: v12.Time{Time: time.Now()}, + }, + { + MetricName: "memory", + Value: resource.MustParse("5035916Ki"), + Window: &v12.Duration{Duration: 24 * time.Hour}, + Timestamp: v12.Time{Time: time.Now()}, + }, + }, + }, + }, + }, + }, + }, + enablePortrait: false, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + lowNodeInfo := framework.NewNodeInfo() + lowNodeInfo.SetNode(tc.lowNode) + for _, pod := range tc.lowNodePods { + lowNodeInfo.AddPod(pod) + } + + highNodeInfo := framework.NewNodeInfo() + highNodeInfo.SetNode(tc.highNode) + for _, pod := range tc.highNodePods { + highNodeInfo.AddPod(pod) + } + + fw, err := runtime.NewFramework(nil, nil, + runtime.WithSnapshotSharedLister(newTestSharedLister(nil, []*v1.Node{tc.lowNode, tc.highNode}))) + assert.NoError(t, err) + + controlCtx, err := katalyst_base.GenerateFakeGenericContext() + assert.NoError(t, err) + + p := &Plugin{ + handle: fw, + args: makeTestArgs(), + spdLister: controlCtx.InternalInformerFactory.Workload().V1alpha1().ServiceProfileDescriptors().Lister(), + npdLister: controlCtx.InternalInformerFactory.Node().V1alpha1().NodeProfileDescriptors().Lister(), + spdHasSynced: controlCtx.InternalInformerFactory.Workload().V1alpha1().ServiceProfileDescriptors().Informer().HasSynced, + cache: &Cache{ + NodePodInfo: map[string]*NodeCache{}, + }, + } + *p.args.EnablePortrait = tc.enablePortrait + p.cache.SetSPDLister(p) + + for _, pr := range tc.spd { + _, err = controlCtx.Client.InternalClient.WorkloadV1alpha1().ServiceProfileDescriptors(pr.Namespace). + Create(context.TODO(), pr, v12.CreateOptions{}) + assert.NoError(t, err) + } + for _, n := range tc.npd { + _, err = controlCtx.Client.InternalClient.NodeV1alpha1().NodeProfileDescriptors(). + Create(context.TODO(), n, v12.CreateOptions{}) + assert.NoError(t, err) + } + + controlCtx.StartInformer(context.TODO()) + + // wait for portrait synced + if !cache2.WaitForCacheSync(context.TODO().Done(), p.spdHasSynced) { + t.Error("wait for portrait informer synced fail") + t.FailNow() + } + + // add pod to cache + for _, pod := range tc.lowNodePods { + p.cache.addPod(tc.lowNode.Name, pod, time.Now()) + } + for _, pod := range tc.highNodePods { + p.cache.addPod(tc.highNode.Name, pod, time.Now()) + } + + lowScore, stat := p.Score(context.TODO(), nil, tc.pod, tc.lowNode.Name) + assert.Nil(t, stat) + assert.NotZero(t, lowScore) + + highScore, stat := p.Score(context.TODO(), nil, tc.pod, tc.highNode.Name) + assert.Nil(t, stat) + assert.NotZero(t, highScore) + + assert.Greater(t, highScore, lowScore) + }) + } +}