From a82d63fea8f9be1176d514ca7178af57af6d3375 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 6 Jun 2023 16:39:41 +0800 Subject: [PATCH] *: remove unnecessary interfaces (#6551) ref tikv/pd#5839 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/core/basic_cluster.go | 2 +- pkg/core/region.go | 16 ++--- pkg/dashboard/keyvisual/input/core.go | 2 +- pkg/mock/mockcluster/mockcluster.go | 19 ----- pkg/replication/replication_mode.go | 2 +- pkg/schedule/checker/merge_checker_test.go | 2 +- pkg/schedule/coordinator.go | 46 ++++++------ pkg/schedule/core/cluster_informer.go | 17 ++--- pkg/schedule/diagnostic_manager.go | 32 +++++---- pkg/schedule/filter/region_filters.go | 2 +- pkg/schedule/operator/operator_controller.go | 40 ++++++----- .../operator/operator_controller_test.go | 71 ++++++++++--------- pkg/schedule/operator/step.go | 48 ++++++------- pkg/schedule/operator/step_test.go | 2 +- pkg/schedule/prepare_checker.go | 2 +- pkg/schedule/region_scatterer_test.go | 26 +++---- .../schedulers/balance_benchmark_test.go | 4 +- pkg/schedule/schedulers/balance_leader.go | 8 +-- pkg/schedule/schedulers/balance_region.go | 8 +-- pkg/schedule/schedulers/balance_test.go | 6 +- pkg/schedule/schedulers/balance_witness.go | 6 +- pkg/schedule/schedulers/evict_leader.go | 5 +- pkg/schedule/schedulers/evict_leader_test.go | 4 +- pkg/schedule/schedulers/grant_hot_region.go | 3 +- pkg/schedule/schedulers/grant_leader.go | 5 +- pkg/schedule/schedulers/hot_region_test.go | 4 +- pkg/schedule/schedulers/init.go | 36 +++++----- pkg/schedule/schedulers/scheduler.go | 6 +- pkg/schedule/schedulers/scheduler_test.go | 6 +- pkg/storage/storage_test.go | 8 +-- plugin/scheduler_example/evict_leader.go | 4 +- server/api/region.go | 2 +- server/api/stats.go | 2 +- server/cluster/cluster.go | 52 +++----------- server/cluster/cluster_test.go | 18 ++--- server/cluster/cluster_worker.go | 2 +- server/handler.go | 10 +-- tests/server/cluster/cluster_test.go | 4 +- .../simulator/cases/import_data.go | 4 +- tools/pd-simulator/simulator/raft.go | 2 +- 40 files changed, 251 insertions(+), 287 deletions(-) diff --git a/pkg/core/basic_cluster.go b/pkg/core/basic_cluster.go index b21ca53316b..1c8902ca8cb 100644 --- a/pkg/core/basic_cluster.go +++ b/pkg/core/basic_cluster.go @@ -248,7 +248,7 @@ func (bc *BasicCluster) GetStoresWriteRate() (storeIDs []uint64, bytesRates, key // RegionSetInformer provides access to a shared informer of regions. type RegionSetInformer interface { - GetRegionCount() int + GetTotalRegionCount() int RandFollowerRegions(storeID uint64, ranges []KeyRange) []*RegionInfo RandLeaderRegions(storeID uint64, ranges []KeyRange) []*RegionInfo RandLearnerRegions(storeID uint64, ranges []KeyRange) []*RegionInfo diff --git a/pkg/core/region.go b/pkg/core/region.go index c44cefa2b0d..450fab499e6 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -1287,8 +1287,8 @@ func (r *RegionsInfo) GetStoreStats(storeID uint64) (leader, region, witness, le r.learners[storeID].length(), r.pendingPeers[storeID].length(), r.leaders[storeID].TotalSize(), r.getStoreRegionSizeLocked(storeID) } -// GetRegionCount gets the total count of RegionInfo of regionMap -func (r *RegionsInfo) GetRegionCount() int { +// GetTotalRegionCount gets the total count of RegionInfo of regionMap +func (r *RegionsInfo) GetTotalRegionCount() int { r.t.RLock() defer r.t.RUnlock() return len(r.regions) @@ -1482,8 +1482,8 @@ func (r *RegionInfo) GetWriteLoads() []float64 { } } -// GetRangeCount returns the number of regions that overlap with the range [startKey, endKey). -func (r *RegionsInfo) GetRangeCount(startKey, endKey []byte) int { +// GetRegionCount returns the number of regions that overlap with the range [startKey, endKey). +func (r *RegionsInfo) GetRegionCount(startKey, endKey []byte) int { r.t.RLock() defer r.t.RUnlock() start := ®ionItem{&RegionInfo{meta: &metapb.Region{StartKey: startKey}}} @@ -1505,9 +1505,9 @@ func (r *RegionsInfo) GetRangeCount(startKey, endKey []byte) int { return endIndex - startIndex + 1 } -// ScanRange scans regions intersecting [start key, end key), returns at most +// ScanRegions scans regions intersecting [start key, end key), returns at most // `limit` regions. limit <= 0 means no limit. -func (r *RegionsInfo) ScanRange(startKey, endKey []byte, limit int) []*RegionInfo { +func (r *RegionsInfo) ScanRegions(startKey, endKey []byte, limit int) []*RegionInfo { r.t.RLock() defer r.t.RUnlock() var res []*RegionInfo @@ -1524,9 +1524,9 @@ func (r *RegionsInfo) ScanRange(startKey, endKey []byte, limit int) []*RegionInf return res } -// ScanRangeWithIterator scans from the first region containing or behind start key, +// ScanRegionWithIterator scans from the first region containing or behind start key, // until iterator returns false. -func (r *RegionsInfo) ScanRangeWithIterator(startKey []byte, iterator func(region *RegionInfo) bool) { +func (r *RegionsInfo) ScanRegionWithIterator(startKey []byte, iterator func(region *RegionInfo) bool) { r.t.RLock() defer r.t.RUnlock() r.tree.scanRange(startKey, iterator) diff --git a/pkg/dashboard/keyvisual/input/core.go b/pkg/dashboard/keyvisual/input/core.go index 5b273841534..3ca5f96cd81 100644 --- a/pkg/dashboard/keyvisual/input/core.go +++ b/pkg/dashboard/keyvisual/input/core.go @@ -94,7 +94,7 @@ func clusterScan(rc *core.BasicCluster) RegionsInfo { regions := make([]*core.RegionInfo, 0, limit) for { - rs := rc.ScanRange(startKey, endKey, limit) + rs := rc.ScanRegions(startKey, endKey, limit) length := len(rs) if length == 0 { break diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 1c23301f41b..4ce88be652f 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -112,20 +112,9 @@ func (mc *Cluster) GetPersistOptions() *config.PersistOptions { // UpdateRegionsLabelLevelStats updates the label level stats for the regions. func (mc *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {} -// IsSchedulerExisted checks if the scheduler with name is existed or not. -func (mc *Cluster) IsSchedulerExisted(name string) (bool, error) { return false, nil } - -// IsSchedulerDisabled checks if the scheduler with name is disabled or not. -func (mc *Cluster) IsSchedulerDisabled(name string) (bool, error) { return false, nil } - // CheckSchedulingAllowance checks if the cluster allows scheduling currently. func (mc *Cluster) CheckSchedulingAllowance() (bool, error) { return true, nil } -// ScanRegions scans region with start key, until number greater than limit. -func (mc *Cluster) ScanRegions(startKey, endKey []byte, limit int) []*core.RegionInfo { - return mc.ScanRange(startKey, endKey, limit) -} - // LoadRegion puts region info without leader func (mc *Cluster) LoadRegion(regionID uint64, peerStoreIDs ...uint64) { // regions load from etcd will have no leader @@ -817,11 +806,6 @@ func (mc *Cluster) PutStoreWithLabels(id uint64, labelPairs ...string) { mc.AddLabelsStore(id, 0, labels) } -// RemoveScheduler mocks method. -func (mc *Cluster) RemoveScheduler(name string) error { - return nil -} - // MockRegionInfo returns a mock region // If leaderStoreID is zero, the regions would have no leader func (mc *Cluster) MockRegionInfo(regionID uint64, leaderStoreID uint64, @@ -950,6 +934,3 @@ func (mc *Cluster) ObserveRegionsStats() { storeIDs, writeBytesRates, writeKeysRates := mc.BasicCluster.GetStoresWriteRate() mc.HotStat.ObserveRegionsStats(storeIDs, writeBytesRates, writeKeysRates) } - -// RecordOpStepWithTTL records OpStep with TTL -func (mc *Cluster) RecordOpStepWithTTL(regionID uint64) {} diff --git a/pkg/replication/replication_mode.go b/pkg/replication/replication_mode.go index c5074995b1f..9a00db8711c 100644 --- a/pkg/replication/replication_mode.go +++ b/pkg/replication/replication_mode.go @@ -609,7 +609,7 @@ func (m *ModeManager) updateProgress() { key = r.GetEndKey() } m.drSampleTotalRegion = len(sampleRegions) - m.drTotalRegion = m.cluster.GetRegionCount() + m.drTotalRegion = m.cluster.GetTotalRegionCount() return } } diff --git a/pkg/schedule/checker/merge_checker_test.go b/pkg/schedule/checker/merge_checker_test.go index 0da950cc77f..f6f2dd69868 100644 --- a/pkg/schedule/checker/merge_checker_test.go +++ b/pkg/schedule/checker/merge_checker_test.go @@ -463,7 +463,7 @@ func (suite *mergeCheckerTestSuite) TestStoreLimitWithMerge() { mc := NewMergeChecker(suite.ctx, tc, tc.GetOpts()) stream := hbstream.NewTestHeartbeatStreams(suite.ctx, tc.ID, tc, false /* no need to run */) - oc := operator.NewController(suite.ctx, tc, stream) + oc := operator.NewController(suite.ctx, tc.GetBasicCluster(), tc.GetOpts(), stream) regions[2] = regions[2].Clone( core.SetPeers([]*metapb.Peer{ diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 2171af8ee80..90d1cfa4a09 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -90,22 +90,23 @@ type Coordinator struct { // NewCoordinator creates a new Coordinator. func NewCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbStreams *hbstream.HeartbeatStreams) *Coordinator { ctx, cancel := context.WithCancel(ctx) - opController := operator.NewController(ctx, cluster, hbStreams) + opController := operator.NewController(ctx, cluster.GetBasicCluster(), cluster.GetPersistOptions(), hbStreams) schedulers := make(map[string]*scheduleController) - return &Coordinator{ - ctx: ctx, - cancel: cancel, - cluster: cluster, - prepareChecker: newPrepareChecker(), - checkers: checker.NewController(ctx, cluster, cluster.GetOpts(), cluster.GetRuleManager(), cluster.GetRegionLabeler(), opController), - regionScatterer: NewRegionScatterer(ctx, cluster, opController), - regionSplitter: NewRegionSplitter(cluster, NewSplitRegionsHandler(cluster, opController)), - schedulers: schedulers, - opController: opController, - hbStreams: hbStreams, - pluginInterface: NewPluginInterface(), - diagnosticManager: newDiagnosticManager(cluster), - } + c := &Coordinator{ + ctx: ctx, + cancel: cancel, + cluster: cluster, + prepareChecker: newPrepareChecker(), + checkers: checker.NewController(ctx, cluster, cluster.GetOpts(), cluster.GetRuleManager(), cluster.GetRegionLabeler(), opController), + regionScatterer: NewRegionScatterer(ctx, cluster, opController), + regionSplitter: NewRegionSplitter(cluster, NewSplitRegionsHandler(cluster, opController)), + schedulers: schedulers, + opController: opController, + hbStreams: hbStreams, + pluginInterface: NewPluginInterface(), + } + c.diagnosticManager = newDiagnosticManager(c, cluster.GetPersistOptions()) + return c } // GetWaitingRegions returns the regions in the waiting list. @@ -305,7 +306,7 @@ func (c *Coordinator) drivePushOperator() { log.Info("drive push operator has been stopped") return case <-ticker.C: - c.opController.PushOperators() + c.opController.PushOperators(c.RecordOpStepWithTTL) } } } @@ -382,7 +383,7 @@ func (c *Coordinator) Run() { log.Info("skip create scheduler with independent configuration", zap.String("scheduler-name", name), zap.String("scheduler-type", cfg.Type), zap.Strings("scheduler-args", cfg.Args)) continue } - s, err := schedulers.CreateScheduler(cfg.Type, c.opController, c.cluster.GetStorage(), schedulers.ConfigJSONDecoder([]byte(data))) + s, err := schedulers.CreateScheduler(cfg.Type, c.opController, c.cluster.GetStorage(), schedulers.ConfigJSONDecoder([]byte(data)), c.RemoveScheduler) if err != nil { log.Error("can not create scheduler with independent configuration", zap.String("scheduler-name", name), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) continue @@ -403,7 +404,7 @@ func (c *Coordinator) Run() { continue } - s, err := schedulers.CreateScheduler(schedulerCfg.Type, c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(schedulerCfg.Type, schedulerCfg.Args)) + s, err := schedulers.CreateScheduler(schedulerCfg.Type, c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(schedulerCfg.Type, schedulerCfg.Args), c.RemoveScheduler) if err != nil { log.Error("can not create scheduler", zap.String("scheduler-type", schedulerCfg.Type), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err)) continue @@ -452,7 +453,7 @@ func (c *Coordinator) LoadPlugin(pluginPath string, ch chan string) { } schedulerArgs := SchedulerArgs.(func() []string) // create and add user scheduler - s, err := schedulers.CreateScheduler(schedulerType(), c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(schedulerType(), schedulerArgs())) + s, err := schedulers.CreateScheduler(schedulerType(), c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(schedulerType(), schedulerArgs()), c.RemoveScheduler) if err != nil { log.Error("can not create scheduler", zap.String("scheduler-type", schedulerType()), errs.ZapError(err)) return @@ -724,7 +725,7 @@ func (c *Coordinator) removeOptScheduler(o *config.PersistOptions, name string) for i, schedulerCfg := range v.Schedulers { // To create a temporary scheduler is just used to get scheduler's name decoder := schedulers.ConfigSliceDecoder(schedulerCfg.Type, schedulerCfg.Args) - tmp, err := schedulers.CreateScheduler(schedulerCfg.Type, operator.NewController(c.ctx, nil, nil), storage.NewStorageWithMemoryBackend(), decoder) + tmp, err := schedulers.CreateScheduler(schedulerCfg.Type, c.opController, storage.NewStorageWithMemoryBackend(), decoder, c.RemoveScheduler) if err != nil { return err } @@ -945,6 +946,11 @@ func (c *Coordinator) GetDiagnosticResult(name string) (*DiagnosticResult, error return c.diagnosticManager.getDiagnosticResult(name) } +// RecordOpStepWithTTL records OpStep with TTL +func (c *Coordinator) RecordOpStepWithTTL(regionID uint64) { + c.GetRuleChecker().RecordRegionPromoteToNonWitness(regionID) +} + // scheduleController is used to manage a scheduler to schedulers. type scheduleController struct { schedulers.Scheduler diff --git a/pkg/schedule/core/cluster_informer.go b/pkg/schedule/core/cluster_informer.go index 1af8b28046a..bcf440a587d 100644 --- a/pkg/schedule/core/cluster_informer.go +++ b/pkg/schedule/core/cluster_informer.go @@ -38,22 +38,23 @@ type ClusterInformer interface { GetAllocator() id.Allocator GetRegionLabeler() *labeler.RegionLabeler GetStorage() storage.Storage - RemoveScheduler(name string) error - AddSuspectRegions(ids ...uint64) - RecordOpStepWithTTL(regionID uint64) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) - IsSchedulerExisted(name string) (bool, error) - IsSchedulerDisabled(name string) (bool, error) CheckSchedulingAllowance() (bool, error) + AddSuspectRegions(ids ...uint64) GetPersistOptions() *config.PersistOptions } // RegionHealthCluster is an aggregate interface that wraps multiple interfaces type RegionHealthCluster interface { - core.StoreSetInformer - core.StoreSetController - core.RegionSetInformer + BasicCluster GetOpts() sc.Config GetRuleManager() *placement.RuleManager } + +// BasicCluster is an aggregate interface that wraps multiple interfaces +type BasicCluster interface { + core.StoreSetInformer + core.StoreSetController + core.RegionSetInformer +} diff --git a/pkg/schedule/diagnostic_manager.go b/pkg/schedule/diagnostic_manager.go index 9560110fea0..f6610520858 100644 --- a/pkg/schedule/diagnostic_manager.go +++ b/pkg/schedule/diagnostic_manager.go @@ -22,7 +22,7 @@ import ( "github.com/tikv/pd/pkg/cache" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/movingaverage" - sche "github.com/tikv/pd/pkg/schedule/core" + sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" "github.com/tikv/pd/pkg/schedule/schedulers" @@ -56,28 +56,30 @@ var DiagnosableSummaryFunc = map[string]plan.Summary{ } type diagnosticManager struct { - cluster sche.ClusterInformer - recorders map[string]*diagnosticRecorder + coordinator *Coordinator + config sc.Config + recorders map[string]*diagnosticRecorder } -func newDiagnosticManager(cluster sche.ClusterInformer) *diagnosticManager { +func newDiagnosticManager(coordinator *Coordinator, config sc.Config) *diagnosticManager { recorders := make(map[string]*diagnosticRecorder) for name := range DiagnosableSummaryFunc { - recorders[name] = newDiagnosticRecorder(name, cluster) + recorders[name] = newDiagnosticRecorder(name, coordinator, config) } return &diagnosticManager{ - cluster: cluster, - recorders: recorders, + coordinator: coordinator, + config: config, + recorders: recorders, } } func (d *diagnosticManager) getDiagnosticResult(name string) (*DiagnosticResult, error) { - if !d.cluster.GetOpts().IsDiagnosticAllowed() { + if !d.config.IsDiagnosticAllowed() { return nil, errs.ErrDiagnosticDisabled } - isSchedulerExisted, _ := d.cluster.IsSchedulerExisted(name) - isDisabled, _ := d.cluster.IsSchedulerDisabled(name) + isSchedulerExisted, _ := d.coordinator.IsSchedulerExisted(name) + isDisabled, _ := d.coordinator.IsSchedulerDisabled(name) if !isSchedulerExisted || isDisabled { ts := uint64(time.Now().Unix()) res := &DiagnosticResult{Name: name, Timestamp: ts, Status: disabled} @@ -102,20 +104,22 @@ func (d *diagnosticManager) getRecorder(name string) *diagnosticRecorder { // diagnosticRecorder is used to manage diagnostic for one scheduler. type diagnosticRecorder struct { schedulerName string - cluster sche.ClusterInformer + coordinator *Coordinator + config sc.Config summaryFunc plan.Summary results *cache.FIFO } -func newDiagnosticRecorder(name string, cluster sche.ClusterInformer) *diagnosticRecorder { +func newDiagnosticRecorder(name string, coordinator *Coordinator, config sc.Config) *diagnosticRecorder { summaryFunc, ok := DiagnosableSummaryFunc[name] if !ok { log.Error("can't find summary function", zap.String("scheduler-name", name)) return nil } return &diagnosticRecorder{ - cluster: cluster, + coordinator: coordinator, schedulerName: name, + config: config, summaryFunc: summaryFunc, results: cache.NewFIFO(maxDiagnosticResultNum), } @@ -125,7 +129,7 @@ func (d *diagnosticRecorder) isAllowed() bool { if d == nil { return false } - return d.cluster.GetOpts().IsDiagnosticAllowed() + return d.config.IsDiagnosticAllowed() } func (d *diagnosticRecorder) getLastResult() *DiagnosticResult { diff --git a/pkg/schedule/filter/region_filters.go b/pkg/schedule/filter/region_filters.go index 88fd13dc78e..e2c35b4e249 100644 --- a/pkg/schedule/filter/region_filters.go +++ b/pkg/schedule/filter/region_filters.go @@ -149,7 +149,7 @@ func (f *regionEmptyFilter) Select(region *core.RegionInfo) *plan.Status { // isEmptyRegionAllowBalance returns true if the region is not empty or the number of regions is too small. func isEmptyRegionAllowBalance(cluster sche.RegionHealthCluster, region *core.RegionInfo) bool { - return region.GetApproximateSize() > core.EmptyRegionApproximateSize || cluster.GetRegionCount() < core.InitClusterRegionThreshold + return region.GetApproximateSize() > core.EmptyRegionApproximateSize || cluster.GetTotalRegionCount() < core.InitClusterRegionThreshold } type regionWitnessFilter struct { diff --git a/pkg/schedule/operator/operator_controller.go b/pkg/schedule/operator/operator_controller.go index ab7a39ac032..37ddb36737b 100644 --- a/pkg/schedule/operator/operator_controller.go +++ b/pkg/schedule/operator/operator_controller.go @@ -29,7 +29,7 @@ import ( "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/errs" - sche "github.com/tikv/pd/pkg/schedule/core" + "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/hbstream" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/pkg/versioninfo" @@ -56,7 +56,8 @@ var ( type Controller struct { syncutil.RWMutex ctx context.Context - cluster sche.ClusterInformer + config config.Config + cluster *core.BasicCluster operators map[uint64]*Operator hbStreams *hbstream.HeartbeatStreams fastOperators *cache.TTLUint64 @@ -68,10 +69,11 @@ type Controller struct { } // NewController creates a Controller. -func NewController(ctx context.Context, cluster sche.ClusterInformer, hbStreams *hbstream.HeartbeatStreams) *Controller { +func NewController(ctx context.Context, cluster *core.BasicCluster, config config.Config, hbStreams *hbstream.HeartbeatStreams) *Controller { return &Controller{ ctx: ctx, cluster: cluster, + config: config, operators: make(map[uint64]*Operator), hbStreams: hbStreams, fastOperators: cache.NewIDTTL(ctx, time.Minute, FastOperatorFinishTime), @@ -89,8 +91,8 @@ func (oc *Controller) Ctx() context.Context { return oc.ctx } -// GetCluster exports cluster to evict-scheduler for check store status. -func (oc *Controller) GetCluster() sche.ClusterInformer { +// GetCluster exports basic cluster to evict-scheduler for check store status. +func (oc *Controller) GetCluster() *core.BasicCluster { oc.RLock() defer oc.RUnlock() return oc.cluster @@ -102,7 +104,7 @@ func (oc *Controller) GetHBStreams() *hbstream.HeartbeatStreams { } // Dispatch is used to dispatch the operator of a region. -func (oc *Controller) Dispatch(region *core.RegionInfo, source string) { +func (oc *Controller) Dispatch(region *core.RegionInfo, source string, recordOpStepWithTTL func(regionID uint64)) { // Check existed if op := oc.GetOperator(region.GetID()); op != nil { failpoint.Inject("concurrentRemoveOperator", func() { @@ -121,7 +123,7 @@ func (oc *Controller) Dispatch(region *core.RegionInfo, source string) { oc.SendScheduleCommand(region, step, source) case SUCCESS: if op.ContainNonWitnessStep() { - oc.cluster.RecordOpStepWithTTL(op.RegionID()) + recordOpStepWithTTL(op.RegionID()) } if oc.RemoveOperator(op) { operatorWaitCounter.WithLabelValues(op.Desc(), "promote-success").Inc() @@ -158,7 +160,7 @@ func (oc *Controller) Dispatch(region *core.RegionInfo, source string) { } func (oc *Controller) checkStaleOperator(op *Operator, step OpStep, region *core.RegionInfo) bool { - err := step.CheckInProgress(oc.cluster, region) + err := step.CheckInProgress(oc.cluster, oc.config, region) if err != nil { if oc.RemoveOperator(op, zap.String("reason", err.Error())) { operatorCounter.WithLabelValues(op.Desc(), "stale").Inc() @@ -244,7 +246,7 @@ func (oc *Controller) pollNeedDispatchRegion() (r *core.RegionInfo, next bool) { } // PushOperators periodically pushes the unfinished operator to the executor(TiKV). -func (oc *Controller) PushOperators() { +func (oc *Controller) PushOperators(recordOpStepWithTTL func(regionID uint64)) { for { r, next := oc.pollNeedDispatchRegion() if !next { @@ -254,7 +256,7 @@ func (oc *Controller) PushOperators() { continue } - oc.Dispatch(r, DispatchFromNotifierQueue) + oc.Dispatch(r, DispatchFromNotifierQueue, recordOpStepWithTTL) } } @@ -416,8 +418,8 @@ func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) bool operatorWaitCounter.WithLabelValues(op.Desc(), "unexpected-status").Inc() return false } - if !isPromoting && oc.wopStatus.ops[op.Desc()] >= oc.cluster.GetOpts().GetSchedulerMaxWaitingOperator() { - log.Debug("exceed max return false", zap.Uint64("waiting", oc.wopStatus.ops[op.Desc()]), zap.String("desc", op.Desc()), zap.Uint64("max", oc.cluster.GetOpts().GetSchedulerMaxWaitingOperator())) + if !isPromoting && oc.wopStatus.ops[op.Desc()] >= oc.config.GetSchedulerMaxWaitingOperator() { + log.Debug("exceed max return false", zap.Uint64("waiting", oc.wopStatus.ops[op.Desc()]), zap.String("desc", op.Desc()), zap.Uint64("max", oc.config.GetSchedulerMaxWaitingOperator())) operatorWaitCounter.WithLabelValues(op.Desc(), "exceed-max").Inc() return false } @@ -661,7 +663,7 @@ func (oc *Controller) SendScheduleCommand(region *core.RegionInfo, step OpStep, zap.Stringer("step", step), zap.String("source", source)) - useConfChangeV2 := versioninfo.IsFeatureSupported(oc.cluster.GetOpts().GetClusterVersion(), versioninfo.ConfChangeV2) + useConfChangeV2 := versioninfo.IsFeatureSupported(oc.config.GetClusterVersion(), versioninfo.ConfChangeV2) cmd := step.GetCmd(region, useConfChangeV2) if cmd == nil { return @@ -718,7 +720,7 @@ func (oc *Controller) OperatorCount(kind OpKind) uint64 { } // GetOpInfluence gets OpInfluence. -func (oc *Controller) GetOpInfluence(cluster sche.ClusterInformer) OpInfluence { +func (oc *Controller) GetOpInfluence(cluster *core.BasicCluster) OpInfluence { influence := OpInfluence{ StoresInfluence: make(map[uint64]*StoreInfluence), } @@ -736,7 +738,7 @@ func (oc *Controller) GetOpInfluence(cluster sche.ClusterInformer) OpInfluence { } // GetFastOpInfluence get fast finish operator influence -func (oc *Controller) GetFastOpInfluence(cluster sche.ClusterInformer, influence OpInfluence) { +func (oc *Controller) GetFastOpInfluence(cluster *core.BasicCluster, influence OpInfluence) { for _, id := range oc.fastOperators.GetAllID() { value, ok := oc.fastOperators.Get(id) if !ok { @@ -751,13 +753,13 @@ func (oc *Controller) GetFastOpInfluence(cluster sche.ClusterInformer, influence } // AddOpInfluence add operator influence for cluster -func AddOpInfluence(op *Operator, influence OpInfluence, cluster sche.ClusterInformer) { +func AddOpInfluence(op *Operator, influence OpInfluence, cluster *core.BasicCluster) { region := cluster.GetRegion(op.RegionID()) op.TotalInfluence(influence, region) } // NewTotalOpInfluence creates a OpInfluence. -func NewTotalOpInfluence(operators []*Operator, cluster sche.ClusterInformer) OpInfluence { +func NewTotalOpInfluence(operators []*Operator, cluster *core.BasicCluster) OpInfluence { influence := *NewOpInfluence() for _, op := range operators { @@ -865,7 +867,7 @@ func (oc *Controller) exceedStoreLimitLocked(ops ...*Operator) bool { // getOrCreateStoreLimit is used to get or create the limit of a store. func (oc *Controller) getOrCreateStoreLimit(storeID uint64, limitType storelimit.Type) storelimit.StoreLimit { - ratePerSec := oc.cluster.GetOpts().GetStoreLimitByType(storeID, limitType) / StoreBalanceBaseTime + ratePerSec := oc.config.GetStoreLimitByType(storeID, limitType) / StoreBalanceBaseTime s := oc.cluster.GetStore(storeID) if s == nil { log.Error("invalid store ID", zap.Uint64("store-id", storeID)) @@ -873,7 +875,7 @@ func (oc *Controller) getOrCreateStoreLimit(storeID uint64, limitType storelimit } // The other limits do not need to update by config exclude StoreRateLimit. if limit, ok := s.GetStoreLimit().(*storelimit.StoreRateLimit); ok && limit.Rate(limitType) != ratePerSec { - oc.cluster.GetBasicCluster().ResetStoreLimit(storeID, limitType, ratePerSec) + oc.cluster.ResetStoreLimit(storeID, limitType, ratePerSec) } return s.GetStoreLimit() } diff --git a/pkg/schedule/operator/operator_controller_test.go b/pkg/schedule/operator/operator_controller_test.go index 655197f4a2d..eb2d69db944 100644 --- a/pkg/schedule/operator/operator_controller_test.go +++ b/pkg/schedule/operator/operator_controller_test.go @@ -58,7 +58,8 @@ func (suite *operatorControllerTestSuite) TearDownSuite() { func (suite *operatorControllerTestSuite) TestCacheInfluence() { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(suite.ctx, opt) - oc := NewController(suite.ctx, tc, nil) + bc := tc.GetBasicCluster() + oc := NewController(suite.ctx, bc, tc.GetOpts(), nil) tc.AddLeaderStore(2, 1) region := tc.AddLeaderRegion(1, 1, 2) @@ -69,20 +70,20 @@ func (suite *operatorControllerTestSuite) TestCacheInfluence() { oc.SetOperator(op) suite.True(op.Start()) influence := NewOpInfluence() - AddOpInfluence(op, *influence, tc) + AddOpInfluence(op, *influence, bc) suite.Equal(int64(-96), influence.GetStoreInfluence(2).RegionSize) // case: influence is same even if the region size changed. region = region.Clone(core.SetApproximateSize(100)) tc.PutRegion(region) influence1 := NewOpInfluence() - AddOpInfluence(op, *influence1, tc) + AddOpInfluence(op, *influence1, bc) suite.Equal(int64(-96), influence1.GetStoreInfluence(2).RegionSize) // case: influence is valid even if the region is removed. tc.RemoveRegion(region) influence2 := NewOpInfluence() - AddOpInfluence(op, *influence2, tc) + AddOpInfluence(op, *influence2, bc) suite.Equal(int64(-96), influence2.GetStoreInfluence(2).RegionSize) } @@ -90,7 +91,7 @@ func (suite *operatorControllerTestSuite) TestCacheInfluence() { func (suite *operatorControllerTestSuite) TestGetOpInfluence() { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(suite.ctx, opt) - oc := NewController(suite.ctx, tc, nil) + oc := NewController(suite.ctx, tc.GetBasicCluster(), tc.GetOpts(), nil) tc.AddLeaderStore(2, 1) tc.AddLeaderRegion(1, 1, 2) tc.AddLeaderRegion(2, 1, 2) @@ -121,7 +122,7 @@ func (suite *operatorControllerTestSuite) TestGetOpInfluence() { case <-ctx.Done(): return default: - oc.GetOpInfluence(tc) + oc.GetOpInfluence(tc.GetBasicCluster()) } } }(suite.ctx) @@ -133,7 +134,7 @@ func (suite *operatorControllerTestSuite) TestOperatorStatus() { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(suite.ctx, opt) stream := hbstream.NewTestHeartbeatStreams(suite.ctx, tc.ID, tc, false /* no need to run */) - oc := NewController(suite.ctx, tc, stream) + oc := NewController(suite.ctx, tc.GetBasicCluster(), tc.GetOpts(), stream) tc.AddLeaderStore(1, 2) tc.AddLeaderStore(2, 0) tc.AddLeaderRegion(1, 1, 2) @@ -155,12 +156,12 @@ func (suite *operatorControllerTestSuite) TestOperatorStatus() { op1.SetStatusReachTime(STARTED, time.Now().Add(-SlowStepWaitTime-FastStepWaitTime)) region2 = ApplyOperatorStep(region2, op2) tc.PutRegion(region2) - oc.Dispatch(region1, "test") - oc.Dispatch(region2, "test") + oc.Dispatch(region1, "test", nil) + oc.Dispatch(region2, "test", nil) suite.Equal(pdpb.OperatorStatus_TIMEOUT, oc.GetOperatorStatus(1).Status) suite.Equal(pdpb.OperatorStatus_RUNNING, oc.GetOperatorStatus(2).Status) ApplyOperator(tc, op2) - oc.Dispatch(region2, "test") + oc.Dispatch(region2, "test", nil) suite.Equal(pdpb.OperatorStatus_SUCCESS, oc.GetOperatorStatus(2).Status) } @@ -168,7 +169,7 @@ func (suite *operatorControllerTestSuite) TestFastFailOperator() { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(suite.ctx, opt) stream := hbstream.NewTestHeartbeatStreams(suite.ctx, tc.ID, tc, false /* no need to run */) - oc := NewController(suite.ctx, tc, stream) + oc := NewController(suite.ctx, tc.GetBasicCluster(), tc.GetOpts(), stream) tc.AddLeaderStore(1, 2) tc.AddLeaderStore(2, 0) tc.AddLeaderStore(3, 0) @@ -181,18 +182,18 @@ func (suite *operatorControllerTestSuite) TestFastFailOperator() { op := NewTestOperator(1, &metapb.RegionEpoch{}, OpRegion, steps...) suite.True(op.Start()) oc.SetOperator(op) - oc.Dispatch(region, "test") + oc.Dispatch(region, "test", nil) suite.Equal(pdpb.OperatorStatus_RUNNING, oc.GetOperatorStatus(1).Status) // change the leader region = region.Clone(core.WithLeader(region.GetPeer(2))) - oc.Dispatch(region, DispatchFromHeartBeat) + oc.Dispatch(region, DispatchFromHeartBeat, nil) suite.Equal(CANCELED, op.Status()) suite.Nil(oc.GetOperator(region.GetID())) // transfer leader to an illegal store. op = NewTestOperator(1, &metapb.RegionEpoch{}, OpRegion, TransferLeader{ToStore: 5}) oc.SetOperator(op) - oc.Dispatch(region, DispatchFromHeartBeat) + oc.Dispatch(region, DispatchFromHeartBeat, nil) suite.Equal(CANCELED, op.Status()) suite.Nil(oc.GetOperator(region.GetID())) } @@ -202,7 +203,7 @@ func (suite *operatorControllerTestSuite) TestFastFailWithUnhealthyStore() { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(suite.ctx, opt) stream := hbstream.NewTestHeartbeatStreams(suite.ctx, tc.ID, tc, false /* no need to run */) - oc := NewController(suite.ctx, tc, stream) + oc := NewController(suite.ctx, tc.GetBasicCluster(), tc.GetOpts(), stream) tc.AddLeaderStore(1, 2) tc.AddLeaderStore(2, 0) tc.AddLeaderStore(3, 0) @@ -222,7 +223,7 @@ func (suite *operatorControllerTestSuite) TestCheckAddUnexpectedStatus() { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(suite.ctx, opt) stream := hbstream.NewTestHeartbeatStreams(suite.ctx, tc.ID, tc, false /* no need to run */) - oc := NewController(suite.ctx, tc, stream) + oc := NewController(suite.ctx, tc.GetBasicCluster(), tc.GetOpts(), stream) tc.AddLeaderStore(1, 0) tc.AddLeaderStore(2, 1) tc.AddLeaderRegion(1, 2, 1) @@ -287,7 +288,7 @@ func (suite *operatorControllerTestSuite) TestConcurrentRemoveOperator() { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(suite.ctx, opt) stream := hbstream.NewTestHeartbeatStreams(suite.ctx, tc.ID, tc, false /* no need to run */) - oc := NewController(suite.ctx, tc, stream) + oc := NewController(suite.ctx, tc.GetBasicCluster(), tc.GetOpts(), stream) tc.AddLeaderStore(1, 0) tc.AddLeaderStore(2, 1) tc.AddLeaderRegion(1, 2, 1) @@ -308,7 +309,7 @@ func (suite *operatorControllerTestSuite) TestConcurrentRemoveOperator() { var wg sync.WaitGroup wg.Add(2) go func() { - oc.Dispatch(region1, "test") + oc.Dispatch(region1, "test", nil) wg.Done() }() go func() { @@ -328,7 +329,7 @@ func (suite *operatorControllerTestSuite) TestPollDispatchRegion() { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(suite.ctx, opt) stream := hbstream.NewTestHeartbeatStreams(suite.ctx, tc.ID, tc, false /* no need to run */) - oc := NewController(suite.ctx, tc, stream) + oc := NewController(suite.ctx, tc.GetBasicCluster(), tc.GetOpts(), stream) tc.AddLeaderStore(1, 2) tc.AddLeaderStore(2, 1) tc.AddLeaderRegion(1, 1, 2) @@ -401,7 +402,7 @@ func (suite *operatorControllerTestSuite) TestStoreLimit() { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(suite.ctx, opt) stream := hbstream.NewTestHeartbeatStreams(suite.ctx, tc.ID, tc, false /* no need to run */) - oc := NewController(suite.ctx, tc, stream) + oc := NewController(suite.ctx, tc.GetBasicCluster(), tc.GetOpts(), stream) tc.AddLeaderStore(1, 0) tc.UpdateLeaderCount(1, 1000) tc.AddLeaderStore(2, 0) @@ -468,7 +469,7 @@ func (suite *operatorControllerTestSuite) TestStoreLimit() { func (suite *operatorControllerTestSuite) TestDispatchOutdatedRegion() { cluster := mockcluster.NewCluster(suite.ctx, mockconfig.NewTestOptions()) stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster.ID, cluster, false /* no need to run */) - controller := NewController(suite.ctx, cluster, stream) + controller := NewController(suite.ctx, cluster.GetBasicCluster(), cluster.GetOpts(), stream) cluster.AddLeaderStore(1, 2) cluster.AddLeaderStore(2, 0) @@ -487,7 +488,7 @@ func (suite *operatorControllerTestSuite) TestDispatchOutdatedRegion() { region := cluster.MockRegionInfo(1, 2, []uint64{1, 2}, []uint64{}, &metapb.RegionEpoch{ConfVer: 0, Version: 0}) - controller.Dispatch(region, DispatchFromHeartBeat) + controller.Dispatch(region, DispatchFromHeartBeat, nil) suite.Equal(uint64(0), op.ConfVerChanged(region)) suite.Equal(2, stream.MsgLength()) @@ -495,7 +496,7 @@ func (suite *operatorControllerTestSuite) TestDispatchOutdatedRegion() { region = cluster.MockRegionInfo(1, 2, []uint64{2}, []uint64{}, &metapb.RegionEpoch{ConfVer: 0, Version: 0}) - controller.Dispatch(region, DispatchFromHeartBeat) + controller.Dispatch(region, DispatchFromHeartBeat, nil) suite.Equal(uint64(1), op.ConfVerChanged(region)) suite.Equal(2, stream.MsgLength()) @@ -509,7 +510,7 @@ func (suite *operatorControllerTestSuite) TestDispatchOutdatedRegion() { // report region with an abnormal confver region = cluster.MockRegionInfo(1, 1, []uint64{1, 2}, []uint64{}, &metapb.RegionEpoch{ConfVer: 1, Version: 0}) - controller.Dispatch(region, DispatchFromHeartBeat) + controller.Dispatch(region, DispatchFromHeartBeat, nil) suite.Equal(uint64(0), op.ConfVerChanged(region)) // no new step suite.Equal(3, stream.MsgLength()) @@ -518,7 +519,7 @@ func (suite *operatorControllerTestSuite) TestDispatchOutdatedRegion() { func (suite *operatorControllerTestSuite) TestCalcInfluence() { cluster := mockcluster.NewCluster(suite.ctx, mockconfig.NewTestOptions()) stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster.ID, cluster, false /* no need to run */) - controller := NewController(suite.ctx, cluster, stream) + controller := NewController(suite.ctx, cluster.GetBasicCluster(), cluster.GetOpts(), stream) epoch := &metapb.RegionEpoch{ConfVer: 0, Version: 0} region := cluster.MockRegionInfo(1, 1, []uint64{2}, []uint64{}, epoch) @@ -546,7 +547,7 @@ func (suite *operatorControllerTestSuite) TestCalcInfluence() { suite.Equal(si.StepCost[storelimit.RemovePeer], expect.StepCost[storelimit.RemovePeer]) } - influence := controller.GetOpInfluence(cluster) + influence := controller.GetOpInfluence(cluster.GetBasicCluster()) check(influence, 1, &StoreInfluence{ LeaderSize: -20, LeaderCount: -1, @@ -573,7 +574,7 @@ func (suite *operatorControllerTestSuite) TestCalcInfluence() { suite.True(steps[0].IsFinish(region2)) op.Check(region2) - influence = controller.GetOpInfluence(cluster) + influence = controller.GetOpInfluence(cluster.GetBasicCluster()) check(influence, 1, &StoreInfluence{ LeaderSize: -20, LeaderCount: -1, @@ -595,7 +596,7 @@ func (suite *operatorControllerTestSuite) TestCalcInfluence() { func (suite *operatorControllerTestSuite) TestDispatchUnfinishedStep() { cluster := mockcluster.NewCluster(suite.ctx, mockconfig.NewTestOptions()) stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster.ID, cluster, false /* no need to run */) - controller := NewController(suite.ctx, cluster, stream) + controller := NewController(suite.ctx, cluster.GetBasicCluster(), cluster.GetOpts(), stream) // Create a new region with epoch(0, 0) // the region has two peers with its peer id allocated incrementally. @@ -644,7 +645,7 @@ func (suite *operatorControllerTestSuite) TestDispatchUnfinishedStep() { suite.NotNil(region2.GetPendingPeers()) suite.False(steps[0].IsFinish(region2)) - controller.Dispatch(region2, DispatchFromHeartBeat) + controller.Dispatch(region2, DispatchFromHeartBeat, nil) // In this case, the conf version has been changed, but the // peer added is in pending state, the operator should not be @@ -663,7 +664,7 @@ func (suite *operatorControllerTestSuite) TestDispatchUnfinishedStep() { core.WithIncConfVer(), ) suite.True(steps[0].IsFinish(region3)) - controller.Dispatch(region3, DispatchFromHeartBeat) + controller.Dispatch(region3, DispatchFromHeartBeat, nil) suite.Equal(uint64(1), op.ConfVerChanged(region3)) suite.Equal(2, stream.MsgLength()) @@ -672,7 +673,7 @@ func (suite *operatorControllerTestSuite) TestDispatchUnfinishedStep() { core.WithIncConfVer(), ) suite.True(steps[1].IsFinish(region4)) - controller.Dispatch(region4, DispatchFromHeartBeat) + controller.Dispatch(region4, DispatchFromHeartBeat, nil) suite.Equal(uint64(2), op.ConfVerChanged(region4)) suite.Equal(3, stream.MsgLength()) @@ -681,7 +682,7 @@ func (suite *operatorControllerTestSuite) TestDispatchUnfinishedStep() { core.WithLeader(region4.GetStorePeer(3)), ) suite.True(steps[2].IsFinish(region5)) - controller.Dispatch(region5, DispatchFromHeartBeat) + controller.Dispatch(region5, DispatchFromHeartBeat, nil) suite.Equal(uint64(2), op.ConfVerChanged(region5)) suite.Equal(4, stream.MsgLength()) @@ -691,7 +692,7 @@ func (suite *operatorControllerTestSuite) TestDispatchUnfinishedStep() { core.WithIncConfVer(), ) suite.True(steps[3].IsFinish(region6)) - controller.Dispatch(region6, DispatchFromHeartBeat) + controller.Dispatch(region6, DispatchFromHeartBeat, nil) suite.Equal(uint64(3), op.ConfVerChanged(region6)) // The Operator has finished, so no message should be sent @@ -732,7 +733,7 @@ func (suite *operatorControllerTestSuite) TestAddWaitingOperator() { opts := mockconfig.NewTestOptions() cluster := mockcluster.NewCluster(suite.ctx, opts) stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster.ID, cluster, false /* no need to run */) - controller := NewController(suite.ctx, cluster, stream) + controller := NewController(suite.ctx, cluster.GetBasicCluster(), cluster.GetOpts(), stream) cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"}) cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"}) @@ -801,7 +802,7 @@ func (suite *operatorControllerTestSuite) TestInvalidStoreId() { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(suite.ctx, opt) stream := hbstream.NewTestHeartbeatStreams(suite.ctx, tc.ID, tc, false /* no need to run */) - oc := NewController(suite.ctx, tc, stream) + oc := NewController(suite.ctx, tc.GetBasicCluster(), tc.GetOpts(), stream) // If PD and store 3 are gone, PD will not have info of store 3 after recreating it. tc.AddRegionStore(1, 1) tc.AddRegionStore(2, 1) diff --git a/pkg/schedule/operator/step.go b/pkg/schedule/operator/step.go index e71616aa146..c23f409e097 100644 --- a/pkg/schedule/operator/step.go +++ b/pkg/schedule/operator/step.go @@ -27,7 +27,7 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/storelimit" - sche "github.com/tikv/pd/pkg/schedule/core" + "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/utils/typeutil" "go.uber.org/zap" ) @@ -54,7 +54,7 @@ type OpStep interface { fmt.Stringer ConfVerChanged(region *core.RegionInfo) uint64 IsFinish(region *core.RegionInfo) bool - CheckInProgress(ci sche.ClusterInformer, region *core.RegionInfo) error + CheckInProgress(ci *core.BasicCluster, config config.Config, region *core.RegionInfo) error Influence(opInfluence OpInfluence, region *core.RegionInfo) Timeout(regionSize int64) time.Duration GetCmd(region *core.RegionInfo, useConfChangeV2 bool) *pdpb.RegionHeartbeatResponse @@ -88,7 +88,7 @@ func (tl TransferLeader) IsFinish(region *core.RegionInfo) bool { } // CheckInProgress checks if the step is in the progress of advancing. -func (tl TransferLeader) CheckInProgress(ci sche.ClusterInformer, region *core.RegionInfo) error { +func (tl TransferLeader) CheckInProgress(ci *core.BasicCluster, config config.Config, region *core.RegionInfo) error { errList := make([]error, 0, len(tl.ToStores)+1) for _, storeID := range append(tl.ToStores, tl.ToStore) { peer := region.GetStorePeer(tl.ToStore) @@ -100,7 +100,7 @@ func (tl TransferLeader) CheckInProgress(ci sche.ClusterInformer, region *core.R errList = append(errList, errors.New("peer already is a learner")) continue } - if err := validateStore(ci, storeID); err != nil { + if err := validateStore(ci, config, storeID); err != nil { errList = append(errList, err) continue } @@ -193,8 +193,8 @@ func (ap AddPeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) { } // CheckInProgress checks if the step is in the progress of advancing. -func (ap AddPeer) CheckInProgress(ci sche.ClusterInformer, region *core.RegionInfo) error { - if err := validateStore(ci, ap.ToStore); err != nil { +func (ap AddPeer) CheckInProgress(ci *core.BasicCluster, config config.Config, region *core.RegionInfo) error { + if err := validateStore(ci, config, ap.ToStore); err != nil { return err } peer := region.GetStorePeer(ap.ToStore) @@ -247,8 +247,8 @@ func (bw BecomeWitness) IsFinish(region *core.RegionInfo) bool { } // CheckInProgress checks if the step is in the progress of advancing. -func (bw BecomeWitness) CheckInProgress(ci sche.ClusterInformer, region *core.RegionInfo) error { - if err := validateStore(ci, bw.StoreID); err != nil { +func (bw BecomeWitness) CheckInProgress(ci *core.BasicCluster, config config.Config, region *core.RegionInfo) error { + if err := validateStore(ci, config, bw.StoreID); err != nil { return err } peer := region.GetStorePeer(bw.StoreID) @@ -309,8 +309,8 @@ func (bn BecomeNonWitness) IsFinish(region *core.RegionInfo) bool { } // CheckInProgress checks if the step is in the progress of advancing. -func (bn BecomeNonWitness) CheckInProgress(ci sche.ClusterInformer, region *core.RegionInfo) error { - if err := validateStore(ci, bn.StoreID); err != nil { +func (bn BecomeNonWitness) CheckInProgress(ci *core.BasicCluster, config config.Config, region *core.RegionInfo) error { + if err := validateStore(ci, config, bn.StoreID); err != nil { return err } peer := region.GetStorePeer(bn.StoreID) @@ -395,14 +395,14 @@ func (bsw BatchSwitchWitness) IsFinish(region *core.RegionInfo) bool { } // CheckInProgress checks if the step is in the progress of advancing. -func (bsw BatchSwitchWitness) CheckInProgress(ci sche.ClusterInformer, region *core.RegionInfo) error { +func (bsw BatchSwitchWitness) CheckInProgress(ci *core.BasicCluster, config config.Config, region *core.RegionInfo) error { for _, w := range bsw.ToWitnesses { - if err := w.CheckInProgress(ci, region); err != nil { + if err := w.CheckInProgress(ci, config, region); err != nil { return err } } for _, nw := range bsw.ToNonWitnesses { - if err := nw.CheckInProgress(ci, region); err != nil { + if err := nw.CheckInProgress(ci, config, region); err != nil { return err } } @@ -478,8 +478,8 @@ func (al AddLearner) IsFinish(region *core.RegionInfo) bool { } // CheckInProgress checks if the step is in the progress of advancing. -func (al AddLearner) CheckInProgress(ci sche.ClusterInformer, region *core.RegionInfo) error { - if err := validateStore(ci, al.ToStore); err != nil { +func (al AddLearner) CheckInProgress(ci *core.BasicCluster, config config.Config, region *core.RegionInfo) error { + if err := validateStore(ci, config, al.ToStore); err != nil { return err } peer := region.GetStorePeer(al.ToStore) @@ -564,7 +564,7 @@ func (pl PromoteLearner) IsFinish(region *core.RegionInfo) bool { } // CheckInProgress checks if the step is in the progress of advancing. -func (pl PromoteLearner) CheckInProgress(_ sche.ClusterInformer, region *core.RegionInfo) error { +func (pl PromoteLearner) CheckInProgress(_ *core.BasicCluster, config config.Config, region *core.RegionInfo) error { peer := region.GetStorePeer(pl.ToStore) if peer.GetId() != pl.PeerID { return errors.New("peer does not exist") @@ -615,7 +615,7 @@ func (rp RemovePeer) IsFinish(region *core.RegionInfo) bool { } // CheckInProgress checks if the step is in the progress of advancing. -func (rp RemovePeer) CheckInProgress(_ sche.ClusterInformer, region *core.RegionInfo) error { +func (rp RemovePeer) CheckInProgress(_ *core.BasicCluster, config config.Config, region *core.RegionInfo) error { if rp.FromStore == region.GetLeader().GetStoreId() { return errors.New("cannot remove leader peer") } @@ -685,7 +685,7 @@ func (mr MergeRegion) IsFinish(region *core.RegionInfo) bool { } // CheckInProgress checks if the step is in the progress of advancing. -func (mr MergeRegion) CheckInProgress(_ sche.ClusterInformer, _ *core.RegionInfo) error { +func (mr MergeRegion) CheckInProgress(_ *core.BasicCluster, config config.Config, _ *core.RegionInfo) error { return nil } @@ -753,7 +753,7 @@ func (sr SplitRegion) Influence(opInfluence OpInfluence, region *core.RegionInfo } // CheckInProgress checks if the step is in the progress of advancing. -func (sr SplitRegion) CheckInProgress(_ sche.ClusterInformer, _ *core.RegionInfo) error { +func (sr SplitRegion) CheckInProgress(_ *core.BasicCluster, config config.Config, _ *core.RegionInfo) error { return nil } @@ -878,7 +878,7 @@ func (cpe ChangePeerV2Enter) IsFinish(region *core.RegionInfo) bool { } // CheckInProgress checks if the step is in the progress of advancing. -func (cpe ChangePeerV2Enter) CheckInProgress(_ sche.ClusterInformer, region *core.RegionInfo) error { +func (cpe ChangePeerV2Enter) CheckInProgress(_ *core.BasicCluster, config config.Config, region *core.RegionInfo) error { inJointState, notInJointState := false, false for _, pl := range cpe.PromoteLearners { peer := region.GetStorePeer(pl.ToStore) @@ -1007,7 +1007,7 @@ func (cpl ChangePeerV2Leave) IsFinish(region *core.RegionInfo) bool { } // CheckInProgress checks if the step is in the progress of advancing. -func (cpl ChangePeerV2Leave) CheckInProgress(_ sche.ClusterInformer, region *core.RegionInfo) error { +func (cpl ChangePeerV2Leave) CheckInProgress(_ *core.BasicCluster, config config.Config, region *core.RegionInfo) error { inJointState, notInJointState, demoteLeader := false, false, false leaderStoreID := region.GetLeader().GetStoreId() @@ -1085,12 +1085,12 @@ func (cpl ChangePeerV2Leave) GetCmd(region *core.RegionInfo, useConfChangeV2 boo } } -func validateStore(ci sche.ClusterInformer, id uint64) error { - store := ci.GetBasicCluster().GetStore(id) +func validateStore(ci *core.BasicCluster, config config.Config, id uint64) error { + store := ci.GetStore(id) if store == nil { return errors.New("target store does not exist") } - if store.DownTime() > ci.GetOpts().GetMaxStoreDownTime() { + if store.DownTime() > config.GetMaxStoreDownTime() { return errors.New("target store is down") } return nil diff --git a/pkg/schedule/operator/step_test.go b/pkg/schedule/operator/step_test.go index 3b797575149..41c18384da4 100644 --- a/pkg/schedule/operator/step_test.go +++ b/pkg/schedule/operator/step_test.go @@ -566,7 +566,7 @@ func (suite *operatorStepTestSuite) check(step OpStep, desc string, testCases [] region := core.NewRegionInfo(&metapb.Region{Id: 1, Peers: testCase.Peers}, testCase.Peers[0]) suite.Equal(testCase.ConfVerChanged, step.ConfVerChanged(region)) suite.Equal(testCase.IsFinish, step.IsFinish(region)) - err := step.CheckInProgress(suite.cluster, region) + err := step.CheckInProgress(suite.cluster.GetBasicCluster(), suite.cluster.GetOpts(), region) testCase.CheckInProgress(err) _ = step.GetCmd(region, true) diff --git a/pkg/schedule/prepare_checker.go b/pkg/schedule/prepare_checker.go index 6a8acbcf9f5..cd4b076b2d6 100644 --- a/pkg/schedule/prepare_checker.go +++ b/pkg/schedule/prepare_checker.go @@ -48,7 +48,7 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool { return true } // The number of active regions should be more than total region of all stores * collectFactor - if float64(c.GetRegionCount())*collectFactor > float64(checker.sum) { + if float64(c.GetTotalRegionCount())*collectFactor > float64(checker.sum) { return false } for _, store := range c.GetStores() { diff --git a/pkg/schedule/region_scatterer_test.go b/pkg/schedule/region_scatterer_test.go index ad0031f9cdf..ec2ab77fbe3 100644 --- a/pkg/schedule/region_scatterer_test.go +++ b/pkg/schedule/region_scatterer_test.go @@ -91,7 +91,7 @@ func scatter(re *require.Assertions, numStores, numRegions uint64, useRules bool opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := operator.NewController(ctx, tc, stream) + oc := operator.NewController(ctx, tc.GetBasicCluster(), tc.GetOpts(), stream) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) // Add ordinary stores. @@ -148,7 +148,7 @@ func scatterSpecial(re *require.Assertions, numOrdinaryStores, numSpecialStores, opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := operator.NewController(ctx, tc, stream) + oc := operator.NewController(ctx, tc.GetBasicCluster(), tc.GetOpts(), stream) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) // Add ordinary stores. @@ -226,7 +226,7 @@ func TestStoreLimit(t *testing.T) { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := operator.NewController(ctx, tc, stream) + oc := operator.NewController(ctx, tc.GetBasicCluster(), tc.GetOpts(), stream) // Add stores 1~6. for i := uint64(1); i <= 5; i++ { @@ -258,7 +258,7 @@ func TestScatterCheck(t *testing.T) { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := operator.NewController(ctx, tc, stream) + oc := operator.NewController(ctx, tc.GetBasicCluster(), tc.GetOpts(), stream) // Add 5 stores. for i := uint64(1); i <= 5; i++ { tc.AddRegionStore(i, 0) @@ -307,7 +307,7 @@ func TestSomeStoresFilteredScatterGroupInConcurrency(t *testing.T) { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := operator.NewController(ctx, tc, stream) + oc := operator.NewController(ctx, tc.GetBasicCluster(), tc.GetOpts(), stream) // Add 5 connected stores. for i := uint64(1); i <= 5; i++ { tc.AddRegionStore(i, 0) @@ -352,7 +352,7 @@ func TestScatterGroupInConcurrency(t *testing.T) { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := operator.NewController(ctx, tc, stream) + oc := operator.NewController(ctx, tc.GetBasicCluster(), tc.GetOpts(), stream) // Add 5 stores. for i := uint64(1); i <= 5; i++ { tc.AddRegionStore(i, 0) @@ -424,7 +424,7 @@ func TestScatterForManyRegion(t *testing.T) { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := operator.NewController(ctx, tc, stream) + oc := operator.NewController(ctx, tc.GetBasicCluster(), tc.GetOpts(), stream) // Add 60 stores. for i := uint64(1); i <= 60; i++ { tc.AddRegionStore(i, 0) @@ -452,7 +452,7 @@ func TestScattersGroup(t *testing.T) { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := operator.NewController(ctx, tc, stream) + oc := operator.NewController(ctx, tc.GetBasicCluster(), tc.GetOpts(), stream) // Add 5 stores. for i := uint64(1); i <= 5; i++ { tc.AddRegionStore(i, 0) @@ -541,7 +541,7 @@ func TestRegionFromDifferentGroups(t *testing.T) { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := operator.NewController(ctx, tc, stream) + oc := operator.NewController(ctx, tc.GetBasicCluster(), tc.GetOpts(), stream) // Add 6 stores. storeCount := 6 for i := uint64(1); i <= uint64(storeCount); i++ { @@ -577,7 +577,7 @@ func TestRegionHasLearner(t *testing.T) { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := operator.NewController(ctx, tc, stream) + oc := operator.NewController(ctx, tc.GetBasicCluster(), tc.GetOpts(), stream) // Add 8 stores. voterCount := uint64(6) storeCount := uint64(8) @@ -665,7 +665,7 @@ func TestSelectedStoresTooFewPeers(t *testing.T) { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := operator.NewController(ctx, tc, stream) + oc := operator.NewController(ctx, tc.GetBasicCluster(), tc.GetOpts(), stream) // Add 4 stores. for i := uint64(1); i <= 4; i++ { tc.AddRegionStore(i, 0) @@ -702,7 +702,7 @@ func TestSelectedStoresTooManyPeers(t *testing.T) { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := operator.NewController(ctx, tc, stream) + oc := operator.NewController(ctx, tc.GetBasicCluster(), tc.GetOpts(), stream) // Add 4 stores. for i := uint64(1); i <= 5; i++ { tc.AddRegionStore(i, 0) @@ -740,7 +740,7 @@ func TestBalanceRegion(t *testing.T) { opt.SetLocationLabels([]string{"host"}) tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := operator.NewController(ctx, tc, stream) + oc := operator.NewController(ctx, tc.GetBasicCluster(), tc.GetOpts(), stream) // Add 6 stores in 3 hosts. for i := uint64(2); i <= 7; i++ { tc.AddLabelsStore(i, 0, map[string]string{"host": strconv.FormatUint(i/2, 10)}) diff --git a/pkg/schedule/schedulers/balance_benchmark_test.go b/pkg/schedule/schedulers/balance_benchmark_test.go index b52b9b493a8..ace59e0caa9 100644 --- a/pkg/schedule/schedulers/balance_benchmark_test.go +++ b/pkg/schedule/schedulers/balance_benchmark_test.go @@ -46,7 +46,7 @@ func newBenchCluster(ruleEnable, labelEnable bool, tombstoneEnable bool) (contex ctx, cancel := context.WithCancel(context.Background()) opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) - oc := operator.NewController(ctx, tc, nil) + oc := operator.NewController(ctx, tc.GetBasicCluster(), tc.GetOpts(), nil) opt.GetScheduleConfig().TolerantSizeRatio = float64(storeCount) opt.SetPlacementRuleEnabled(ruleEnable) @@ -95,7 +95,7 @@ func newBenchBigCluster(storeNumInOneRack, regionNum int) (context.CancelFunc, * ctx, cancel := context.WithCancel(context.Background()) opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) - oc := operator.NewController(ctx, tc, nil) + oc := operator.NewController(ctx, tc.GetBasicCluster(), tc.GetOpts(), nil) opt.GetScheduleConfig().TolerantSizeRatio = float64(storeCount) opt.SetPlacementRuleEnabled(true) diff --git a/pkg/schedule/schedulers/balance_leader.go b/pkg/schedule/schedulers/balance_leader.go index 4f331b870b9..349df14b35d 100644 --- a/pkg/schedule/schedulers/balance_leader.go +++ b/pkg/schedule/schedulers/balance_leader.go @@ -161,7 +161,6 @@ type balanceLeaderScheduler struct { name string conf *balanceLeaderSchedulerConfig handler http.Handler - opController *operator.Controller filters []filter.Filter counter *prometheus.CounterVec filterCounter *filter.Counter @@ -177,7 +176,6 @@ func newBalanceLeaderScheduler(opController *operator.Controller, conf *balanceL name: BalanceLeaderName, conf: conf, handler: newBalanceLeaderHandler(conf), - opController: opController, counter: balanceLeaderCounter, filterCounter: filter.NewCounter(filter.BalanceLeader.String()), } @@ -227,7 +225,7 @@ func (l *balanceLeaderScheduler) EncodeConfig() ([]byte, error) { } func (l *balanceLeaderScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { - allowed := l.opController.OperatorCount(operator.OpLeader) < cluster.GetOpts().GetLeaderScheduleLimit() + allowed := l.OpController.OperatorCount(operator.OpLeader) < cluster.GetOpts().GetLeaderScheduleLimit() if !allowed { operator.OperatorLimitCounter.WithLabelValues(l.GetType(), operator.OpLeader.String()).Inc() } @@ -338,7 +336,7 @@ func (l *balanceLeaderScheduler) Schedule(cluster sche.ClusterInformer, dryRun b balanceLeaderScheduleCounter.Inc() leaderSchedulePolicy := cluster.GetOpts().GetLeaderSchedulePolicy() - opInfluence := l.opController.GetOpInfluence(cluster) + opInfluence := l.OpController.GetOpInfluence(cluster.GetBasicCluster()) kind := constant.NewScheduleKind(constant.LeaderKind, leaderSchedulePolicy) solver := newSolver(basePlan, kind, cluster, opInfluence) @@ -421,7 +419,7 @@ func makeInfluence(op *operator.Operator, plan *solver, usedRegions map[uint64]s storesIDs := candidate.binarySearchStores(plan.source, plan.target) candidateUpdateStores[id] = storesIDs } - operator.AddOpInfluence(op, plan.opInfluence, plan.ClusterInformer) + operator.AddOpInfluence(op, plan.opInfluence, plan.ClusterInformer.GetBasicCluster()) for id, candidate := range candidates { for _, pos := range candidateUpdateStores[id] { candidate.resortStoreWithPos(pos) diff --git a/pkg/schedule/schedulers/balance_region.go b/pkg/schedule/schedulers/balance_region.go index 43e05863729..04c8fda59f2 100644 --- a/pkg/schedule/schedulers/balance_region.go +++ b/pkg/schedule/schedulers/balance_region.go @@ -58,7 +58,6 @@ type balanceRegionScheduler struct { *BaseScheduler *retryQuota conf *balanceRegionSchedulerConfig - opController *operator.Controller filters []filter.Filter counter *prometheus.CounterVec filterCounter *filter.Counter @@ -72,7 +71,6 @@ func newBalanceRegionScheduler(opController *operator.Controller, conf *balanceR BaseScheduler: base, retryQuota: newRetryQuota(), conf: conf, - opController: opController, counter: balanceRegionCounter, filterCounter: filter.NewCounter(filter.BalanceRegion.String()), } @@ -116,7 +114,7 @@ func (s *balanceRegionScheduler) EncodeConfig() ([]byte, error) { } func (s *balanceRegionScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { - allowed := s.opController.OperatorCount(operator.OpRegion) < cluster.GetOpts().GetRegionScheduleLimit() + allowed := s.OpController.OperatorCount(operator.OpRegion) < cluster.GetOpts().GetRegionScheduleLimit() if !allowed { operator.OperatorLimitCounter.WithLabelValues(s.GetType(), operator.OpRegion.String()).Inc() } @@ -135,8 +133,8 @@ func (s *balanceRegionScheduler) Schedule(cluster sche.ClusterInformer, dryRun b snapshotFilter := filter.NewSnapshotSendFilter(stores, constant.Medium) faultTargets := filter.SelectUnavailableTargetStores(stores, s.filters, opts, collector, s.filterCounter) sourceStores := filter.SelectSourceStores(stores, s.filters, opts, collector, s.filterCounter) - opInfluence := s.opController.GetOpInfluence(cluster) - s.OpController.GetFastOpInfluence(cluster, opInfluence) + opInfluence := s.OpController.GetOpInfluence(cluster.GetBasicCluster()) + s.OpController.GetFastOpInfluence(cluster.GetBasicCluster(), opInfluence) kind := constant.NewScheduleKind(constant.RegionKind, constant.BySize) solver := newSolver(basePlan, kind, cluster, opInfluence) diff --git a/pkg/schedule/schedulers/balance_test.go b/pkg/schedule/schedulers/balance_test.go index 4d3888308b7..8fa0294a572 100644 --- a/pkg/schedule/schedulers/balance_test.go +++ b/pkg/schedule/schedulers/balance_test.go @@ -52,7 +52,7 @@ func TestInfluenceAmp(t *testing.T) { R := int64(96) kind := constant.NewScheduleKind(constant.RegionKind, constant.BySize) - influence := oc.GetOpInfluence(tc) + influence := oc.GetOpInfluence(tc.GetBasicCluster()) influence.GetStoreInfluence(1).RegionSize = R influence.GetStoreInfluence(2).RegionSize = -R tc.SetTolerantSizeRatio(1) @@ -150,7 +150,7 @@ func TestShouldBalance(t *testing.T) { tc.SetLeaderSchedulePolicy(testCase.kind.String()) kind := constant.NewScheduleKind(constant.LeaderKind, testCase.kind) basePlan := NewBalanceSchedulerPlan() - solver := newSolver(basePlan, kind, tc, oc.GetOpInfluence(tc)) + solver := newSolver(basePlan, kind, tc, oc.GetOpInfluence(tc.GetBasicCluster())) solver.source, solver.target, solver.region = tc.GetStore(1), tc.GetStore(2), tc.GetRegion(1) solver.sourceScore, solver.targetScore = solver.sourceStoreScore(""), solver.targetStoreScore("") re.Equal(testCase.expectedResult, solver.shouldBalance("")) @@ -164,7 +164,7 @@ func TestShouldBalance(t *testing.T) { tc.PutRegion(region) kind := constant.NewScheduleKind(constant.RegionKind, testCase.kind) basePlan := NewBalanceSchedulerPlan() - solver := newSolver(basePlan, kind, tc, oc.GetOpInfluence(tc)) + solver := newSolver(basePlan, kind, tc, oc.GetOpInfluence(tc.GetBasicCluster())) solver.source, solver.target, solver.region = tc.GetStore(1), tc.GetStore(2), tc.GetRegion(1) solver.sourceScore, solver.targetScore = solver.sourceStoreScore(""), solver.targetStoreScore("") re.Equal(testCase.expectedResult, solver.shouldBalance("")) diff --git a/pkg/schedule/schedulers/balance_witness.go b/pkg/schedule/schedulers/balance_witness.go index 6547523be2f..4a9c3433963 100644 --- a/pkg/schedule/schedulers/balance_witness.go +++ b/pkg/schedule/schedulers/balance_witness.go @@ -146,7 +146,6 @@ type balanceWitnessScheduler struct { name string conf *balanceWitnessSchedulerConfig handler http.Handler - opController *operator.Controller filters []filter.Filter counter *prometheus.CounterVec filterCounter *filter.Counter @@ -162,7 +161,6 @@ func newBalanceWitnessScheduler(opController *operator.Controller, conf *balance name: BalanceWitnessName, conf: conf, handler: newbalanceWitnessHandler(conf), - opController: opController, counter: balanceWitnessCounter, filterCounter: filter.NewCounter(filter.BalanceWitness.String()), } @@ -212,7 +210,7 @@ func (b *balanceWitnessScheduler) EncodeConfig() ([]byte, error) { } func (b *balanceWitnessScheduler) IsScheduleAllowed(cluster sche.ClusterInformer) bool { - allowed := b.opController.OperatorCount(operator.OpWitness) < cluster.GetOpts().GetWitnessScheduleLimit() + allowed := b.OpController.OperatorCount(operator.OpWitness) < cluster.GetOpts().GetWitnessScheduleLimit() if !allowed { operator.OperatorLimitCounter.WithLabelValues(b.GetType(), operator.OpWitness.String()).Inc() } @@ -230,7 +228,7 @@ func (b *balanceWitnessScheduler) Schedule(cluster sche.ClusterInformer, dryRun batch := b.conf.Batch schedulerCounter.WithLabelValues(b.GetName(), "schedule").Inc() - opInfluence := b.opController.GetOpInfluence(cluster) + opInfluence := b.OpController.GetOpInfluence(cluster.GetBasicCluster()) kind := constant.NewScheduleKind(constant.WitnessKind, constant.ByCount) solver := newSolver(basePlan, kind, cluster, opInfluence) diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index 46b9d79f574..f98f81dd466 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -59,7 +59,8 @@ type evictLeaderSchedulerConfig struct { mu syncutil.RWMutex storage endpoint.ConfigStorage StoreIDWithRanges map[uint64][]core.KeyRange `json:"store-id-ranges"` - cluster sche.ClusterInformer + cluster *core.BasicCluster + removeSchedulerCb func(string) error } func (conf *evictLeaderSchedulerConfig) getStores() []uint64 { @@ -399,7 +400,7 @@ func (handler *evictLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.R return } if last { - if err := handler.config.cluster.RemoveScheduler(EvictLeaderName); err != nil { + if err := handler.config.removeSchedulerCb(EvictLeaderName); err != nil { if errors.ErrorEqual(err, errs.ErrSchedulerNotFound.FastGenByArgs()) { handler.rd.JSON(w, http.StatusNotFound, err.Error()) } else { diff --git a/pkg/schedule/schedulers/evict_leader_test.go b/pkg/schedule/schedulers/evict_leader_test.go index 1babb2d56d5..d804561f11c 100644 --- a/pkg/schedule/schedulers/evict_leader_test.go +++ b/pkg/schedule/schedulers/evict_leader_test.go @@ -41,7 +41,7 @@ func TestEvictLeader(t *testing.T) { tc.AddLeaderRegion(2, 2, 1) tc.AddLeaderRegion(3, 3, 1) - sl, err := CreateScheduler(EvictLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(EvictLeaderType, []string{"1"})) + sl, err := CreateScheduler(EvictLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(EvictLeaderType, []string{"1"}), func(string) error { return nil }) re.NoError(err) re.True(sl.IsScheduleAllowed(tc)) ops, _ := sl.Schedule(tc, false) @@ -54,7 +54,7 @@ func TestEvictLeaderWithUnhealthyPeer(t *testing.T) { re := require.New(t) cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - sl, err := CreateScheduler(EvictLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(EvictLeaderType, []string{"1"})) + sl, err := CreateScheduler(EvictLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(EvictLeaderType, []string{"1"}), func(string) error { return nil }) re.NoError(err) // Add stores 1, 2, 3 diff --git a/pkg/schedule/schedulers/grant_hot_region.go b/pkg/schedule/schedulers/grant_hot_region.go index fb8fc6343ca..f08b1ab2d30 100644 --- a/pkg/schedule/schedulers/grant_hot_region.go +++ b/pkg/schedule/schedulers/grant_hot_region.go @@ -23,6 +23,7 @@ import ( "github.com/gorilla/mux" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/errs" sche "github.com/tikv/pd/pkg/schedule/core" @@ -54,7 +55,7 @@ var ( type grantHotRegionSchedulerConfig struct { mu syncutil.RWMutex storage endpoint.ConfigStorage - cluster sche.ClusterInformer + cluster *core.BasicCluster StoreIDs []uint64 `json:"store-id"` StoreLeaderID uint64 `json:"store-leader-id"` } diff --git a/pkg/schedule/schedulers/grant_leader.go b/pkg/schedule/schedulers/grant_leader.go index e21e029ec4d..f84b922f32b 100644 --- a/pkg/schedule/schedulers/grant_leader.go +++ b/pkg/schedule/schedulers/grant_leader.go @@ -52,7 +52,8 @@ type grantLeaderSchedulerConfig struct { mu syncutil.RWMutex storage endpoint.ConfigStorage StoreIDWithRanges map[uint64][]core.KeyRange `json:"store-id-ranges"` - cluster sche.ClusterInformer + cluster *core.BasicCluster + removeSchedulerCb func(name string) error } func (conf *grantLeaderSchedulerConfig) BuildWithArgs(args []string) error { @@ -301,7 +302,7 @@ func (handler *grantLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.R return } if last { - if err := handler.config.cluster.RemoveScheduler(GrantLeaderName); err != nil { + if err := handler.config.removeSchedulerCb(GrantLeaderName); err != nil { if errors.ErrorEqual(err, errs.ErrSchedulerNotFound.FastGenByArgs()) { handler.rd.JSON(w, http.StatusNotFound, err.Error()) } else { diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index 3bb6df6b273..362758d4ebf 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -39,11 +39,11 @@ import ( func init() { schedulePeerPr = 1.0 - RegisterScheduler(statistics.Write.String(), func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { + RegisterScheduler(statistics.Write.String(), func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { cfg := initHotRegionScheduleConfig() return newHotWriteScheduler(opController, cfg), nil }) - RegisterScheduler(statistics.Read.String(), func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { + RegisterScheduler(statistics.Read.String(), func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { return newHotReadScheduler(opController, initHotRegionScheduleConfig()), nil }) } diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index 268855ed593..8170f86cd2a 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -52,7 +52,7 @@ func schedulersRegister() { } }) - RegisterScheduler(BalanceLeaderType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { + RegisterScheduler(BalanceLeaderType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { conf := &balanceLeaderSchedulerConfig{storage: storage} if err := decoder(conf); err != nil { return nil, err @@ -80,7 +80,7 @@ func schedulersRegister() { } }) - RegisterScheduler(BalanceRegionType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { + RegisterScheduler(BalanceRegionType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { conf := &balanceRegionSchedulerConfig{} if err := decoder(conf); err != nil { return nil, err @@ -105,7 +105,7 @@ func schedulersRegister() { } }) - RegisterScheduler(BalanceWitnessType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { + RegisterScheduler(BalanceWitnessType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { conf := &balanceWitnessSchedulerConfig{storage: storage} if err := decoder(conf); err != nil { return nil, err @@ -141,12 +141,13 @@ func schedulersRegister() { } }) - RegisterScheduler(EvictLeaderType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { + RegisterScheduler(EvictLeaderType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { conf := &evictLeaderSchedulerConfig{StoreIDWithRanges: make(map[uint64][]core.KeyRange), storage: storage} if err := decoder(conf); err != nil { return nil, err } conf.cluster = opController.GetCluster() + conf.removeSchedulerCb = removeSchedulerCb[0] return newEvictLeaderScheduler(opController, conf), nil }) @@ -157,7 +158,7 @@ func schedulersRegister() { } }) - RegisterScheduler(EvictSlowStoreType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { + RegisterScheduler(EvictSlowStoreType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { conf := &evictSlowStoreSchedulerConfig{storage: storage, EvictedStores: make([]uint64, 0)} if err := decoder(conf); err != nil { return nil, err @@ -196,7 +197,7 @@ func schedulersRegister() { } }) - RegisterScheduler(GrantHotRegionType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { + RegisterScheduler(GrantHotRegionType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { conf := &grantHotRegionSchedulerConfig{StoreIDs: make([]uint64, 0), storage: storage} conf.cluster = opController.GetCluster() if err := decoder(conf); err != nil { @@ -212,7 +213,7 @@ func schedulersRegister() { } }) - RegisterScheduler(HotRegionType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { + RegisterScheduler(HotRegionType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { conf := initHotRegionScheduleConfig() var data map[string]interface{} if err := decoder(&data); err != nil { @@ -257,9 +258,10 @@ func schedulersRegister() { } }) - RegisterScheduler(GrantLeaderType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { + RegisterScheduler(GrantLeaderType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { conf := &grantLeaderSchedulerConfig{StoreIDWithRanges: make(map[uint64][]core.KeyRange), storage: storage} conf.cluster = opController.GetCluster() + conf.removeSchedulerCb = removeSchedulerCb[0] if err := decoder(conf); err != nil { return nil, err } @@ -283,7 +285,7 @@ func schedulersRegister() { } }) - RegisterScheduler(LabelType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { + RegisterScheduler(LabelType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { conf := &labelSchedulerConfig{} if err := decoder(conf); err != nil { return nil, err @@ -308,7 +310,7 @@ func schedulersRegister() { } }) - RegisterScheduler(RandomMergeType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { + RegisterScheduler(RandomMergeType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { conf := &randomMergeSchedulerConfig{} if err := decoder(conf); err != nil { return nil, err @@ -337,7 +339,7 @@ func schedulersRegister() { } }) - RegisterScheduler(ScatterRangeType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { + RegisterScheduler(ScatterRangeType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { conf := &scatterRangeSchedulerConfig{ storage: storage, } @@ -371,7 +373,7 @@ func schedulersRegister() { } }) - RegisterScheduler(ShuffleHotRegionType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { + RegisterScheduler(ShuffleHotRegionType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { conf := &shuffleHotRegionSchedulerConfig{Limit: uint64(1)} if err := decoder(conf); err != nil { return nil, err @@ -396,7 +398,7 @@ func schedulersRegister() { } }) - RegisterScheduler(ShuffleLeaderType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { + RegisterScheduler(ShuffleLeaderType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { conf := &shuffleLeaderSchedulerConfig{} if err := decoder(conf); err != nil { return nil, err @@ -421,7 +423,7 @@ func schedulersRegister() { } }) - RegisterScheduler(ShuffleRegionType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { + RegisterScheduler(ShuffleRegionType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { conf := &shuffleRegionSchedulerConfig{storage: storage} if err := decoder(conf); err != nil { return nil, err @@ -436,7 +438,7 @@ func schedulersRegister() { } }) - RegisterScheduler(SplitBucketType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { + RegisterScheduler(SplitBucketType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { conf := initSplitBucketConfig() if err := decoder(conf); err != nil { return nil, err @@ -452,7 +454,7 @@ func schedulersRegister() { } }) - RegisterScheduler(TransferWitnessLeaderType, func(opController *operator.Controller, _ endpoint.ConfigStorage, _ ConfigDecoder) (Scheduler, error) { + RegisterScheduler(TransferWitnessLeaderType, func(opController *operator.Controller, _ endpoint.ConfigStorage, _ ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { return newTransferWitnessLeaderScheduler(opController), nil }) @@ -463,7 +465,7 @@ func schedulersRegister() { } }) - RegisterScheduler(EvictSlowTrendType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder) (Scheduler, error) { + RegisterScheduler(EvictSlowTrendType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { conf := &evictSlowTrendSchedulerConfig{storage: storage, EvictedStores: make([]uint64, 0), evictCandidate: 0} if err := decoder(conf); err != nil { return nil, err diff --git a/pkg/schedule/schedulers/scheduler.go b/pkg/schedule/schedulers/scheduler.go index 6cd636cf5db..9389f862f5a 100644 --- a/pkg/schedule/schedulers/scheduler.go +++ b/pkg/schedule/schedulers/scheduler.go @@ -89,7 +89,7 @@ func ConfigSliceDecoder(name string, args []string) ConfigDecoder { } // CreateSchedulerFunc is for creating scheduler. -type CreateSchedulerFunc func(opController *operator.Controller, storage endpoint.ConfigStorage, dec ConfigDecoder) (Scheduler, error) +type CreateSchedulerFunc func(opController *operator.Controller, storage endpoint.ConfigStorage, dec ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) var schedulerMap = make(map[string]CreateSchedulerFunc) var schedulerArgsToDecoder = make(map[string]ConfigSliceDecoderBuilder) @@ -114,13 +114,13 @@ func RegisterSliceDecoderBuilder(typ string, builder ConfigSliceDecoderBuilder) } // CreateScheduler creates a scheduler with registered creator func. -func CreateScheduler(typ string, opController *operator.Controller, storage endpoint.ConfigStorage, dec ConfigDecoder) (Scheduler, error) { +func CreateScheduler(typ string, oc *operator.Controller, storage endpoint.ConfigStorage, dec ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { fn, ok := schedulerMap[typ] if !ok { return nil, errs.ErrSchedulerCreateFuncNotRegistered.FastGenByArgs(typ) } - s, err := fn(opController, storage, dec) + s, err := fn(oc, storage, dec, removeSchedulerCb...) if err != nil { return nil, err } diff --git a/pkg/schedule/schedulers/scheduler_test.go b/pkg/schedule/schedulers/scheduler_test.go index 42ea535b9b3..d88fdb39025 100644 --- a/pkg/schedule/schedulers/scheduler_test.go +++ b/pkg/schedule/schedulers/scheduler_test.go @@ -45,7 +45,7 @@ func prepareSchedulersTest(needToRunStream ...bool) (context.CancelFunc, config. } else { stream = hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, needToRunStream[0]) } - oc := operator.NewController(ctx, tc, stream) + oc := operator.NewController(ctx, tc.GetBasicCluster(), tc.GetOpts(), stream) return cancel, opt, tc, oc } @@ -110,7 +110,7 @@ func TestRejectLeader(t *testing.T) { re.Empty(ops) // Can't evict leader from store2, neither. - el, err := CreateScheduler(EvictLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(EvictLeaderType, []string{"2"})) + el, err := CreateScheduler(EvictLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(EvictLeaderType, []string{"2"}), func(string) error { return nil }) re.NoError(err) ops, _ = el.Schedule(tc, false) re.Empty(ops) @@ -136,7 +136,7 @@ func TestRemoveRejectLeader(t *testing.T) { defer cancel() tc.AddRegionStore(1, 0) tc.AddRegionStore(2, 1) - el, err := CreateScheduler(EvictLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(EvictLeaderType, []string{"1"})) + el, err := CreateScheduler(EvictLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(EvictLeaderType, []string{"1"}), func(string) error { return nil }) re.NoError(err) tc.DeleteStore(tc.GetStore(1)) succ, _ := el.(*evictLeaderScheduler).conf.removeStore(1) diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index dd14036855c..77f94183403 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -218,7 +218,7 @@ func TestLoadRegions(t *testing.T) { regions := mustSaveRegions(re, storage, n) re.NoError(storage.LoadRegions(context.Background(), cache.CheckAndPutRegion)) - re.Equal(n, cache.GetRegionCount()) + re.Equal(n, cache.GetTotalRegionCount()) for _, region := range cache.GetMetaRegions() { re.Equal(regions[region.GetId()], region) } @@ -255,7 +255,7 @@ func TestLoadRegionsToCache(t *testing.T) { regions := mustSaveRegions(re, storage, n) re.NoError(TryLoadRegionsOnce(context.Background(), storage, cache.CheckAndPutRegion)) - re.Equal(n, cache.GetRegionCount()) + re.Equal(n, cache.GetTotalRegionCount()) for _, region := range cache.GetMetaRegions() { re.Equal(regions[region.GetId()], region) } @@ -263,7 +263,7 @@ func TestLoadRegionsToCache(t *testing.T) { n = 20 mustSaveRegions(re, storage, n) re.NoError(TryLoadRegionsOnce(context.Background(), storage, cache.CheckAndPutRegion)) - re.Equal(n, cache.GetRegionCount()) + re.Equal(n, cache.GetTotalRegionCount()) } func TestLoadRegionsExceedRangeLimit(t *testing.T) { @@ -275,7 +275,7 @@ func TestLoadRegionsExceedRangeLimit(t *testing.T) { n := 1000 regions := mustSaveRegions(re, storage, n) re.NoError(storage.LoadRegions(context.Background(), cache.CheckAndPutRegion)) - re.Equal(n, cache.GetRegionCount()) + re.Equal(n, cache.GetTotalRegionCount()) for _, region := range cache.GetMetaRegions() { re.Equal(regions[region.GetId()], region) } diff --git a/plugin/scheduler_example/evict_leader.go b/plugin/scheduler_example/evict_leader.go index 9aea0cd4c6a..70d0884d53d 100644 --- a/plugin/scheduler_example/evict_leader.go +++ b/plugin/scheduler_example/evict_leader.go @@ -68,7 +68,7 @@ func init() { } }) - schedulers.RegisterScheduler(EvictLeaderType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedulers.ConfigDecoder) (schedulers.Scheduler, error) { + schedulers.RegisterScheduler(EvictLeaderType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedulers.ConfigDecoder, removeSchedulerCb ...func(string) error) (schedulers.Scheduler, error) { conf := &evictLeaderSchedulerConfig{StoreIDWitRanges: make(map[uint64][]core.KeyRange), storage: storage} if err := decoder(conf); err != nil { return nil, err @@ -95,7 +95,7 @@ type evictLeaderSchedulerConfig struct { mu syncutil.RWMutex storage endpoint.ConfigStorage StoreIDWitRanges map[uint64][]core.KeyRange `json:"store-id-ranges"` - cluster sche.ClusterInformer + cluster *core.BasicCluster } func (conf *evictLeaderSchedulerConfig) BuildWithArgs(args []string) error { diff --git a/server/api/region.go b/server/api/region.go index 39fd2d4c686..97a6acf19ac 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -376,7 +376,7 @@ func (h *regionsHandler) ScanRegions(w http.ResponseWriter, r *http.Request) { // @Router /regions/count [get] func (h *regionsHandler) GetRegionCount(w http.ResponseWriter, r *http.Request) { rc := getCluster(r) - count := rc.GetRegionCount() + count := rc.GetTotalRegionCount() h.rd.JSON(w, http.StatusOK, &RegionsInfo{Count: count}) } diff --git a/server/api/stats.go b/server/api/stats.go index 21d9606a56a..e8b04ba588e 100644 --- a/server/api/stats.go +++ b/server/api/stats.go @@ -46,7 +46,7 @@ func (h *statsHandler) GetRegionStatus(w http.ResponseWriter, r *http.Request) { startKey, endKey := r.URL.Query().Get("start_key"), r.URL.Query().Get("end_key") var stats *statistics.RegionStats if r.URL.Query().Has("count") { - stats = rc.GetRangeCount([]byte(startKey), []byte(endKey)) + stats = rc.GetRegionCount([]byte(startKey), []byte(endKey)) } else { stats = rc.GetRegionStats([]byte(startKey), []byte(endKey)) } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index fbfd483e49d..42e169b93ed 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -216,7 +216,7 @@ func (c *RaftCluster) LoadClusterStatus() (*Status, error) { } func (c *RaftCluster) isInitialized() bool { - if c.core.GetRegionCount() > 1 { + if c.core.GetTotalRegionCount() > 1 { return true } region := c.core.GetRegionByKey(nil) @@ -490,7 +490,7 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) { return nil, err } log.Info("load regions", - zap.Int("count", c.core.GetRegionCount()), + zap.Int("count", c.core.GetTotalRegionCount()), zap.Duration("cost", time.Since(start)), ) for _, store := range c.GetStores() { @@ -677,11 +677,6 @@ func (c *RaftCluster) GetRuleChecker() *checker.RuleChecker { return c.coordinator.GetRuleChecker() } -// RecordOpStepWithTTL records OpStep with TTL -func (c *RaftCluster) RecordOpStepWithTTL(regionID uint64) { - c.GetRuleChecker().RecordRegionPromoteToNonWitness(regionID) -} - // GetSchedulers gets all schedulers. func (c *RaftCluster) GetSchedulers() []string { return c.coordinator.GetSchedulers() @@ -707,36 +702,11 @@ func (c *RaftCluster) PauseOrResumeScheduler(name string, t int64) error { return c.coordinator.PauseOrResumeScheduler(name, t) } -// IsSchedulerPaused checks if a scheduler is paused. -func (c *RaftCluster) IsSchedulerPaused(name string) (bool, error) { - return c.coordinator.IsSchedulerPaused(name) -} - -// IsSchedulerDisabled checks if a scheduler is disabled. -func (c *RaftCluster) IsSchedulerDisabled(name string) (bool, error) { - return c.coordinator.IsSchedulerDisabled(name) -} - -// IsSchedulerAllowed checks if a scheduler is allowed. -func (c *RaftCluster) IsSchedulerAllowed(name string) (bool, error) { - return c.coordinator.IsSchedulerAllowed(name) -} - -// IsSchedulerExisted checks if a scheduler is existed. -func (c *RaftCluster) IsSchedulerExisted(name string) (bool, error) { - return c.coordinator.IsSchedulerExisted(name) -} - // PauseOrResumeChecker pauses or resumes checker. func (c *RaftCluster) PauseOrResumeChecker(name string, t int64) error { return c.coordinator.PauseOrResumeChecker(name, t) } -// IsCheckerPaused returns if checker is paused -func (c *RaftCluster) IsCheckerPaused(name string) (bool, error) { - return c.coordinator.IsCheckerPaused(name) -} - // GetAllocator returns cluster's id allocator. func (c *RaftCluster) GetAllocator() id.Allocator { return c.id @@ -1129,7 +1099,7 @@ func (c *RaftCluster) GetPrevRegionByKey(regionKey []byte) *core.RegionInfo { // ScanRegions scans region with start key, until the region contains endKey, or // total number greater than limit. func (c *RaftCluster) ScanRegions(startKey, endKey []byte, limit int) []*core.RegionInfo { - return c.core.ScanRange(startKey, endKey, limit) + return c.core.ScanRegions(startKey, endKey, limit) } // GetRegion searches for a region by ID. @@ -1147,9 +1117,9 @@ func (c *RaftCluster) GetRegions() []*core.RegionInfo { return c.core.GetRegions() } -// GetRegionCount returns total count of regions -func (c *RaftCluster) GetRegionCount() int { - return c.core.GetRegionCount() +// GetTotalRegionCount returns total count of regions +func (c *RaftCluster) GetTotalRegionCount() int { + return c.core.GetTotalRegionCount() } // GetStoreRegions returns all regions' information with a given storeID. @@ -1746,7 +1716,7 @@ func (c *RaftCluster) checkStores() { storeID := store.GetID() if store.IsPreparing() { - if store.GetUptime() >= c.opt.GetMaxStorePreparingTime() || c.GetRegionCount() < core.InitClusterRegionThreshold { + if store.GetUptime() >= c.opt.GetMaxStorePreparingTime() || c.GetTotalRegionCount() < core.InitClusterRegionThreshold { if err := c.ReadyToServe(storeID); err != nil { log.Error("change store to serving failed", zap.Stringer("store", store.GetMeta()), @@ -2243,13 +2213,13 @@ func (c *RaftCluster) PutMetaCluster(meta *metapb.Cluster) error { // GetRegionStats returns region statistics from cluster. func (c *RaftCluster) GetRegionStats(startKey, endKey []byte) *statistics.RegionStats { - return statistics.GetRegionStats(c.core.ScanRange(startKey, endKey, -1)) + return statistics.GetRegionStats(c.core.ScanRegions(startKey, endKey, -1)) } -// GetRangeCount returns the number of regions in the range. -func (c *RaftCluster) GetRangeCount(startKey, endKey []byte) *statistics.RegionStats { +// GetRegionCount returns the number of regions in the range. +func (c *RaftCluster) GetRegionCount(startKey, endKey []byte) *statistics.RegionStats { stats := &statistics.RegionStats{} - stats.Count = c.core.GetRangeCount(startKey, endKey) + stats.Count = c.core.GetRegionCount(startKey, endKey) return stats } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 12c322e04ef..0e5c049e15b 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -74,7 +74,7 @@ func TestStoreHeartbeat(t *testing.T) { for _, region := range regions { re.NoError(cluster.putRegion(region)) } - re.Equal(int(n), cluster.GetRegionCount()) + re.Equal(int(n), cluster.GetTotalRegionCount()) for i, store := range stores { req := &pdpb.StoreHeartbeatRequest{} @@ -320,7 +320,7 @@ func TestSetOfflineWithReplica(t *testing.T) { func addEvictLeaderScheduler(cluster *RaftCluster, storeID uint64) (evictScheduler schedulers.Scheduler, err error) { args := []string{fmt.Sprintf("%d", storeID)} - evictScheduler, err = schedulers.CreateScheduler(schedulers.EvictLeaderType, cluster.GetOperatorController(), cluster.storage, schedulers.ConfigSliceDecoder(schedulers.EvictLeaderType, args)) + evictScheduler, err = schedulers.CreateScheduler(schedulers.EvictLeaderType, cluster.GetOperatorController(), cluster.storage, schedulers.ConfigSliceDecoder(schedulers.EvictLeaderType, args), cluster.GetCoordinator().RemoveScheduler) if err != nil { return } @@ -2112,7 +2112,7 @@ func checkRegions(re *require.Assertions, cache *core.BasicCluster, regions []*c } } - re.Equal(len(regions), cache.GetRegionCount()) + re.Equal(len(regions), cache.GetTotalRegionCount()) for id, count := range regionCount { re.Equal(count, cache.GetStoreRegionCount(id)) } @@ -2343,7 +2343,7 @@ func dispatchHeartbeat(co *schedule.Coordinator, region *core.RegionInfo, stream if err := co.GetCluster().(*RaftCluster).putRegion(region.Clone()); err != nil { return err } - co.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat) + co.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, nil) return nil } @@ -2859,12 +2859,12 @@ func TestAddScheduler(t *testing.T) { batch := data["batch"].(float64) re.Equal(4, int(batch)) - gls, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"0"})) + gls, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"0"}), co.RemoveScheduler) re.NoError(err) re.NotNil(co.AddScheduler(gls)) re.NotNil(co.RemoveScheduler(gls.GetName())) - gls, err = schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"})) + gls, err = schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), co.RemoveScheduler) re.NoError(err) re.NoError(co.AddScheduler(gls)) @@ -2910,10 +2910,10 @@ func TestPersistScheduler(t *testing.T) { oc := co.GetOperatorController() storage := tc.RaftCluster.storage - gls1, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"})) + gls1, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), co.RemoveScheduler) re.NoError(err) re.NoError(co.AddScheduler(gls1, "1")) - evict, err := schedulers.CreateScheduler(schedulers.EvictLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.EvictLeaderType, []string{"2"})) + evict, err := schedulers.CreateScheduler(schedulers.EvictLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.EvictLeaderType, []string{"2"}), co.RemoveScheduler) re.NoError(err) re.NoError(co.AddScheduler(evict, "2")) re.Len(co.GetSchedulers(), defaultCount+2) @@ -3017,7 +3017,7 @@ func TestRemoveScheduler(t *testing.T) { oc := co.GetOperatorController() storage := tc.RaftCluster.storage - gls1, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"})) + gls1, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), co.RemoveScheduler) re.NoError(err) re.NoError(co.AddScheduler(gls1, "1")) re.Len(co.GetSchedulers(), defaultCount+1) diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index 9f87b4501ad..cc401731a6f 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -37,7 +37,7 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { return err } - c.coordinator.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat) + c.coordinator.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, c.coordinator.RecordOpStepWithTTL) return nil } diff --git a/server/handler.go b/server/handler.go index be730d205f1..1fc543827b2 100644 --- a/server/handler.go +++ b/server/handler.go @@ -114,7 +114,7 @@ func (h *Handler) IsSchedulerPaused(name string) (bool, error) { if err != nil { return false, err } - return rc.IsSchedulerPaused(name) + return rc.GetCoordinator().IsSchedulerPaused(name) } // IsSchedulerDisabled returns whether scheduler is disabled. @@ -123,7 +123,7 @@ func (h *Handler) IsSchedulerDisabled(name string) (bool, error) { if err != nil { return false, err } - return rc.IsSchedulerDisabled(name) + return rc.GetCoordinator().IsSchedulerDisabled(name) } // IsSchedulerExisted returns whether scheduler is existed. @@ -132,7 +132,7 @@ func (h *Handler) IsSchedulerExisted(name string) (bool, error) { if err != nil { return false, err } - return rc.IsSchedulerExisted(name) + return rc.GetCoordinator().IsSchedulerExisted(name) } // GetScheduleConfig returns ScheduleConfig. @@ -155,7 +155,7 @@ func (h *Handler) IsCheckerPaused(name string) (bool, error) { if err != nil { return false, err } - return rc.IsCheckerPaused(name) + return rc.GetCoordinator().IsCheckerPaused(name) } // GetStores returns all stores in the cluster. @@ -221,7 +221,7 @@ func (h *Handler) AddScheduler(name string, args ...string) error { return err } - s, err := schedulers.CreateScheduler(name, c.GetOperatorController(), h.s.storage, schedulers.ConfigSliceDecoder(name, args)) + s, err := schedulers.CreateScheduler(name, c.GetOperatorController(), h.s.storage, schedulers.ConfigSliceDecoder(name, args), c.GetCoordinator().RemoveScheduler) if err != nil { return err } diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 5f02b257206..4b9f608d47f 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -861,7 +861,7 @@ func TestLoadClusterInfo(t *testing.T) { for _, store := range raftCluster.GetMetaStores() { re.Equal(stores[store.GetId()], store) } - re.Equal(n, raftCluster.GetRegionCount()) + re.Equal(n, raftCluster.GetTotalRegionCount()) for _, region := range raftCluster.GetMetaRegions() { re.Equal(regions[region.GetId()], region) } @@ -882,7 +882,7 @@ func TestLoadClusterInfo(t *testing.T) { re.NoError(testStorage.SaveRegion(region)) } re.NoError(storage.TryLoadRegionsOnce(ctx, testStorage, raftCluster.GetBasicCluster().PutRegion)) - re.Equal(n, raftCluster.GetRegionCount()) + re.Equal(n, raftCluster.GetTotalRegionCount()) } func TestTiFlashWithPlacementRules(t *testing.T) { diff --git a/tools/pd-simulator/simulator/cases/import_data.go b/tools/pd-simulator/simulator/cases/import_data.go index 546a175792b..0e7f7770a48 100644 --- a/tools/pd-simulator/simulator/cases/import_data.go +++ b/tools/pd-simulator/simulator/cases/import_data.go @@ -84,7 +84,7 @@ func newImportData() *Case { leaderTotal := 0 peerTotal := 0 res := make([]*core.RegionInfo, 0, 100) - regions.ScanRangeWithIterator([]byte(table12), func(region *core.RegionInfo) bool { + regions.ScanRegionWithIterator([]byte(table12), func(region *core.RegionInfo) bool { if bytes.Compare(region.GetEndKey(), []byte(table13)) < 0 { res = append(res, regions.GetRegion(region.GetID())) return true @@ -116,7 +116,7 @@ func newImportData() *Case { tablePeerLog = fmt.Sprintf("%s [store %d]:%.2f%%", tablePeerLog, storeID, float64(peerCount)/float64(peerTotal)*100) } } - regionTotal := regions.GetRegionCount() + regionTotal := regions.GetTotalRegionCount() totalLeaderLog := fmt.Sprintf("%d leader:", regionTotal) totalPeerLog := fmt.Sprintf("%d peer:", regionTotal*3) isEnd := false diff --git a/tools/pd-simulator/simulator/raft.go b/tools/pd-simulator/simulator/raft.go index 617edd7f84c..fccf75781d3 100644 --- a/tools/pd-simulator/simulator/raft.go +++ b/tools/pd-simulator/simulator/raft.go @@ -291,7 +291,7 @@ func (r *RaftEngine) GetRegionByKey(regionKey []byte) *core.RegionInfo { func (r *RaftEngine) BootstrapRegion() *core.RegionInfo { r.RLock() defer r.RUnlock() - regions := r.regionsInfo.ScanRange(nil, nil, 1) + regions := r.regionsInfo.ScanRegions(nil, nil, 1) if len(regions) > 0 { return regions[0] }