Skip to content

Commit

Permalink
cherry pick #2946 to release-4.0 (#3100) (#3100)
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: Ryan Leung <[email protected]>
Co-authored-by: ShuNing <[email protected]>
Co-authored-by: Ti Prow Robot <[email protected]>
  • Loading branch information
4 people authored Jan 5, 2021
1 parent 60a24ab commit e5be7fd
Show file tree
Hide file tree
Showing 15 changed files with 79 additions and 36 deletions.
2 changes: 1 addition & 1 deletion plugin/scheduler_example/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ type evictLeaderScheduler struct {
// out of a store.
func newEvictLeaderScheduler(opController *schedule.OperatorController, conf *evictLeaderSchedulerConfig) schedule.Scheduler {
filters := []filter.Filter{
filter.StoreStateFilter{ActionScope: EvictLeaderName, TransferLeader: true},
&filter.StoreStateFilter{ActionScope: EvictLeaderName, TransferLeader: true},
}

base := schedulers.NewBaseScheduler(opController)
Expand Down
6 changes: 3 additions & 3 deletions server/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (c *RuleChecker) allowLeader(fit *placement.RegionFit, peer *metapb.Peer) b
if s == nil {
return false
}
stateFilter := filter.StoreStateFilter{ActionScope: "rule-checker", TransferLeader: true}
stateFilter := &filter.StoreStateFilter{ActionScope: "rule-checker", TransferLeader: true}
if !stateFilter.Target(c.cluster, s) {
return false
}
Expand All @@ -184,7 +184,7 @@ func (c *RuleChecker) fixBetterLocation(region *core.RegionInfo, fit *placement.
return nil, nil
}
stores := getRuleFitStores(c.cluster, rf)
s := selector.NewReplicaSelector(stores, rf.Rule.LocationLabels, filter.StoreStateFilter{ActionScope: "rule-checker", MoveRegion: true})
s := selector.NewReplicaSelector(stores, rf.Rule.LocationLabels, &filter.StoreStateFilter{ActionScope: "rule-checker", MoveRegion: true})
oldPeerStore := s.SelectSource(c.cluster, stores)
if oldPeerStore == nil {
return nil, nil
Expand Down Expand Up @@ -257,7 +257,7 @@ func (c *RuleChecker) isOfflinePeer(region *core.RegionInfo, peer *metapb.Peer)
// SelectStoreToAddPeerByRule selects a store to add peer in order to fit the placement rule.
func SelectStoreToAddPeerByRule(scope string, cluster opt.Cluster, region *core.RegionInfo, rf *placement.RuleFit, filters ...filter.Filter) *core.StoreInfo {
fs := []filter.Filter{
filter.StoreStateFilter{ActionScope: scope, MoveRegion: true},
&filter.StoreStateFilter{ActionScope: scope, MoveRegion: true},
filter.NewStorageThresholdFilter(scope),
filter.NewLabelConstaintFilter(scope, rf.Rule.LabelConstraints),
filter.NewExcludedFilter(scope, nil, region.GetStoreIds()),
Expand Down
79 changes: 61 additions & 18 deletions server/schedule/filter/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,30 +373,44 @@ type StoreStateFilter struct {
TransferLeader bool
// Set true if the schedule involves any move region operation.
MoveRegion bool
// Reason is used to distinguish the reason of store state filter
Reason string
}

// Scope returns the scheduler or the checker which the filter acts on.
func (f StoreStateFilter) Scope() string {
func (f *StoreStateFilter) Scope() string {
return f.ActionScope
}

// Type returns the type of the Filter.
func (f StoreStateFilter) Type() string {
return "store-state-filter"
func (f *StoreStateFilter) Type() string {
return fmt.Sprintf("store-state-%s-filter", f.Reason)
}

// Source returns true when the store can be selected as the schedule
// source.
func (f StoreStateFilter) Source(opt opt.Options, store *core.StoreInfo) bool {
if store.IsTombstone() ||
store.DownTime() > opt.GetMaxStoreDownTime() {
func (f *StoreStateFilter) Source(opts opt.Options, store *core.StoreInfo) bool {
if store.IsTombstone() {
f.Reason = "tombstone"
return false
}
if f.TransferLeader && (store.IsDisconnected() || store.IsBlocked()) {

if store.DownTime() > opts.GetMaxStoreDownTime() {
f.Reason = "down"
return false
}
if f.TransferLeader {
if store.IsDisconnected() {
f.Reason = "disconnected"
return false
}
if store.IsBlocked() {
f.Reason = "blocked"
return false
}
}

if f.MoveRegion && !f.filterMoveRegion(opt, true, store) {
if f.MoveRegion && !f.filterMoveRegion(opts, true, store) {
return false
}
return true
Expand All @@ -405,22 +419,43 @@ func (f StoreStateFilter) Source(opt opt.Options, store *core.StoreInfo) bool {
// Target returns true when the store can be selected as the schedule
// target.
func (f StoreStateFilter) Target(opts opt.Options, store *core.StoreInfo) bool {
if store.IsTombstone() ||
store.IsOffline() ||
store.DownTime() > opts.GetMaxStoreDownTime() {
if store.IsTombstone() {
f.Reason = "tombstone"
return false
}
if f.TransferLeader &&
(store.IsDisconnected() ||
store.IsBlocked() ||
store.IsBusy() ||
opts.CheckLabelProperty(opt.RejectLeader, store.GetLabels())) {

if store.DownTime() > opts.GetMaxStoreDownTime() {
f.Reason = "down"
return false
}

if store.IsOffline() {
f.Reason = "offline"
return false
}
if f.TransferLeader {
if store.IsDisconnected() {
f.Reason = "disconnected"
return false
}
if store.IsBlocked() {
f.Reason = "blocked"
return false
}
if store.IsBusy() {
f.Reason = "busy"
return false
}
if opts.CheckLabelProperty(opt.RejectLeader, store.GetLabels()) {
f.Reason = "reject-leader"
return false
}
}

if f.MoveRegion {
// only target consider the pending peers because pending more means the disk is slower.
if opts.GetMaxPendingPeerCount() > 0 && store.GetPendingPeerCount() > int(opts.GetMaxPendingPeerCount()) {
f.Reason = "too-many-pending-peer"
return false
}

Expand All @@ -431,18 +466,26 @@ func (f StoreStateFilter) Target(opts opt.Options, store *core.StoreInfo) bool {
return true
}

func (f StoreStateFilter) filterMoveRegion(opt opt.Options, isSource bool, store *core.StoreInfo) bool {
func (f *StoreStateFilter) filterMoveRegion(opt opt.Options, isSource bool, store *core.StoreInfo) bool {
if store.IsBusy() {
f.Reason = "busy"
return false
}

if isSource && !store.IsAvailable(storelimit.RemovePeer) {
f.Reason = "exceed-remove-limit"
return false
}

if (isSource && !store.IsAvailable(storelimit.RemovePeer)) || (!isSource && !store.IsAvailable(storelimit.AddPeer)) {
if !isSource && !store.IsAvailable(storelimit.AddPeer) {
f.Reason = "exceed-add-limit"
return false
}

if uint64(store.GetSendingSnapCount()) > opt.GetMaxSnapshotCount() ||
uint64(store.GetReceivingSnapCount()) > opt.GetMaxSnapshotCount() ||
uint64(store.GetApplyingSnapCount()) > opt.GetMaxSnapshotCount() {
f.Reason = "too-many-snapshot"
return false
}
return true
Expand Down
2 changes: 1 addition & 1 deletion server/schedule/operator/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func (b *Builder) allowLeader(peer *metapb.Peer) bool {
if store == nil {
return false
}
stateFilter := filter.StoreStateFilter{ActionScope: "operator-builder", TransferLeader: true}
stateFilter := &filter.StoreStateFilter{ActionScope: "operator-builder", TransferLeader: true}
if !stateFilter.Target(b.cluster, store) {
return false
}
Expand Down
4 changes: 2 additions & 2 deletions server/schedule/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ type engineContext struct {
}

func newEngineContext(filters ...filter.Filter) engineContext {
filters = append(filters, filter.StoreStateFilter{ActionScope: regionScatterName})
filters = append(filters, &filter.StoreStateFilter{ActionScope: regionScatterName})
return engineContext{
filters: filters,
selectedPeer: newSelectedStores(true),
Expand Down Expand Up @@ -278,7 +278,7 @@ func (r *RegionScatterer) selectPeerToReplace(group string, stores map[uint64]*c
func (r *RegionScatterer) collectAvailableStores(group string, region *core.RegionInfo, context engineContext) map[uint64]*core.StoreInfo {
filters := []filter.Filter{
filter.NewExcludedFilter(r.name, nil, region.GetStoreIds()),
filter.StoreStateFilter{ActionScope: r.name, MoveRegion: true},
&filter.StoreStateFilter{ActionScope: r.name, MoveRegion: true},
}
filters = append(filters, context.filters...)
filters = append(filters, context.selectedPeer.newFilters(r.name, group)...)
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/adjacent_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (a *adjacentState) len() int {
// on each store.
func newBalanceAdjacentRegionScheduler(opController *schedule.OperatorController, conf *balanceAdjacentRegionConfig) schedule.Scheduler {
filters := []filter.Filter{
filter.StoreStateFilter{ActionScope: AdjacentRegionName, TransferLeader: true, MoveRegion: true},
&filter.StoreStateFilter{ActionScope: AdjacentRegionName, TransferLeader: true, MoveRegion: true},
filter.NewSpecialUseFilter(AdjacentRegionName),
}
base := NewBaseScheduler(opController)
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func newBalanceLeaderScheduler(opController *schedule.OperatorController, conf *
option(s)
}
s.filters = []filter.Filter{
filter.StoreStateFilter{ActionScope: s.GetName(), TransferLeader: true},
&filter.StoreStateFilter{ActionScope: s.GetName(), TransferLeader: true},
filter.NewSpecialUseFilter(s.GetName()),
}
return s
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func newBalanceRegionScheduler(opController *schedule.OperatorController, conf *
setOption(scheduler)
}
scheduler.filters = []filter.Filter{
filter.StoreStateFilter{ActionScope: scheduler.GetName(), MoveRegion: true},
&filter.StoreStateFilter{ActionScope: scheduler.GetName(), MoveRegion: true},
filter.NewSpecialUseFilter(scheduler.GetName()),
}
return scheduler
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ type evictLeaderScheduler struct {
// out of a store.
func newEvictLeaderScheduler(opController *schedule.OperatorController, conf *evictLeaderSchedulerConfig) schedule.Scheduler {
filters := []filter.Filter{
filter.StoreStateFilter{ActionScope: EvictLeaderName, TransferLeader: true},
&filter.StoreStateFilter{ActionScope: EvictLeaderName, TransferLeader: true},
}

base := NewBaseScheduler(opController)
Expand Down
4 changes: 2 additions & 2 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
}

filters = []filter.Filter{
filter.StoreStateFilter{ActionScope: bs.sche.GetName(), MoveRegion: true},
&filter.StoreStateFilter{ActionScope: bs.sche.GetName(), MoveRegion: true},
filter.NewExcludedFilter(bs.sche.GetName(), bs.cur.region.GetStoreIds(), bs.cur.region.GetStoreIds()),
filter.NewConnectedFilter(bs.sche.GetName()),
filter.NewSpecialUseFilter(bs.sche.GetName(), filter.SpecialUseHotRegion),
Expand All @@ -788,7 +788,7 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {

case transferLeader:
filters = []filter.Filter{
filter.StoreStateFilter{ActionScope: bs.sche.GetName(), TransferLeader: true},
&filter.StoreStateFilter{ActionScope: bs.sche.GetName(), TransferLeader: true},
filter.NewConnectedFilter(bs.sche.GetName()),
filter.NewSpecialUseFilter(bs.sche.GetName(), filter.SpecialUseHotRegion),
}
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type labelScheduler struct {
// the store with the specific label.
func newLabelScheduler(opController *schedule.OperatorController, conf *labelSchedulerConfig) schedule.Scheduler {
filters := []filter.Filter{
filter.StoreStateFilter{ActionScope: LabelName, TransferLeader: true},
&filter.StoreStateFilter{ActionScope: LabelName, TransferLeader: true},
}
kind := core.NewScheduleKind(core.LeaderKind, core.ByCount)
return &labelScheduler{
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/random_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type randomMergeScheduler struct {
// then merges them.
func newRandomMergeScheduler(opController *schedule.OperatorController, conf *randomMergeSchedulerConfig) schedule.Scheduler {
filters := []filter.Filter{
filter.StoreStateFilter{ActionScope: conf.Name, MoveRegion: true},
&filter.StoreStateFilter{ActionScope: conf.Name, MoveRegion: true},
}
base := NewBaseScheduler(opController)
return &randomMergeScheduler{
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/shuffle_hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (s *shuffleHotRegionScheduler) randomSchedule(cluster opt.Cluster, loadDeta
}

filters := []filter.Filter{
filter.StoreStateFilter{ActionScope: s.GetName(), MoveRegion: true},
&filter.StoreStateFilter{ActionScope: s.GetName(), MoveRegion: true},
filter.NewExcludedFilter(s.GetName(), srcRegion.GetStoreIds(), srcRegion.GetStoreIds()),
scoreGuard,
}
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/shuffle_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type shuffleLeaderScheduler struct {
// between stores.
func newShuffleLeaderScheduler(opController *schedule.OperatorController, conf *shuffleLeaderSchedulerConfig) schedule.Scheduler {
filters := []filter.Filter{
filter.StoreStateFilter{ActionScope: conf.Name, TransferLeader: true},
&filter.StoreStateFilter{ActionScope: conf.Name, TransferLeader: true},
filter.NewSpecialUseFilter(conf.Name),
}
base := NewBaseScheduler(opController)
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/shuffle_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type shuffleRegionScheduler struct {
// between stores.
func newShuffleRegionScheduler(opController *schedule.OperatorController, conf *shuffleRegionSchedulerConfig) schedule.Scheduler {
filters := []filter.Filter{
filter.StoreStateFilter{ActionScope: ShuffleRegionName, MoveRegion: true},
&filter.StoreStateFilter{ActionScope: ShuffleRegionName, MoveRegion: true},
filter.NewSpecialUseFilter(ShuffleRegionName),
}
base := NewBaseScheduler(opController)
Expand Down

0 comments on commit e5be7fd

Please sign in to comment.