Skip to content

Commit

Permalink
Merge pull request kubewharf#645 from nightmeng/dev/optimize-lw-healt…
Browse files Browse the repository at this point in the history
…hcheck

fix(advisor): optimize lw health check
  • Loading branch information
xu282934741 authored Jul 16, 2024
2 parents 7263851 + 0b75b6d commit c6f9ea9
Show file tree
Hide file tree
Showing 7 changed files with 468 additions and 14 deletions.
4 changes: 2 additions & 2 deletions pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/advisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,8 @@ func (cra *cpuResourceAdvisor) updateWithIsolationGuardian(tryIsolation bool) er
klog.Infof("[qosaware-cpu] notify cpu server: %+v", calculationResult)
return nil
default:
klog.Errorf("[qosaware-cpu] channel is full")
return fmt.Errorf("calculation result channel is full")
klog.Warningf("[qosaware-cpu] channel is full, drop the new advice")
return nil
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ func (ra *memoryResourceAdvisor) sendAdvices() error {
general.Infof("notify memory server: %+v", result)
return nil
default:
general.Errorf("channel is full")
return fmt.Errorf("memory advice channel is full")
klog.Warningf("[qosaware-memory] channel is full, drop the new advice")
return nil
}
}

Expand Down
15 changes: 14 additions & 1 deletion pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ func (cs *cpuServer) RegisterAdvisorServer() {

func (cs *cpuServer) ListAndWatch(_ *advisorsvc.Empty, server cpuadvisor.CPUAdvisor_ListAndWatchServer) error {
_ = cs.emitter.StoreInt64(cs.genMetricsName(metricServerLWCalled), int64(cs.period.Seconds()), metrics.MetricTypeNameCount)
general.RegisterHeartbeatCheck(cpuServerHealthCheckName, healthCheckTolerationDuration, general.HealthzCheckStateNotReady, healthCheckTolerationDuration)
general.RegisterTemporaryHeartbeatCheck(cpuServerHealthCheckName, healthCheckTolerationDuration, general.HealthzCheckStateNotReady, healthCheckTolerationDuration)
defer general.UnregisterTemporaryHeartbeatCheck(cpuServerHealthCheckName)

if !cs.getCheckpointCalled {
if err := cs.startToGetCheckpointFromCPUPlugin(); err != nil {
Expand All @@ -91,6 +92,18 @@ func (cs *cpuServer) ListAndWatch(_ *advisorsvc.Empty, server cpuadvisor.CPUAdvi
return fmt.Errorf("recvCh convert failed")
}

maxDropLength := len(recvCh)
klog.Infof("[qosaware-server-cpu] drop all old cpu advices in channel (max: %d)", maxDropLength)
for i := 0; i < maxDropLength; i++ {
select {
case <-recvCh:
default:
klog.Infof("[qosaware-server-cpu] all old cpu advices in channel is dropped (count: %d)", i)
break
}
}

klog.Infof("[qosaware-server-cpu] start to push cpu advices")
for {
select {
case <-server.Context().Done():
Expand Down
205 changes: 197 additions & 8 deletions pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,33 @@ func newTestCPUServer(t *testing.T, podList []*v1.Pod) *cpuServer {
return cpuServer
}

func newTestCPUServerWithChanBuffer(t *testing.T, podList []*v1.Pod) *cpuServer {
recvCh := make(chan types.InternalCPUCalculationResult, 1)
sendCh := make(chan types.TriggerInfo, 1)
conf := generateTestConfiguration(t)

metricsFetcher := metric.NewFakeMetricsFetcher(metrics.DummyMetrics{})
metaCache, err := metacache.NewMetaCacheImp(conf, metricspool.DummyMetricsEmitterPool{}, metricsFetcher)
require.NoError(t, err)
require.NotNil(t, metaCache)

metaServer := &metaserver.MetaServer{
MetaAgent: &agent.MetaAgent{
PodFetcher: &pod.PodFetcherStub{
PodList: podList,
},
},
}

cpuServer, err := NewCPUServer(recvCh, sendCh, conf, metaCache, metaServer, metrics.DummyMetrics{})
require.NoError(t, err)
require.NotNil(t, cpuServer)

cpuServer.getCheckpointCalled = true

return cpuServer
}

func TestCPUServerStartAndStop(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -259,17 +286,17 @@ func DeepCopyResponse(response *cpuadvisor.ListAndWatchResponse) (*cpuadvisor.Li
return copyResponse, nil
}

type ContainerInfo struct {
request *advisorsvc.ContainerMetadata
podInfo *v1.Pod
allocationInfo *cpuadvisor.AllocationInfo
isolated bool
regions sets.String
}

func TestCPUServerListAndWatch(t *testing.T) {
t.Parallel()

type ContainerInfo struct {
request *advisorsvc.ContainerMetadata
podInfo *v1.Pod
allocationInfo *cpuadvisor.AllocationInfo
isolated bool
regions sets.String
}

tests := []struct {
name string
provision types.InternalCPUCalculationResult
Expand Down Expand Up @@ -1708,3 +1735,165 @@ func TestConcurrencyGetCheckpointAndAddContainer(t *testing.T) {
time.Sleep(10 * time.Second)
cancel()
}

func TestCPUServerDropOldAdvice(t *testing.T) {
t.Parallel()

cpuServer := newTestCPUServerWithChanBuffer(t, []*v1.Pod{})
s := &mockCPUServerService_ListAndWatchServer{ResultsChan: make(chan *cpuadvisor.ListAndWatchResponse)}
stop := make(chan struct{})
recvCh := cpuServer.recvCh.(chan types.InternalCPUCalculationResult)
recvCh <- types.InternalCPUCalculationResult{}
go func() {
err := cpuServer.ListAndWatch(&advisorsvc.Empty{}, s)
assert.NoError(t, err, "failed to LW cpu server")
stop <- struct{}{}
}()
provision := types.InternalCPUCalculationResult{
TimeStamp: time.Now(),
PoolEntries: map[string]map[int]int{
state.PoolNameReclaim: {
0: 4,
1: 8,
},
},
}
infos := []*ContainerInfo{
{
request: &advisorsvc.ContainerMetadata{
PodUid: "pod1",
ContainerName: "c1",
Annotations: map[string]string{
consts.PodAnnotationMemoryEnhancementNumaBinding: consts.PodAnnotationMemoryEnhancementNumaBindingEnable,
},
QosLevel: consts.PodAnnotationQoSLevelDedicatedCores,
},
podInfo: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod1",
UID: "pod1",
Annotations: map[string]string{
consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores,
consts.PodAnnotationMemoryEnhancementKey: "{\"numa_exclusive\":true}",
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "c1",
},
},
},
},
allocationInfo: &cpuadvisor.AllocationInfo{
OwnerPoolName: state.PoolNameDedicated,
TopologyAwareAssignments: map[uint64]string{
0: "0-3",
1: "24-47",
},
},
},
}
for _, info := range infos {
assert.NoError(t, cpuServer.addContainer(info.request))
assert.NoError(t, cpuServer.updateContainerInfo(info.request.PodUid, info.request.ContainerName, info.podInfo, info.allocationInfo))

nodeInfo, _ := cpuServer.metaCache.GetContainerInfo(info.request.PodUid, info.request.ContainerName)
nodeInfo.Isolated = info.isolated
if info.regions.Len() > 0 {
nodeInfo.RegionNames = info.regions
}
assert.NoError(t, cpuServer.metaCache.SetContainerInfo(info.request.PodUid, info.request.ContainerName, nodeInfo))
}

recvCh <- provision
res := <-s.ResultsChan
close(cpuServer.stopCh)
<-stop
copyres, err := DeepCopyResponse(res)
assert.NoError(t, err)
wantRes := &cpuadvisor.ListAndWatchResponse{
Entries: map[string]*cpuadvisor.CalculationEntries{
state.PoolNameReclaim: {
Entries: map[string]*cpuadvisor.CalculationInfo{
"": {
OwnerPoolName: state.PoolNameReclaim,
CalculationResultsByNumas: map[int64]*cpuadvisor.NumaCalculationResult{
0: {
Blocks: []*cpuadvisor.Block{
{
Result: 4,
OverlapTargets: []*cpuadvisor.OverlapTarget{
{
OverlapTargetPodUid: "pod1",
OverlapTargetContainerName: "c1",
OverlapType: cpuadvisor.OverlapType_OverlapWithPod,
},
},
},
},
},
1: {
Blocks: []*cpuadvisor.Block{
{
Result: 8,
OverlapTargets: []*cpuadvisor.OverlapTarget{
{
OverlapTargetPodUid: "pod1",
OverlapTargetContainerName: "c1",
OverlapType: cpuadvisor.OverlapType_OverlapWithPod,
},
},
},
},
},
},
},
},
},
"pod1": {
Entries: map[string]*cpuadvisor.CalculationInfo{
"c1": {
OwnerPoolName: state.PoolNameDedicated,
CalculationResultsByNumas: map[int64]*cpuadvisor.NumaCalculationResult{
0: {
Blocks: []*cpuadvisor.Block{
{
Result: 4,
OverlapTargets: []*cpuadvisor.OverlapTarget{
{
OverlapTargetPoolName: state.PoolNameReclaim,
OverlapType: cpuadvisor.OverlapType_OverlapWithPool,
},
},
},
},
},
1: {
Blocks: []*cpuadvisor.Block{
{
Result: 16,
OverlapTargets: nil,
},
{
Result: 8,
OverlapTargets: []*cpuadvisor.OverlapTarget{
{
OverlapTargetPoolName: state.PoolNameReclaim,
OverlapType: cpuadvisor.OverlapType_OverlapWithPool,
},
},
},
},
},
},
},
},
},
},
}
if !reflect.DeepEqual(copyres, wantRes) {
t.Errorf("ListAndWatch()\ngot = %+v, \nwant= %+v", general.ToString(copyres), general.ToString(wantRes))
}
}
15 changes: 14 additions & 1 deletion pkg/agent/sysadvisor/plugin/qosaware/server/memory_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,27 @@ func (ms *memoryServer) listContainers() error {

func (ms *memoryServer) ListAndWatch(_ *advisorsvc.Empty, server advisorsvc.AdvisorService_ListAndWatchServer) error {
_ = ms.emitter.StoreInt64(ms.genMetricsName(metricServerLWCalled), int64(ms.period.Seconds()), metrics.MetricTypeNameCount)
general.RegisterHeartbeatCheck(memoryServerHealthCheckName, healthCheckTolerationDuration, general.HealthzCheckStateNotReady, healthCheckTolerationDuration)
general.RegisterTemporaryHeartbeatCheck(memoryServerHealthCheckName, healthCheckTolerationDuration, general.HealthzCheckStateNotReady, healthCheckTolerationDuration)
defer general.UnregisterTemporaryHeartbeatCheck(memoryServerHealthCheckName)

recvCh, ok := ms.recvCh.(chan types.InternalMemoryCalculationResult)
if !ok {
return fmt.Errorf("recvCh convert failed")
}
ms.listAndWatchCalled = true

maxDropLength := len(recvCh)
klog.Infof("[qosaware-server-memory] drop all old memory advices in channel (max: %d)", maxDropLength)
for i := 0; i < maxDropLength; i++ {
select {
case <-recvCh:
default:
klog.Infof("[qosaware-server-memory] all old memory advice in channel is dropped (max: %d)", i)
break
}
}

klog.Infof("[qosaware-server-memory] start to push memory advice")
for {
select {
case <-server.Context().Done():
Expand Down
Loading

0 comments on commit c6f9ea9

Please sign in to comment.