Skip to content

Commit

Permalink
Remove scheduler waits to speed up recovery time (#8200)
Browse files Browse the repository at this point in the history
Currently, the scheduler and autoscaler are single threads and use
a lock to prevent multiple scheduling and autoscaling decision
from happening in parallel; this is not a problem for our use
cases, however, the multiple `wait` currently present are slowing
down recovery time.

From my testing, if I delete and recreate the Kafka control plane
and data plane, without this patch it takes 1h to recover when there
are 400 triggers or 20 minutes when there are 100 triggers; with the
patch it is immediate (only a 2/3 minutes with 400 triggers).

- Remove `wait`s from state builder and autoscaler
- Add additional debug logs
- Use logger provided through the context as opposed to gloabal loggers
  in each individual component to preserve `knative/pkg` resource aware
  log keys.

Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi committed Sep 23, 2024
1 parent 17ab884 commit 3060ca8
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 160 deletions.
8 changes: 4 additions & 4 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,18 @@ type Evictor func(pod *corev1.Pod, vpod VPod, from *duckv1alpha1.Placement) erro
// Scheduler is responsible for placing VPods into real Kubernetes pods
type Scheduler interface {
// Schedule computes the new set of placements for vpod.
Schedule(vpod VPod) ([]duckv1alpha1.Placement, error)
Schedule(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error)
}

// SchedulerFunc type is an adapter to allow the use of
// ordinary functions as Schedulers. If f is a function
// with the appropriate signature, SchedulerFunc(f) is a
// Scheduler that calls f.
type SchedulerFunc func(vpod VPod) ([]duckv1alpha1.Placement, error)
type SchedulerFunc func(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error)

// Schedule implements the Scheduler interface.
func (f SchedulerFunc) Schedule(vpod VPod) ([]duckv1alpha1.Placement, error) {
return f(vpod)
func (f SchedulerFunc) Schedule(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error) {
return f(ctx, vpod)
}

// VPod represents virtual replicas placed into real Kubernetes pods
Expand Down
5 changes: 3 additions & 2 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package scheduler

import (
"context"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -28,12 +29,12 @@ func TestSchedulerFuncSchedule(t *testing.T) {

called := 0

var s Scheduler = SchedulerFunc(func(vpod VPod) ([]duckv1alpha1.Placement, error) {
var s Scheduler = SchedulerFunc(func(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error) {
called++
return nil, nil
})

_, err := s.Schedule(nil)
_, err := s.Schedule(context.Background(), nil)
require.Nil(t, err)
require.Equal(t, 1, called)
}
9 changes: 5 additions & 4 deletions pkg/scheduler/state/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"

"knative.dev/eventing/pkg/scheduler"
)

Expand Down Expand Up @@ -55,10 +56,10 @@ func SatisfyZoneAvailability(feasiblePods []int32, states *State) bool {
var zoneName string
var err error
for _, podID := range feasiblePods {
wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
zoneName, _, err = states.GetPodInfo(PodNameFromOrdinal(states.StatefulSetName, podID))
return err == nil, nil
})
zoneName, _, err = states.GetPodInfo(PodNameFromOrdinal(states.StatefulSetName, podID))
if err != nil {
continue
}
zoneMap[zoneName] = struct{}{}
}
return len(zoneMap) == int(states.NumZones)
Expand Down
95 changes: 44 additions & 51 deletions pkg/scheduler/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,14 @@ import (
"errors"
"math"
"strconv"
"time"

"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
corev1 "k8s.io/client-go/listers/core/v1"

"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/scheduler"
Expand All @@ -42,7 +39,7 @@ type StateAccessor interface {
// State returns the current state (snapshot) about placed vpods
// Take into account reserved vreplicas and update `reserved` to reflect
// the current state.
State(reserved map[types.NamespacedName]map[string]int32) (*State, error)
State(ctx context.Context, reserved map[types.NamespacedName]map[string]int32) (*State, error)
}

// state provides information about the current scheduling of all vpods
Expand Down Expand Up @@ -152,8 +149,6 @@ func (s *State) IsSchedulablePod(ordinal int32) bool {

// stateBuilder reconstruct the state from scratch, by listing vpods
type stateBuilder struct {
ctx context.Context
logger *zap.SugaredLogger
vpodLister scheduler.VPodLister
capacity int32
schedulerPolicy scheduler.SchedulerPolicyType
Expand All @@ -166,11 +161,9 @@ type stateBuilder struct {
}

// NewStateBuilder returns a StateAccessor recreating the state from scratch each time it is requested
func NewStateBuilder(ctx context.Context, namespace, sfsname string, lister scheduler.VPodLister, podCapacity int32, schedulerPolicy scheduler.SchedulerPolicyType, schedPolicy *scheduler.SchedulerPolicy, deschedPolicy *scheduler.SchedulerPolicy, podlister corev1.PodNamespaceLister, nodeLister corev1.NodeLister, statefulSetCache *scheduler.ScaleCache) StateAccessor {
func NewStateBuilder(sfsname string, lister scheduler.VPodLister, podCapacity int32, schedulerPolicy scheduler.SchedulerPolicyType, schedPolicy, deschedPolicy *scheduler.SchedulerPolicy, podlister corev1.PodNamespaceLister, nodeLister corev1.NodeLister, statefulSetCache *scheduler.ScaleCache) StateAccessor {

return &stateBuilder{
ctx: ctx,
logger: logging.FromContext(ctx),
vpodLister: lister,
capacity: podCapacity,
schedulerPolicy: schedulerPolicy,
Expand All @@ -183,15 +176,18 @@ func NewStateBuilder(ctx context.Context, namespace, sfsname string, lister sche
}
}

func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) (*State, error) {
func (s *stateBuilder) State(ctx context.Context, reserved map[types.NamespacedName]map[string]int32) (*State, error) {
vpods, err := s.vpodLister()
if err != nil {
return nil, err
}

scale, err := s.statefulSetCache.GetScale(s.ctx, s.statefulSetName, metav1.GetOptions{})
logger := logging.FromContext(ctx).With("subcomponent", "statebuilder")
ctx = logging.WithLogger(ctx, logger)

scale, err := s.statefulSetCache.GetScale(ctx, s.statefulSetName, metav1.GetOptions{})
if err != nil {
s.logger.Infow("failed to get statefulset", zap.Error(err))
logger.Infow("failed to get statefulset", zap.Error(err))
return nil, err
}

Expand Down Expand Up @@ -235,36 +231,35 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
}

for podId := int32(0); podId < scale.Spec.Replicas && s.podLister != nil; podId++ {
var pod *v1.Pod
wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
pod, err = s.podLister.Get(PodNameFromOrdinal(s.statefulSetName, podId))
return err == nil, nil
})

if pod != nil {
if isPodUnschedulable(pod) {
// Pod is marked for eviction - CANNOT SCHEDULE VREPS on this pod.
continue
}

node, err := s.nodeLister.Get(pod.Spec.NodeName)
if err != nil {
return nil, err
}
pod, err := s.podLister.Get(PodNameFromOrdinal(s.statefulSetName, podId))
if err != nil {
logger.Warnw("Failed to get pod", zap.Int32("ordinal", podId), zap.Error(err))
continue
}
if isPodUnschedulable(pod) {
// Pod is marked for eviction - CANNOT SCHEDULE VREPS on this pod.
logger.Debugw("Pod is unschedulable", zap.Any("pod", pod))
continue
}

if isNodeUnschedulable(node) {
// Node is marked as Unschedulable - CANNOT SCHEDULE VREPS on a pod running on this node.
continue
}
node, err := s.nodeLister.Get(pod.Spec.NodeName)
if err != nil {
return nil, err
}

// Pod has no annotation or not annotated as unschedulable and
// not on an unschedulable node, so add to feasible
schedulablePods.Insert(podId)
if isNodeUnschedulable(node) {
// Node is marked as Unschedulable - CANNOT SCHEDULE VREPS on a pod running on this node.
logger.Debugw("Pod is on an unschedulable node", zap.Any("pod", node))
continue
}

// Pod has no annotation or not annotated as unschedulable and
// not on an unschedulable node, so add to feasible
schedulablePods.Insert(podId)
}

for _, p := range schedulablePods.List() {
free, last = s.updateFreeCapacity(free, last, PodNameFromOrdinal(s.statefulSetName, p), 0)
free, last = s.updateFreeCapacity(logger, free, last, PodNameFromOrdinal(s.statefulSetName, p), 0)
}

// Getting current state from existing placements for all vpods
Expand All @@ -286,15 +281,14 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
// Account for reserved vreplicas
vreplicas = withReserved(vpod.GetKey(), podName, vreplicas, reserved)

free, last = s.updateFreeCapacity(free, last, podName, vreplicas)
free, last = s.updateFreeCapacity(logger, free, last, podName, vreplicas)

withPlacement[vpod.GetKey()][podName] = true

var pod *v1.Pod
wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
pod, err = s.podLister.Get(podName)
return err == nil, nil
})
pod, err := s.podLister.Get(podName)
if err != nil {
logger.Warnw("Failed to get pod", zap.String("podName", podName), zap.Error(err))
}

if pod != nil && schedulablePods.Has(OrdinalFromPodName(pod.GetName())) {
nodeName := pod.Spec.NodeName //node name for this pod
Expand All @@ -315,11 +309,10 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
continue
}

var pod *v1.Pod
wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
pod, err = s.podLister.Get(podName)
return err == nil, nil
})
pod, err := s.podLister.Get(podName)
if err != nil {
logger.Warnw("Failed to get pod", zap.String("podName", podName), zap.Error(err))
}

if pod != nil && schedulablePods.Has(OrdinalFromPodName(pod.GetName())) {
nodeName := pod.Spec.NodeName //node name for this pod
Expand All @@ -330,15 +323,15 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
}
}

free, last = s.updateFreeCapacity(free, last, podName, rvreplicas)
free, last = s.updateFreeCapacity(logger, free, last, podName, rvreplicas)
}
}

state := &State{FreeCap: free, SchedulablePods: schedulablePods.List(), LastOrdinal: last, Capacity: s.capacity, Replicas: scale.Spec.Replicas, NumZones: int32(len(zoneMap)), NumNodes: int32(len(nodeToZoneMap)),
SchedulerPolicy: s.schedulerPolicy, SchedPolicy: s.schedPolicy, DeschedPolicy: s.deschedPolicy, NodeToZoneMap: nodeToZoneMap, StatefulSetName: s.statefulSetName, PodLister: s.podLister,
PodSpread: podSpread, NodeSpread: nodeSpread, ZoneSpread: zoneSpread, Pending: pending, ExpectedVReplicaByVPod: expectedVReplicasByVPod}

s.logger.Infow("cluster state info", zap.Any("state", state), zap.Any("reserved", toJSONable(reserved)))
logger.Infow("cluster state info", zap.Any("state", state), zap.Any("reserved", toJSONable(reserved)))

return state, nil
}
Expand All @@ -350,7 +343,7 @@ func pendingFromVPod(vpod scheduler.VPod) int32 {
return int32(math.Max(float64(0), float64(expected-scheduled)))
}

func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName string, vreplicas int32) ([]int32, int32) {
func (s *stateBuilder) updateFreeCapacity(logger *zap.SugaredLogger, free []int32, last int32, podName string, vreplicas int32) ([]int32, int32) {
ordinal := OrdinalFromPodName(podName)
free = grow(free, ordinal, s.capacity)

Expand All @@ -359,7 +352,7 @@ func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName stri
// Assert the pod is not overcommitted
if free[ordinal] < 0 {
// This should not happen anymore. Log as an error but do not interrupt the current scheduling.
s.logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal]))
logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal]))
}

if ordinal > last {
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,8 +645,8 @@ func TestStateBuilder(t *testing.T) {

scaleCache := scheduler.NewScaleCache(ctx, testNs, kubeclient.Get(ctx).AppsV1().StatefulSets(testNs), scheduler.ScaleCacheConfig{RefreshPeriod: time.Minute * 5})

stateBuilder := NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, int32(10), tc.schedulerPolicyType, &scheduler.SchedulerPolicy{}, &scheduler.SchedulerPolicy{}, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister(), scaleCache)
state, err := stateBuilder.State(tc.reserved)
stateBuilder := NewStateBuilder(sfsName, vpodClient.List, int32(10), tc.schedulerPolicyType, &scheduler.SchedulerPolicy{}, &scheduler.SchedulerPolicy{}, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister(), scaleCache)
state, err := stateBuilder.State(ctx, tc.reserved)
if err != nil {
t.Fatal("unexpected error", err)
}
Expand Down
Loading

0 comments on commit 3060ca8

Please sign in to comment.