Skip to content

Commit

Permalink
pkg: use minimum interface for schedulers (#6553)
Browse files Browse the repository at this point in the history
ref #5839, ref #6551

Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx authored Jun 9, 2023
1 parent f290e66 commit 181e613
Show file tree
Hide file tree
Showing 36 changed files with 162 additions and 162 deletions.
3 changes: 0 additions & 3 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,6 @@ func (mc *Cluster) GetPersistOptions() *config.PersistOptions {
// UpdateRegionsLabelLevelStats updates the label level stats for the regions.
func (mc *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {}

// CheckSchedulingAllowance checks if the cluster allows scheduling currently.
func (mc *Cluster) CheckSchedulingAllowance() (bool, error) { return true, nil }

// LoadRegion puts region info without leader
func (mc *Cluster) LoadRegion(regionID uint64, peerStoreIDs ...uint64) {
// regions load from etcd will have no leader
Expand Down
8 changes: 4 additions & 4 deletions pkg/schedule/checker/merge_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,14 @@ var (
// MergeChecker ensures region to merge with adjacent region when size is small
type MergeChecker struct {
PauseController
cluster sche.ClusterInformer
cluster sche.ScheduleCluster
conf config.Config
splitCache *cache.TTLUint64
startTime time.Time // it's used to judge whether server recently start.
}

// NewMergeChecker creates a merge checker.
func NewMergeChecker(ctx context.Context, cluster sche.ClusterInformer, conf config.Config) *MergeChecker {
func NewMergeChecker(ctx context.Context, cluster sche.ScheduleCluster, conf config.Config) *MergeChecker {
splitCache := cache.NewIDTTL(ctx, time.Minute, conf.GetSplitMergeInterval())
return &MergeChecker{
cluster: cluster,
Expand Down Expand Up @@ -250,7 +250,7 @@ func (m *MergeChecker) checkTarget(region, adjacent *core.RegionInfo) bool {
}

// AllowMerge returns true if two regions can be merged according to the key type.
func AllowMerge(cluster sche.ClusterInformer, region, adjacent *core.RegionInfo) bool {
func AllowMerge(cluster sche.ScheduleCluster, region, adjacent *core.RegionInfo) bool {
var start, end []byte
if bytes.Equal(region.GetEndKey(), adjacent.GetStartKey()) && len(region.GetEndKey()) != 0 {
start, end = region.GetStartKey(), adjacent.GetEndKey()
Expand Down Expand Up @@ -306,7 +306,7 @@ func isTableIDSame(region, adjacent *core.RegionInfo) bool {
// Check whether there is a peer of the adjacent region on an offline store,
// while the source region has no peer on it. This is to prevent from bringing
// any other peer into an offline store to slow down the offline process.
func checkPeerStore(cluster sche.ClusterInformer, region, adjacent *core.RegionInfo) bool {
func checkPeerStore(cluster sche.ScheduleCluster, region, adjacent *core.RegionInfo) bool {
regionStoreIDs := region.GetStoreIDs()
for _, peer := range adjacent.GetPeers() {
storeID := peer.GetStoreId()
Expand Down
2 changes: 2 additions & 0 deletions pkg/schedule/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ func IsSchedulerRegistered(name string) bool {

// Config is the interface that wraps the Config related methods.
type Config interface {
IsSchedulingHalted() bool

GetReplicaScheduleLimit() uint64
GetRegionScheduleLimit() uint64
GetMergeScheduleLimit() uint64
Expand Down
10 changes: 5 additions & 5 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,7 @@ func (c *Coordinator) RecordOpStepWithTTL(regionID uint64) {
// scheduleController is used to manage a scheduler to schedulers.
type scheduleController struct {
schedulers.Scheduler
cluster sche.ClusterInformer
cluster sche.ScheduleCluster
opController *operator.Controller
nextInterval time.Duration
ctx context.Context
Expand Down Expand Up @@ -1056,7 +1056,7 @@ func (s *scheduleController) AllowSchedule(diagnosable bool) bool {
}

func (s *scheduleController) isSchedulingHalted() bool {
return s.cluster.GetPersistOptions().IsSchedulingHalted()
return s.cluster.GetOpts().IsSchedulingHalted()
}

// isPaused returns if a scheduler is paused.
Expand Down Expand Up @@ -1127,7 +1127,7 @@ func (c *Coordinator) CheckTransferWitnessLeader(region *core.RegionInfo) {

// cacheCluster include cache info to improve the performance.
type cacheCluster struct {
sche.ClusterInformer
sche.ScheduleCluster
stores []*core.StoreInfo
}

Expand All @@ -1137,9 +1137,9 @@ func (c *cacheCluster) GetStores() []*core.StoreInfo {
}

// newCacheCluster constructor for cache
func newCacheCluster(c sche.ClusterInformer) *cacheCluster {
func newCacheCluster(c sche.ScheduleCluster) *cacheCluster {
return &cacheCluster{
ClusterInformer: c,
ScheduleCluster: c,
stores: c.GetStores(),
}
}
21 changes: 11 additions & 10 deletions pkg/schedule/core/cluster_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,28 @@ import (

// ClusterInformer provides the necessary information of a cluster.
type ClusterInformer interface {
RegionHealthCluster
statistics.RegionStatInformer
statistics.StoreStatInformer
buckets.BucketStatInformer
ScheduleCluster

GetBasicCluster() *core.BasicCluster
GetStoreConfig() sc.StoreConfig
GetAllocator() id.Allocator
GetRegionLabeler() *labeler.RegionLabeler
GetStorage() storage.Storage
UpdateRegionsLabelLevelStats(regions []*core.RegionInfo)
AddSuspectRegions(ids ...uint64)
GetPersistOptions() *config.PersistOptions
}

// RegionHealthCluster is an aggregate interface that wraps multiple interfaces
type RegionHealthCluster interface {
// ScheduleCluster is an aggregate interface that wraps multiple interfaces for schedulers use
type ScheduleCluster interface {
BasicCluster

statistics.StoreStatInformer
statistics.RegionStatInformer
buckets.BucketStatInformer

GetOpts() sc.Config
GetRuleManager() *placement.RuleManager
GetRegionLabeler() *labeler.RegionLabeler
GetBasicCluster() *core.BasicCluster
GetStoreConfig() sc.StoreConfig
GetAllocator() id.Allocator
}

// BasicCluster is an aggregate interface that wraps multiple interfaces
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/filter/healthy.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,17 @@ func hasDownPeers(region *core.RegionInfo) bool {
// IsRegionReplicated checks if a region is fully replicated. When placement
// rules is enabled, its peers should fit corresponding rules. When placement
// rules is disabled, it should have enough replicas and no any learner peer.
func IsRegionReplicated(cluster sche.RegionHealthCluster, region *core.RegionInfo) bool {
func IsRegionReplicated(cluster sche.ScheduleCluster, region *core.RegionInfo) bool {
if cluster.GetOpts().IsPlacementRulesEnabled() {
return isRegionPlacementRuleSatisfied(cluster, region)
}
return isRegionReplicasSatisfied(cluster, region)
}

func isRegionPlacementRuleSatisfied(cluster sche.RegionHealthCluster, region *core.RegionInfo) bool {
func isRegionPlacementRuleSatisfied(cluster sche.ScheduleCluster, region *core.RegionInfo) bool {
return cluster.GetRuleManager().FitRegion(cluster, region).IsSatisfied()
}

func isRegionReplicasSatisfied(cluster sche.RegionHealthCluster, region *core.RegionInfo) bool {
func isRegionReplicasSatisfied(cluster sche.ScheduleCluster, region *core.RegionInfo) bool {
return len(region.GetLearners()) == 0 && len(region.GetPeers()) == cluster.GetOpts().GetMaxReplicas()
}
10 changes: 5 additions & 5 deletions pkg/schedule/filter/region_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ func (f *regionDownFilter) Select(region *core.RegionInfo) *plan.Status {

// RegionReplicatedFilter filters all unreplicated regions.
type RegionReplicatedFilter struct {
cluster sche.RegionHealthCluster
cluster sche.ScheduleCluster
fit *placement.RegionFit
}

// NewRegionReplicatedFilter creates a RegionFilter that filters all unreplicated regions.
func NewRegionReplicatedFilter(cluster sche.RegionHealthCluster) RegionFilter {
func NewRegionReplicatedFilter(cluster sche.ScheduleCluster) RegionFilter {
return &RegionReplicatedFilter{cluster: cluster}
}

Expand All @@ -132,11 +132,11 @@ func (f *RegionReplicatedFilter) Select(region *core.RegionInfo) *plan.Status {
}

type regionEmptyFilter struct {
cluster sche.RegionHealthCluster
cluster sche.ScheduleCluster
}

// NewRegionEmptyFilter returns creates a RegionFilter that filters all empty regions.
func NewRegionEmptyFilter(cluster sche.RegionHealthCluster) RegionFilter {
func NewRegionEmptyFilter(cluster sche.ScheduleCluster) RegionFilter {
return &regionEmptyFilter{cluster: cluster}
}

Expand All @@ -148,7 +148,7 @@ func (f *regionEmptyFilter) Select(region *core.RegionInfo) *plan.Status {
}

// isEmptyRegionAllowBalance returns true if the region is not empty or the number of regions is too small.
func isEmptyRegionAllowBalance(cluster sche.RegionHealthCluster, region *core.RegionInfo) bool {
func isEmptyRegionAllowBalance(cluster sche.ScheduleCluster, region *core.RegionInfo) bool {
return region.GetApproximateSize() > core.EmptyRegionApproximateSize || cluster.GetTotalRegionCount() < core.InitClusterRegionThreshold
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/operator/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
// according to various constraints.
type Builder struct {
// basic info
sche.ClusterInformer
sche.ScheduleCluster
desc string
regionID uint64
regionEpoch *metapb.RegionEpoch
Expand Down Expand Up @@ -92,10 +92,10 @@ func SkipPlacementRulesCheck(b *Builder) {
}

// NewBuilder creates a Builder.
func NewBuilder(desc string, ci sche.ClusterInformer, region *core.RegionInfo, opts ...BuilderOption) *Builder {
func NewBuilder(desc string, ci sche.ScheduleCluster, region *core.RegionInfo, opts ...BuilderOption) *Builder {
b := &Builder{
desc: desc,
ClusterInformer: ci,
ScheduleCluster: ci,
regionID: region.GetID(),
regionEpoch: region.GetRegionEpoch(),
approximateSize: region.GetApproximateSize(),
Expand Down
32 changes: 16 additions & 16 deletions pkg/schedule/operator/create_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,51 +31,51 @@ import (
)

// CreateAddPeerOperator creates an operator that adds a new peer.
func CreateAddPeerOperator(desc string, ci sche.ClusterInformer, region *core.RegionInfo, peer *metapb.Peer, kind OpKind) (*Operator, error) {
func CreateAddPeerOperator(desc string, ci sche.ScheduleCluster, region *core.RegionInfo, peer *metapb.Peer, kind OpKind) (*Operator, error) {
return NewBuilder(desc, ci, region).
AddPeer(peer).
Build(kind)
}

// CreateDemoteVoterOperator creates an operator that demotes a voter
func CreateDemoteVoterOperator(desc string, ci sche.ClusterInformer, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) {
func CreateDemoteVoterOperator(desc string, ci sche.ScheduleCluster, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) {
return NewBuilder(desc, ci, region).
DemoteVoter(peer.GetStoreId()).
Build(0)
}

// CreatePromoteLearnerOperator creates an operator that promotes a learner.
func CreatePromoteLearnerOperator(desc string, ci sche.ClusterInformer, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) {
func CreatePromoteLearnerOperator(desc string, ci sche.ScheduleCluster, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) {
return NewBuilder(desc, ci, region).
PromoteLearner(peer.GetStoreId()).
Build(0)
}

// CreateRemovePeerOperator creates an operator that removes a peer from region.
func CreateRemovePeerOperator(desc string, ci sche.ClusterInformer, kind OpKind, region *core.RegionInfo, storeID uint64) (*Operator, error) {
func CreateRemovePeerOperator(desc string, ci sche.ScheduleCluster, kind OpKind, region *core.RegionInfo, storeID uint64) (*Operator, error) {
return NewBuilder(desc, ci, region).
RemovePeer(storeID).
Build(kind)
}

// CreateTransferLeaderOperator creates an operator that transfers the leader from a source store to a target store.
func CreateTransferLeaderOperator(desc string, ci sche.ClusterInformer, region *core.RegionInfo, sourceStoreID uint64, targetStoreID uint64, targetStoreIDs []uint64, kind OpKind) (*Operator, error) {
func CreateTransferLeaderOperator(desc string, ci sche.ScheduleCluster, region *core.RegionInfo, sourceStoreID uint64, targetStoreID uint64, targetStoreIDs []uint64, kind OpKind) (*Operator, error) {
return NewBuilder(desc, ci, region, SkipOriginJointStateCheck).
SetLeader(targetStoreID).
SetLeaders(targetStoreIDs).
Build(kind)
}

// CreateForceTransferLeaderOperator creates an operator that transfers the leader from a source store to a target store forcible.
func CreateForceTransferLeaderOperator(desc string, ci sche.ClusterInformer, region *core.RegionInfo, sourceStoreID uint64, targetStoreID uint64, kind OpKind) (*Operator, error) {
func CreateForceTransferLeaderOperator(desc string, ci sche.ScheduleCluster, region *core.RegionInfo, sourceStoreID uint64, targetStoreID uint64, kind OpKind) (*Operator, error) {
return NewBuilder(desc, ci, region, SkipOriginJointStateCheck, SkipPlacementRulesCheck).
SetLeader(targetStoreID).
EnableForceTargetLeader().
Build(kind)
}

// CreateMoveRegionOperator creates an operator that moves a region to specified stores.
func CreateMoveRegionOperator(desc string, ci sche.ClusterInformer, region *core.RegionInfo, kind OpKind, roles map[uint64]placement.PeerRoleType) (*Operator, error) {
func CreateMoveRegionOperator(desc string, ci sche.ScheduleCluster, region *core.RegionInfo, kind OpKind, roles map[uint64]placement.PeerRoleType) (*Operator, error) {
// construct the peers from roles
oldPeers := region.GetPeers()
peers := make(map[uint64]*metapb.Peer)
Expand All @@ -97,23 +97,23 @@ func CreateMoveRegionOperator(desc string, ci sche.ClusterInformer, region *core
}

// CreateMovePeerOperator creates an operator that replaces an old peer with a new peer.
func CreateMovePeerOperator(desc string, ci sche.ClusterInformer, region *core.RegionInfo, kind OpKind, oldStore uint64, peer *metapb.Peer) (*Operator, error) {
func CreateMovePeerOperator(desc string, ci sche.ScheduleCluster, region *core.RegionInfo, kind OpKind, oldStore uint64, peer *metapb.Peer) (*Operator, error) {
return NewBuilder(desc, ci, region).
RemovePeer(oldStore).
AddPeer(peer).
Build(kind)
}

// CreateMoveWitnessOperator creates an operator that replaces an old witness with a new witness.
func CreateMoveWitnessOperator(desc string, ci sche.ClusterInformer, region *core.RegionInfo, sourceStoreID uint64, targetStoreID uint64) (*Operator, error) {
func CreateMoveWitnessOperator(desc string, ci sche.ScheduleCluster, region *core.RegionInfo, sourceStoreID uint64, targetStoreID uint64) (*Operator, error) {
return NewBuilder(desc, ci, region).
BecomeNonWitness(sourceStoreID).
BecomeWitness(targetStoreID).
Build(OpWitness)
}

// CreateReplaceLeaderPeerOperator creates an operator that replaces an old peer with a new peer, and move leader from old store firstly.
func CreateReplaceLeaderPeerOperator(desc string, ci sche.ClusterInformer, region *core.RegionInfo, kind OpKind, oldStore uint64, peer *metapb.Peer, leader *metapb.Peer) (*Operator, error) {
func CreateReplaceLeaderPeerOperator(desc string, ci sche.ScheduleCluster, region *core.RegionInfo, kind OpKind, oldStore uint64, peer *metapb.Peer, leader *metapb.Peer) (*Operator, error) {
return NewBuilder(desc, ci, region).
RemovePeer(oldStore).
AddPeer(peer).
Expand All @@ -122,7 +122,7 @@ func CreateReplaceLeaderPeerOperator(desc string, ci sche.ClusterInformer, regio
}

// CreateMoveLeaderOperator creates an operator that replaces an old leader with a new leader.
func CreateMoveLeaderOperator(desc string, ci sche.ClusterInformer, region *core.RegionInfo, kind OpKind, oldStore uint64, peer *metapb.Peer) (*Operator, error) {
func CreateMoveLeaderOperator(desc string, ci sche.ScheduleCluster, region *core.RegionInfo, kind OpKind, oldStore uint64, peer *metapb.Peer) (*Operator, error) {
return NewBuilder(desc, ci, region).
RemovePeer(oldStore).
AddPeer(peer).
Expand Down Expand Up @@ -157,7 +157,7 @@ func CreateSplitRegionOperator(desc string, region *core.RegionInfo, kind OpKind
}

// CreateMergeRegionOperator creates an operator that merge two region into one.
func CreateMergeRegionOperator(desc string, ci sche.ClusterInformer, source *core.RegionInfo, target *core.RegionInfo, kind OpKind) ([]*Operator, error) {
func CreateMergeRegionOperator(desc string, ci sche.ScheduleCluster, source *core.RegionInfo, target *core.RegionInfo, kind OpKind) ([]*Operator, error) {
if core.IsInJointState(source.GetPeers()...) || core.IsInJointState(target.GetPeers()...) {
return nil, errors.Errorf("cannot merge regions which are in joint state")
}
Expand Down Expand Up @@ -215,7 +215,7 @@ func isRegionMatch(a, b *core.RegionInfo) bool {
}

// CreateScatterRegionOperator creates an operator that scatters the specified region.
func CreateScatterRegionOperator(desc string, ci sche.ClusterInformer, origin *core.RegionInfo, targetPeers map[uint64]*metapb.Peer, targetLeader uint64) (*Operator, error) {
func CreateScatterRegionOperator(desc string, ci sche.ScheduleCluster, origin *core.RegionInfo, targetPeers map[uint64]*metapb.Peer, targetLeader uint64) (*Operator, error) {
// randomly pick a leader.
var ids []uint64
for id, peer := range targetPeers {
Expand Down Expand Up @@ -243,7 +243,7 @@ func CreateScatterRegionOperator(desc string, ci sche.ClusterInformer, origin *c
const OpDescLeaveJointState = "leave-joint-state"

// CreateLeaveJointStateOperator creates an operator that let region leave joint state.
func CreateLeaveJointStateOperator(desc string, ci sche.ClusterInformer, origin *core.RegionInfo) (*Operator, error) {
func CreateLeaveJointStateOperator(desc string, ci sche.ScheduleCluster, origin *core.RegionInfo) (*Operator, error) {
b := NewBuilder(desc, ci, origin, SkipOriginJointStateCheck, SkipPlacementRulesCheck)

if b.err == nil && !core.IsInJointState(origin.GetPeers()...) {
Expand Down Expand Up @@ -303,14 +303,14 @@ func CreateLeaveJointStateOperator(desc string, ci sche.ClusterInformer, origin
}

// CreateWitnessPeerOperator creates an operator that set a follower or learner peer with witness
func CreateWitnessPeerOperator(desc string, ci sche.ClusterInformer, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) {
func CreateWitnessPeerOperator(desc string, ci sche.ScheduleCluster, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) {
return NewBuilder(desc, ci, region).
BecomeWitness(peer.GetStoreId()).
Build(OpWitness)
}

// CreateNonWitnessPeerOperator creates an operator that set a peer with non-witness
func CreateNonWitnessPeerOperator(desc string, ci sche.ClusterInformer, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) {
func CreateNonWitnessPeerOperator(desc string, ci sche.ScheduleCluster, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) {
return NewBuilder(desc, ci, region).
BecomeNonWitness(peer.GetStoreId()).
Build(OpWitness)
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (l *balanceLeaderScheduler) EncodeConfig() ([]byte, error) {
return EncodeConfig(l.conf)
}

func (l *balanceLeaderScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool {
func (l *balanceLeaderScheduler) IsScheduleAllowed(cluster sche.ScheduleCluster) bool {
allowed := l.OpController.OperatorCount(operator.OpLeader) < cluster.GetOpts().GetLeaderScheduleLimit()
if !allowed {
operator.OperatorLimitCounter.WithLabelValues(l.GetType(), operator.OpLeader.String()).Inc()
Expand Down Expand Up @@ -324,7 +324,7 @@ func (cs *candidateStores) resortStoreWithPos(pos int) {
}
}

func (l *balanceLeaderScheduler) Schedule(cluster sche.ClusterInformer, dryRun bool) ([]*operator.Operator, []plan.Plan) {
func (l *balanceLeaderScheduler) Schedule(cluster sche.ScheduleCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) {
l.conf.mu.RLock()
defer l.conf.mu.RUnlock()
basePlan := NewBalanceSchedulerPlan()
Expand Down Expand Up @@ -419,7 +419,7 @@ func makeInfluence(op *operator.Operator, plan *solver, usedRegions map[uint64]s
storesIDs := candidate.binarySearchStores(plan.source, plan.target)
candidateUpdateStores[id] = storesIDs
}
operator.AddOpInfluence(op, plan.opInfluence, plan.ClusterInformer.GetBasicCluster())
operator.AddOpInfluence(op, plan.opInfluence, plan.ScheduleCluster.GetBasicCluster())
for id, candidate := range candidates {
for _, pos := range candidateUpdateStores[id] {
candidate.resortStoreWithPos(pos)
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,15 @@ func (s *balanceRegionScheduler) EncodeConfig() ([]byte, error) {
return EncodeConfig(s.conf)
}

func (s *balanceRegionScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool {
func (s *balanceRegionScheduler) IsScheduleAllowed(cluster sche.ScheduleCluster) bool {
allowed := s.OpController.OperatorCount(operator.OpRegion) < cluster.GetOpts().GetRegionScheduleLimit()
if !allowed {
operator.OperatorLimitCounter.WithLabelValues(s.GetType(), operator.OpRegion.String()).Inc()
}
return allowed
}

func (s *balanceRegionScheduler) Schedule(cluster sche.ClusterInformer, dryRun bool) ([]*operator.Operator, []plan.Plan) {
func (s *balanceRegionScheduler) Schedule(cluster sche.ScheduleCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) {
basePlan := NewBalanceSchedulerPlan()
var collector *plan.Collector
if dryRun {
Expand Down
Loading

0 comments on commit 181e613

Please sign in to comment.