Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: preserve allocations enriched during placement as 'informational' #24960

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions scheduler/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,10 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
untainted, migrate, lost, disconnecting, reconnecting, ignore, expiring := all.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients, a.now)
desiredChanges.Ignore += uint64(len(ignore))

// Determine what set of terminal allocations need to be rescheduled
untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, false, a.now, a.evalID, a.deployment)
// Determine what set of terminal allocations need to be rescheduled and
// see that we don't discard allocations that carry important information
// for future reschedules or deployments
untainted, rescheduleNow, rescheduleLater, informational := untainted.filterByRescheduleable(a.batch, false, a.now, a.evalID, a.deployment)

// If there are allocations reconnecting we need to reconcile them and
// their replacements first because there is specific logic when deciding
Expand Down Expand Up @@ -509,7 +511,7 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
// the reschedule policy won't be enabled and the lost allocations
// wont be rescheduled, and PreventRescheduleOnLost is ignored.
if tg.GetDisconnectLostTimeout() != 0 {
untaintedDisconnecting, rescheduleDisconnecting, laterDisconnecting := disconnecting.filterByRescheduleable(a.batch, true, a.now, a.evalID, a.deployment)
untaintedDisconnecting, rescheduleDisconnecting, laterDisconnecting, _ := disconnecting.filterByRescheduleable(a.batch, true, a.now, a.evalID, a.deployment)

rescheduleNow = rescheduleNow.union(rescheduleDisconnecting)
untainted = untainted.union(untaintedDisconnecting)
Expand Down Expand Up @@ -591,7 +593,7 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
// * An alloc was lost
var place []allocPlaceResult
if len(lostLater) == 0 {
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, lost, isCanarying)
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, lost, isCanarying, informational)
if !existingDeployment {
dstate.DesiredTotal += len(place)
}
Expand Down Expand Up @@ -798,7 +800,7 @@ func (a *allocReconciler) computeUnderProvisionedBy(group *structs.TaskGroup, un
// Placements will meet or exceed group count.
func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
nameIndex *allocNameIndex, untainted, migrate, reschedule, lost allocSet,
isCanarying bool) []allocPlaceResult {
isCanarying bool, informational []*structs.Allocation) []allocPlaceResult {

// Add rescheduled placement results
var place []allocPlaceResult
Expand Down Expand Up @@ -843,9 +845,18 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
// Add remaining placement results
if existing < group.Count {
for _, name := range nameIndex.Next(uint(group.Count - existing)) {

// if there are any informational allocs, pop and add them as previousAlloc to
// our new placement
var a *structs.Allocation
if len(informational) > 0 {
a, informational = informational[len(informational)-1], informational[:len(informational)-1]
}

place = append(place, allocPlaceResult{
name: name,
taskGroup: group,
previousAlloc: a,
downgradeNonCanary: isCanarying,
})
}
Expand Down
33 changes: 25 additions & 8 deletions scheduler/reconcile_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ func (a allocSet) fromKeys(keys ...[]string) allocSet {
return from
}

// filterByTainted takes a set of tainted nodes and filters the allocation set
// into the following groups:
// filterByTainted takes a set of tainted nodes and partitions the allocation
// set into the following groups:
// 1. Those that exist on untainted nodes
// 2. Those exist on nodes that are draining
// 3. Those that exist on lost nodes or have expired
Expand Down Expand Up @@ -385,14 +385,25 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, serverS
return
}

// filterByRescheduleable filters the allocation set to return the set of allocations that are either
// untainted or a set of allocations that must be rescheduled now. Allocations that can be rescheduled
// at a future time are also returned so that we can create follow up evaluations for them. Allocs are
// skipped or considered untainted according to logic defined in shouldFilter method.
func (a allocSet) filterByRescheduleable(isBatch, isDisconnecting bool, now time.Time, evalID string, deployment *structs.Deployment) (allocSet, allocSet, []*delayedRescheduleInfo) {
// filterByRescheduleable filters the allocation set to return the set of
// allocations that are either untainted or a set of allocations that must be
// rescheduled now. Allocations that can be rescheduled at a future time are
// also returned so that we can create follow up evaluations for them. Allocs
// are skipped or considered untainted according to logic defined in
// shouldFilter method.
//
// filterByRescheduleable returns an extra slice of allocations as its last
// output: these allocs are "informational." They will not be rescheduled now
// or later, but they carry important information for future allocations that
// might get rescheduled. An example of such allocations are stateful
// deployments: allocs that require particular host volume IDs.
func (a allocSet) filterByRescheduleable(
isBatch, isDisconnecting bool, now time.Time, evalID string, deployment *structs.Deployment,
) (allocSet, allocSet, []*delayedRescheduleInfo, []*structs.Allocation) {
untainted := make(map[string]*structs.Allocation)
rescheduleNow := make(map[string]*structs.Allocation)
rescheduleLater := []*delayedRescheduleInfo{}
informational := []*structs.Allocation{}

for _, alloc := range a {
// Ignore disconnecting allocs that are already unknown. This can happen
Expand All @@ -411,6 +422,12 @@ func (a allocSet) filterByRescheduleable(isBatch, isDisconnecting bool, now time
continue
}

// Allocations with host volume IDs can be ignored, but we must keep
// the information they carry for future migrated allocs
if len(alloc.HostVolumeIDs) > 0 {
informational = append(informational, alloc)
}
tgross marked this conversation as resolved.
Show resolved Hide resolved

isUntainted, ignore := shouldFilter(alloc, isBatch)
if isUntainted && !isDisconnecting {
untainted[alloc.ID] = alloc
Expand All @@ -436,7 +453,7 @@ func (a allocSet) filterByRescheduleable(isBatch, isDisconnecting bool, now time
}

}
return untainted, rescheduleNow, rescheduleLater
return untainted, rescheduleNow, rescheduleLater, informational
}

// shouldFilter returns whether the alloc should be ignored or considered untainted.
Expand Down
90 changes: 64 additions & 26 deletions scheduler/reconcile_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1875,6 +1875,8 @@ func TestAllocSet_filterByRescheduleable(t *testing.T) {
}
testJob.TaskGroups[0] = rescheduleTG

stickyDeploymentJob := mock.Job()

now := time.Now()

rt := &structs.RescheduleTracker{
Expand All @@ -1895,9 +1897,10 @@ func TestAllocSet_filterByRescheduleable(t *testing.T) {
deployment *structs.Deployment

// expected results
untainted allocSet
resNow allocSet
resLater []*delayedRescheduleInfo
untainted allocSet
resNow allocSet
resLater []*delayedRescheduleInfo
informational []*structs.Allocation
}

testCases := []testCase{
Expand All @@ -1921,8 +1924,9 @@ func TestAllocSet_filterByRescheduleable(t *testing.T) {
TaskGroup: "noRescheduleTG",
},
},
resNow: allocSet{},
resLater: []*delayedRescheduleInfo{},
resNow: allocSet{},
resLater: []*delayedRescheduleInfo{},
informational: []*structs.Allocation{},
},
{
name: "batch ignore unknown disconnecting allocs",
Expand All @@ -1935,9 +1939,10 @@ func TestAllocSet_filterByRescheduleable(t *testing.T) {
Job: testJob,
},
},
untainted: allocSet{},
resNow: allocSet{},
resLater: []*delayedRescheduleInfo{},
untainted: allocSet{},
resNow: allocSet{},
resLater: []*delayedRescheduleInfo{},
informational: []*structs.Allocation{},
},
{
name: "batch disconnecting allocation reschedule",
Expand All @@ -1962,7 +1967,8 @@ func TestAllocSet_filterByRescheduleable(t *testing.T) {
RescheduleTracker: rt,
},
},
resLater: []*delayedRescheduleInfo{},
resLater: []*delayedRescheduleInfo{},
informational: []*structs.Allocation{},
},

{
Expand Down Expand Up @@ -1991,8 +1997,9 @@ func TestAllocSet_filterByRescheduleable(t *testing.T) {
"task": {State: structs.TaskStateDead, Failed: false}},
},
},
resNow: allocSet{},
resLater: []*delayedRescheduleInfo{},
resNow: allocSet{},
resLater: []*delayedRescheduleInfo{},
informational: []*structs.Allocation{},
},
{
name: "service disconnecting allocation no reschedule",
Expand All @@ -2014,8 +2021,9 @@ func TestAllocSet_filterByRescheduleable(t *testing.T) {
TaskGroup: "noRescheduleTG",
},
},
resNow: allocSet{},
resLater: []*delayedRescheduleInfo{},
resNow: allocSet{},
resLater: []*delayedRescheduleInfo{},
informational: []*structs.Allocation{},
},
{
name: "service disconnecting allocation reschedule",
Expand All @@ -2040,7 +2048,8 @@ func TestAllocSet_filterByRescheduleable(t *testing.T) {
RescheduleTracker: rt,
},
},
resLater: []*delayedRescheduleInfo{},
resLater: []*delayedRescheduleInfo{},
informational: []*structs.Allocation{},
},
{
name: "service ignore unknown disconnecting allocs",
Expand All @@ -2053,9 +2062,10 @@ func TestAllocSet_filterByRescheduleable(t *testing.T) {
Job: testJob,
},
},
untainted: allocSet{},
resNow: allocSet{},
resLater: []*delayedRescheduleInfo{},
untainted: allocSet{},
resNow: allocSet{},
resLater: []*delayedRescheduleInfo{},
informational: []*structs.Allocation{},
},
{
name: "service previously rescheduled alloc should not reschedule",
Expand All @@ -2070,9 +2080,10 @@ func TestAllocSet_filterByRescheduleable(t *testing.T) {
TaskGroup: "rescheduleTG",
},
},
untainted: allocSet{},
resNow: allocSet{},
resLater: []*delayedRescheduleInfo{},
untainted: allocSet{},
resNow: allocSet{},
resLater: []*delayedRescheduleInfo{},
informational: []*structs.Allocation{},
},
{
name: "service complete should be ignored",
Expand All @@ -2087,9 +2098,10 @@ func TestAllocSet_filterByRescheduleable(t *testing.T) {
TaskGroup: "rescheduleTG",
},
},
untainted: allocSet{},
resNow: allocSet{},
resLater: []*delayedRescheduleInfo{},
untainted: allocSet{},
resNow: allocSet{},
resLater: []*delayedRescheduleInfo{},
informational: []*structs.Allocation{},
},
{
name: "service running allocation no reschedule",
Expand All @@ -2111,18 +2123,44 @@ func TestAllocSet_filterByRescheduleable(t *testing.T) {
TaskGroup: "noRescheduleTG",
},
},
resNow: allocSet{},
resLater: []*delayedRescheduleInfo{},
resNow: allocSet{},
resLater: []*delayedRescheduleInfo{},
informational: []*structs.Allocation{},
},
{
name: "allocation with sticky volumes",
isDisconnecting: false,
isBatch: false,
all: allocSet{
"sticky": {
ID: "sticky",
DesiredStatus: structs.AllocDesiredStatusStop,
ClientStatus: structs.AllocClientStatusComplete,
Job: stickyDeploymentJob,
HostVolumeIDs: []string{"hostVolumeID1"},
},
},
untainted: allocSet{},
resNow: allocSet{},
resLater: []*delayedRescheduleInfo{},
informational: []*structs.Allocation{{
ID: "sticky",
DesiredStatus: structs.AllocDesiredStatusStop,
ClientStatus: structs.AllocClientStatusComplete,
Job: stickyDeploymentJob,
HostVolumeIDs: []string{"hostVolumeID1"},
}},
tgross marked this conversation as resolved.
Show resolved Hide resolved
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
untainted, resNow, resLater := tc.all.filterByRescheduleable(tc.isBatch,
untainted, resNow, resLater, informational := tc.all.filterByRescheduleable(tc.isBatch,
tc.isDisconnecting, now, "evailID", tc.deployment)
must.Eq(t, tc.untainted, untainted, must.Sprintf("with-nodes: untainted"))
must.Eq(t, tc.resNow, resNow, must.Sprintf("with-nodes: reschedule-now"))
must.Eq(t, tc.resLater, resLater, must.Sprintf("with-nodes: rescheduleLater"))
must.Eq(t, tc.informational, informational, must.Sprintf("with-nodes: informational"))
})
}
}
Loading