diff --git a/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/advisor.go b/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/advisor.go index eb52aa389..0b3163430 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/advisor.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/advisor.go @@ -308,8 +308,8 @@ func (cra *cpuResourceAdvisor) updateWithIsolationGuardian(tryIsolation bool) er klog.Infof("[qosaware-cpu] notify cpu server: %+v", calculationResult) return nil default: - klog.Errorf("[qosaware-cpu] channel is full") - return fmt.Errorf("calculation result channel is full") + klog.Warningf("[qosaware-cpu] channel is full, drop the new advice") + return nil } } diff --git a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/advisor.go b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/advisor.go index 1edd8b5db..1f03f7c7b 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/advisor.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/advisor.go @@ -180,8 +180,8 @@ func (ra *memoryResourceAdvisor) sendAdvices() error { general.Infof("notify memory server: %+v", result) return nil default: - general.Errorf("channel is full") - return fmt.Errorf("memory advice channel is full") + klog.Warningf("[qosaware-memory] channel is full, drop the new advice") + return nil } } diff --git a/pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server.go b/pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server.go index 1b52c27dc..637b0c25a 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server.go @@ -75,7 +75,8 @@ func (cs *cpuServer) RegisterAdvisorServer() { func (cs *cpuServer) ListAndWatch(_ *advisorsvc.Empty, server cpuadvisor.CPUAdvisor_ListAndWatchServer) error { _ = cs.emitter.StoreInt64(cs.genMetricsName(metricServerLWCalled), int64(cs.period.Seconds()), metrics.MetricTypeNameCount) - general.RegisterHeartbeatCheck(cpuServerHealthCheckName, healthCheckTolerationDuration, general.HealthzCheckStateNotReady, healthCheckTolerationDuration) + general.RegisterTemporaryHeartbeatCheck(cpuServerHealthCheckName, healthCheckTolerationDuration, general.HealthzCheckStateNotReady, healthCheckTolerationDuration) + defer general.UnregisterTemporaryHeartbeatCheck(cpuServerHealthCheckName) if !cs.getCheckpointCalled { if err := cs.startToGetCheckpointFromCPUPlugin(); err != nil { @@ -91,6 +92,18 @@ func (cs *cpuServer) ListAndWatch(_ *advisorsvc.Empty, server cpuadvisor.CPUAdvi return fmt.Errorf("recvCh convert failed") } + maxDropLength := len(recvCh) + klog.Infof("[qosaware-server-cpu] drop all old cpu advices in channel (max: %d)", maxDropLength) + for i := 0; i < maxDropLength; i++ { + select { + case <-recvCh: + default: + klog.Infof("[qosaware-server-cpu] all old cpu advices in channel is dropped (count: %d)", i) + break + } + } + + klog.Infof("[qosaware-server-cpu] start to push cpu advices") for { select { case <-server.Context().Done(): diff --git a/pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server_test.go b/pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server_test.go index fc6adbecc..d3a42cde0 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server_test.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server_test.go @@ -98,6 +98,33 @@ func newTestCPUServer(t *testing.T, podList []*v1.Pod) *cpuServer { return cpuServer } +func newTestCPUServerWithChanBuffer(t *testing.T, podList []*v1.Pod) *cpuServer { + recvCh := make(chan types.InternalCPUCalculationResult, 1) + sendCh := make(chan types.TriggerInfo, 1) + conf := generateTestConfiguration(t) + + metricsFetcher := metric.NewFakeMetricsFetcher(metrics.DummyMetrics{}) + metaCache, err := metacache.NewMetaCacheImp(conf, metricspool.DummyMetricsEmitterPool{}, metricsFetcher) + require.NoError(t, err) + require.NotNil(t, metaCache) + + metaServer := &metaserver.MetaServer{ + MetaAgent: &agent.MetaAgent{ + PodFetcher: &pod.PodFetcherStub{ + PodList: podList, + }, + }, + } + + cpuServer, err := NewCPUServer(recvCh, sendCh, conf, metaCache, metaServer, metrics.DummyMetrics{}) + require.NoError(t, err) + require.NotNil(t, cpuServer) + + cpuServer.getCheckpointCalled = true + + return cpuServer +} + func TestCPUServerStartAndStop(t *testing.T) { t.Parallel() @@ -259,17 +286,17 @@ func DeepCopyResponse(response *cpuadvisor.ListAndWatchResponse) (*cpuadvisor.Li return copyResponse, nil } +type ContainerInfo struct { + request *advisorsvc.ContainerMetadata + podInfo *v1.Pod + allocationInfo *cpuadvisor.AllocationInfo + isolated bool + regions sets.String +} + func TestCPUServerListAndWatch(t *testing.T) { t.Parallel() - type ContainerInfo struct { - request *advisorsvc.ContainerMetadata - podInfo *v1.Pod - allocationInfo *cpuadvisor.AllocationInfo - isolated bool - regions sets.String - } - tests := []struct { name string provision types.InternalCPUCalculationResult @@ -1708,3 +1735,165 @@ func TestConcurrencyGetCheckpointAndAddContainer(t *testing.T) { time.Sleep(10 * time.Second) cancel() } + +func TestCPUServerDropOldAdvice(t *testing.T) { + t.Parallel() + + cpuServer := newTestCPUServerWithChanBuffer(t, []*v1.Pod{}) + s := &mockCPUServerService_ListAndWatchServer{ResultsChan: make(chan *cpuadvisor.ListAndWatchResponse)} + stop := make(chan struct{}) + recvCh := cpuServer.recvCh.(chan types.InternalCPUCalculationResult) + recvCh <- types.InternalCPUCalculationResult{} + go func() { + err := cpuServer.ListAndWatch(&advisorsvc.Empty{}, s) + assert.NoError(t, err, "failed to LW cpu server") + stop <- struct{}{} + }() + provision := types.InternalCPUCalculationResult{ + TimeStamp: time.Now(), + PoolEntries: map[string]map[int]int{ + state.PoolNameReclaim: { + 0: 4, + 1: 8, + }, + }, + } + infos := []*ContainerInfo{ + { + request: &advisorsvc.ContainerMetadata{ + PodUid: "pod1", + ContainerName: "c1", + Annotations: map[string]string{ + consts.PodAnnotationMemoryEnhancementNumaBinding: consts.PodAnnotationMemoryEnhancementNumaBindingEnable, + }, + QosLevel: consts.PodAnnotationQoSLevelDedicatedCores, + }, + podInfo: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod1", + UID: "pod1", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationMemoryEnhancementKey: "{\"numa_exclusive\":true}", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "c1", + }, + }, + }, + }, + allocationInfo: &cpuadvisor.AllocationInfo{ + OwnerPoolName: state.PoolNameDedicated, + TopologyAwareAssignments: map[uint64]string{ + 0: "0-3", + 1: "24-47", + }, + }, + }, + } + for _, info := range infos { + assert.NoError(t, cpuServer.addContainer(info.request)) + assert.NoError(t, cpuServer.updateContainerInfo(info.request.PodUid, info.request.ContainerName, info.podInfo, info.allocationInfo)) + + nodeInfo, _ := cpuServer.metaCache.GetContainerInfo(info.request.PodUid, info.request.ContainerName) + nodeInfo.Isolated = info.isolated + if info.regions.Len() > 0 { + nodeInfo.RegionNames = info.regions + } + assert.NoError(t, cpuServer.metaCache.SetContainerInfo(info.request.PodUid, info.request.ContainerName, nodeInfo)) + } + + recvCh <- provision + res := <-s.ResultsChan + close(cpuServer.stopCh) + <-stop + copyres, err := DeepCopyResponse(res) + assert.NoError(t, err) + wantRes := &cpuadvisor.ListAndWatchResponse{ + Entries: map[string]*cpuadvisor.CalculationEntries{ + state.PoolNameReclaim: { + Entries: map[string]*cpuadvisor.CalculationInfo{ + "": { + OwnerPoolName: state.PoolNameReclaim, + CalculationResultsByNumas: map[int64]*cpuadvisor.NumaCalculationResult{ + 0: { + Blocks: []*cpuadvisor.Block{ + { + Result: 4, + OverlapTargets: []*cpuadvisor.OverlapTarget{ + { + OverlapTargetPodUid: "pod1", + OverlapTargetContainerName: "c1", + OverlapType: cpuadvisor.OverlapType_OverlapWithPod, + }, + }, + }, + }, + }, + 1: { + Blocks: []*cpuadvisor.Block{ + { + Result: 8, + OverlapTargets: []*cpuadvisor.OverlapTarget{ + { + OverlapTargetPodUid: "pod1", + OverlapTargetContainerName: "c1", + OverlapType: cpuadvisor.OverlapType_OverlapWithPod, + }, + }, + }, + }, + }, + }, + }, + }, + }, + "pod1": { + Entries: map[string]*cpuadvisor.CalculationInfo{ + "c1": { + OwnerPoolName: state.PoolNameDedicated, + CalculationResultsByNumas: map[int64]*cpuadvisor.NumaCalculationResult{ + 0: { + Blocks: []*cpuadvisor.Block{ + { + Result: 4, + OverlapTargets: []*cpuadvisor.OverlapTarget{ + { + OverlapTargetPoolName: state.PoolNameReclaim, + OverlapType: cpuadvisor.OverlapType_OverlapWithPool, + }, + }, + }, + }, + }, + 1: { + Blocks: []*cpuadvisor.Block{ + { + Result: 16, + OverlapTargets: nil, + }, + { + Result: 8, + OverlapTargets: []*cpuadvisor.OverlapTarget{ + { + OverlapTargetPoolName: state.PoolNameReclaim, + OverlapType: cpuadvisor.OverlapType_OverlapWithPool, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + if !reflect.DeepEqual(copyres, wantRes) { + t.Errorf("ListAndWatch()\ngot = %+v, \nwant= %+v", general.ToString(copyres), general.ToString(wantRes)) + } +} diff --git a/pkg/agent/sysadvisor/plugin/qosaware/server/memory_server.go b/pkg/agent/sysadvisor/plugin/qosaware/server/memory_server.go index f5aa46edb..1ccec9b5d 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/server/memory_server.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/server/memory_server.go @@ -140,7 +140,8 @@ func (ms *memoryServer) listContainers() error { func (ms *memoryServer) ListAndWatch(_ *advisorsvc.Empty, server advisorsvc.AdvisorService_ListAndWatchServer) error { _ = ms.emitter.StoreInt64(ms.genMetricsName(metricServerLWCalled), int64(ms.period.Seconds()), metrics.MetricTypeNameCount) - general.RegisterHeartbeatCheck(memoryServerHealthCheckName, healthCheckTolerationDuration, general.HealthzCheckStateNotReady, healthCheckTolerationDuration) + general.RegisterTemporaryHeartbeatCheck(memoryServerHealthCheckName, healthCheckTolerationDuration, general.HealthzCheckStateNotReady, healthCheckTolerationDuration) + defer general.UnregisterTemporaryHeartbeatCheck(memoryServerHealthCheckName) recvCh, ok := ms.recvCh.(chan types.InternalMemoryCalculationResult) if !ok { @@ -148,6 +149,18 @@ func (ms *memoryServer) ListAndWatch(_ *advisorsvc.Empty, server advisorsvc.Advi } ms.listAndWatchCalled = true + maxDropLength := len(recvCh) + klog.Infof("[qosaware-server-memory] drop all old memory advices in channel (max: %d)", maxDropLength) + for i := 0; i < maxDropLength; i++ { + select { + case <-recvCh: + default: + klog.Infof("[qosaware-server-memory] all old memory advice in channel is dropped (max: %d)", i) + break + } + } + + klog.Infof("[qosaware-server-memory] start to push memory advice") for { select { case <-server.Context().Done(): diff --git a/pkg/agent/sysadvisor/plugin/qosaware/server/memory_server_test.go b/pkg/agent/sysadvisor/plugin/qosaware/server/memory_server_test.go index 0cea72e8a..6cf3caa9e 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/server/memory_server_test.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/server/memory_server_test.go @@ -103,6 +103,33 @@ func newTestMemoryServer(t *testing.T, podList []*v1.Pod) *memoryServer { return memoryServer } +func newTestMemoryServerWithChannelBuffer(t *testing.T, podList []*v1.Pod) *memoryServer { + recvCh := make(chan types.InternalMemoryCalculationResult, 1) + sendCh := make(chan types.TriggerInfo, 1) + conf := generateTestMemoryAdvisorConfiguration(t) + + metricsFetcher := metric.NewFakeMetricsFetcher(metrics.DummyMetrics{}) + metaCache, err := metacache.NewMetaCacheImp(conf, metricspool.DummyMetricsEmitterPool{}, metricsFetcher) + require.NoError(t, err) + require.NotNil(t, metaCache) + + metaServer := &metaserver.MetaServer{ + MetaAgent: &agent.MetaAgent{ + PodFetcher: &pod.PodFetcherStub{ + PodList: podList, + }, + }, + } + + memoryServer, err := NewMemoryServer(recvCh, sendCh, conf, metaCache, metaServer, metrics.DummyMetrics{}) + require.NoError(t, err) + require.NotNil(t, memoryServer) + + memoryServer.listAndWatchCalled = true + + return memoryServer +} + type MockQRMServiceServer struct { containers []*advisorsvc.ContainerMetadata listErr error @@ -325,3 +352,105 @@ func TestMemoryServerListAndWatch(t *testing.T) { }) } } + +func TestMemoryServerDropOldAdvice(t *testing.T) { + t.Parallel() + + cs := newTestMemoryServerWithChannelBuffer(t, []*v1.Pod{}) + s := &mockMemoryServerService_ListAndWatchServer{ResultsChan: make(chan *advisorsvc.ListAndWatchResponse)} + recvCh := cs.recvCh.(chan types.InternalMemoryCalculationResult) + recvCh <- types.InternalMemoryCalculationResult{} + + stop := make(chan struct{}) + go func() { + err := cs.ListAndWatch(&advisorsvc.Empty{}, s) + assert.NoError(t, err, "failed to lw memory server") + stop <- struct{}{} + }() + recvCh <- types.InternalMemoryCalculationResult{ + TimeStamp: time.Now(), + ContainerEntries: []types.ContainerMemoryAdvices{ + { + PodUID: "pod1", + ContainerName: "c1", + Values: map[string]string{"k1": "v1"}, + }, + { + PodUID: "pod1", + ContainerName: "c1", + Values: map[string]string{"k2": "v2"}, + }, + { + PodUID: "pod2", + ContainerName: "c1", + Values: map[string]string{"k1": "v1"}, + }, + { + PodUID: "pod2", + ContainerName: "c2", + Values: map[string]string{"k2": "v2"}, + }, + }, + ExtraEntries: []types.ExtraMemoryAdvices{ + { + CgroupPath: "/kubepods/burstable", + Values: map[string]string{"k1": "v1"}, + }, + { + CgroupPath: "/kubepods/burstable", + Values: map[string]string{"k2": "v2"}, + }, + { + CgroupPath: "/kubepods/besteffort", + Values: map[string]string{"k1": "v1"}, + }, + }, + } + res := <-s.ResultsChan + close(cs.stopCh) + <-stop + wantRes := &advisorsvc.ListAndWatchResponse{ + PodEntries: map[string]*advisorsvc.CalculationEntries{ + "pod1": { + ContainerEntries: map[string]*advisorsvc.CalculationInfo{ + "c1": { + CalculationResult: &advisorsvc.CalculationResult{ + Values: map[string]string{"k1": "v1", "k2": "v2"}, + }, + }, + }, + }, + "pod2": { + ContainerEntries: map[string]*advisorsvc.CalculationInfo{ + "c1": { + CalculationResult: &advisorsvc.CalculationResult{ + Values: map[string]string{"k1": "v1"}, + }, + }, + "c2": { + CalculationResult: &advisorsvc.CalculationResult{ + Values: map[string]string{"k2": "v2"}, + }, + }, + }, + }, + }, + ExtraEntries: []*advisorsvc.CalculationInfo{ + { + CgroupPath: "/kubepods/burstable", + CalculationResult: &advisorsvc.CalculationResult{ + Values: map[string]string{"k1": "v1", "k2": "v2"}, + }, + }, + { + CgroupPath: "/kubepods/besteffort", + CalculationResult: &advisorsvc.CalculationResult{ + Values: map[string]string{"k1": "v1"}, + }, + }, + }, + } + if !reflect.DeepEqual(res, wantRes) { + t.Errorf("ListAndWatch()\ngot = %+v, \nwant= %+v", res, wantRes) + } +} diff --git a/pkg/util/general/healthz.go b/pkg/util/general/healthz.go index 42b5aa0a9..904be9054 100644 --- a/pkg/util/general/healthz.go +++ b/pkg/util/general/healthz.go @@ -20,6 +20,8 @@ import ( "fmt" "sync" "time" + + "k8s.io/klog/v2" ) var ( @@ -60,6 +62,8 @@ type healthzCheckStatus struct { // is failed. AutoRecoverPeriod time.Duration `json:"autoRecoverPeriod"` mutex sync.RWMutex + temporary bool + count int } func (h *healthzCheckStatus) update(state HealthzCheckState, message string) { @@ -102,6 +106,13 @@ type HealthzCheckFunc func() (healthzCheckStatus, error) func RegisterHeartbeatCheck(name string, timeout time.Duration, initState HealthzCheckState, tolerationPeriod time.Duration) { healthzCheckLock.Lock() defer healthzCheckLock.Unlock() + origin, ok := healthzCheckMap[HealthzCheckName(name)] + if ok { + if origin.Mode != HealthzCheckModeHeartBeat { + klog.Errorf("RegisterHeartbeatCheck don't allow to change mode (%s)", name) + } + return + } healthzCheckMap[HealthzCheckName(name)] = &healthzCheckStatus{ State: initState, @@ -110,21 +121,93 @@ func RegisterHeartbeatCheck(name string, timeout time.Duration, initState Health TimeoutPeriod: timeout, TolerationPeriod: tolerationPeriod, Mode: HealthzCheckModeHeartBeat, + temporary: false, } } +func RegisterTemporaryHeartbeatCheck(name string, timeout time.Duration, initState HealthzCheckState, tolerationPeriod time.Duration) { + healthzCheckLock.Lock() + defer healthzCheckLock.Unlock() + + origin, ok := healthzCheckMap[HealthzCheckName(name)] + if ok { + if !origin.temporary { + klog.Errorf("RegisterTemporaryHeartbeatCheck not allow to change non-temporary health check") + return + } + origin.count++ + klog.Infof("request to register temporary heartbeat check(name: %s, count: %d)", name, origin.count) + return + } + + klog.Infof("request to register temporary heartbeat check(name: %s)", name) + healthzCheckMap[HealthzCheckName(name)] = &healthzCheckStatus{ + State: initState, + Message: InitMessage, + LastUpdateTime: time.Now(), + TimeoutPeriod: timeout, + TolerationPeriod: tolerationPeriod, + Mode: HealthzCheckModeHeartBeat, + temporary: true, + count: 1, + } +} + +func UnregisterTemporaryHeartbeatCheck(name string) { + unregisterHealthCheck(name, HealthzCheckModeHeartBeat) +} + func RegisterReportCheck(name string, autoRecoverPeriod time.Duration) { healthzCheckLock.Lock() defer healthzCheckLock.Unlock() + origin, ok := healthzCheckMap[HealthzCheckName(name)] + if ok { + if origin.Mode != HealthzCheckModeReport { + klog.Errorf("RegisterReportCheck don't allow to change mode (%s)", name) + } + return + } + + healthzCheckMap[HealthzCheckName(name)] = &healthzCheckStatus{ + State: HealthzCheckStateReady, + Message: InitMessage, + AutoRecoverPeriod: autoRecoverPeriod, + Mode: HealthzCheckModeReport, + temporary: false, + } +} + +func RegisterTemporaryReportCheck(name string, autoRecoverPeriod time.Duration) { + healthzCheckLock.Lock() + defer healthzCheckLock.Unlock() + + origin, ok := healthzCheckMap[HealthzCheckName(name)] + if ok { + if !origin.temporary { + klog.Errorf("RegisterTemporaryReportCheck not allow to change non-temporary health check") + return + } + origin.count++ + klog.Infof("request to register temporary report check(name: %s, count: %d)", name, origin.count) + return + } + + klog.Infof("request to register temporary report check(name: %s)", name) healthzCheckMap[HealthzCheckName(name)] = &healthzCheckStatus{ State: HealthzCheckStateReady, Message: InitMessage, AutoRecoverPeriod: autoRecoverPeriod, Mode: HealthzCheckModeReport, + temporary: true, + count: 1, } } +func UnregisterTemporaryReportCheck(name string) { + unregisterHealthCheck(name, HealthzCheckModeReport) +} + func UpdateHealthzStateByError(name string, err error) error { if err != nil { return UpdateHealthzState(name, HealthzCheckStateNotReady, err.Error()) @@ -186,3 +269,30 @@ func GetRegisterReadinessCheckResult() map[HealthzCheckName]HealthzCheckResult { } return results } + +func unregisterHealthCheck(name string, mode HealthzCheckMode) { + healthzCheckLock.Lock() + defer healthzCheckLock.Unlock() + + current, ok := healthzCheckMap[HealthzCheckName(name)] + if !ok { + return + } + + if !current.temporary { + klog.Warningf("reject unregister non short time health check(name: %s, mode: %s)", name, current.Mode) + return + } + + if current.Mode != mode { + klog.Warning("reject to unregister health check(name: %s) with unmatched mode: %s (current mode: %s)", name, mode, current.Mode) + return + } + + current.count-- + if current.count == 0 { + delete(healthzCheckMap, HealthzCheckName(name)) + } else { + klog.Infof("request to unregister short time health check(name: %s, mode: %s, count: %d)", name, current.Mode, current.count) + } +}