From b5c430da41ddc04578be95a9de2a3accea25e2c0 Mon Sep 17 00:00:00 2001 From: luomingmeng Date: Wed, 18 Dec 2024 11:42:44 +0800 Subject: [PATCH] qrm supports shared_cores with numa binding hint optimizer --- .../app/options/qrm/cpu_plugin.go | 35 +- .../dynamicpolicy/hintoptimizer/interface.go | 33 ++ .../hintoptimizer/serviceprofile/optimizer.go | 452 ++++++++++++++++++ .../serviceprofile/optimizer_test.go | 442 +++++++++++++++++ .../qrm-plugins/cpu/dynamicpolicy/policy.go | 9 + .../cpu/dynamicpolicy/policy_hint_handlers.go | 7 + .../cpu/dynamicpolicy/policy_test.go | 22 +- .../plugin/qosaware/resource/helper/helper.go | 7 +- pkg/config/agent/qrm/cpu_plugin.go | 10 + .../store/data/types/aggragated.go | 4 +- pkg/metaserver/spd/fetcher.go | 5 +- pkg/metaserver/spd/manager.go | 4 +- pkg/metaserver/spd/util.go | 28 ++ pkg/metaserver/spd/util_test.go | 91 ++++ pkg/util/native/resources.go | 31 ++ 15 files changed, 1154 insertions(+), 26 deletions(-) create mode 100644 pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/interface.go create mode 100644 pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/serviceprofile/optimizer.go create mode 100644 pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/serviceprofile/optimizer_test.go diff --git a/cmd/katalyst-agent/app/options/qrm/cpu_plugin.go b/cmd/katalyst-agent/app/options/qrm/cpu_plugin.go index f07503cff..6b2294116 100644 --- a/cmd/katalyst-agent/app/options/qrm/cpu_plugin.go +++ b/cmd/katalyst-agent/app/options/qrm/cpu_plugin.go @@ -17,11 +17,13 @@ limitations under the License. package qrm import ( + v1 "k8s.io/api/core/v1" cliflag "k8s.io/component-base/cli/flag" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/commonstate" cpuconsts "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/consts" qrmconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm" + "github.com/kubewharf/katalyst-core/pkg/util/native" ) type CPUOptions struct { @@ -34,13 +36,15 @@ type CPUOptions struct { } type CPUDynamicPolicyOptions struct { - EnableCPUAdvisor bool - EnableCPUPressureEviction bool - LoadPressureEvictionSkipPools []string - EnableSyncingCPUIdle bool - EnableCPUIdle bool - CPUNUMAHintPreferPolicy string - CPUNUMAHintPreferLowThreshold float64 + EnableCPUAdvisor bool + EnableCPUPressureEviction bool + LoadPressureEvictionSkipPools []string + EnableSyncingCPUIdle bool + EnableCPUIdle bool + CPUNUMAHintPreferPolicy string + CPUNUMAHintPreferLowThreshold float64 + EnableSharedCoresNUMABindingHintOptimizer bool + SharedCoresNUMABindingHintOptimizerOptions ServiceProfileHintOptimizerOptions } type CPUNativePolicyOptions struct { @@ -48,6 +52,10 @@ type CPUNativePolicyOptions struct { CPUAllocationOption string } +type ServiceProfileHintOptimizerOptions struct { + ResourceWeights native.ResourceThreshold +} + func NewCPUOptions() *CPUOptions { return &CPUOptions{ PolicyName: "dynamic", @@ -65,6 +73,13 @@ func NewCPUOptions() *CPUOptions { commonstate.PoolNameFallback, commonstate.PoolNameReserve, }, + EnableSharedCoresNUMABindingHintOptimizer: false, + SharedCoresNUMABindingHintOptimizerOptions: ServiceProfileHintOptimizerOptions{ + ResourceWeights: native.ResourceThreshold{ + v1.ResourceCPU: 1., + v1.ResourceMemory: 1., + }, + }, }, CPUNativePolicyOptions: CPUNativePolicyOptions{ EnableFullPhysicalCPUsOnly: false, @@ -103,6 +118,10 @@ func (o *CPUOptions) AddFlags(fss *cliflag.NamedFlagSets) { fs.BoolVar(&o.EnableFullPhysicalCPUsOnly, "enable-full-physical-cpus-only", o.EnableFullPhysicalCPUsOnly, "if set true, we will enable extra allocation restrictions to "+ "avoid different containers to possibly end up on the same core.") + fs.BoolVar(&o.EnableSharedCoresNUMABindingHintOptimizer, "enable-shared-cores-numa-binding-hint-optimizer", + o.EnableSharedCoresNUMABindingHintOptimizer, "if set true, we will enable shared cores numa binding hint optimizer") + fs.Var(&o.SharedCoresNUMABindingHintOptimizerOptions.ResourceWeights, "shared-cores-numa-binding-hint-optimizer-resource-weights", + "it indicates resource weights for shared cores numa binding hint optimizer") } func (o *CPUOptions) ApplyTo(conf *qrmconfig.CPUQRMPluginConfig) error { @@ -118,5 +137,7 @@ func (o *CPUOptions) ApplyTo(conf *qrmconfig.CPUQRMPluginConfig) error { conf.CPUAllocationOption = o.CPUAllocationOption conf.CPUNUMAHintPreferPolicy = o.CPUNUMAHintPreferPolicy conf.CPUNUMAHintPreferLowThreshold = o.CPUNUMAHintPreferLowThreshold + conf.EnableSharedCoresNUMABindingHintOptimizer = o.EnableSharedCoresNUMABindingHintOptimizer + conf.SharedCoresNUMABindingHintOptimizerConfig.ResourceWeights = o.SharedCoresNUMABindingHintOptimizerOptions.ResourceWeights return nil } diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/interface.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/interface.go new file mode 100644 index 000000000..208863593 --- /dev/null +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/interface.go @@ -0,0 +1,33 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hintoptimizer + +import ( + pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" + + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state" +) + +type HintOptimizer interface { + OptimizeHints(*pluginapi.ResourceRequest, []*pluginapi.TopologyHint, state.NUMANodeMap) error +} + +type DummyHintOptimizer struct{} + +func (d *DummyHintOptimizer) OptimizeHints(*pluginapi.ResourceRequest, []*pluginapi.TopologyHint, state.NUMANodeMap) error { + return nil +} diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/serviceprofile/optimizer.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/serviceprofile/optimizer.go new file mode 100644 index 000000000..5f15a5ff5 --- /dev/null +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/serviceprofile/optimizer.go @@ -0,0 +1,452 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package serviceprofile + +import ( + "context" + "fmt" + "math" + "sort" + "time" + + pkgerrors "github.com/pkg/errors" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" + + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state" + "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm" + "github.com/kubewharf/katalyst-core/pkg/metaserver" + agentpod "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod" + "github.com/kubewharf/katalyst-core/pkg/metaserver/spd" + "github.com/kubewharf/katalyst-core/pkg/metrics" + "github.com/kubewharf/katalyst-core/pkg/util/general" + "github.com/kubewharf/katalyst-core/pkg/util/native" +) + +const ( + defaultQuantitiesSize = 24 + + // MaxNUMAScore is the maximum score a numa is expected to return. + MaxNUMAScore float64 = 100. + + // MinNUMAScore is the minimum score a numa is expected to return. + MinNUMAScore float64 = 0 +) + +const ( + metricNameServiceProfileOptimizeHintsFailed = "service_profile_optimize_hints_failed" + metricNameServiceProfileOptimizeHintsSuccess = "service_profile_optimize_hints_success" + metricNameServiceProfileOptimizeHintsLatency = "service_profile_optimize_hints_latency" +) + +type NUMAScore struct { + Score float64 + ID int +} + +type GetRequestFunc func(podUID string) (resource.Quantity, error) + +type NormalizeNUMAScoresFunc func([]NUMAScore) error + +type resourceFuncs struct { + normalizeScoreFunc NormalizeNUMAScoresFunc + defaultRequestFunc GetRequestFunc +} + +type serviceProfileHintOptimizer struct { + emitter metrics.MetricEmitter + profilingManager spd.ServiceProfilingManager + podFetcher agentpod.PodFetcher + resourceWeight map[v1.ResourceName]float64 + resourceFuncs map[v1.ResourceName]resourceFuncs + defaultQuantitiesSize int +} + +func NewServiceProfileHintOptimizer( + emitter metrics.MetricEmitter, + metaServer *metaserver.MetaServer, + conf qrm.ServiceProfileHintOptimizerConfig, +) hintoptimizer.HintOptimizer { + p := &serviceProfileHintOptimizer{ + emitter: emitter, + profilingManager: metaServer, + podFetcher: metaServer, + resourceWeight: conf.ResourceWeights, + defaultQuantitiesSize: defaultQuantitiesSize, + } + + p.resourceFuncs = map[v1.ResourceName]resourceFuncs{ + v1.ResourceCPU: { + defaultRequestFunc: p.getCPURequest, + normalizeScoreFunc: spreadNormalizeScore, + }, + v1.ResourceMemory: { + defaultRequestFunc: p.getMemoryRequest, + normalizeScoreFunc: spreadNormalizeScore, + }, + } + + return p +} + +func (p *serviceProfileHintOptimizer) OptimizeHints( + req *pluginapi.ResourceRequest, + hints []*pluginapi.TopologyHint, + machineState state.NUMANodeMap, +) error { + now := time.Now() + defer func() { + _ = p.emitter.StoreInt64(metricNameServiceProfileOptimizeHintsLatency, time.Since(now).Milliseconds(), + metrics.MetricTypeNameRaw, + ) + }() + + err := p.optimizeHints(req, hints, machineState) + if err != nil { + general.Warningf("optimize hints failed with error: %v", err) + _ = p.emitter.StoreInt64(metricNameServiceProfileOptimizeHintsFailed, 1, + metrics.MetricTypeNameRaw, + p.generateRequestMetricTags(req)..., + ) + } + + return nil +} + +func (p *serviceProfileHintOptimizer) optimizeHints( + req *pluginapi.ResourceRequest, + hints []*pluginapi.TopologyHint, + machineState state.NUMANodeMap, +) error { + if len(hints) == 0 { + return nil + } + + general.Infof("%s/%s origin hints: %v", req.PodNamespace, req.PodName, hints) + + // Extract NUMA node IDs from the hints. Each hint should map to exactly one NUMA node. + numaNodes, err := p.getNUMANodes(hints) + if err != nil { + return err + } + + numaScores, err := p.score(req, numaNodes, machineState) + if err != nil { + if spd.IsSPDNameNotFound(err) { + // SPD name isn't found, skip optimization. + general.Infof("%v, skip optimization", err) + return nil + } + return err + } + + // Sort hints by aggregated NUMA scores in descending order. + sort.Slice(hints, func(i, j int) bool { + return numaScores[int(hints[i].Nodes[0])] > numaScores[int(hints[j].Nodes[0])] + }) + + // Mark the highest-ranked hint as preferred, and mark all other hints as non-preferred to + // make sure that the highest-ranked hint is the only one that is preferred. + hints[0].Preferred = true + maxScore := numaScores[int(hints[0].Nodes[0])] + for i := 1; i < len(hints); i++ { + if numaScores[int(hints[i].Nodes[0])] == maxScore { + hints[i].Preferred = true + } else { + hints[i].Preferred = false + } + } + + _ = p.emitter.StoreInt64(metricNameServiceProfileOptimizeHintsSuccess, 1, + metrics.MetricTypeNameRaw, + p.generateRequestMetricTags(req)..., + ) + + general.Infof("optimize hints %s/%s successfully, optimized hints: %v", req.PodNamespace, req.PodName, hints) + return nil +} + +func (p *serviceProfileHintOptimizer) generateRequestMetricTags(req *pluginapi.ResourceRequest) []metrics.MetricTag { + return []metrics.MetricTag{ + { + Key: "podName", + Val: req.PodName, + }, + { + Key: "podNamespace", + Val: req.PodNamespace, + }, + } +} + +// getNUMANodeScores returns a map of NUMA node IDs to their aggregated scores. +func (p *serviceProfileHintOptimizer) score( + req *pluginapi.ResourceRequest, + numaNodes []int, + machineState state.NUMANodeMap, +) (map[int]float64, error) { + numaScores := make(map[int]float64, len(numaNodes)) + for resourceName, weight := range p.resourceWeight { + if weight == 0 { + continue + } + + scores, err := p.getNUMANodeScores(req, numaNodes, machineState, resourceName) + if err != nil { + return nil, err + } + + general.Infof("%s/%s scores for resource %s before normalize: %v", req.PodNamespace, req.PodName, resourceName, scores) + + if funcs, ok := p.resourceFuncs[resourceName]; ok && funcs.normalizeScoreFunc != nil { + err = funcs.normalizeScoreFunc(scores) + if err != nil { + return nil, err + } + } + + general.Infof("%s/%s scores for resource %s after normalize: %v", req.PodNamespace, req.PodName, resourceName, scores) + + for _, score := range scores { + numaScores[score.ID] += score.Score * weight + } + } + + general.Infof("%s/%s aggregated scores: %v", req.PodNamespace, req.PodName, numaScores) + + return numaScores, nil +} + +func (p *serviceProfileHintOptimizer) getNUMANodes(hints []*pluginapi.TopologyHint) ([]int, error) { + numaNodes := make([]int, len(hints)) + for i, hint := range hints { + if len(hint.Nodes) != 1 { + return nil, fmt.Errorf("hint %d has invalid node count", i) + } + numaNodes[i] = int(hint.Nodes[0]) + } + return numaNodes, nil +} + +func (p *serviceProfileHintOptimizer) getNUMANodeScores( + req *pluginapi.ResourceRequest, + numaNodes []int, + machineState state.NUMANodeMap, + name v1.ResourceName, +) ([]NUMAScore, error) { + request, err := p.getContainerServiceProfileRequest(req, name) + if err != nil { + return nil, pkgerrors.Wrapf(err, "getting %s/%s service profile request", req.PodNamespace, req.PodName) + } + + // Calculate scores for each NUMA node. + var scores []NUMAScore + for _, numaID := range numaNodes { + // Get the current allocated pod service profile state for the NUMA node. + profileState, err := p.getNUMAAllocatedServiceProfileState(machineState, name, numaID) + if err != nil { + return nil, err + } + + err = native.AddQuantities(profileState, request) + if err != nil { + return nil, fmt.Errorf("failed to add quantities: %v", err) + } + + // Compute the NUMA node score based on the aggregated state. + scores = append(scores, NUMAScore{ + Score: native.AggregateMaxQuantities(profileState).AsApproximateFloat64(), + ID: numaID, + }) + } + + return scores, nil +} + +func spreadNormalizeScore(scores []NUMAScore) error { + // Determine the minimum and maximum scores among the NUMA nodes. + minScore, maxScore := math.MaxFloat64, 0.0 + for _, score := range scores { + if score.Score < minScore { + minScore = score.Score + } + if score.Score > maxScore { + maxScore = score.Score + } + } + + // Handle the edge case where all scores are zero. + if maxScore == 0 { + for i := range scores { + scores[i].Score = MaxNUMAScore + } + return nil + } + + // Normalize scores to a range of [MinNUMAScore, MaxNUMAScore], And invert the scores. + // This is done to ensure that the highest-ranked NUMA node receives the highest score, + // and the lowest-ranked NUMA node receives the lowest score. + for i := range scores { + s := scores[i].Score + scores[i].Score = MaxNUMAScore * (maxScore + minScore - s) / maxScore + } + return nil +} + +func (p *serviceProfileHintOptimizer) getContainerServiceProfileRequest( + req *pluginapi.ResourceRequest, + name v1.ResourceName, +) ([]resource.Quantity, error) { + if req == nil { + return nil, fmt.Errorf("request is nil") + } + + // Fetch the service profile request from the MetaServer using the provided metadata. + request, err := spd.GetContainerServiceProfileRequest(p.profilingManager, metav1.ObjectMeta{ + UID: types.UID(req.PodUid), + Namespace: req.PodNamespace, + Name: req.PodName, + Labels: req.Labels, + Annotations: req.Annotations, + }, name) + if err != nil && !errors.IsNotFound(err) { + // Log non-"not found" errors. Errors like "SPD name not found" indicate the pod explicitly not require + // profiling for optimization. In such cases, return the error to skip hint optimization. + general.Warningf("GetContainerServiceProfileRequest for pod: %s/%s, container: %s failed with error: %v", + req.PodNamespace, req.PodName, req.ContainerName, err) + return nil, err + } + + return p.getServiceProfileRequestWithDefault(req.PodUid, request, name) +} + +func (p *serviceProfileHintOptimizer) getContainerServiceProfileState( + allocationInfo *state.AllocationInfo, + name v1.ResourceName, +) ([]resource.Quantity, error) { + if allocationInfo == nil { + return nil, fmt.Errorf("allocationInfo is nil") + } + + // Fetch the service profile state from the MetaServer using allocation metadata. + request, err := spd.GetContainerServiceProfileRequest(p.profilingManager, metav1.ObjectMeta{ + UID: types.UID(allocationInfo.PodUid), + Namespace: allocationInfo.PodNamespace, + Name: allocationInfo.PodName, + Labels: allocationInfo.Labels, + Annotations: allocationInfo.Annotations, + }, name) + if err != nil && !spd.IsSPDNameOrResourceNotFound(err) { + // Log non-"spd name or resource not found" errors. Missing SPD name or resource is expected in some cases, + //such as existing allocations without a corresponding service profile. + general.Warningf("GetContainerServiceProfileRequest for pod: %s/%s, container: %s failed with error: %v", + allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName, err) + return nil, err + } + + return p.getServiceProfileRequestWithDefault(allocationInfo.PodUid, request, name) +} + +func (p *serviceProfileHintOptimizer) getServiceProfileRequestWithDefault(podUID string, request []resource.Quantity, name v1.ResourceName) ([]resource.Quantity, error) { + // If the request is unavailable, fall back to the default request function. + if request == nil && p.resourceFuncs[name].defaultRequestFunc != nil { + quantity, err := p.resourceFuncs[name].defaultRequestFunc(podUID) + if err != nil { + return nil, err + } + request = native.DuplicateQuantities(quantity, p.defaultQuantitiesSize) + } + return request, nil +} + +func (p *serviceProfileHintOptimizer) getNUMAAllocatedServiceProfileState( + machineState state.NUMANodeMap, + name v1.ResourceName, + numaID int, +) ([]resource.Quantity, error) { + if machineState == nil { + return nil, fmt.Errorf("machineState is nil") + } else if machineState[numaID] == nil { + return nil, fmt.Errorf("machineState for NUMA node %d is nil", numaID) + } + + profileState := native.DuplicateQuantities(resource.Quantity{}, p.defaultQuantitiesSize) + for _, entries := range machineState[numaID].PodEntries { + for _, allocationInfo := range entries { + if !(allocationInfo.CheckSharedNUMABinding() && allocationInfo.CheckMainContainer()) { + continue + } + + request, err := p.getContainerServiceProfileState(allocationInfo, name) + if err != nil { + return nil, pkgerrors.Wrapf(err, "getting %s/%s service profile state", allocationInfo.PodNamespace, allocationInfo.PodName) + } + + err = native.AddQuantities(profileState, request) + if err != nil { + return nil, err + } + } + } + + return profileState, nil +} + +func (p *serviceProfileHintOptimizer) getCPURequest(podUID string) (resource.Quantity, error) { + var ( + err error + pod *v1.Pod + ) + + ctx := context.Background() + pod, err = p.podFetcher.GetPod(ctx, podUID) + if err != nil { + ctx = context.WithValue(ctx, agentpod.BypassCacheKey, agentpod.BypassCacheTrue) + pod, err = p.podFetcher.GetPod(ctx, podUID) + if err != nil { + return resource.Quantity{}, err + } + } + + quantity := native.CPUQuantityGetter()(native.SumUpPodRequestResources(pod)) + return quantity, nil +} + +func (p *serviceProfileHintOptimizer) getMemoryRequest(podUID string) (resource.Quantity, error) { + var ( + err error + pod *v1.Pod + ) + + ctx := context.Background() + pod, err = p.podFetcher.GetPod(ctx, podUID) + if err != nil { + ctx = context.WithValue(ctx, agentpod.BypassCacheKey, agentpod.BypassCacheTrue) + pod, err = p.podFetcher.GetPod(ctx, podUID) + if err != nil { + return resource.Quantity{}, err + } + } + + quantity := native.MemoryQuantityGetter()(native.SumUpPodRequestResources(pod)) + return quantity, nil +} diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/serviceprofile/optimizer_test.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/serviceprofile/optimizer_test.go new file mode 100644 index 000000000..c48124756 --- /dev/null +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/serviceprofile/optimizer_test.go @@ -0,0 +1,442 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package serviceprofile + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" + "k8s.io/metrics/pkg/apis/metrics/v1beta1" + + workloadapi "github.com/kubewharf/katalyst-api/pkg/apis/workload/v1alpha1" + apiconsts "github.com/kubewharf/katalyst-api/pkg/consts" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/commonstate" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state" + "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm" + "github.com/kubewharf/katalyst-core/pkg/metaserver" + agentpod "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod" + "github.com/kubewharf/katalyst-core/pkg/metaserver/spd" + "github.com/kubewharf/katalyst-core/pkg/metrics" +) + +func Test_spreadNormalizeScore(t *testing.T) { + t.Parallel() + type args struct { + scores []NUMAScore + } + tests := []struct { + name string + args args + want []NUMAScore + wantErr bool + }{ + { + name: "test1", + args: args{ + scores: []NUMAScore{ + { + Score: 10, + ID: 0, + }, + { + Score: 200, + ID: 1, + }, + { + Score: 400, + ID: 2, + }, + }, + }, + want: []NUMAScore{ + { + Score: 100, + ID: 0, + }, + { + Score: 52.5, + ID: 1, + }, + { + Score: 2.5, + ID: 2, + }, + }, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + if err := spreadNormalizeScore(tt.args.scores); (err != nil) != tt.wantErr { + t.Errorf("spreadNormalizeScore() error = %v, wantErr %v", err, tt.wantErr) + } + assert.Equal(t, tt.want, tt.args.scores) + }) + } +} + +func TestNewServiceProfileHintOptimizer(t *testing.T) { + t.Parallel() + type args struct { + metaServer *metaserver.MetaServer + conf qrm.ServiceProfileHintOptimizerConfig + } + tests := []struct { + name string + args args + }{ + { + name: "test1", + args: args{ + metaServer: &metaserver.MetaServer{}, + conf: qrm.ServiceProfileHintOptimizerConfig{}, + }, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + o := NewServiceProfileHintOptimizer(metrics.DummyMetrics{}, tt.args.metaServer, tt.args.conf) + assert.NotNil(t, o) + }) + } +} + +func Test_serviceProfileHintOptimizer_OptimizeHints(t *testing.T) { + t.Parallel() + type fields struct { + profilingManager spd.ServiceProfilingManager + podFetcher agentpod.PodFetcher + resourceWeight map[v1.ResourceName]float64 + resourceFuncs map[v1.ResourceName]resourceFuncs + } + type args struct { + req *pluginapi.ResourceRequest + hints []*pluginapi.TopologyHint + machineState state.NUMANodeMap + } + tests := []struct { + name string + fields fields + args args + want []*pluginapi.TopologyHint + wantErr assert.ErrorAssertionFunc + }{ + { + name: "test1", + fields: fields{ + profilingManager: spd.NewServiceProfilingManager(spd.DummySPDFetcher{ + SPD: &workloadapi.ServiceProfileDescriptor{ + Status: workloadapi.ServiceProfileDescriptorStatus{ + AggMetrics: []workloadapi.AggPodMetrics{ + { + Aggregator: workloadapi.Avg, + Items: []v1beta1.PodMetrics{ + { + Timestamp: metav1.NewTime(time.Date(1970, 0, 0, 0, 0, 0, 0, time.UTC)), + Window: metav1.Duration{Duration: time.Hour}, + Containers: []v1beta1.ContainerMetrics{ + { + Name: "container-1", + Usage: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("10"), + }, + }, + { + Name: "container-2", + Usage: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("10"), + }, + }, + }, + }, + { + Timestamp: metav1.NewTime(time.Date(1970, 0, 0, 1, 0, 0, 0, time.UTC)), + Window: metav1.Duration{Duration: time.Hour}, + Containers: []v1beta1.ContainerMetrics{ + { + Name: "container-1", + Usage: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("20"), + }, + }, + { + Name: "container-2", + Usage: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("20"), + }, + }, + }, + }, + }, + }, + }, + }, + }, + }), + resourceFuncs: map[v1.ResourceName]resourceFuncs{ + v1.ResourceCPU: { + defaultRequestFunc: func(podUID string) (resource.Quantity, error) { + return resource.MustParse("10"), nil + }, + normalizeScoreFunc: spreadNormalizeScore, + }, + v1.ResourceMemory: { + defaultRequestFunc: func(podUID string) (resource.Quantity, error) { + return resource.MustParse("10Gi"), nil + }, + normalizeScoreFunc: spreadNormalizeScore, + }, + }, + resourceWeight: map[v1.ResourceName]float64{ + v1.ResourceCPU: 1., + v1.ResourceMemory: 1., + }, + }, + args: args{ + req: &pluginapi.ResourceRequest{}, + hints: []*pluginapi.TopologyHint{ + { + Nodes: []uint64{0}, + }, + { + Nodes: []uint64{1}, + }, + { + Nodes: []uint64{2}, + }, + }, + machineState: state.NUMANodeMap{ + 0: &state.NUMANodeState{ + PodEntries: state.PodEntries{ + "pod-1": { + "container-1": &state.AllocationInfo{ + AllocationMeta: commonstate.AllocationMeta{ + PodUid: "pod-1", + ContainerType: pluginapi.ContainerType_MAIN.String(), + ContainerName: "container-1", + QoSLevel: apiconsts.PodAnnotationQoSLevelSharedCores, + Annotations: map[string]string{ + apiconsts.PodAnnotationQoSLevelKey: apiconsts.PodAnnotationQoSLevelSharedCores, + apiconsts.PodAnnotationMemoryEnhancementNumaBinding: apiconsts.PodAnnotationMemoryEnhancementNumaBindingEnable, + }, + }, + }, + }, + }, + }, + 1: &state.NUMANodeState{ + PodEntries: state.PodEntries{ + "pod-2": { + "container-2": &state.AllocationInfo{ + AllocationMeta: commonstate.AllocationMeta{ + PodUid: "pod-2", + ContainerType: pluginapi.ContainerType_MAIN.String(), + ContainerName: "container-2", + QoSLevel: apiconsts.PodAnnotationQoSLevelSharedCores, + Annotations: map[string]string{ + apiconsts.PodAnnotationQoSLevelKey: apiconsts.PodAnnotationQoSLevelSharedCores, + apiconsts.PodAnnotationMemoryEnhancementNumaBinding: apiconsts.PodAnnotationMemoryEnhancementNumaBindingEnable, + }, + }, + }, + }, + "pod-3": { + "container-2": &state.AllocationInfo{ + AllocationMeta: commonstate.AllocationMeta{ + PodUid: "pod-3", + ContainerType: pluginapi.ContainerType_MAIN.String(), + ContainerName: "container-3", + QoSLevel: apiconsts.PodAnnotationQoSLevelSharedCores, + Annotations: map[string]string{ + apiconsts.PodAnnotationQoSLevelKey: apiconsts.PodAnnotationQoSLevelSharedCores, + apiconsts.PodAnnotationMemoryEnhancementNumaBinding: apiconsts.PodAnnotationMemoryEnhancementNumaBindingEnable, + }, + }, + }, + }, + }, + }, + 2: &state.NUMANodeState{}, + }, + }, + want: []*pluginapi.TopologyHint{ + { + Nodes: []uint64{2}, + Preferred: true, + }, + { + Nodes: []uint64{0}, + }, + { + Nodes: []uint64{1}, + }, + }, + wantErr: assert.NoError, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + p := &serviceProfileHintOptimizer{ + emitter: metrics.DummyMetrics{}, + profilingManager: tt.fields.profilingManager, + podFetcher: tt.fields.podFetcher, + resourceWeight: tt.fields.resourceWeight, + resourceFuncs: tt.fields.resourceFuncs, + defaultQuantitiesSize: 2, + } + tt.wantErr(t, p.OptimizeHints(tt.args.req, tt.args.hints, tt.args.machineState), fmt.Sprintf("OptimizeHints(%v, %v, %v)", tt.args.req, tt.args.hints, tt.args.machineState)) + assert.Equal(t, tt.want, tt.args.hints) + }) + } +} + +func Test_serviceProfileHintOptimizer_getCPURequest(t *testing.T) { + t.Parallel() + type fields struct { + podFetcher agentpod.PodFetcher + } + type args struct { + podUID string + } + tests := []struct { + name string + fields fields + args args + want resource.Quantity + wantErr assert.ErrorAssertionFunc + }{ + { + name: "test1", + fields: fields{ + podFetcher: &agentpod.PodFetcherStub{ + PodList: []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + UID: "pod-1", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container-1", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("10"), + }, + }, + }, + }, + }, + }, + }, + }, + }, + args: args{ + podUID: "pod-1", + }, + want: resource.MustParse("10"), + wantErr: assert.NoError, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + p := &serviceProfileHintOptimizer{ + podFetcher: tt.fields.podFetcher, + } + got, err := p.getCPURequest(tt.args.podUID) + if !tt.wantErr(t, err, fmt.Sprintf("getCPURequest(%v)", tt.args.podUID)) { + return + } + assert.Equalf(t, tt.want, got, "getCPURequest(%v)", tt.args.podUID) + }) + } +} + +func Test_serviceProfileHintOptimizer_getMemoryRequest(t *testing.T) { + t.Parallel() + type fields struct { + podFetcher agentpod.PodFetcher + } + type args struct { + podUID string + } + tests := []struct { + name string + fields fields + args args + want resource.Quantity + wantErr assert.ErrorAssertionFunc + }{ + { + name: "test1", + fields: fields{ + podFetcher: &agentpod.PodFetcherStub{ + PodList: []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + UID: "pod-1", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container-1", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceMemory: resource.MustParse("10Gi"), + }, + }, + }, + }, + }, + }, + }, + }, + }, + args: args{ + podUID: "pod-1", + }, + want: resource.MustParse("10Gi"), + wantErr: assert.NoError, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + p := &serviceProfileHintOptimizer{ + podFetcher: tt.fields.podFetcher, + } + got, err := p.getMemoryRequest(tt.args.podUID) + if !tt.wantErr(t, err, fmt.Sprintf("getMemoryRequest(%v)", tt.args.podUID)) { + return + } + assert.Equalf(t, tt.want, got, "getMemoryRequest(%v)", tt.args.podUID) + }) + } +} diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go index 0bd2d22e6..a418c531f 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go @@ -40,6 +40,8 @@ import ( "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/calculator" advisorapi "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpuadvisor" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpueviction" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer/serviceprofile" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/validator" cpuutil "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/util" @@ -123,6 +125,8 @@ type DynamicPolicy struct { reservedReclaimedCPUsSize int reservedReclaimedCPUSet machine.CPUSet reservedReclaimedTopologyAwareAssignments map[int]machine.CPUSet + + sharedCoresNUMABindingHintOptimizer hintoptimizer.HintOptimizer } func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration, @@ -201,6 +205,11 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration reservedReclaimedCPUsSize: general.Max(reservedReclaimedCPUsSize, agentCtx.KatalystMachineInfo.NumNUMANodes), } + policyImplement.sharedCoresNUMABindingHintOptimizer = &hintoptimizer.DummyHintOptimizer{} + if conf.EnableSharedCoresNUMABindingHintOptimizer { + policyImplement.sharedCoresNUMABindingHintOptimizer = serviceprofile.NewServiceProfileHintOptimizer(wrappedEmitter, agentCtx.MetaServer, conf.SharedCoresNUMABindingHintOptimizerConfig) + } + // register allocation behaviors for pods with different QoS level policyImplement.allocationHandlers = map[string]util.AllocationHandler{ consts.PodAnnotationQoSLevelSharedCores: policyImplement.sharedCoresAllocationHandler, diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go index 49fb4cb20..782ba23ac 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go @@ -941,6 +941,13 @@ func (p *DynamicPolicy) calculateHintsForNUMABindingSharedCores(request float64, return nil, errNoAvailableCPUHints } + // optimize hints by shared_cores numa_binding hint optimizer + // TODO: refactor the HintOptimizer interface to support multiple plugins + err = p.sharedCoresNUMABindingHintOptimizer.OptimizeHints(req, hints.Hints, machineState) + if err != nil { + return nil, err + } + return map[string]*pluginapi.ListOfTopologyHints{ string(v1.ResourceCPU): hints, }, nil diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go index cf8d0ef60..023fbc12c 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go @@ -41,6 +41,7 @@ import ( cpuconsts "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/consts" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/calculator" advisorapi "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpuadvisor" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/hintoptimizer" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/validator" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util" @@ -120,16 +121,17 @@ func getTestDynamicPolicyWithoutInitialization(topology *machine.CPUTopology, st dynamicConfig := dynamic.NewDynamicAgentConfiguration() policyImplement := &DynamicPolicy{ - machineInfo: machineInfo, - qosConfig: qosConfig, - dynamicConfig: dynamicConfig, - state: stateImpl, - advisorValidator: validator.NewCPUAdvisorValidator(stateImpl, machineInfo), - reservedReclaimedCPUsSize: general.Max(reservedReclaimedCPUsSize, topology.NumNUMANodes), - reservedCPUs: reservedCPUs, - enableReclaimNUMABinding: true, - emitter: metrics.DummyMetrics{}, - podDebugAnnoKeys: []string{podDebugAnnoKey}, + machineInfo: machineInfo, + qosConfig: qosConfig, + dynamicConfig: dynamicConfig, + state: stateImpl, + advisorValidator: validator.NewCPUAdvisorValidator(stateImpl, machineInfo), + reservedReclaimedCPUsSize: general.Max(reservedReclaimedCPUsSize, topology.NumNUMANodes), + reservedCPUs: reservedCPUs, + enableReclaimNUMABinding: true, + emitter: metrics.DummyMetrics{}, + podDebugAnnoKeys: []string{podDebugAnnoKey}, + sharedCoresNUMABindingHintOptimizer: &hintoptimizer.DummyHintOptimizer{}, } // register allocation behaviors for pods with different QoS level diff --git a/pkg/agent/sysadvisor/plugin/qosaware/resource/helper/helper.go b/pkg/agent/sysadvisor/plugin/qosaware/resource/helper/helper.go index 7d6c2df84..3df338254 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/resource/helper/helper.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/resource/helper/helper.go @@ -21,7 +21,6 @@ import ( "fmt" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" "github.com/kubewharf/katalyst-api/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/metaserver" @@ -51,7 +50,7 @@ func PodEnableReclaim(ctx context.Context, metaServer *metaserver.MetaServer, // get current service performance level of the pod pLevel, err := metaServer.ServiceBusinessPerformanceLevel(ctx, pod.ObjectMeta) - if err != nil && !errors.IsNotFound(err) { + if err != nil && !spd.IsSPDNameOrResourceNotFound(err) { return false, err } else if err != nil { return true, nil @@ -63,8 +62,10 @@ func PodEnableReclaim(ctx context.Context, metaServer *metaserver.MetaServer, // check whether current pod is service baseline baseline, err := metaServer.ServiceBaseline(ctx, pod.ObjectMeta) - if err != nil { + if err != nil && !spd.IsSPDNameOrResourceNotFound(err) { return false, err + } else if err != nil { + return true, nil } else if baseline { // if pod is baseline, it can not be reclaimed general.InfoS("pod is regarded as baseline, reclaim disabled", "podUID", podUID) diff --git a/pkg/config/agent/qrm/cpu_plugin.go b/pkg/config/agent/qrm/cpu_plugin.go index d421ec704..1558af2ff 100644 --- a/pkg/config/agent/qrm/cpu_plugin.go +++ b/pkg/config/agent/qrm/cpu_plugin.go @@ -16,6 +16,8 @@ limitations under the License. package qrm +import v1 "k8s.io/api/core/v1" + type CPUQRMPluginConfig struct { // PolicyName is used to switch between several strategies PolicyName string @@ -45,6 +47,14 @@ type CPUDynamicPolicyConfig struct { // CPUNUMAHintPreferLowThreshold indicates threshold to apply CPUNUMAHintPreferPolicy dynamically, // and it's working when CPUNUMAHintPreferPolicy is set to dynamic_packing CPUNUMAHintPreferLowThreshold float64 + // EnableSharedCoresNUMABindingHintOptimizer is set to enable shared cores numa binding hint optimizer + EnableSharedCoresNUMABindingHintOptimizer bool + // SharedCoresNUMABindingHintOptimizerConfig is used to control service profile hint optimizer + SharedCoresNUMABindingHintOptimizerConfig ServiceProfileHintOptimizerConfig +} + +type ServiceProfileHintOptimizerConfig struct { + ResourceWeights map[v1.ResourceName]float64 } type CPUNativePolicyConfig struct { diff --git a/pkg/custom-metric/store/data/types/aggragated.go b/pkg/custom-metric/store/data/types/aggragated.go index 72af23888..1ace7e083 100644 --- a/pkg/custom-metric/store/data/types/aggragated.go +++ b/pkg/custom-metric/store/data/types/aggragated.go @@ -18,12 +18,12 @@ package types import ( "bytes" - "math/big" "strings" "k8s.io/apimachinery/pkg/api/resource" "github.com/kubewharf/katalyst-api/pkg/metric" + "github.com/kubewharf/katalyst-core/pkg/util/native" ) var validAggregatorSuffixList = []string{ @@ -58,7 +58,7 @@ func (a *AggregatedItem) DeepCopy() Item { } func (a *AggregatedItem) GetQuantity() resource.Quantity { - return resource.MustParse(big.NewFloat(a.Value).String()) + return native.GetFloat64Quantity(a.Value) } func (a *AggregatedItem) GetCount() *int64 { return &a.Count } diff --git a/pkg/metaserver/spd/fetcher.go b/pkg/metaserver/spd/fetcher.go index 7d9d7bb44..69c3234ee 100644 --- a/pkg/metaserver/spd/fetcher.go +++ b/pkg/metaserver/spd/fetcher.go @@ -22,6 +22,7 @@ import ( "sync" "time" + pkgerrors "github.com/pkg/errors" "go.uber.org/atomic" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -55,6 +56,8 @@ const ( metricsNameDeleteCache = "spd_manager_delete_cache" ) +var SPDNameNotFoundError = pkgerrors.New("get SPD name not found") + type GetPodSPDNameFunc func(_ metav1.ObjectMeta) (string, error) type SPDFetcher interface { @@ -127,7 +130,7 @@ func (s *spdFetcher) GetSPD(ctx context.Context, podMeta metav1.ObjectMeta) (*wo spdName, err := s.getPodSPDNameFunc(podMeta) if err != nil { general.Warningf("get spd for pod (%v/%v) err %v", podMeta.Namespace, podMeta.Name, err) - return nil, errors.NewNotFound(workloadapis.Resource(workloadapis.ResourceNameServiceProfileDescriptors), fmt.Sprintf("for pod(%v/%v)", podMeta.Namespace, podMeta.Name)) + return nil, SPDNameNotFoundError } spdNamespace := podMeta.GetNamespace() diff --git a/pkg/metaserver/spd/manager.go b/pkg/metaserver/spd/manager.go index b71f3a351..f502f7b3e 100644 --- a/pkg/metaserver/spd/manager.go +++ b/pkg/metaserver/spd/manager.go @@ -208,10 +208,8 @@ func (m *serviceProfilingManager) ServiceExtendedIndicator(ctx context.Context, func (m *serviceProfilingManager) ServiceBaseline(ctx context.Context, podMeta metav1.ObjectMeta) (bool, error) { spd, err := m.fetcher.GetSPD(ctx, podMeta) - if err != nil && !errors.IsNotFound(err) { + if err != nil { return false, err - } else if err != nil { - return false, nil } baselineSentinel, err := util.GetSPDBaselineSentinel(spd) diff --git a/pkg/metaserver/spd/util.go b/pkg/metaserver/spd/util.go index 35a1cbdcf..f7ef11a35 100644 --- a/pkg/metaserver/spd/util.go +++ b/pkg/metaserver/spd/util.go @@ -19,7 +19,10 @@ package spd import ( "context" + pkgerrors "github.com/pkg/errors" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" workloadapi "github.com/kubewharf/katalyst-api/pkg/apis/workload/v1alpha1" @@ -48,3 +51,28 @@ func GetContainerMemoryBandwidthRequest(profilingManager ServiceProfilingManager return mbwRequest, nil } + +// GetContainerServiceProfileRequest gets the memory bandwidth request for pod with given cpu request +func GetContainerServiceProfileRequest(profilingManager ServiceProfilingManager, podMeta metav1.ObjectMeta, + name v1.ResourceName, +) ([]resource.Quantity, error) { + metrics, err := profilingManager.ServiceAggregateMetrics(context.Background(), podMeta, name, + false, workloadapi.Avg, workloadapi.Sum) + if err != nil && !errors.IsNotFound(err) { + return nil, err + } else if err == nil && metrics != nil { + return metrics, nil + } + + return nil, nil +} + +// IsSPDNameOrResourceNotFound returns true if the given error is caused by SPD name not found or SPD not found. +func IsSPDNameOrResourceNotFound(err error) bool { + return errors.IsNotFound(err) || pkgerrors.Is(err, SPDNameNotFoundError) +} + +// IsSPDNameNotFound returns true if the given error is caused by SPD name not found. +func IsSPDNameNotFound(err error) bool { + return pkgerrors.Is(err, SPDNameNotFoundError) +} diff --git a/pkg/metaserver/spd/util_test.go b/pkg/metaserver/spd/util_test.go index dab93e906..358b0df45 100644 --- a/pkg/metaserver/spd/util_test.go +++ b/pkg/metaserver/spd/util_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -117,3 +118,93 @@ func TestGetContainerMemoryBandwidthRequest(t *testing.T) { }) } } + +func TestGetContainerServiceProfileRequest(t *testing.T) { + t.Parallel() + + type args struct { + profilingManager ServiceProfilingManager + podMeta metav1.ObjectMeta + resourceName v1.ResourceName + } + tests := []struct { + name string + args args + want []resource.Quantity + wantErr bool + }{ + { + name: "test two container", + args: args{ + profilingManager: NewServiceProfilingManager(DummySPDFetcher{ + SPD: &workloadapi.ServiceProfileDescriptor{ + Status: workloadapi.ServiceProfileDescriptorStatus{ + AggMetrics: []workloadapi.AggPodMetrics{ + { + Aggregator: workloadapi.Avg, + Items: []v1beta1.PodMetrics{ + { + Timestamp: metav1.NewTime(time.Date(1970, 0, 0, 0, 0, 0, 0, time.UTC)), + Window: metav1.Duration{Duration: time.Hour}, + Containers: []v1beta1.ContainerMetrics{ + { + Name: "container-1", + Usage: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("10"), + }, + }, + { + Name: "container-2", + Usage: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("10"), + }, + }, + }, + }, + { + Timestamp: metav1.NewTime(time.Date(1970, 0, 0, 1, 0, 0, 0, time.UTC)), + Window: metav1.Duration{Duration: time.Hour}, + Containers: []v1beta1.ContainerMetrics{ + { + Name: "container-1", + Usage: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("20"), + }, + }, + { + Name: "container-2", + Usage: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("20"), + }, + }, + }, + }, + }, + }, + }, + }, + }, + }), + podMeta: metav1.ObjectMeta{}, + resourceName: v1.ResourceCPU, + }, + want: []resource.Quantity{ + resource.MustParse("20"), + resource.MustParse("40"), + }, + wantErr: false, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + got, err := GetContainerServiceProfileRequest(tt.args.profilingManager, tt.args.podMeta, tt.args.resourceName) + if (err != nil) != tt.wantErr { + t.Errorf("GetContainerServiceProfileRequest() error = %v, wantErr %v", err, tt.wantErr) + return + } + assert.Equal(t, len(got), len(tt.want)) + }) + } +} diff --git a/pkg/util/native/resources.go b/pkg/util/native/resources.go index b62aa8c07..29c9fd8e2 100644 --- a/pkg/util/native/resources.go +++ b/pkg/util/native/resources.go @@ -18,6 +18,7 @@ package native import ( "fmt" + "math/big" "sort" "strconv" "strings" @@ -331,6 +332,10 @@ func MultiplyQuantity(quantity resource.Quantity, y float64) resource.Quantity { return *resource.NewQuantity(value, quantity.Format) } +func GetFloat64Quantity(value float64) resource.Quantity { + return resource.MustParse(big.NewFloat(value).String()) +} + // AggregateSumQuantities get the sum of quantities func AggregateSumQuantities(quantities []resource.Quantity) *resource.Quantity { var res *resource.Quantity @@ -368,3 +373,29 @@ func AggregateMaxQuantities(quantities []resource.Quantity) *resource.Quantity { } return res } + +// AddQuantities adds quantitiesB into quantitiesA, and returns error if length of quantitiesA and quantitiesB are not equal. +func AddQuantities(quantitiesA, quantitiesB []resource.Quantity) error { + if quantitiesB == nil { + return nil + } + + if len(quantitiesA) != len(quantitiesB) { + return fmt.Errorf("length of quantitiesA and quantitiesB are not equal, %d vs %d", + len(quantitiesA), len(quantitiesB)) + } + + for i := range quantitiesB { + quantitiesA[i].Add(quantitiesB[i]) + } + return nil +} + +// DuplicateQuantities duplicates quantities to the given length. +func DuplicateQuantities(quantity resource.Quantity, length int) []resource.Quantity { + res := make([]resource.Quantity, 0, length) + for i := 0; i < length; i++ { + res = append(res, quantity.DeepCopy()) + } + return res +}