diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index 88f7ec7a13eed..44a2aed084ef8 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -204,7 +204,7 @@ type priorityLevelState struct { // reached through this pointer is mutable. pl *flowcontrol.PriorityLevelConfiguration - // qsCompleter holds the QueueSetCompleter derived from `pl` + // qsCompleter holds the QueueSetCompleter derived from `config` // and `queues`. qsCompleter fq.QueueSetCompleter @@ -255,12 +255,12 @@ type priorityLevelState struct { type seatDemandStats struct { avg float64 stdDev float64 - highWatermark int + highWatermark float64 smoothed float64 } func (stats *seatDemandStats) update(obs fq.IntegratorResults) { - stats.highWatermark = int(math.Round(obs.Max)) + stats.highWatermark = obs.Max if obs.Duration <= 0 { return } @@ -398,63 +398,38 @@ func (cfgCtlr *configController) updateBorrowing() { func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plStates map[string]*priorityLevelState) { items := make([]allocProblemItem, 0, len(plStates)) - nonExemptPLNames := make([]string, 0, len(plStates)) - idxOfNonExempt := map[string]int{} - cclOfExempt := map[string]int{} - var minCLSum, minCurrentCLSum int - remainingServerCL := cfgCtlr.nominalCLSum + plNames := make([]string, 0, len(plStates)) for plName, plState := range plStates { obs := plState.seatDemandIntegrator.Reset() plState.seatDemandStats.update(obs) - var minCurrentCL int - if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt { - minCurrentCL = max(plState.minCL, plState.seatDemandStats.highWatermark) - cclOfExempt[plName] = minCurrentCL - remainingServerCL -= minCurrentCL - } else { - // Lower bound on this priority level's adjusted concurreny limit is the lesser of: - // - its seat demamd high watermark over the last adjustment period, and - // - its configured concurrency limit. - // BUT: we do not want this to be lower than the lower bound from configuration. - // See KEP-1040 for a more detailed explanation. - minCurrentCL = max(plState.minCL, min(plState.nominalCL, plState.seatDemandStats.highWatermark)) - idxOfNonExempt[plName] = len(items) - nonExemptPLNames = append(nonExemptPLNames, plName) - items = append(items, allocProblemItem{ - lowerBound: float64(minCurrentCL), - upperBound: float64(plState.maxCL), - target: math.Max(float64(minCurrentCL), plState.seatDemandStats.smoothed), - }) - } - minCLSum += plState.minCL - minCurrentCLSum += minCurrentCL + // Lower bound on this priority level's adjusted concurreny limit is the lesser of: + // - its seat demamd high watermark over the last adjustment period, and + // - its configured concurrency limit. + // BUT: we do not want this to be lower than the lower bound from configuration. + // See KEP-1040 for a more detailed explanation. + minCurrentCL := math.Max(float64(plState.minCL), math.Min(float64(plState.nominalCL), plState.seatDemandStats.highWatermark)) + plNames = append(plNames, plName) + items = append(items, allocProblemItem{ + lowerBound: minCurrentCL, + upperBound: float64(plState.maxCL), + target: math.Max(minCurrentCL, plState.seatDemandStats.smoothed), + }) } if len(items) == 0 && cfgCtlr.nominalCLSum > 0 { klog.ErrorS(nil, "Impossible: no priority levels", "plStates", cfgCtlr.priorityLevelStates) return } - var allocs []float64 - var shareFrac, fairFrac float64 - var err error - if remainingServerCL <= minCLSum { - metrics.SetFairFrac(0) - } else if remainingServerCL <= minCurrentCLSum { - shareFrac = float64(remainingServerCL-minCLSum) / float64(minCurrentCLSum-minCLSum) - metrics.SetFairFrac(0) - } else { - allocs, fairFrac, err = computeConcurrencyAllocation(cfgCtlr.nominalCLSum, items) - if err != nil { - klog.ErrorS(err, "Unable to derive new concurrency limits", "plNames", nonExemptPLNames, "items", items) - allocs = make([]float64, len(items)) - for idx, plName := range nonExemptPLNames { - plState := plStates[plName] - allocs[idx] = float64(plState.currentCL) - } + allocs, fairFrac, err := computeConcurrencyAllocation(cfgCtlr.nominalCLSum, items) + if err != nil { + klog.ErrorS(err, "Unable to derive new concurrency limits", "plNames", plNames, "items", items) + allocs = make([]float64, len(items)) + for idx, plName := range plNames { + plState := plStates[plName] + allocs[idx] = float64(plState.currentCL) } - metrics.SetFairFrac(float64(fairFrac)) } - for plName, plState := range plStates { - idx, isNonExempt := idxOfNonExempt[plName] + for idx, plName := range plNames { + plState := plStates[plName] if setCompleters { qsCompleter, err := queueSetCompleterForPL(cfgCtlr.queueSetFactory, plState.queues, plState.pl, plState.reqsGaugePair, plState.execSeatsObs, @@ -465,20 +440,10 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta } plState.qsCompleter = qsCompleter } - var currentCL int - if !isNonExempt { - currentCL = cclOfExempt[plName] - } else if remainingServerCL <= minCLSum { - currentCL = plState.minCL - } else if remainingServerCL <= minCurrentCLSum { - minCurrentCL := max(plState.minCL, min(plState.nominalCL, plState.seatDemandStats.highWatermark)) - currentCL = plState.minCL + int(math.Round(float64(minCurrentCL-plState.minCL)*shareFrac)) - } else { - currentCL = int(math.Round(float64(allocs[idx]))) - } + currentCL := int(math.Round(float64(allocs[idx]))) relChange := relDiff(float64(currentCL), float64(plState.currentCL)) plState.currentCL = currentCL - metrics.NotePriorityLevelConcurrencyAdjustment(plState.pl.Name, float64(plState.seatDemandStats.highWatermark), plState.seatDemandStats.avg, plState.seatDemandStats.stdDev, plState.seatDemandStats.smoothed, float64(items[idx].target), currentCL) + metrics.NotePriorityLevelConcurrencyAdjustment(plState.pl.Name, plState.seatDemandStats.highWatermark, plState.seatDemandStats.avg, plState.seatDemandStats.stdDev, plState.seatDemandStats.smoothed, float64(items[idx].target), currentCL) logLevel := klog.Level(4) if relChange >= 0.05 { logLevel = 2 @@ -493,6 +458,7 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta klog.V(logLevel).InfoS("Update CurrentCL", "plName", plName, "seatDemandHighWatermark", plState.seatDemandStats.highWatermark, "seatDemandAvg", plState.seatDemandStats.avg, "seatDemandStdev", plState.seatDemandStats.stdDev, "seatDemandSmoothed", plState.seatDemandStats.smoothed, "fairFrac", fairFrac, "currentCL", currentCL, "concurrencyDenominator", concurrencyDenominator, "backstop", err != nil) plState.queues = plState.qsCompleter.Complete(fq.DispatchingConfig{ConcurrencyLimit: currentCL, ConcurrencyDenominator: concurrencyDenominator}) } + metrics.SetFairFrac(float64(fairFrac)) } // runWorker is the logic of the one and only worker goroutine. We