Skip to content

Commit

Permalink
feat: support allocate dedicated without exclusive and reclaim on a n…
Browse files Browse the repository at this point in the history
…uma node
  • Loading branch information
WangZzzhe committed Jun 11, 2024
1 parent 480f2d7 commit 8f4600f
Show file tree
Hide file tree
Showing 13 changed files with 1,191 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/status"
"k8s.io/klog/v2"
pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1"
maputil "k8s.io/kubernetes/pkg/util/maps"

Expand All @@ -36,10 +37,12 @@ import (
advisorapi "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpuadvisor"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util"
"github.com/kubewharf/katalyst-core/pkg/config/generic"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
"github.com/kubewharf/katalyst-core/pkg/util/process"
"github.com/kubewharf/katalyst-core/pkg/util/qos"
)

const (
Expand Down Expand Up @@ -284,7 +287,7 @@ func (p *DynamicPolicy) allocateByCPUAdvisor(resp *advisorapi.ListAndWatchRespon

// generateBlockCPUSet generates BlockCPUSet from cpu-advisor response
// and the logic contains three main steps
// 1. handle blocks for static pools
// 1. handle blocks for static pools and pods(dedicated_cores with numa_binding without numa_exclusive)
// 2. handle blocks with specified NUMA ids (probably be blocks for
// numa_binding dedicated_cores containers and reclaimed_cores containers colocated with them)
// 3. handle blocks without specified NUMA id (probably be blocks for
Expand Down Expand Up @@ -322,6 +325,44 @@ func (p *DynamicPolicy) generateBlockCPUSet(resp *advisorapi.ListAndWatchRespons
availableCPUs = availableCPUs.Difference(blockCPUSet[blockID])
}

// walk through static pods to reuse cpuset if exists.
qosConf := generic.NewQoSConfiguration()
for podUID, containerEntries := range p.state.GetPodEntries() {
if containerEntries.IsPoolEntry() {
continue
}
if containerEntries.GetMainContainerEntry().QoSLevel != consts.PodAnnotationQoSLevelDedicatedCores {
continue
}
pod, err := p.metaServer.GetPod(context.Background(), podUID)
if err != nil {
err = fmt.Errorf("getPod %s fail: %v", podUID, err)
return nil, err
}
if !qos.IsPodNumaBinding(qosConf, pod) {
continue
}
if qos.IsPodNumaExclusive(qosConf, pod) {
continue
}

for containerName, allocationInfo := range containerEntries {
for numaID, cpuset := range allocationInfo.TopologyAwareAssignments {
blocks, ok := resp.GeEntryNUMABlocks(podUID, containerName, int64(numaID))
if !ok || len(blocks) != 1 {
klog.Error(err)
return nil, fmt.Errorf("blocks of pod %v container %v numaID %v is invalid", pod.Name, containerName, numaID)
}

blockID := blocks[0].BlockId
blockCPUSet[blockID] = cpuset.Clone()
availableCPUs = availableCPUs.Difference(blockCPUSet[blockID])

klog.V(6).Infof("pod %v container %v numaId %v reuse cpuset %v", pod.Name, containerName, numaID, blockCPUSet[blockID])
}
}
}

// walk through all blocks with specified NUMA ids
// for each block, add them into blockCPUSet (if not exist) and renew availableCPUs
for numaID, blocks := range numaToBlocks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1"

apiconsts "github.com/kubewharf/katalyst-api/pkg/consts"
Expand Down Expand Up @@ -597,7 +598,11 @@ func (p *DynamicPolicy) adjustPoolsAndIsolatedEntries(poolsQuantityMap map[strin
) error {
availableCPUs := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs, nil, state.CheckDedicatedNUMABinding)

poolsCPUSet, isolatedCPUSet, err := p.generatePoolsAndIsolation(poolsQuantityMap, isolatedQuantityMap, availableCPUs)
// availableCPUs only for reclaimed pool, all unused cpu on dedicated without exclusive NUMA
reclaimedAvailableCPUs := machineState.GetMatchedAvailableCPUSet(p.reservedCPUs, state.CheckDedicatedNUMABindingWithoutNUMAExclusive, state.CheckDedicatedNUMABindingWithoutNUMAExclusive)
klog.V(6).Infof("adjustPoolsAndIsolatedEntries availableCPUs: %v, reclaimedAvailableCPUs: %v", availableCPUs.String(), reclaimedAvailableCPUs.String())

poolsCPUSet, isolatedCPUSet, err := p.generatePoolsAndIsolation(poolsQuantityMap, isolatedQuantityMap, availableCPUs, reclaimedAvailableCPUs)
if err != nil {
return fmt.Errorf("generatePoolsAndIsolation failed with error: %v", err)
}
Expand Down Expand Up @@ -642,7 +647,7 @@ func (p *DynamicPolicy) reclaimOverlapNUMABinding(poolsCPUSet map[string]machine
}

for _, allocationInfo := range containerEntries {
if !(allocationInfo != nil && state.CheckDedicatedNUMABinding(allocationInfo) && allocationInfo.CheckMainContainer()) {
if !(allocationInfo != nil && state.CheckDedicatedNUMABindingWithNUMAExclusive(allocationInfo) && allocationInfo.CheckMainContainer()) {
continue
} else if allocationInfo.RampUp {
general.Infof("dedicated numa_binding pod: %s/%s container: %s is in ramp up, not to overlap reclaim pool with it",
Expand Down Expand Up @@ -862,7 +867,7 @@ func (p *DynamicPolicy) applyPoolsAndIsolatedInfo(poolsCPUSet map[string]machine
// 2. use the left cores to allocate among different pools
// 3. apportion to other pools if reclaimed is disabled
func (p *DynamicPolicy) generatePoolsAndIsolation(poolsQuantityMap map[string]int,
isolatedQuantityMap map[string]map[string]int, availableCPUs machine.CPUSet) (poolsCPUSet map[string]machine.CPUSet,
isolatedQuantityMap map[string]map[string]int, availableCPUs machine.CPUSet, reclaimAvailableCPUS machine.CPUSet) (poolsCPUSet map[string]machine.CPUSet,
isolatedCPUSet map[string]map[string]machine.CPUSet, err error,
) {
// clear pool map with zero quantity
Expand Down Expand Up @@ -995,6 +1000,9 @@ func (p *DynamicPolicy) generatePoolsAndIsolation(poolsQuantityMap map[string]in
}

poolsCPUSet[state.PoolNameReclaim] = poolsCPUSet[state.PoolNameReclaim].Union(availableCPUs)
// join reclaim pool with all unused cpu on dedicated without exclusive NUMA
// final cpuSet include reservedResourceForAllocated will be calculated by advisor soon if cpu advisor is enabled
poolsCPUSet[state.PoolNameReclaim] = poolsCPUSet[state.PoolNameReclaim].Union(reclaimAvailableCPUS)
if poolsCPUSet[state.PoolNameReclaim].IsEmpty() {
// for reclaimed pool, we must make them exist when the node isn't in hybrid mode even if cause overlap
allAvailableCPUs := p.machineInfo.CPUDetails.CPUs().Difference(p.reservedCPUs)
Expand Down
Loading

0 comments on commit 8f4600f

Please sign in to comment.