Skip to content

Commit

Permalink
Fix Issues Arising from Confusing Weighted Costs (#3758)
Browse files Browse the repository at this point in the history
* wip

Signed-off-by: Chris Martin <[email protected]>

* wip

Signed-off-by: Chris Martin <[email protected]>

* wip

Signed-off-by: Chris Martin <[email protected]>

* code review comment

Signed-off-by: Chris Martin <[email protected]>

---------

Signed-off-by: Chris Martin <[email protected]>
Co-authored-by: Chris Martin <[email protected]>
  • Loading branch information
d80tb7 and d80tb7 authored Jun 28, 2024
1 parent 09e1a12 commit 481b2de
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 14 deletions.
4 changes: 2 additions & 2 deletions internal/scheduler/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (sctx *SchedulingContext) GetQueue(queue string) (fairness.Queue, bool) {
func (sctx *SchedulingContext) TotalCost() float64 {
var rv float64
for _, qctx := range sctx.QueueSchedulingContexts {
rv += sctx.FairnessCostProvider.CostFromQueue(qctx)
rv += sctx.FairnessCostProvider.UnweightedCostFromQueue(qctx)
}
return rv
}
Expand All @@ -187,7 +187,7 @@ func (sctx *SchedulingContext) UpdateFairShares() {
for queueName, qctx := range sctx.QueueSchedulingContexts {
cappedShare := 1.0
if !sctx.TotalResources.IsZero() {
cappedShare = sctx.FairnessCostProvider.CostFromAllocationAndWeight(qctx.Demand, qctx.Weight) * qctx.Weight
cappedShare = sctx.FairnessCostProvider.UnweightedCostFromAllocation(qctx.Demand)
}
queueInfos = append(queueInfos, &queueInfo{
queueName: queueName,
Expand Down
22 changes: 16 additions & 6 deletions internal/scheduler/fairness/fairness.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ type Queue interface {

// FairnessCostProvider captures algorithms to compute the cost of an allocation.
type FairnessCostProvider interface {
CostFromQueue(queue Queue) float64
CostFromAllocationAndWeight(allocation schedulerobjects.ResourceList, weight float64) float64
UnweightedCostFromQueue(queue Queue) float64
UnweightedCostFromAllocation(allocation schedulerobjects.ResourceList) float64
WeightedCostFromQueue(queue Queue) float64
WeightedCostFromAllocation(allocation schedulerobjects.ResourceList, weight float64) float64
}

type DominantResourceFairness struct {
Expand All @@ -42,11 +44,19 @@ func NewDominantResourceFairness(totalResources schedulerobjects.ResourceList, r
}, nil
}

func (f *DominantResourceFairness) CostFromQueue(queue Queue) float64 {
return f.CostFromAllocationAndWeight(queue.GetAllocation(), queue.GetWeight())
func (f *DominantResourceFairness) WeightedCostFromQueue(queue Queue) float64 {
return f.UnweightedCostFromQueue(queue) / queue.GetWeight()
}

func (f *DominantResourceFairness) CostFromAllocationAndWeight(allocation schedulerobjects.ResourceList, weight float64) float64 {
func (f *DominantResourceFairness) UnweightedCostFromQueue(queue Queue) float64 {
return f.UnweightedCostFromAllocation(queue.GetAllocation())
}

func (f *DominantResourceFairness) WeightedCostFromAllocation(allocation schedulerobjects.ResourceList, weight float64) float64 {
return f.UnweightedCostFromAllocation(allocation) / weight
}

func (f *DominantResourceFairness) UnweightedCostFromAllocation(allocation schedulerobjects.ResourceList) float64 {
var cost float64
for _, t := range f.resourcesToConsider {
capacity := f.totalResources.Get(t)
Expand All @@ -60,5 +70,5 @@ func (f *DominantResourceFairness) CostFromAllocationAndWeight(allocation schedu
cost = tcost
}
}
return cost / weight
return cost
}
6 changes: 3 additions & 3 deletions internal/scheduler/fairness/fairness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,12 @@ func TestDominantResourceFairness(t *testing.T) {
assert.Equal(
t,
tc.expectedCost,
f.CostFromAllocationAndWeight(tc.allocation, tc.weight),
f.WeightedCostFromAllocation(tc.allocation, tc.weight),
)
assert.Equal(
t,
f.CostFromAllocationAndWeight(tc.allocation, tc.weight),
f.CostFromQueue(MinimalQueue{allocation: tc.allocation, weight: tc.weight}),
f.WeightedCostFromAllocation(tc.allocation, tc.weight),
f.WeightedCostFromQueue(MinimalQueue{allocation: tc.allocation, weight: tc.weight}),
)
})
}
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche
return false
}
if qctx, ok := sch.schedulingContext.QueueSchedulingContexts[job.Queue()]; ok {
actualShare := sch.schedulingContext.FairnessCostProvider.CostFromQueue(qctx) / totalCost
actualShare := sch.schedulingContext.FairnessCostProvider.UnweightedCostFromQueue(qctx) / totalCost
fairShare := qctx.FairShare
if sch.useAdjustedFairShareProtection {
fairShare = math.Max(qctx.AdjustedFairShare, fairShare)
Expand Down
31 changes: 31 additions & 0 deletions internal/scheduler/preempting_queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1312,6 +1312,37 @@ func TestPreemptingQueueScheduler(t *testing.T) {
"D": 1,
},
},
"ProtectedFractionOfFairShare non equal weights": {
SchedulingConfig: testfixtures.WithProtectedFractionOfFairShareConfig(
1.0,
testfixtures.TestSchedulingConfig(),
),
Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities),
Rounds: []SchedulingRound{
{
JobsByQueue: map[string][]*jobdb.Job{
"A": testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass2NonPreemptible, 24),
"B": testfixtures.N1Cpu4GiJobs("B", testfixtures.PriorityClass0, 8),
},
ExpectedScheduledIndices: map[string][]int{
"A": testfixtures.IntRange(0, 23),
"B": testfixtures.IntRange(0, 7),
},
},
{
// D submits one more job. No preemption occurs because B is below adjusted fair share
JobsByQueue: map[string][]*jobdb.Job{
"C": testfixtures.N1Cpu4GiJobs("C", testfixtures.PriorityClass0, 1),
},
},
{}, // Empty round to make sure nothing changes.
},
PriorityFactorByQueue: map[string]float64{
"A": 1,
"B": 2,
"C": 1,
},
},
"DominantResourceFairness": {
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities),
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ func (it *CandidateGangIterator) queueCostWithGctx(gctx *schedulercontext.GangSc
it.buffer.Zero()
it.buffer.Add(queue.GetAllocation())
it.buffer.Add(gctx.TotalResourceRequests)
return it.fairnessCostProvider.CostFromAllocationAndWeight(it.buffer, queue.GetWeight()), nil
return it.fairnessCostProvider.WeightedCostFromAllocation(it.buffer, queue.GetWeight()), nil
}

// Priority queue used by CandidateGangIterator to determine from which queue to schedule the next job.
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/scheduler_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (metrics *SchedulerMetrics) calculateQueuePoolMetrics(schedulingContexts []

for queue, queueContext := range schedContext.QueueSchedulingContexts {
key := queuePoolKey{queue: queue, pool: pool}
actualShare := schedContext.FairnessCostProvider.CostFromQueue(queueContext) / totalCost
actualShare := schedContext.FairnessCostProvider.UnweightedCostFromQueue(queueContext) / totalCost
result[key] = queuePoolData{
numberOfJobsConsidered: len(queueContext.UnsuccessfulJobSchedulingContexts) + len(queueContext.SuccessfulJobSchedulingContexts),
fairShare: queueContext.FairShare,
Expand Down

0 comments on commit 481b2de

Please sign in to comment.