diff --git a/pkg/schedule/schedulers/balance_leader.go b/pkg/schedule/schedulers/balance_leader.go index 16d673674b4..94a2a3ddd89 100644 --- a/pkg/schedule/schedulers/balance_leader.go +++ b/pkg/schedule/schedulers/balance_leader.go @@ -67,7 +67,7 @@ var ( ) type balanceLeaderSchedulerConfig struct { - mu syncutil.RWMutex + syncutil.RWMutex storage endpoint.ConfigStorage Ranges []core.KeyRange `json:"ranges"` // Batch is used to generate multiple operators by one scheduling @@ -75,8 +75,8 @@ type balanceLeaderSchedulerConfig struct { } func (conf *balanceLeaderSchedulerConfig) Update(data []byte) (int, interface{}) { - conf.mu.Lock() - defer conf.mu.Unlock() + conf.Lock() + defer conf.Unlock() oldc, _ := json.Marshal(conf) @@ -108,8 +108,8 @@ func (conf *balanceLeaderSchedulerConfig) validate() bool { } func (conf *balanceLeaderSchedulerConfig) Clone() *balanceLeaderSchedulerConfig { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() ranges := make([]core.KeyRange, len(conf.Ranges)) copy(ranges, conf.Ranges) return &balanceLeaderSchedulerConfig{ @@ -209,14 +209,14 @@ func (l *balanceLeaderScheduler) GetType() string { } func (l *balanceLeaderScheduler) EncodeConfig() ([]byte, error) { - l.conf.mu.RLock() - defer l.conf.mu.RUnlock() + l.conf.RLock() + defer l.conf.RUnlock() return EncodeConfig(l.conf) } func (l *balanceLeaderScheduler) ReloadConfig() error { - l.conf.mu.Lock() - defer l.conf.mu.Unlock() + l.conf.Lock() + defer l.conf.Unlock() cfgData, err := l.conf.storage.LoadSchedulerConfig(l.GetName()) if err != nil { return err @@ -334,8 +334,8 @@ func (cs *candidateStores) resortStoreWithPos(pos int) { } func (l *balanceLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { - l.conf.mu.RLock() - defer l.conf.mu.RUnlock() + l.conf.RLock() + defer l.conf.RUnlock() basePlan := plan.NewBalanceSchedulerPlan() var collector *plan.Collector if dryRun { diff --git a/pkg/schedule/schedulers/balance_region.go b/pkg/schedule/schedulers/balance_region.go index 1343600af06..1cef3a4615b 100644 --- a/pkg/schedule/schedulers/balance_region.go +++ b/pkg/schedule/schedulers/balance_region.go @@ -51,6 +51,7 @@ var ( type balanceRegionSchedulerConfig struct { Name string `json:"name"` Ranges []core.KeyRange `json:"ranges"` + // TODO: When we prepare to use Ranges, we will need to implement the ReloadConfig function for this scheduler. } type balanceRegionScheduler struct { diff --git a/pkg/schedule/schedulers/balance_witness.go b/pkg/schedule/schedulers/balance_witness.go index e44770f0845..b6371633e50 100644 --- a/pkg/schedule/schedulers/balance_witness.go +++ b/pkg/schedule/schedulers/balance_witness.go @@ -53,7 +53,7 @@ const ( ) type balanceWitnessSchedulerConfig struct { - mu syncutil.RWMutex + syncutil.RWMutex storage endpoint.ConfigStorage Ranges []core.KeyRange `json:"ranges"` // Batch is used to generate multiple operators by one scheduling @@ -61,8 +61,8 @@ type balanceWitnessSchedulerConfig struct { } func (conf *balanceWitnessSchedulerConfig) Update(data []byte) (int, interface{}) { - conf.mu.Lock() - defer conf.mu.Unlock() + conf.Lock() + defer conf.Unlock() oldc, _ := json.Marshal(conf) @@ -94,8 +94,8 @@ func (conf *balanceWitnessSchedulerConfig) validate() bool { } func (conf *balanceWitnessSchedulerConfig) Clone() *balanceWitnessSchedulerConfig { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() ranges := make([]core.KeyRange, len(conf.Ranges)) copy(ranges, conf.Ranges) return &balanceWitnessSchedulerConfig{ @@ -204,14 +204,14 @@ func (b *balanceWitnessScheduler) GetType() string { } func (b *balanceWitnessScheduler) EncodeConfig() ([]byte, error) { - b.conf.mu.RLock() - defer b.conf.mu.RUnlock() + b.conf.RLock() + defer b.conf.RUnlock() return EncodeConfig(b.conf) } func (b *balanceWitnessScheduler) ReloadConfig() error { - b.conf.mu.Lock() - defer b.conf.mu.Unlock() + b.conf.Lock() + defer b.conf.Unlock() cfgData, err := b.conf.storage.LoadSchedulerConfig(b.GetName()) if err != nil { return err @@ -237,8 +237,8 @@ func (b *balanceWitnessScheduler) IsScheduleAllowed(cluster sche.SchedulerCluste } func (b *balanceWitnessScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { - b.conf.mu.RLock() - defer b.conf.mu.RUnlock() + b.conf.RLock() + defer b.conf.RUnlock() basePlan := plan.NewBalanceSchedulerPlan() var collector *plan.Collector if dryRun { diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index 332002043a3..879aa9869b3 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -56,7 +56,7 @@ var ( ) type evictLeaderSchedulerConfig struct { - mu syncutil.RWMutex + syncutil.RWMutex storage endpoint.ConfigStorage StoreIDWithRanges map[uint64][]core.KeyRange `json:"store-id-ranges"` cluster *core.BasicCluster @@ -64,8 +64,8 @@ type evictLeaderSchedulerConfig struct { } func (conf *evictLeaderSchedulerConfig) getStores() []uint64 { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() stores := make([]uint64, 0, len(conf.StoreIDWithRanges)) for storeID := range conf.StoreIDWithRanges { stores = append(stores, storeID) @@ -86,15 +86,15 @@ func (conf *evictLeaderSchedulerConfig) BuildWithArgs(args []string) error { if err != nil { return err } - conf.mu.Lock() - defer conf.mu.Unlock() + conf.Lock() + defer conf.Unlock() conf.StoreIDWithRanges[id] = ranges return nil } func (conf *evictLeaderSchedulerConfig) Clone() *evictLeaderSchedulerConfig { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() storeIDWithRanges := make(map[uint64][]core.KeyRange) for id, ranges := range conf.StoreIDWithRanges { storeIDWithRanges[id] = append(storeIDWithRanges[id], ranges...) @@ -106,8 +106,8 @@ func (conf *evictLeaderSchedulerConfig) Clone() *evictLeaderSchedulerConfig { func (conf *evictLeaderSchedulerConfig) Persist() error { name := conf.getSchedulerName() - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() data, err := EncodeConfig(conf) failpoint.Inject("persistFail", func() { err = errors.New("fail to persist") @@ -123,8 +123,8 @@ func (conf *evictLeaderSchedulerConfig) getSchedulerName() string { } func (conf *evictLeaderSchedulerConfig) getRanges(id uint64) []string { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() ranges := conf.StoreIDWithRanges[id] res := make([]string, 0, len(ranges)*2) for index := range ranges { @@ -134,8 +134,8 @@ func (conf *evictLeaderSchedulerConfig) getRanges(id uint64) []string { } func (conf *evictLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last bool) { - conf.mu.Lock() - defer conf.mu.Unlock() + conf.Lock() + defer conf.Unlock() _, exists := conf.StoreIDWithRanges[id] succ, last = false, false if exists { @@ -148,15 +148,15 @@ func (conf *evictLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last } func (conf *evictLeaderSchedulerConfig) resetStore(id uint64, keyRange []core.KeyRange) { - conf.mu.Lock() - defer conf.mu.Unlock() + conf.Lock() + defer conf.Unlock() conf.cluster.PauseLeaderTransfer(id) conf.StoreIDWithRanges[id] = keyRange } func (conf *evictLeaderSchedulerConfig) getKeyRangesByID(id uint64) []core.KeyRange { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() if ranges, exist := conf.StoreIDWithRanges[id]; exist { return ranges } @@ -199,14 +199,14 @@ func (s *evictLeaderScheduler) GetType() string { } func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) { - s.conf.mu.RLock() - defer s.conf.mu.RUnlock() + s.conf.RLock() + defer s.conf.RUnlock() return EncodeConfig(s.conf) } func (s *evictLeaderScheduler) ReloadConfig() error { - s.conf.mu.Lock() - defer s.conf.mu.Unlock() + s.conf.Lock() + defer s.conf.Unlock() cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) if err != nil { return err @@ -223,25 +223,9 @@ func (s *evictLeaderScheduler) ReloadConfig() error { return nil } -// pauseAndResumeLeaderTransfer checks the old and new store IDs, and pause or resume the leader transfer. -func pauseAndResumeLeaderTransfer(cluster *core.BasicCluster, old, new map[uint64][]core.KeyRange) { - for id := range old { - if _, ok := new[id]; ok { - continue - } - cluster.ResumeLeaderTransfer(id) - } - for id := range new { - if _, ok := old[id]; ok { - continue - } - cluster.PauseLeaderTransfer(id) - } -} - func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { - s.conf.mu.RLock() - defer s.conf.mu.RUnlock() + s.conf.RLock() + defer s.conf.RUnlock() var res error for id := range s.conf.StoreIDWithRanges { if err := cluster.PauseLeaderTransfer(id); err != nil { @@ -252,8 +236,8 @@ func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) erro } func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) { - s.conf.mu.RLock() - defer s.conf.mu.RUnlock() + s.conf.RLock() + defer s.conf.RUnlock() for id := range s.conf.StoreIDWithRanges { cluster.ResumeLeaderTransfer(id) } @@ -382,15 +366,15 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R idFloat, ok := input["store_id"].(float64) if ok { id = (uint64)(idFloat) - handler.config.mu.RLock() + handler.config.RLock() if _, exists = handler.config.StoreIDWithRanges[id]; !exists { if err := handler.config.cluster.PauseLeaderTransfer(id); err != nil { - handler.config.mu.RUnlock() + handler.config.RUnlock() handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } } - handler.config.mu.RUnlock() + handler.config.RUnlock() args = append(args, strconv.FormatUint(id, 10)) } diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go index a88b4bffffb..713920828cc 100644 --- a/pkg/schedule/schedulers/evict_slow_store.go +++ b/pkg/schedule/schedulers/evict_slow_store.go @@ -15,6 +15,10 @@ package schedulers import ( + "net/http" + "time" + + "github.com/gorilla/mux" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" @@ -23,6 +27,9 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/apiutil" + "github.com/tikv/pd/pkg/utils/syncutil" + "github.com/unrolled/render" "go.uber.org/zap" ) @@ -40,12 +47,35 @@ const ( var evictSlowStoreCounter = schedulerCounter.WithLabelValues(EvictSlowStoreName, "schedule") type evictSlowStoreSchedulerConfig struct { - storage endpoint.ConfigStorage - EvictedStores []uint64 `json:"evict-stores"` + syncutil.RWMutex + cluster *core.BasicCluster + storage endpoint.ConfigStorage + // Last timestamp of the chosen slow store for eviction. + lastSlowStoreCaptureTS time.Time + // Duration gap for recovering the candidate, unit: s. + RecoveryDurationGap uint64 `json:"recovery-duration"` + EvictedStores []uint64 `json:"evict-stores"` +} + +func initEvictSlowStoreSchedulerConfig(storage endpoint.ConfigStorage) *evictSlowStoreSchedulerConfig { + return &evictSlowStoreSchedulerConfig{ + storage: storage, + lastSlowStoreCaptureTS: time.Time{}, + RecoveryDurationGap: defaultRecoveryDurationGap, + EvictedStores: make([]uint64, 0), + } } -func (conf *evictSlowStoreSchedulerConfig) Persist() error { - name := conf.getSchedulerName() +func (conf *evictSlowStoreSchedulerConfig) Clone() *evictSlowStoreSchedulerConfig { + conf.RLock() + defer conf.RUnlock() + return &evictSlowStoreSchedulerConfig{ + RecoveryDurationGap: conf.RecoveryDurationGap, + } +} + +func (conf *evictSlowStoreSchedulerConfig) persistLocked() error { + name := EvictSlowStoreName data, err := EncodeConfig(conf) failpoint.Inject("persistFail", func() { err = errors.New("fail to persist") @@ -56,11 +86,9 @@ func (conf *evictSlowStoreSchedulerConfig) Persist() error { return conf.storage.SaveSchedulerConfig(name, data) } -func (conf *evictSlowStoreSchedulerConfig) getSchedulerName() string { - return EvictSlowStoreName -} - func (conf *evictSlowStoreSchedulerConfig) getStores() []uint64 { + conf.RLock() + defer conf.RUnlock() return conf.EvictedStores } @@ -72,29 +100,96 @@ func (conf *evictSlowStoreSchedulerConfig) getKeyRangesByID(id uint64) []core.Ke } func (conf *evictSlowStoreSchedulerConfig) evictStore() uint64 { - if len(conf.EvictedStores) == 0 { + if len(conf.getStores()) == 0 { return 0 } - return conf.EvictedStores[0] + return conf.getStores()[0] +} + +// readyForRecovery checks whether the last cpatured candidate is ready for recovery. +func (conf *evictSlowStoreSchedulerConfig) readyForRecovery() bool { + conf.RLock() + defer conf.RUnlock() + recoveryDurationGap := conf.RecoveryDurationGap + failpoint.Inject("transientRecoveryGap", func() { + recoveryDurationGap = 0 + }) + return uint64(time.Since(conf.lastSlowStoreCaptureTS).Seconds()) >= recoveryDurationGap } func (conf *evictSlowStoreSchedulerConfig) setStoreAndPersist(id uint64) error { + conf.Lock() + defer conf.Unlock() conf.EvictedStores = []uint64{id} - return conf.Persist() + conf.lastSlowStoreCaptureTS = time.Now() + return conf.persistLocked() } func (conf *evictSlowStoreSchedulerConfig) clearAndPersist() (oldID uint64, err error) { oldID = conf.evictStore() + conf.Lock() + defer conf.Unlock() if oldID > 0 { conf.EvictedStores = []uint64{} - err = conf.Persist() + conf.lastSlowStoreCaptureTS = time.Time{} + err = conf.persistLocked() } return } +type evictSlowStoreHandler struct { + rd *render.Render + config *evictSlowStoreSchedulerConfig +} + +func newEvictSlowStoreHandler(config *evictSlowStoreSchedulerConfig) http.Handler { + h := &evictSlowStoreHandler{ + config: config, + rd: render.New(render.Options{IndentJSON: true}), + } + router := mux.NewRouter() + router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost) + router.HandleFunc("/list", h.ListConfig).Methods(http.MethodGet) + return router +} + +func (handler *evictSlowStoreHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { + var input map[string]interface{} + if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil { + return + } + recoveryDurationGapFloat, ok := input["recovery-duration"].(float64) + if !ok { + handler.rd.JSON(w, http.StatusInternalServerError, errors.New("invalid argument for 'recovery-duration'").Error()) + return + } + handler.config.Lock() + defer handler.config.Unlock() + prevRecoveryDurationGap := handler.config.RecoveryDurationGap + recoveryDurationGap := uint64(recoveryDurationGapFloat) + handler.config.RecoveryDurationGap = recoveryDurationGap + if err := handler.config.persistLocked(); err != nil { + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + handler.config.RecoveryDurationGap = prevRecoveryDurationGap + return + } + log.Info("evict-slow-store-scheduler update 'recovery-duration' - unit: s", zap.Uint64("prev", prevRecoveryDurationGap), zap.Uint64("cur", recoveryDurationGap)) + handler.rd.JSON(w, http.StatusOK, nil) +} + +func (handler *evictSlowStoreHandler) ListConfig(w http.ResponseWriter, r *http.Request) { + conf := handler.config.Clone() + handler.rd.JSON(w, http.StatusOK, conf) +} + type evictSlowStoreScheduler struct { *BaseScheduler - conf *evictSlowStoreSchedulerConfig + conf *evictSlowStoreSchedulerConfig + handler http.Handler +} + +func (s *evictSlowStoreScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.handler.ServeHTTP(w, r) } func (s *evictSlowStoreScheduler) GetName() string { @@ -109,6 +204,34 @@ func (s *evictSlowStoreScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } +func (s *evictSlowStoreScheduler) ReloadConfig() error { + s.conf.Lock() + defer s.conf.Unlock() + cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) + if err != nil { + return err + } + if len(cfgData) == 0 { + return nil + } + newCfg := &evictSlowStoreSchedulerConfig{} + if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + return err + } + old := make(map[uint64]struct{}) + for _, id := range s.conf.EvictedStores { + old[id] = struct{}{} + } + new := make(map[uint64]struct{}) + for _, id := range newCfg.EvictedStores { + new[id] = struct{}{} + } + pauseAndResumeLeaderTransfer(s.conf.cluster, old, new) + s.conf.RecoveryDurationGap = newCfg.RecoveryDurationGap + s.conf.EvictedStores = newCfg.EvictedStores + return nil +} + func (s *evictSlowStoreScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { evictStore := s.conf.evictStore() if evictStore != 0 { @@ -168,7 +291,7 @@ func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, dryRun // slow node next time. log.Info("slow store has been removed", zap.Uint64("store-id", store.GetID())) - } else if store.GetSlowScore() <= slowStoreRecoverThreshold { + } else if store.GetSlowScore() <= slowStoreRecoverThreshold && s.conf.readyForRecovery() { log.Info("slow store has been recovered", zap.Uint64("store-id", store.GetID())) } else { @@ -211,11 +334,10 @@ func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, dryRun // newEvictSlowStoreScheduler creates a scheduler that detects and evicts slow stores. func newEvictSlowStoreScheduler(opController *operator.Controller, conf *evictSlowStoreSchedulerConfig) Scheduler { - base := NewBaseScheduler(opController) - - s := &evictSlowStoreScheduler{ - BaseScheduler: base, + handler := newEvictSlowStoreHandler(conf) + return &evictSlowStoreScheduler{ + BaseScheduler: NewBaseScheduler(opController), conf: conf, + handler: handler, } - return s } diff --git a/pkg/schedule/schedulers/evict_slow_store_test.go b/pkg/schedule/schedulers/evict_slow_store_test.go index 5cb9a359cde..11cd69e60f7 100644 --- a/pkg/schedule/schedulers/evict_slow_store_test.go +++ b/pkg/schedule/schedulers/evict_slow_store_test.go @@ -67,6 +67,7 @@ func (suite *evictSlowStoreTestSuite) TearDownTest() { } func (suite *evictSlowStoreTestSuite) TestEvictSlowStore() { + suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/schedulers/transientRecoveryGap", "return(true)")) storeInfo := suite.tc.GetStore(1) newStoreInfo := storeInfo.Clone(func(store *core.StoreInfo) { store.GetStoreStats().SlowScore = 100 @@ -113,6 +114,8 @@ func (suite *evictSlowStoreTestSuite) TestEvictSlowStore() { suite.NoError(err) suite.Equal(es2.conf.EvictedStores, persistValue.EvictedStores) suite.Zero(persistValue.evictStore()) + suite.True(persistValue.readyForRecovery()) + suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/schedulers/transientRecoveryGap")) } func (suite *evictSlowStoreTestSuite) TestEvictSlowStorePrepare() { @@ -124,6 +127,7 @@ func (suite *evictSlowStoreTestSuite) TestEvictSlowStorePrepare() { es2.conf.setStoreAndPersist(1) suite.Equal(uint64(1), es2.conf.evictStore()) + suite.False(es2.conf.readyForRecovery()) // prepare with evict store. suite.es.PrepareConfig(suite.tc) } diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go index 1d83e231c5b..53e096baec7 100644 --- a/pkg/schedule/schedulers/evict_slow_trend.go +++ b/pkg/schedule/schedulers/evict_slow_trend.go @@ -15,9 +15,11 @@ package schedulers import ( + "net/http" "strconv" "time" + "github.com/gorilla/mux" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" @@ -26,6 +28,9 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/apiutil" + "github.com/tikv/pd/pkg/utils/syncutil" + "github.com/unrolled/render" "go.uber.org/zap" ) @@ -36,17 +41,52 @@ const ( EvictSlowTrendType = "evict-slow-trend" ) -type evictSlowTrendSchedulerConfig struct { - storage endpoint.ConfigStorage - evictCandidate uint64 - candidateCaptureTime time.Time +const ( + alterEpsilon = 1e-9 + minReCheckDurationGap = 120 // default gap for re-check the slow node, unit: s + defaultRecoveryDurationGap = 600 // default gap for recovery, unit: s. +) + +type slowCandidate struct { + storeID uint64 + captureTS time.Time + recoverTS time.Time +} +type evictSlowTrendSchedulerConfig struct { + syncutil.RWMutex + cluster *core.BasicCluster + storage endpoint.ConfigStorage + // Candidate for eviction in current tick. + evictCandidate slowCandidate + // Last chosen candidate for eviction. + lastEvictCandidate slowCandidate + // Duration gap for recovering the candidate, unit: s. + RecoveryDurationGap uint64 `json:"recovery-duration"` // Only evict one store for now EvictedStores []uint64 `json:"evict-by-trend-stores"` } -func (conf *evictSlowTrendSchedulerConfig) Persist() error { - name := conf.getSchedulerName() +func initEvictSlowTrendSchedulerConfig(storage endpoint.ConfigStorage) *evictSlowTrendSchedulerConfig { + return &evictSlowTrendSchedulerConfig{ + storage: storage, + evictCandidate: slowCandidate{}, + lastEvictCandidate: slowCandidate{}, + RecoveryDurationGap: defaultRecoveryDurationGap, + EvictedStores: make([]uint64, 0), + } +} + +func (conf *evictSlowTrendSchedulerConfig) Clone() *evictSlowTrendSchedulerConfig { + conf.RLock() + defer conf.RUnlock() + return &evictSlowTrendSchedulerConfig{ + RecoveryDurationGap: conf.RecoveryDurationGap, + } +} + +func (conf *evictSlowTrendSchedulerConfig) persistLocked() error { + name := EvictSlowTrendName data, err := EncodeConfig(conf) failpoint.Inject("persistFail", func() { err = errors.New("fail to persist") @@ -57,11 +97,9 @@ func (conf *evictSlowTrendSchedulerConfig) Persist() error { return conf.storage.SaveSchedulerConfig(name, data) } -func (conf *evictSlowTrendSchedulerConfig) getSchedulerName() string { - return EvictSlowTrendName -} - func (conf *evictSlowTrendSchedulerConfig) getStores() []uint64 { + conf.RLock() + defer conf.RUnlock() return conf.EvictedStores } @@ -72,39 +110,100 @@ func (conf *evictSlowTrendSchedulerConfig) getKeyRangesByID(id uint64) []core.Ke return []core.KeyRange{core.NewKeyRange("", "")} } +func (conf *evictSlowTrendSchedulerConfig) hasEvictedStores() bool { + conf.RLock() + defer conf.RUnlock() + return len(conf.EvictedStores) > 0 +} + func (conf *evictSlowTrendSchedulerConfig) evictedStore() uint64 { - if len(conf.EvictedStores) == 0 { + if !conf.hasEvictedStores() { return 0 } + conf.RLock() + defer conf.RUnlock() + // If a candidate passes all checks and proved to be slow, it will be + // recorded in `conf.EvictStores`, and `conf.lastEvictCandidate` will record + // the captured timestamp of this store. return conf.EvictedStores[0] } func (conf *evictSlowTrendSchedulerConfig) candidate() uint64 { - return conf.evictCandidate + conf.RLock() + defer conf.RUnlock() + return conf.evictCandidate.storeID } func (conf *evictSlowTrendSchedulerConfig) captureTS() time.Time { - return conf.candidateCaptureTime + conf.RLock() + defer conf.RUnlock() + return conf.evictCandidate.captureTS } func (conf *evictSlowTrendSchedulerConfig) candidateCapturedSecs() uint64 { - return uint64(time.Since(conf.candidateCaptureTime).Seconds()) + conf.RLock() + defer conf.RUnlock() + return DurationSinceAsSecs(conf.evictCandidate.captureTS) +} + +func (conf *evictSlowTrendSchedulerConfig) lastCapturedCandidate() *slowCandidate { + conf.RLock() + defer conf.RUnlock() + return &conf.lastEvictCandidate +} + +func (conf *evictSlowTrendSchedulerConfig) lastCandidateCapturedSecs() uint64 { + return DurationSinceAsSecs(conf.lastEvictCandidate.captureTS) +} + +// readyForRecovery checks whether the last cpatured candidate is ready for recovery. +func (conf *evictSlowTrendSchedulerConfig) readyForRecovery() bool { + conf.RLock() + defer conf.RUnlock() + recoveryDurationGap := conf.RecoveryDurationGap + failpoint.Inject("transientRecoveryGap", func() { + recoveryDurationGap = 0 + }) + return conf.lastCandidateCapturedSecs() >= recoveryDurationGap } func (conf *evictSlowTrendSchedulerConfig) captureCandidate(id uint64) { - conf.evictCandidate = id - conf.candidateCaptureTime = time.Now() + conf.Lock() + defer conf.Unlock() + conf.evictCandidate = slowCandidate{ + storeID: id, + captureTS: time.Now(), + recoverTS: time.Now(), + } + if conf.lastEvictCandidate == (slowCandidate{}) { + conf.lastEvictCandidate = conf.evictCandidate + } } -func (conf *evictSlowTrendSchedulerConfig) popCandidate() uint64 { - id := conf.evictCandidate - conf.evictCandidate = 0 +func (conf *evictSlowTrendSchedulerConfig) popCandidate(updLast bool) uint64 { + conf.Lock() + defer conf.Unlock() + id := conf.evictCandidate.storeID + if updLast { + conf.lastEvictCandidate = conf.evictCandidate + } + conf.evictCandidate = slowCandidate{} return id } +func (conf *evictSlowTrendSchedulerConfig) markCandidateRecovered() { + conf.Lock() + defer conf.Unlock() + if conf.lastEvictCandidate != (slowCandidate{}) { + conf.lastEvictCandidate.recoverTS = time.Now() + } +} + func (conf *evictSlowTrendSchedulerConfig) setStoreAndPersist(id uint64) error { + conf.Lock() + defer conf.Unlock() conf.EvictedStores = []uint64{id} - return conf.Persist() + return conf.persistLocked() } func (conf *evictSlowTrendSchedulerConfig) clearAndPersist(cluster sche.SchedulerCluster) (oldID uint64, err error) { @@ -118,13 +217,78 @@ func (conf *evictSlowTrendSchedulerConfig) clearAndPersist(cluster sche.Schedule address = store.GetAddress() } storeSlowTrendEvictedStatusGauge.WithLabelValues(address, strconv.FormatUint(oldID, 10)).Set(0) + conf.Lock() + defer conf.Unlock() conf.EvictedStores = []uint64{} - return oldID, conf.Persist() + return oldID, conf.persistLocked() +} + +type evictSlowTrendHandler struct { + rd *render.Render + config *evictSlowTrendSchedulerConfig +} + +func newEvictSlowTrendHandler(config *evictSlowTrendSchedulerConfig) http.Handler { + h := &evictSlowTrendHandler{ + config: config, + rd: render.New(render.Options{IndentJSON: true}), + } + router := mux.NewRouter() + router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost) + router.HandleFunc("/list", h.ListConfig).Methods(http.MethodGet) + return router +} + +func (handler *evictSlowTrendHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { + var input map[string]interface{} + if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil { + return + } + recoveryDurationGapFloat, ok := input["recovery-duration"].(float64) + if !ok { + handler.rd.JSON(w, http.StatusInternalServerError, errors.New("invalid argument for 'recovery-duration'").Error()) + return + } + handler.config.Lock() + defer handler.config.Unlock() + prevRecoveryDurationGap := handler.config.RecoveryDurationGap + recoveryDurationGap := uint64(recoveryDurationGapFloat) + handler.config.RecoveryDurationGap = recoveryDurationGap + if err := handler.config.persistLocked(); err != nil { + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + handler.config.RecoveryDurationGap = prevRecoveryDurationGap + return + } + log.Info("evict-slow-trend-scheduler update 'recovery-duration' - unit: s", zap.Uint64("prev", prevRecoveryDurationGap), zap.Uint64("cur", recoveryDurationGap)) + handler.rd.JSON(w, http.StatusOK, nil) +} + +func (handler *evictSlowTrendHandler) ListConfig(w http.ResponseWriter, r *http.Request) { + conf := handler.config.Clone() + handler.rd.JSON(w, http.StatusOK, conf) } type evictSlowTrendScheduler struct { *BaseScheduler - conf *evictSlowTrendSchedulerConfig + conf *evictSlowTrendSchedulerConfig + handler http.Handler +} + +func (s *evictSlowTrendScheduler) GetNextInterval(interval time.Duration) time.Duration { + var growthType intervalGrowthType + // If it already found a slow node as candidate, the next interval should be shorter + // to make the next scheduling as soon as possible. This adjustment will decrease the + // response time, as heartbeats from other nodes will be received and updated more quickly. + if s.conf.hasEvictedStores() { + growthType = zeroGrowth + } else { + growthType = exponentialGrowth + } + return intervalGrow(s.GetMinInterval(), MaxScheduleInterval, growthType) +} + +func (s *evictSlowTrendScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.handler.ServeHTTP(w, r) } func (s *evictSlowTrendScheduler) GetName() string { @@ -139,6 +303,34 @@ func (s *evictSlowTrendScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } +func (s *evictSlowTrendScheduler) ReloadConfig() error { + s.conf.Lock() + defer s.conf.Unlock() + cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) + if err != nil { + return err + } + if len(cfgData) == 0 { + return nil + } + newCfg := &evictSlowTrendSchedulerConfig{} + if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + return err + } + old := make(map[uint64]struct{}) + for _, id := range s.conf.EvictedStores { + old[id] = struct{}{} + } + new := make(map[uint64]struct{}) + for _, id := range newCfg.EvictedStores { + new[id] = struct{}{} + } + pauseAndResumeLeaderTransfer(s.conf.cluster, old, new) + s.conf.RecoveryDurationGap = newCfg.RecoveryDurationGap + s.conf.EvictedStores = newCfg.EvictedStores + return nil +} + func (s *evictSlowTrendScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { evictedStoreID := s.conf.evictedStore() if evictedStoreID == 0 { @@ -166,6 +358,8 @@ func (s *evictSlowTrendScheduler) cleanupEvictLeader(cluster sche.SchedulerClust log.Info("evict-slow-trend-scheduler persist config failed", zap.Uint64("store-id", evictedStoreID)) } if evictedStoreID != 0 { + // Assertion: evictStoreID == s.conf.LastEvictCandidate.storeID + s.conf.markCandidateRecovered() cluster.SlowTrendRecovered(evictedStoreID) } } @@ -200,15 +394,13 @@ func (s *evictSlowTrendScheduler) Schedule(cluster sche.SchedulerCluster, dryRun if store == nil || store.IsRemoved() { // Previous slow store had been removed, remove the scheduler and check // slow node next time. - log.Info("store evicted by slow trend has been removed", - zap.Uint64("store-id", store.GetID())) - storeSlowTrendActionStatusGauge.WithLabelValues("evict.stop:removed").Inc() - } else if checkStoreCanRecover(cluster, store) { - log.Info("store evicted by slow trend has been recovered", - zap.Uint64("store-id", store.GetID())) - storeSlowTrendActionStatusGauge.WithLabelValues("evict.stop:recovered").Inc() + log.Info("store evicted by slow trend has been removed", zap.Uint64("store-id", store.GetID())) + storeSlowTrendActionStatusGauge.WithLabelValues("evict", "stop_removed").Inc() + } else if checkStoreCanRecover(cluster, store) && s.conf.readyForRecovery() { + log.Info("store evicted by slow trend has been recovered", zap.Uint64("store-id", store.GetID())) + storeSlowTrendActionStatusGauge.WithLabelValues("evict", "stop_recovered").Inc() } else { - storeSlowTrendActionStatusGauge.WithLabelValues("evict.continue").Inc() + storeSlowTrendActionStatusGauge.WithLabelValues("evict", "continue").Inc() return s.scheduleEvictLeader(cluster), nil } s.cleanupEvictLeader(cluster) @@ -217,34 +409,32 @@ func (s *evictSlowTrendScheduler) Schedule(cluster sche.SchedulerCluster, dryRun candFreshCaptured := false if s.conf.candidate() == 0 { - candidate := chooseEvictCandidate(cluster) + candidate := chooseEvictCandidate(cluster, s.conf.lastCapturedCandidate()) if candidate != nil { - storeSlowTrendActionStatusGauge.WithLabelValues("cand.captured").Inc() + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "captured").Inc() s.conf.captureCandidate(candidate.GetID()) candFreshCaptured = true } } else { - storeSlowTrendActionStatusGauge.WithLabelValues("cand.continue").Inc() + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "continue").Inc() } + slowStoreID := s.conf.candidate() if slowStoreID == 0 { - storeSlowTrendActionStatusGauge.WithLabelValues("cand.none").Inc() + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none").Inc() return ops, nil } slowStore := cluster.GetStore(slowStoreID) if !candFreshCaptured && checkStoreFasterThanOthers(cluster, slowStore) { - s.conf.popCandidate() - log.Info("slow store candidate by trend has been cancel", - zap.Uint64("store-id", slowStoreID)) - storeSlowTrendActionStatusGauge.WithLabelValues("cand.cancel:too-faster").Inc() + s.conf.popCandidate(false) + log.Info("slow store candidate by trend has been cancel", zap.Uint64("store-id", slowStoreID)) + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "canceled_too_faster").Inc() return ops, nil } - slowStoreRecordTS := s.conf.captureTS() - if !checkStoresAreUpdated(cluster, slowStoreID, slowStoreRecordTS) { - log.Info("slow store candidate waiting for other stores to update heartbeats", - zap.Uint64("store-id", slowStoreID)) - storeSlowTrendActionStatusGauge.WithLabelValues("cand.wait").Inc() + if slowStoreRecordTS := s.conf.captureTS(); !checkStoresAreUpdated(cluster, slowStoreID, slowStoreRecordTS) { + log.Info("slow store candidate waiting for other stores to update heartbeats", zap.Uint64("store-id", slowStoreID)) + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "wait").Inc() return ops, nil } @@ -252,30 +442,36 @@ func (s *evictSlowTrendScheduler) Schedule(cluster sche.SchedulerCluster, dryRun log.Info("detected slow store by trend, start to evict leaders", zap.Uint64("store-id", slowStoreID), zap.Uint64("candidate-captured-secs", candCapturedSecs)) - storeSlowTrendMiscGauge.WithLabelValues("cand.captured.secs").Set(float64(candCapturedSecs)) - err := s.prepareEvictLeader(cluster, s.conf.popCandidate()) - if err != nil { + storeSlowTrendMiscGauge.WithLabelValues("candidate", "captured_secs").Set(float64(candCapturedSecs)) + if err := s.prepareEvictLeader(cluster, s.conf.popCandidate(true)); err != nil { log.Info("prepare for evicting leader by slow trend failed", zap.Error(err), zap.Uint64("store-id", slowStoreID)) - storeSlowTrendActionStatusGauge.WithLabelValues("evict.prepare.err").Inc() + storeSlowTrendActionStatusGauge.WithLabelValues("evict", "prepare_err").Inc() return ops, nil } - storeSlowTrendActionStatusGauge.WithLabelValues("evict.start").Inc() + storeSlowTrendActionStatusGauge.WithLabelValues("evict", "start").Inc() return s.scheduleEvictLeader(cluster), nil } func newEvictSlowTrendScheduler(opController *operator.Controller, conf *evictSlowTrendSchedulerConfig) Scheduler { + handler := newEvictSlowTrendHandler(conf) return &evictSlowTrendScheduler{ BaseScheduler: NewBaseScheduler(opController), conf: conf, + handler: handler, } } -func chooseEvictCandidate(cluster sche.SchedulerCluster) (slowStore *core.StoreInfo) { +func chooseEvictCandidate(cluster sche.SchedulerCluster, lastEvictCandidate *slowCandidate) (slowStore *core.StoreInfo) { + isRaftKV2 := cluster.GetStoreConfig().IsRaftKV2() + failpoint.Inject("mockRaftKV2", func() { + isRaftKV2 = true + }) stores := cluster.GetStores() if len(stores) < 3 { - storeSlowTrendActionStatusGauge.WithLabelValues("cand.none:too-few").Inc() + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none_too_few").Inc() return } + var candidates []*core.StoreInfo var affectedStoreCount int for _, store := range stores { @@ -285,47 +481,68 @@ func chooseEvictCandidate(cluster sche.SchedulerCluster) (slowStore *core.StoreI if !(store.IsPreparing() || store.IsServing()) { continue } - slowTrend := store.GetSlowTrend() - if slowTrend != nil && slowTrend.CauseRate > alterEpsilon && slowTrend.ResultRate < -alterEpsilon { - candidates = append(candidates, store) - storeSlowTrendActionStatusGauge.WithLabelValues("cand.add").Inc() - log.Info("evict-slow-trend-scheduler pre-canptured candidate", - zap.Uint64("store-id", store.GetID()), - zap.Float64("cause-rate", slowTrend.CauseRate), - zap.Float64("result-rate", slowTrend.ResultRate), - zap.Float64("cause-value", slowTrend.CauseValue), - zap.Float64("result-value", slowTrend.ResultValue)) - } - if slowTrend != nil && slowTrend.ResultRate < -alterEpsilon { - affectedStoreCount += 1 + if slowTrend := store.GetSlowTrend(); slowTrend != nil { + if slowTrend.ResultRate < -alterEpsilon { + affectedStoreCount += 1 + } + // For the cases of disk io jitters. + // Normally, if there exists jitters on disk io or network io, the slow store must have a descending + // trend on QPS and ascending trend on duration. So, the slowTrend must match the following pattern. + if slowTrend.CauseRate > alterEpsilon && slowTrend.ResultRate < -alterEpsilon { + candidates = append(candidates, store) + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "add").Inc() + log.Info("evict-slow-trend-scheduler pre-captured candidate", + zap.Uint64("store-id", store.GetID()), + zap.Float64("cause-rate", slowTrend.CauseRate), + zap.Float64("result-rate", slowTrend.ResultRate), + zap.Float64("cause-value", slowTrend.CauseValue), + zap.Float64("result-value", slowTrend.ResultValue)) + } else if isRaftKV2 && slowTrend.CauseRate > alterEpsilon { + // Meanwhile, if the store was previously experiencing slowness in the `Duration` dimension, it should + // re-check whether this node is still encountering network I/O-related jitters. And If this node matches + // the last identified candidate, it indicates that the node is still being affected by delays in network I/O, + // and consequently, it should be re-designated as slow once more. + // Prerequisite: `raft-kv2` engine has the ability to percept the slow trend on network io jitters. + // TODO: maybe make it compatible to `raft-kv` later. + if lastEvictCandidate != nil && lastEvictCandidate.storeID == store.GetID() && DurationSinceAsSecs(lastEvictCandidate.recoverTS) <= minReCheckDurationGap { + candidates = append(candidates, store) + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "add").Inc() + log.Info("evict-slow-trend-scheduler pre-captured candidate in raft-kv2 cluster", + zap.Uint64("store-id", store.GetID()), + zap.Float64("cause-rate", slowTrend.CauseRate), + zap.Float64("result-rate", slowTrend.ResultRate), + zap.Float64("cause-value", slowTrend.CauseValue), + zap.Float64("result-value", slowTrend.ResultValue)) + } + } } } if len(candidates) == 0 { - storeSlowTrendActionStatusGauge.WithLabelValues("cand.none:no-fit").Inc() + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none_no_fit").Inc() return } - // TODO: Calculate to judge if one store is way slower than the others if len(candidates) > 1 { - storeSlowTrendActionStatusGauge.WithLabelValues("cand.none:too-many").Inc() + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none_too_many").Inc() return } + store := candidates[0] affectedStoreThreshold := int(float64(len(stores)) * cluster.GetSchedulerConfig().GetSlowStoreEvictingAffectedStoreRatioThreshold()) if affectedStoreCount < affectedStoreThreshold { log.Info("evict-slow-trend-scheduler failed to confirm candidate: it only affect a few stores", zap.Uint64("store-id", store.GetID())) - storeSlowTrendActionStatusGauge.WithLabelValues("cand.none:affect-a-few").Inc() + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none_affect_a_few").Inc() return } if !checkStoreSlowerThanOthers(cluster, store) { log.Info("evict-slow-trend-scheduler failed to confirm candidate: it's not slower than others", zap.Uint64("store-id", store.GetID())) - storeSlowTrendActionStatusGauge.WithLabelValues("cand.none:not-slower").Inc() + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none_not_slower").Inc() return } - storeSlowTrendActionStatusGauge.WithLabelValues("cand.add").Inc() + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "add").Inc() log.Info("evict-slow-trend-scheduler captured candidate", zap.Uint64("store-id", store.GetID())) return store } @@ -350,12 +567,12 @@ func checkStoresAreUpdated(cluster sche.SchedulerCluster, slowStoreID uint64, sl updatedStores += 1 continue } - if slowStoreRecordTS.Before(store.GetLastHeartbeatTS()) { + if slowStoreRecordTS.Compare(store.GetLastHeartbeatTS()) <= 0 { updatedStores += 1 } } - storeSlowTrendMiscGauge.WithLabelValues("stores.check-updated:count").Set(float64(updatedStores)) - storeSlowTrendMiscGauge.WithLabelValues("stores.check-updated:expected").Set(float64(expected)) + storeSlowTrendMiscGauge.WithLabelValues("store", "check_updated_count").Set(float64(updatedStores)) + storeSlowTrendMiscGauge.WithLabelValues("store", "check_updated_expected").Set(float64(expected)) return updatedStores >= expected } @@ -364,7 +581,7 @@ func checkStoreSlowerThanOthers(cluster sche.SchedulerCluster, target *core.Stor expected := (len(stores)*2 + 1) / 3 targetSlowTrend := target.GetSlowTrend() if targetSlowTrend == nil { - storeSlowTrendActionStatusGauge.WithLabelValues("cand.check-slower:no-data").Inc() + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "check_slower_no_data").Inc() return false } slowerThanStoresNum := 0 @@ -385,8 +602,8 @@ func checkStoreSlowerThanOthers(cluster sche.SchedulerCluster, target *core.Stor slowerThanStoresNum += 1 } } - storeSlowTrendMiscGauge.WithLabelValues("store.check-slower:count").Set(float64(slowerThanStoresNum)) - storeSlowTrendMiscGauge.WithLabelValues("store.check-slower:expected").Set(float64(expected)) + storeSlowTrendMiscGauge.WithLabelValues("store", "check_slower_count").Set(float64(slowerThanStoresNum)) + storeSlowTrendMiscGauge.WithLabelValues("store", "check_slower_expected").Set(float64(expected)) return slowerThanStoresNum >= expected } @@ -418,7 +635,7 @@ func checkStoreFasterThanOthers(cluster sche.SchedulerCluster, target *core.Stor expected := (len(stores) + 1) / 2 targetSlowTrend := target.GetSlowTrend() if targetSlowTrend == nil { - storeSlowTrendActionStatusGauge.WithLabelValues("cand.check-faster:no-data").Inc() + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "check_faster_no_data").Inc() return false } fasterThanStores := 0 @@ -433,15 +650,18 @@ func checkStoreFasterThanOthers(cluster sche.SchedulerCluster, target *core.Stor continue } slowTrend := store.GetSlowTrend() - // Greater `CuaseValue` means slower + // Greater `CauseValue` means slower if slowTrend != nil && targetSlowTrend.CauseValue <= slowTrend.CauseValue*1.1 && slowTrend.CauseValue > alterEpsilon && targetSlowTrend.CauseValue > alterEpsilon { fasterThanStores += 1 } } - storeSlowTrendMiscGauge.WithLabelValues("store.check-faster:count").Set(float64(fasterThanStores)) - storeSlowTrendMiscGauge.WithLabelValues("store.check-faster:expected").Set(float64(expected)) + storeSlowTrendMiscGauge.WithLabelValues("store", "check_faster_count").Set(float64(fasterThanStores)) + storeSlowTrendMiscGauge.WithLabelValues("store", "check_faster_expected").Set(float64(expected)) return fasterThanStores >= expected } -const alterEpsilon = 1e-9 +// DurationSinceAsSecs returns the duration gap since the given startTS, unit: s. +func DurationSinceAsSecs(startTS time.Time) uint64 { + return uint64(time.Since(startTS).Seconds()) +} diff --git a/pkg/schedule/schedulers/grant_hot_region.go b/pkg/schedule/schedulers/grant_hot_region.go index 5a68da069b8..6ab689ea5d4 100644 --- a/pkg/schedule/schedulers/grant_hot_region.go +++ b/pkg/schedule/schedulers/grant_hot_region.go @@ -54,7 +54,7 @@ var ( ) type grantHotRegionSchedulerConfig struct { - mu syncutil.RWMutex + syncutil.RWMutex storage endpoint.ConfigStorage cluster *core.BasicCluster StoreIDs []uint64 `json:"store-id"` @@ -62,8 +62,8 @@ type grantHotRegionSchedulerConfig struct { } func (conf *grantHotRegionSchedulerConfig) setStore(leaderID uint64, peers []uint64) bool { - conf.mu.Lock() - defer conf.mu.Unlock() + conf.Lock() + defer conf.Unlock() ret := slice.AnyOf(peers, func(i int) bool { return leaderID == peers[i] }) @@ -75,20 +75,20 @@ func (conf *grantHotRegionSchedulerConfig) setStore(leaderID uint64, peers []uin } func (conf *grantHotRegionSchedulerConfig) GetStoreLeaderID() uint64 { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() return conf.StoreLeaderID } func (conf *grantHotRegionSchedulerConfig) SetStoreLeaderID(id uint64) { - conf.mu.Lock() - defer conf.mu.Unlock() + conf.Lock() + defer conf.Unlock() conf.StoreLeaderID = id } func (conf *grantHotRegionSchedulerConfig) Clone() *grantHotRegionSchedulerConfig { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() newStoreIDs := make([]uint64, len(conf.StoreIDs)) copy(newStoreIDs, conf.StoreIDs) return &grantHotRegionSchedulerConfig{ @@ -99,8 +99,8 @@ func (conf *grantHotRegionSchedulerConfig) Clone() *grantHotRegionSchedulerConfi func (conf *grantHotRegionSchedulerConfig) Persist() error { name := conf.getSchedulerName() - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() data, err := EncodeConfig(conf) if err != nil { return err @@ -113,8 +113,8 @@ func (conf *grantHotRegionSchedulerConfig) getSchedulerName() string { } func (conf *grantHotRegionSchedulerConfig) has(storeID uint64) bool { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() return slice.AnyOf(conf.StoreIDs, func(i int) bool { return storeID == conf.StoreIDs[i] }) @@ -151,6 +151,25 @@ func (s *grantHotRegionScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } +func (s *grantHotRegionScheduler) ReloadConfig() error { + s.conf.Lock() + defer s.conf.Unlock() + cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) + if err != nil { + return err + } + if len(cfgData) == 0 { + return nil + } + newCfg := &grantHotRegionSchedulerConfig{} + if err := DecodeConfig([]byte(cfgData), newCfg); err != nil { + return err + } + s.conf.StoreIDs = newCfg.StoreIDs + s.conf.StoreLeaderID = newCfg.StoreLeaderID + return nil +} + // IsScheduleAllowed returns whether the scheduler is allowed to schedule. // TODO it should check if there is any scheduler such as evict or hot region scheduler func (s *grantHotRegionScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { diff --git a/pkg/schedule/schedulers/grant_leader.go b/pkg/schedule/schedulers/grant_leader.go index 84f830f368b..47e14af4902 100644 --- a/pkg/schedule/schedulers/grant_leader.go +++ b/pkg/schedule/schedulers/grant_leader.go @@ -49,7 +49,7 @@ var ( ) type grantLeaderSchedulerConfig struct { - mu syncutil.RWMutex + syncutil.RWMutex storage endpoint.ConfigStorage StoreIDWithRanges map[uint64][]core.KeyRange `json:"store-id-ranges"` cluster *core.BasicCluster @@ -69,15 +69,15 @@ func (conf *grantLeaderSchedulerConfig) BuildWithArgs(args []string) error { if err != nil { return err } - conf.mu.Lock() - defer conf.mu.Unlock() + conf.Lock() + defer conf.Unlock() conf.StoreIDWithRanges[id] = ranges return nil } func (conf *grantLeaderSchedulerConfig) Clone() *grantLeaderSchedulerConfig { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() newStoreIDWithRanges := make(map[uint64][]core.KeyRange) for k, v := range conf.StoreIDWithRanges { newStoreIDWithRanges[k] = v @@ -89,8 +89,8 @@ func (conf *grantLeaderSchedulerConfig) Clone() *grantLeaderSchedulerConfig { func (conf *grantLeaderSchedulerConfig) Persist() error { name := conf.getSchedulerName() - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() data, err := EncodeConfig(conf) if err != nil { return err @@ -103,8 +103,8 @@ func (conf *grantLeaderSchedulerConfig) getSchedulerName() string { } func (conf *grantLeaderSchedulerConfig) getRanges(id uint64) []string { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() ranges := conf.StoreIDWithRanges[id] res := make([]string, 0, len(ranges)*2) for index := range ranges { @@ -114,8 +114,8 @@ func (conf *grantLeaderSchedulerConfig) getRanges(id uint64) []string { } func (conf *grantLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last bool) { - conf.mu.Lock() - defer conf.mu.Unlock() + conf.Lock() + defer conf.Unlock() _, exists := conf.StoreIDWithRanges[id] succ, last = false, false if exists { @@ -128,15 +128,15 @@ func (conf *grantLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last } func (conf *grantLeaderSchedulerConfig) resetStore(id uint64, keyRange []core.KeyRange) { - conf.mu.Lock() - defer conf.mu.Unlock() + conf.Lock() + defer conf.Unlock() conf.cluster.PauseLeaderTransfer(id) conf.StoreIDWithRanges[id] = keyRange } func (conf *grantLeaderSchedulerConfig) getKeyRangesByID(id uint64) []core.KeyRange { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() if ranges, exist := conf.StoreIDWithRanges[id]; exist { return ranges } @@ -179,8 +179,8 @@ func (s *grantLeaderScheduler) EncodeConfig() ([]byte, error) { } func (s *grantLeaderScheduler) ReloadConfig() error { - s.conf.mu.Lock() - defer s.conf.mu.Unlock() + s.conf.Lock() + defer s.conf.Unlock() cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) if err != nil { return err @@ -198,8 +198,8 @@ func (s *grantLeaderScheduler) ReloadConfig() error { } func (s *grantLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { - s.conf.mu.RLock() - defer s.conf.mu.RUnlock() + s.conf.RLock() + defer s.conf.RUnlock() var res error for id := range s.conf.StoreIDWithRanges { if err := cluster.PauseLeaderTransfer(id); err != nil { @@ -210,8 +210,8 @@ func (s *grantLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) erro } func (s *grantLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) { - s.conf.mu.RLock() - defer s.conf.mu.RUnlock() + s.conf.RLock() + defer s.conf.RUnlock() for id := range s.conf.StoreIDWithRanges { cluster.ResumeLeaderTransfer(id) } @@ -227,8 +227,8 @@ func (s *grantLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) func (s *grantLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { grantLeaderCounter.Inc() - s.conf.mu.RLock() - defer s.conf.mu.RUnlock() + s.conf.RLock() + defer s.conf.RUnlock() ops := make([]*operator.Operator, 0, len(s.conf.StoreIDWithRanges)) pendingFilter := filter.NewRegionPendingFilter() downFilter := filter.NewRegionDownFilter() @@ -268,15 +268,15 @@ func (handler *grantLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R idFloat, ok := input["store_id"].(float64) if ok { id = (uint64)(idFloat) - handler.config.mu.RLock() + handler.config.RLock() if _, exists = handler.config.StoreIDWithRanges[id]; !exists { if err := handler.config.cluster.PauseLeaderTransfer(id); err != nil { - handler.config.mu.RUnlock() + handler.config.RUnlock() handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } } - handler.config.mu.RUnlock() + handler.config.RUnlock() args = append(args, strconv.FormatUint(id, 10)) } diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index 5ad56a0055f..a55c326aabf 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -256,6 +256,44 @@ func (h *hotScheduler) EncodeConfig() ([]byte, error) { return h.conf.EncodeConfig() } +func (h *hotScheduler) ReloadConfig() error { + h.conf.Lock() + defer h.conf.Unlock() + cfgData, err := h.conf.storage.LoadSchedulerConfig(h.GetName()) + if err != nil { + return err + } + if len(cfgData) == 0 { + return nil + } + newCfg := &hotRegionSchedulerConfig{} + if err := DecodeConfig([]byte(cfgData), newCfg); err != nil { + return err + } + h.conf.MinHotByteRate = newCfg.MinHotByteRate + h.conf.MinHotKeyRate = newCfg.MinHotKeyRate + h.conf.MinHotQueryRate = newCfg.MinHotQueryRate + h.conf.MaxZombieRounds = newCfg.MaxZombieRounds + h.conf.MaxPeerNum = newCfg.MaxPeerNum + h.conf.ByteRateRankStepRatio = newCfg.ByteRateRankStepRatio + h.conf.KeyRateRankStepRatio = newCfg.KeyRateRankStepRatio + h.conf.QueryRateRankStepRatio = newCfg.QueryRateRankStepRatio + h.conf.CountRankStepRatio = newCfg.CountRankStepRatio + h.conf.GreatDecRatio = newCfg.GreatDecRatio + h.conf.MinorDecRatio = newCfg.MinorDecRatio + h.conf.SrcToleranceRatio = newCfg.SrcToleranceRatio + h.conf.DstToleranceRatio = newCfg.DstToleranceRatio + h.conf.WriteLeaderPriorities = newCfg.WriteLeaderPriorities + h.conf.WritePeerPriorities = newCfg.WritePeerPriorities + h.conf.ReadPriorities = newCfg.ReadPriorities + h.conf.StrictPickingStore = newCfg.StrictPickingStore + h.conf.EnableForTiFlash = newCfg.EnableForTiFlash + h.conf.RankFormulaVersion = newCfg.RankFormulaVersion + h.conf.ForbidRWType = newCfg.ForbidRWType + h.conf.SplitThresholds = newCfg.SplitThresholds + return nil +} + func (h *hotScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.conf.ServeHTTP(w, r) } diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index 8170f86cd2a..b2293819753 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -163,6 +163,7 @@ func schedulersRegister() { if err := decoder(conf); err != nil { return nil, err } + conf.cluster = opController.GetCluster() return newEvictSlowStoreScheduler(opController, conf), nil }) @@ -378,6 +379,7 @@ func schedulersRegister() { if err := decoder(conf); err != nil { return nil, err } + conf.storage = storage return newShuffleHotRegionScheduler(opController, conf), nil }) @@ -466,7 +468,7 @@ func schedulersRegister() { }) RegisterScheduler(EvictSlowTrendType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { - conf := &evictSlowTrendSchedulerConfig{storage: storage, EvictedStores: make([]uint64, 0), evictCandidate: 0} + conf := initEvictSlowTrendSchedulerConfig(storage) if err := decoder(conf); err != nil { return nil, err } diff --git a/pkg/schedule/schedulers/label.go b/pkg/schedule/schedulers/label.go index 62a1100d16b..90310bcf10e 100644 --- a/pkg/schedule/schedulers/label.go +++ b/pkg/schedule/schedulers/label.go @@ -46,6 +46,7 @@ var ( type labelSchedulerConfig struct { Name string `json:"name"` Ranges []core.KeyRange `json:"ranges"` + // TODO: When we prepare to use Ranges, we will need to implement the ReloadConfig function for this scheduler. } type labelScheduler struct { diff --git a/pkg/schedule/schedulers/random_merge.go b/pkg/schedule/schedulers/random_merge.go index a621b595198..44bb5081ef9 100644 --- a/pkg/schedule/schedulers/random_merge.go +++ b/pkg/schedule/schedulers/random_merge.go @@ -48,6 +48,7 @@ var ( type randomMergeSchedulerConfig struct { Name string `json:"name"` Ranges []core.KeyRange `json:"ranges"` + // TODO: When we prepare to use Ranges, we will need to implement the ReloadConfig function for this scheduler. } type randomMergeScheduler struct { diff --git a/pkg/schedule/schedulers/scatter_range.go b/pkg/schedule/schedulers/scatter_range.go index e301b4c6e76..1bc6eafb58e 100644 --- a/pkg/schedule/schedulers/scatter_range.go +++ b/pkg/schedule/schedulers/scatter_range.go @@ -49,7 +49,7 @@ var ( ) type scatterRangeSchedulerConfig struct { - mu syncutil.RWMutex + syncutil.RWMutex storage endpoint.ConfigStorage RangeName string `json:"range-name"` StartKey string `json:"start-key"` @@ -60,8 +60,8 @@ func (conf *scatterRangeSchedulerConfig) BuildWithArgs(args []string) error { if len(args) != 3 { return errs.ErrSchedulerConfig.FastGenByArgs("ranges and name") } - conf.mu.Lock() - defer conf.mu.Unlock() + conf.Lock() + defer conf.Unlock() conf.RangeName = args[0] conf.StartKey = args[1] @@ -70,8 +70,8 @@ func (conf *scatterRangeSchedulerConfig) BuildWithArgs(args []string) error { } func (conf *scatterRangeSchedulerConfig) Clone() *scatterRangeSchedulerConfig { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() return &scatterRangeSchedulerConfig{ StartKey: conf.StartKey, EndKey: conf.EndKey, @@ -81,8 +81,8 @@ func (conf *scatterRangeSchedulerConfig) Clone() *scatterRangeSchedulerConfig { func (conf *scatterRangeSchedulerConfig) Persist() error { name := conf.getSchedulerName() - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() data, err := EncodeConfig(conf) if err != nil { return err @@ -91,26 +91,26 @@ func (conf *scatterRangeSchedulerConfig) Persist() error { } func (conf *scatterRangeSchedulerConfig) GetRangeName() string { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() return conf.RangeName } func (conf *scatterRangeSchedulerConfig) GetStartKey() []byte { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() return []byte(conf.StartKey) } func (conf *scatterRangeSchedulerConfig) GetEndKey() []byte { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() return []byte(conf.EndKey) } func (conf *scatterRangeSchedulerConfig) getSchedulerName() string { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() return fmt.Sprintf("scatter-range-%s", conf.RangeName) } @@ -161,14 +161,14 @@ func (l *scatterRangeScheduler) GetType() string { } func (l *scatterRangeScheduler) EncodeConfig() ([]byte, error) { - l.config.mu.RLock() - defer l.config.mu.RUnlock() + l.config.RLock() + defer l.config.RUnlock() return EncodeConfig(l.config) } func (l *scatterRangeScheduler) ReloadConfig() error { - l.config.mu.Lock() - defer l.config.mu.Unlock() + l.config.Lock() + defer l.config.Unlock() cfgData, err := l.config.storage.LoadSchedulerConfig(l.GetName()) if err != nil { return err @@ -176,7 +176,14 @@ func (l *scatterRangeScheduler) ReloadConfig() error { if len(cfgData) == 0 { return nil } - return DecodeConfig([]byte(cfgData), l.config) + newCfg := &scatterRangeSchedulerConfig{} + if err := DecodeConfig([]byte(cfgData), newCfg); err != nil { + return err + } + l.config.RangeName = newCfg.RangeName + l.config.StartKey = newCfg.StartKey + l.config.EndKey = newCfg.EndKey + return nil } func (l *scatterRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { diff --git a/pkg/schedule/schedulers/shuffle_hot_region.go b/pkg/schedule/schedulers/shuffle_hot_region.go index d5264b90428..6ad6656fd18 100644 --- a/pkg/schedule/schedulers/shuffle_hot_region.go +++ b/pkg/schedule/schedulers/shuffle_hot_region.go @@ -15,6 +15,9 @@ package schedulers import ( + "net/http" + + "github.com/gorilla/mux" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core/constant" @@ -24,6 +27,10 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" "github.com/tikv/pd/pkg/statistics" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/apiutil" + "github.com/tikv/pd/pkg/utils/syncutil" + "github.com/unrolled/render" "go.uber.org/zap" ) @@ -42,8 +49,32 @@ var ( ) type shuffleHotRegionSchedulerConfig struct { - Name string `json:"name"` - Limit uint64 `json:"limit"` + syncutil.RWMutex + storage endpoint.ConfigStorage + Name string `json:"name"` + Limit uint64 `json:"limit"` +} + +func (conf *shuffleHotRegionSchedulerConfig) getSchedulerName() string { + return conf.Name +} + +func (conf *shuffleHotRegionSchedulerConfig) Clone() *shuffleHotRegionSchedulerConfig { + conf.RLock() + defer conf.RUnlock() + return &shuffleHotRegionSchedulerConfig{ + Name: conf.Name, + Limit: conf.Limit, + } +} + +func (conf *shuffleHotRegionSchedulerConfig) persistLocked() error { + name := conf.getSchedulerName() + data, err := EncodeConfig(conf) + if err != nil { + return err + } + return conf.storage.SaveSchedulerConfig(name, data) } // ShuffleHotRegionScheduler mainly used to test. @@ -52,19 +83,26 @@ type shuffleHotRegionSchedulerConfig struct { // the hot peer. type shuffleHotRegionScheduler struct { *baseHotScheduler - conf *shuffleHotRegionSchedulerConfig + conf *shuffleHotRegionSchedulerConfig + handler http.Handler } // newShuffleHotRegionScheduler creates an admin scheduler that random balance hot regions func newShuffleHotRegionScheduler(opController *operator.Controller, conf *shuffleHotRegionSchedulerConfig) Scheduler { base := newBaseHotScheduler(opController) + handler := newShuffleHotRegionHandler(conf) ret := &shuffleHotRegionScheduler{ baseHotScheduler: base, conf: conf, + handler: handler, } return ret } +func (s *shuffleHotRegionScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.handler.ServeHTTP(w, r) +} + func (s *shuffleHotRegionScheduler) GetName() string { return s.conf.Name } @@ -77,6 +115,24 @@ func (s *shuffleHotRegionScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } +func (s *shuffleHotRegionScheduler) ReloadConfig() error { + s.conf.Lock() + defer s.conf.Unlock() + cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) + if err != nil { + return err + } + if len(cfgData) == 0 { + return nil + } + newCfg := &shuffleHotRegionSchedulerConfig{} + if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + return err + } + s.conf.Limit = newCfg.Limit + return nil +} + func (s *shuffleHotRegionScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { hotRegionAllowed := s.OpController.OperatorCount(operator.OpHotRegion) < s.conf.Limit conf := cluster.GetSchedulerConfig() @@ -158,3 +214,47 @@ func (s *shuffleHotRegionScheduler) randomSchedule(cluster sche.SchedulerCluster shuffleHotRegionSkipCounter.Inc() return nil } + +type shuffleHotRegionHandler struct { + rd *render.Render + config *shuffleHotRegionSchedulerConfig +} + +func (handler *shuffleHotRegionHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { + var input map[string]interface{} + if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil { + return + } + limit, ok := input["limit"].(float64) + if !ok { + handler.rd.JSON(w, http.StatusBadRequest, "invalid limit") + return + } + handler.config.Lock() + defer handler.config.Unlock() + previous := handler.config.Limit + handler.config.Limit = uint64(limit) + err := handler.config.persistLocked() + if err != nil { + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + handler.config.Limit = previous + return + } + handler.rd.JSON(w, http.StatusOK, nil) +} + +func (handler *shuffleHotRegionHandler) ListConfig(w http.ResponseWriter, r *http.Request) { + conf := handler.config.Clone() + handler.rd.JSON(w, http.StatusOK, conf) +} + +func newShuffleHotRegionHandler(config *shuffleHotRegionSchedulerConfig) http.Handler { + h := &shuffleHotRegionHandler{ + config: config, + rd: render.New(render.Options{IndentJSON: true}), + } + router := mux.NewRouter() + router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost) + router.HandleFunc("/list", h.ListConfig).Methods(http.MethodGet) + return router +} diff --git a/pkg/schedule/schedulers/shuffle_leader.go b/pkg/schedule/schedulers/shuffle_leader.go index 0e33fa802db..a6ff4baf65b 100644 --- a/pkg/schedule/schedulers/shuffle_leader.go +++ b/pkg/schedule/schedulers/shuffle_leader.go @@ -43,6 +43,7 @@ var ( type shuffleLeaderSchedulerConfig struct { Name string `json:"name"` Ranges []core.KeyRange `json:"ranges"` + // TODO: When we prepare to use Ranges, we will need to implement the ReloadConfig function for this scheduler. } type shuffleLeaderScheduler struct { diff --git a/pkg/schedule/schedulers/split_bucket.go b/pkg/schedule/schedulers/split_bucket.go index 6f8a4833c92..4fc344ee0fd 100644 --- a/pkg/schedule/schedulers/split_bucket.go +++ b/pkg/schedule/schedulers/split_bucket.go @@ -64,15 +64,15 @@ func initSplitBucketConfig() *splitBucketSchedulerConfig { } type splitBucketSchedulerConfig struct { - mu syncutil.RWMutex + syncutil.RWMutex storage endpoint.ConfigStorage Degree int `json:"degree"` SplitLimit uint64 `json:"split-limit"` } func (conf *splitBucketSchedulerConfig) Clone() *splitBucketSchedulerConfig { - conf.mu.RLock() - defer conf.mu.RUnlock() + conf.RLock() + defer conf.RUnlock() return &splitBucketSchedulerConfig{ Degree: conf.Degree, } @@ -103,8 +103,8 @@ func (h *splitBucketHandler) ListConfig(w http.ResponseWriter, _ *http.Request) } func (h *splitBucketHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { - h.conf.mu.Lock() - defer h.conf.mu.Unlock() + h.conf.Lock() + defer h.conf.Unlock() rd := render.New(render.Options{IndentJSON: true}) oldc, _ := json.Marshal(h.conf) data, err := io.ReadAll(r.Body) @@ -160,8 +160,8 @@ func (s *splitBucketScheduler) GetType() string { } func (s *splitBucketScheduler) ReloadConfig() error { - s.conf.mu.Lock() - defer s.conf.mu.Unlock() + s.conf.Lock() + defer s.conf.Unlock() cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) if err != nil { return err @@ -169,7 +169,13 @@ func (s *splitBucketScheduler) ReloadConfig() error { if len(cfgData) == 0 { return nil } - return DecodeConfig([]byte(cfgData), s.conf) + newCfg := &splitBucketSchedulerConfig{} + if err := DecodeConfig([]byte(cfgData), newCfg); err != nil { + return err + } + s.conf.SplitLimit = newCfg.SplitLimit + s.conf.Degree = newCfg.Degree + return nil } // ServerHTTP implement Http server. diff --git a/pkg/schedule/schedulers/transfer_witness_leader.go b/pkg/schedule/schedulers/transfer_witness_leader.go index aaa2848b2da..793b70933cf 100644 --- a/pkg/schedule/schedulers/transfer_witness_leader.go +++ b/pkg/schedule/schedulers/transfer_witness_leader.go @@ -34,6 +34,7 @@ const ( // TransferWitnessLeaderBatchSize is the number of operators to to transfer // leaders by one scheduling transferWitnessLeaderBatchSize = 3 + // TODO: When we prepare to use Ranges, we will need to implement the ReloadConfig function for this scheduler. // TransferWitnessLeaderRecvMaxRegionSize is the max number of region can receive // TODO: make it a reasonable value transferWitnessLeaderRecvMaxRegionSize = 10000 diff --git a/pkg/schedule/schedulers/utils.go b/pkg/schedule/schedulers/utils.go index 998b0307530..e2fcfe999ec 100644 --- a/pkg/schedule/schedulers/utils.go +++ b/pkg/schedule/schedulers/utils.go @@ -376,3 +376,19 @@ func (q *retryQuota) GC(keepStores []*core.StoreInfo) { } } } + +// pauseAndResumeLeaderTransfer checks the old and new store IDs, and pause or resume the leader transfer. +func pauseAndResumeLeaderTransfer[T any](cluster *core.BasicCluster, old, new map[uint64]T) { + for id := range old { + if _, ok := new[id]; ok { + continue + } + cluster.ResumeLeaderTransfer(id) + } + for id := range new { + if _, ok := old[id]; ok { + continue + } + cluster.PauseLeaderTransfer(id) + } +} diff --git a/pkg/window/policy.go b/pkg/window/policy.go new file mode 100644 index 00000000000..fed4fedc32a --- /dev/null +++ b/pkg/window/policy.go @@ -0,0 +1,110 @@ +// The MIT License (MIT) +// Copyright (c) 2022 go-kratos Project Authors. +// +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS,g +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package window + +import ( + "time" + + "github.com/tikv/pd/pkg/utils/syncutil" +) + +// RollingPolicy is a policy for ring window based on time duration. +// RollingPolicy moves bucket offset with time duration. +// e.g. If the last point is appended one bucket duration ago, +// RollingPolicy will increment current offset. +type RollingPolicy struct { + mu syncutil.RWMutex + size int + window *Window + offset int + + bucketDuration time.Duration + lastAppendTime time.Time +} + +// RollingPolicyOpts contains the arguments for creating RollingPolicy. +type RollingPolicyOpts struct { + BucketDuration time.Duration +} + +// NewRollingPolicy creates a new RollingPolicy based on the given window and RollingPolicyOpts. +func NewRollingPolicy(window *Window, opts RollingPolicyOpts) *RollingPolicy { + return &RollingPolicy{ + window: window, + size: window.Size(), + offset: 0, + + bucketDuration: opts.BucketDuration, + lastAppendTime: time.Now(), + } +} + +// timespan returns passed bucket number since lastAppendTime, +// if it is one bucket duration earlier than the last recorded +// time, it will return the size. +func (r *RollingPolicy) timespan() int { + v := int(time.Since(r.lastAppendTime) / r.bucketDuration) + if v > -1 { // maybe time backwards + return v + } + return r.size +} + +// apply applies function f with value val on +// current offset bucket, expired bucket will be reset +func (r *RollingPolicy) apply(f func(offset int, val float64), val float64) { + r.mu.Lock() + defer r.mu.Unlock() + + // calculate current offset + timespan := r.timespan() + oriTimespan := timespan + if timespan > 0 { + start := (r.offset + 1) % r.size + end := (r.offset + timespan) % r.size + if timespan > r.size { + timespan = r.size + } + // reset the expired buckets + r.window.ResetBuckets(start, timespan) + r.offset = end + r.lastAppendTime = r.lastAppendTime.Add(time.Duration(oriTimespan * int(r.bucketDuration))) + } + f(r.offset, val) +} + +// Add adds the given value to the latest point within bucket. +func (r *RollingPolicy) Add(val float64) { + r.apply(r.window.Add, val) +} + +// Reduce applies the reduction function to all buckets within the window. +func (r *RollingPolicy) Reduce(f func(Iterator) float64) (val float64) { + r.mu.RLock() + defer r.mu.RUnlock() + + timespan := r.timespan() + if count := r.size - timespan; count > 0 { + offset := r.offset + timespan + 1 + if offset >= r.size { + offset -= r.size + } + val = f(r.window.Iterator(offset, count)) + } + return val +} diff --git a/server/cluster/scheduling_controller.go b/server/cluster/scheduling_controller.go index 04c77498948..5e8cb8462df 100644 --- a/server/cluster/scheduling_controller.go +++ b/server/cluster/scheduling_controller.go @@ -37,6 +37,7 @@ import ( "github.com/tikv/pd/pkg/statistics/buckets" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/syncutil" ) // schedulingController is used to manage all schedulers and checkers. @@ -44,7 +45,7 @@ type schedulingController struct { parentCtx context.Context ctx context.Context cancel context.CancelFunc - mu sync.RWMutex + mu syncutil.RWMutex wg sync.WaitGroup *core.BasicCluster opt sc.ConfProvider diff --git a/tools/pd-simulator/simulator/node.go b/tools/pd-simulator/simulator/node.go index cd76d80b3c4..07fed2e28ff 100644 --- a/tools/pd-simulator/simulator/node.go +++ b/tools/pd-simulator/simulator/node.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/tikv/pd/pkg/ratelimit" + "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/tools/pd-simulator/simulator/cases" "github.com/tikv/pd/tools/pd-simulator/simulator/info" "github.com/tikv/pd/tools/pd-simulator/simulator/simutil" @@ -39,7 +40,7 @@ const ( // Node simulates a TiKV. type Node struct { *metapb.Store - sync.RWMutex + syncutil.RWMutex stats *info.StoreStats tick uint64 wg sync.WaitGroup @@ -50,7 +51,7 @@ type Node struct { cancel context.CancelFunc raftEngine *RaftEngine limiter *ratelimit.RateLimiter - sizeMutex sync.Mutex + sizeMutex syncutil.Mutex hasExtraUsedSpace bool }