diff --git a/pkg/core/basic_cluster.go b/pkg/core/basic_cluster.go index f7c3c5e93b1..e80c228a88f 100644 --- a/pkg/core/basic_cluster.go +++ b/pkg/core/basic_cluster.go @@ -16,6 +16,7 @@ package core import ( "bytes" + "encoding/json" "github.com/tikv/pd/pkg/core/constant" ) @@ -156,6 +157,15 @@ type KeyRange struct { EndKey []byte `json:"end-key"` } +// MarshalJSON marshals to json. +func (kr KeyRange) MarshalJSON() ([]byte, error) { + m := map[string]string{ + "start-key": HexRegionKeyStr(kr.StartKey), + "end-key": HexRegionKeyStr(kr.EndKey), + } + return json.Marshal(m) +} + // NewKeyRange create a KeyRange with the given start key and end key. func NewKeyRange(startKey, endKey string) KeyRange { return KeyRange{ @@ -169,6 +179,17 @@ type KeyRanges struct { krs []*KeyRange } +// NewKeyRanges creates a KeyRanges. +func NewKeyRanges(ranges []KeyRange) *KeyRanges { + krs := make([]*KeyRange, 0, len(ranges)) + for _, kr := range ranges { + krs = append(krs, &kr) + } + return &KeyRanges{ + krs, + } +} + // NewKeyRangesWithSize creates a KeyRanges with the hint size. func NewKeyRangesWithSize(size int) *KeyRanges { return &KeyRanges{ diff --git a/pkg/core/constant/kind.go b/pkg/core/constant/kind.go index 39c256c4f5d..7e9d173c689 100644 --- a/pkg/core/constant/kind.go +++ b/pkg/core/constant/kind.go @@ -66,7 +66,6 @@ const ( RegionKind // WitnessKind indicates the witness kind resource WitnessKind - // ResourceKindLen represents the ResourceKind count ResourceKindLen ) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 6f80572673c..9ab5d329398 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -314,7 +314,11 @@ func (c *Cluster) updateScheduler() { ) // Create the newly added schedulers. for _, scheduler := range latestSchedulersConfig { - schedulerType := types.ConvertOldStrToType[scheduler.Type] + schedulerType, ok := types.ConvertOldStrToType[scheduler.Type] + if !ok { + log.Error("scheduler not found", zap.String("type", scheduler.Type)) + continue + } s, err := schedulers.CreateScheduler( schedulerType, c.coordinator.GetOperatorController(), diff --git a/pkg/schedule/filter/filters.go b/pkg/schedule/filter/filters.go index efb27c3ec6d..6363c903b35 100644 --- a/pkg/schedule/filter/filters.go +++ b/pkg/schedule/filter/filters.go @@ -932,6 +932,8 @@ var ( allSpecialEngines = []string{core.EngineTiFlash} // NotSpecialEngines is used to filter the special engine. NotSpecialEngines = placement.LabelConstraint{Key: core.EngineKey, Op: placement.NotIn, Values: allSpecialEngines} + // TiFlashEngineConstraint is used to filter the TiFlash engine. + TiFlashEngineConstraint = placement.LabelConstraint{Key: core.EngineKey, Op: placement.In, Values: allSpecialEngines} ) type isolationFilter struct { diff --git a/pkg/schedule/operator/operator_controller.go b/pkg/schedule/operator/operator_controller.go index cd2470376e1..61a800ebb9b 100644 --- a/pkg/schedule/operator/operator_controller.go +++ b/pkg/schedule/operator/operator_controller.go @@ -15,6 +15,7 @@ package operator import ( + "bytes" "context" "fmt" "strconv" @@ -828,6 +829,25 @@ func (oc *Controller) GetHistory(start time.Time) []OpHistory { return history } +// OpInfluenceOption is used to filter the region. +// returns true if the region meets the condition, it will ignore this region in the influence calculation. +// returns false if the region does not meet the condition, it will calculate the influence of this region. +type OpInfluenceOption func(region *core.RegionInfo) bool + +// WithRangeOption returns an OpInfluenceOption that filters the region by the label. +func WithRangeOption(ranges []core.KeyRange) OpInfluenceOption { + return func(region *core.RegionInfo) bool { + for _, r := range ranges { + // the start key of the region must greater than the given range start key. + // the end key of the region must less than the given range end key. + if bytes.Compare(region.GetStartKey(), r.StartKey) < 0 || bytes.Compare(r.EndKey, region.GetEndKey()) < 0 { + return false + } + } + return true + } +} + // OperatorCount gets the count of operators filtered by kind. // kind only has one OpKind. func (oc *Controller) OperatorCount(kind OpKind) uint64 { @@ -835,7 +855,7 @@ func (oc *Controller) OperatorCount(kind OpKind) uint64 { } // GetOpInfluence gets OpInfluence. -func (oc *Controller) GetOpInfluence(cluster *core.BasicCluster) OpInfluence { +func (oc *Controller) GetOpInfluence(cluster *core.BasicCluster, ops ...OpInfluenceOption) OpInfluence { influence := OpInfluence{ StoresInfluence: make(map[uint64]*StoreInfluence), } @@ -844,6 +864,11 @@ func (oc *Controller) GetOpInfluence(cluster *core.BasicCluster) OpInfluence { op := value.(*Operator) if !op.CheckTimeout() && !op.CheckSuccess() { region := cluster.GetRegion(op.RegionID()) + for _, opt := range ops { + if !opt(region) { + return true + } + } if region != nil { op.UnfinishedInfluence(influence, region) } diff --git a/pkg/schedule/schedulers/balance_range.go b/pkg/schedule/schedulers/balance_range.go new file mode 100644 index 00000000000..7dd8a539f48 --- /dev/null +++ b/pkg/schedule/schedulers/balance_range.go @@ -0,0 +1,435 @@ +// Copyright 2025 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schedulers + +import ( + "go.uber.org/zap" + "net/http" + "sort" + "strconv" + "time" + + "github.com/gorilla/mux" + "github.com/unrolled/render" + + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" + + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/core/constant" + "github.com/tikv/pd/pkg/errs" + sche "github.com/tikv/pd/pkg/schedule/core" + "github.com/tikv/pd/pkg/schedule/filter" + "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/schedule/plan" + "github.com/tikv/pd/pkg/schedule/types" + "github.com/tikv/pd/pkg/utils/syncutil" +) + +const balanceRangeName = "balance-range-scheduler" + +type balanceRangeSchedulerHandler struct { + rd *render.Render + config *balanceRangeSchedulerConfig +} + +func newBalanceRangeHandler(conf *balanceRangeSchedulerConfig) http.Handler { + handler := &balanceRangeSchedulerHandler{ + config: conf, + rd: render.New(render.Options{IndentJSON: true}), + } + router := mux.NewRouter() + router.HandleFunc("/config", handler.updateConfig).Methods(http.MethodPost) + router.HandleFunc("/list", handler.listConfig).Methods(http.MethodGet) + return router +} + +func (handler *balanceRangeSchedulerHandler) updateConfig(w http.ResponseWriter, _ *http.Request) { + handler.rd.JSON(w, http.StatusBadRequest, "update config is not supported") +} + +func (handler *balanceRangeSchedulerHandler) listConfig(w http.ResponseWriter, _ *http.Request) { + conf := handler.config.clone() + if err := handler.rd.JSON(w, http.StatusOK, conf); err != nil { + log.Error("failed to marshal balance key range scheduler config", errs.ZapError(err)) + } +} + +type balanceRangeSchedulerConfig struct { + syncutil.RWMutex + schedulerConfig + balanceRangeSchedulerParam +} + +type balanceRangeSchedulerParam struct { + Role string `json:"role"` + Engine string `json:"engine"` + Timeout time.Duration `json:"timeout"` + Ranges []core.KeyRange `json:"ranges"` +} + +func (conf *balanceRangeSchedulerConfig) clone() *balanceRangeSchedulerParam { + conf.RLock() + defer conf.RUnlock() + ranges := make([]core.KeyRange, len(conf.Ranges)) + copy(ranges, conf.Ranges) + return &balanceRangeSchedulerParam{ + Ranges: ranges, + Role: conf.Role, + Engine: conf.Engine, + Timeout: conf.Timeout, + } +} + +// EncodeConfig serializes the config. +func (s *balanceRangeScheduler) EncodeConfig() ([]byte, error) { + s.conf.RLock() + defer s.conf.RUnlock() + return EncodeConfig(s.conf) +} + +// ReloadConfig reloads the config. +func (s *balanceRangeScheduler) ReloadConfig() error { + s.conf.Lock() + defer s.conf.Unlock() + + newCfg := &balanceRangeSchedulerConfig{} + if err := s.conf.load(newCfg); err != nil { + return err + } + s.conf.Ranges = newCfg.Ranges + s.conf.Timeout = newCfg.Timeout + s.conf.Role = newCfg.Role + s.conf.Engine = newCfg.Engine + return nil +} + +type balanceRangeScheduler struct { + *BaseScheduler + conf *balanceRangeSchedulerConfig + handler http.Handler + start time.Time + role Role + filters []filter.Filter + filterCounter *filter.Counter +} + +// ServeHTTP implements the http.Handler interface. +func (s *balanceRangeScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.handler.ServeHTTP(w, r) +} + +// IsScheduleAllowed checks if the scheduler is allowed to schedule new operators. +func (s *balanceRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { + allowed := s.OpController.OperatorCount(operator.OpRange) < cluster.GetSchedulerConfig().GetRegionScheduleLimit() + if !allowed { + operator.IncOperatorLimitCounter(s.GetType(), operator.OpRange) + } + if time.Now().Sub(s.start) > s.conf.Timeout { + allowed = false + balanceRangeExpiredCounter.Inc() + } + return allowed +} + +// BalanceRangeCreateOption is used to create a scheduler with an option. +type BalanceRangeCreateOption func(s *balanceRangeScheduler) + +// newBalanceRangeScheduler creates a scheduler that tends to keep given peer role on +// special store balanced. +func newBalanceRangeScheduler(opController *operator.Controller, conf *balanceRangeSchedulerConfig, options ...BalanceRangeCreateOption) Scheduler { + s := &balanceRangeScheduler{ + BaseScheduler: NewBaseScheduler(opController, types.BalanceRangeScheduler, conf), + conf: conf, + handler: newBalanceRangeHandler(conf), + } + for _, option := range options { + option(s) + } + f := filter.NotSpecialEngines + if conf.Engine == core.EngineTiFlash { + f = filter.TiFlashEngineConstraint + } + s.filters = []filter.Filter{ + filter.NewEngineFilter(balanceRangeName, f), + } + + s.filterCounter = filter.NewCounter(s.GetName()) + return s +} + +// Schedule schedules the balance key range operator. +func (s *balanceRangeScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { + balanceRangeCounter.Inc() + opInfluence := s.OpController.GetOpInfluence(cluster.GetBasicCluster(), operator.WithRangeOption(s.conf.Ranges)) + plan, err := s.prepare(cluster, opInfluence) + if err != nil { + log.Error("failed to prepare balance key range scheduler", errs.ZapError(err)) + return nil, nil + } + + downFilter := filter.NewRegionDownFilter() + replicaFilter := filter.NewRegionReplicatedFilter(cluster) + snapshotFilter := filter.NewSnapshotSendFilter(plan.stores, constant.Medium) + pendingFilter := filter.NewRegionPendingFilter() + baseRegionFilters := []filter.RegionFilter{downFilter, replicaFilter, snapshotFilter, pendingFilter} + + for sourceIndex, sourceStore := range plan.stores { + plan.source = sourceStore + plan.sourceScore = plan.score(plan.source.GetID()) + if plan.sourceScore < plan.averageScore { + break + } + switch s.role { + case Leader: + plan.region = filter.SelectOneRegion(cluster.RandLeaderRegions(plan.sourceStoreID(), s.conf.Ranges), nil, baseRegionFilters...) + case Learner: + plan.region = filter.SelectOneRegion(cluster.RandLearnerRegions(plan.sourceStoreID(), s.conf.Ranges), nil, baseRegionFilters...) + case Follower: + plan.region = filter.SelectOneRegion(cluster.RandFollowerRegions(plan.sourceStoreID(), s.conf.Ranges), nil, baseRegionFilters...) + } + if plan.region == nil { + balanceRangeNoRegionCounter.Inc() + continue + } + log.Debug("select region", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", plan.region.GetID())) + // Skip hot regions. + if cluster.IsRegionHot(plan.region) { + log.Debug("region is hot", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", plan.region.GetID())) + balanceRangeHotCounter.Inc() + continue + } + // Check region leader + if plan.region.GetLeader() == nil { + log.Warn("region have no leader", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", plan.region.GetID())) + balanceRangeNoLeaderCounter.Inc() + continue + } + plan.fit = replicaFilter.(*filter.RegionReplicatedFilter).GetFit() + if op := s.transferPeer(plan, plan.stores[sourceIndex+1:]); op != nil { + op.Counters = append(op.Counters, balanceRangeNewOperatorCounter) + return []*operator.Operator{op}, nil + } + } + + if err != nil { + log.Error("failed to prepare balance key range scheduler", errs.ZapError(err)) + return nil, nil + + } + return nil, nil +} + +// transferPeer selects the best store to create a new peer to replace the old peer. +func (s *balanceRangeScheduler) transferPeer(plan *balanceRangeSchedulerPlan, dstStores []*core.StoreInfo) *operator.Operator { + excludeTargets := plan.region.GetStoreIDs() + if s.role != Leader { + excludeTargets = make(map[uint64]struct{}) + } + conf := plan.GetSchedulerConfig() + filters := []filter.Filter{ + filter.NewExcludedFilter(s.GetName(), nil, excludeTargets), + filter.NewPlacementSafeguard(s.GetName(), conf, plan.GetBasicCluster(), plan.GetRuleManager(), plan.region, plan.source, plan.fit), + } + candidates := filter.NewCandidates(s.R, dstStores).FilterTarget(conf, nil, s.filterCounter, filters...) + for i := range candidates.Stores { + plan.target = candidates.Stores[len(candidates.Stores)-i-1] + plan.targetScore = plan.score(plan.target.GetID()) + if plan.targetScore > plan.averageScore { + break + } + regionID := plan.region.GetID() + sourceID := plan.source.GetID() + targetID := plan.target.GetID() + if !plan.shouldBalance(s.GetName()) { + continue + } + log.Debug("candidate store", zap.Uint64("region-id", regionID), zap.Uint64("source-store", sourceID), zap.Uint64("target-store", targetID)) + + oldPeer := plan.region.GetStorePeer(sourceID) + newPeer := &metapb.Peer{StoreId: plan.target.GetID(), Role: oldPeer.Role} + op, err := operator.CreateMovePeerOperator(s.GetName(), plan, plan.region, operator.OpRange, oldPeer.GetStoreId(), newPeer) + if err != nil { + balanceRangeCreateOpFailCounter.Inc() + return nil + } + sourceLabel := strconv.FormatUint(sourceID, 10) + targetLabel := strconv.FormatUint(targetID, 10) + op.FinishedCounters = append(op.FinishedCounters, + balanceDirectionCounter.WithLabelValues(s.GetName(), sourceLabel, targetLabel), + ) + op.SetAdditionalInfo("sourceScore", strconv.FormatInt(plan.sourceScore, 10)) + op.SetAdditionalInfo("targetScore", strconv.FormatInt(plan.targetScore, 10)) + return op + } + balanceRangeNoReplacementCounter.Inc() + return nil +} + +// balanceRangeSchedulerPlan is used to record the plan of balance key range scheduler. +type balanceRangeSchedulerPlan struct { + sche.SchedulerCluster + // stores is sorted by score desc + stores []*core.StoreInfo + // sourceMap records the storeID -> score + sourceMap map[uint64]int64 + source *core.StoreInfo + sourceScore int64 + target *core.StoreInfo + targetScore int64 + region *core.RegionInfo + fit *placement.RegionFit + averageScore int64 +} + +type storeInfo struct { + store *core.StoreInfo + score int64 +} + +func (s *balanceRangeScheduler) prepare(cluster sche.SchedulerCluster, opInfluence operator.OpInfluence) (*balanceRangeSchedulerPlan, error) { + krs := core.NewKeyRanges(s.conf.Ranges) + scanRegions, err := cluster.BatchScanRegions(krs) + if err != nil { + return nil, err + } + sources := filter.SelectSourceStores(cluster.GetStores(), s.filters, cluster.GetSchedulerConfig(), nil, nil) + storeInfos := make(map[uint64]*storeInfo, len(sources)) + for _, source := range sources { + storeInfos[source.GetID()] = &storeInfo{store: source} + } + totalScore := int64(0) + for _, region := range scanRegions { + for _, peer := range s.role.getPeers(region) { + storeInfos[peer.GetStoreId()].score += 1 + totalScore += 1 + } + } + + storeList := make([]*storeInfo, 0, len(storeInfos)) + for storeID, store := range storeInfos { + if influence := opInfluence.GetStoreInfluence(storeID); influence != nil { + store.score += s.role.getStoreInfluence(influence) + } + storeList = append(storeList, store) + } + sort.Slice(storeList, func(i, j int) bool { + return storeList[i].score > storeList[j].score + }) + sourceMap := make(map[uint64]int64) + for _, store := range storeList { + sourceMap[store.store.GetID()] = store.score + } + + stores := make([]*core.StoreInfo, 0, len(storeList)) + for _, store := range storeList { + stores = append(stores, store.store) + } + averageScore := totalScore / int64(len(storeList)) + return &balanceRangeSchedulerPlan{ + SchedulerCluster: cluster, + stores: stores, + sourceMap: sourceMap, + source: nil, + target: nil, + region: nil, + averageScore: averageScore, + }, nil +} + +func (p *balanceRangeSchedulerPlan) sourceStoreID() uint64 { + return p.source.GetID() +} + +func (p *balanceRangeSchedulerPlan) targetStoreID() uint64 { + return p.target.GetID() +} + +func (p *balanceRangeSchedulerPlan) score(storeID uint64) int64 { + return p.sourceMap[storeID] +} + +func (p *balanceRangeSchedulerPlan) shouldBalance(scheduler string) bool { + sourceScore := p.score(p.sourceStoreID()) + targetScore := p.score(p.targetStoreID()) + shouldBalance := sourceScore > targetScore + if !shouldBalance && log.GetLevel() <= zap.DebugLevel { + log.Debug("skip balance ", + zap.String("scheduler", scheduler), + zap.Uint64("region-id", p.region.GetID()), + zap.Uint64("source-store", p.sourceStoreID()), + zap.Uint64("target-store", p.targetStoreID()), + zap.Int64("source-score", p.sourceScore), + zap.Int64("target-score", p.targetScore), + zap.Int64("average-region-size", p.averageScore), + ) + } + return shouldBalance +} + +type Role int + +const ( + Leader Role = iota + Follower + Learner + Unknown + RoleLen +) + +func (r Role) String() string { + switch r { + case Leader: + return "leader" + case Follower: + return "voter" + case Learner: + return "learner" + default: + return "unknown" + } +} + +func (r Role) getPeers(region *core.RegionInfo) []*metapb.Peer { + switch r { + case Leader: + return []*metapb.Peer{region.GetLeader()} + case Follower: + followers := region.GetFollowers() + ret := make([]*metapb.Peer, len(followers)) + for _, peer := range followers { + ret = append(ret, peer) + } + return ret + case Learner: + return region.GetLearners() + default: + return nil + } +} + +func (r Role) getStoreInfluence(influence *operator.StoreInfluence) int64 { + switch r { + case Leader: + return influence.LeaderCount + case Follower: + return influence.RegionCount + case Learner: + return influence.RegionCount + default: + return 0 + } +} diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index 51d857ae445..45e456efeb3 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -15,9 +15,11 @@ package schedulers import ( + "net/url" "strconv" "strings" "sync" + "time" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" @@ -545,4 +547,56 @@ func schedulersRegister() { conf.init(sche.GetName(), storage, conf) return sche, nil }) + + // balance key range scheduler + // args: [role, engine, timeout, range1, range2, ...] + RegisterSliceDecoderBuilder(types.BalanceRangeScheduler, func(args []string) ConfigDecoder { + return func(v any) error { + conf, ok := v.(*balanceRangeSchedulerConfig) + if !ok { + return errs.ErrScheduleConfigNotExist.FastGenByArgs() + } + if len(args) < 4 { + return errs.ErrSchedulerConfig.FastGenByArgs("args length must be greater than 3") + } + role, err := url.QueryUnescape(args[0]) + if err != nil { + return errs.ErrQueryUnescape.Wrap(err) + } + engine, err := url.QueryUnescape(args[1]) + if err != nil { + return errs.ErrQueryUnescape.Wrap(err) + } + timeout, err := url.QueryUnescape(args[2]) + if err != nil { + return errs.ErrQueryUnescape.Wrap(err) + } + duration, err := time.ParseDuration(timeout) + if err != nil { + return errs.ErrURLParse.Wrap(err) + } + ranges, err := getKeyRanges(args[3:]) + if err != nil { + return err + } + conf.Ranges = ranges + conf.Engine = engine + conf.Role = role + conf.Timeout = duration + return nil + } + }) + + RegisterScheduler(types.BalanceRangeScheduler, func(opController *operator.Controller, + storage endpoint.ConfigStorage, decoder ConfigDecoder, _ ...func(string) error) (Scheduler, error) { + conf := &balanceRangeSchedulerConfig{ + schedulerConfig: newBaseDefaultSchedulerConfig(), + } + if err := decoder(conf); err != nil { + return nil, err + } + sche := newBalanceRangeScheduler(opController, conf) + conf.init(sche.GetName(), storage, conf) + return sche, nil + }) } diff --git a/pkg/schedule/schedulers/metrics.go b/pkg/schedule/schedulers/metrics.go index bd8a2b4f6ea..8297f3e30c0 100644 --- a/pkg/schedule/schedulers/metrics.go +++ b/pkg/schedule/schedulers/metrics.go @@ -222,6 +222,10 @@ func transferWitnessLeaderCounterWithEvent(event string) prometheus.Counter { return schedulerCounter.WithLabelValues(types.TransferWitnessLeaderScheduler.String(), event) } +func balanceRangeCounterWithEvent(event string) prometheus.Counter { + return schedulerCounter.WithLabelValues(types.BalanceRangeScheduler.String(), event) +} + // WithLabelValues is a heavy operation, define variable to avoid call it every time. var ( balanceLeaderScheduleCounter = balanceLeaderCounterWithEvent("schedule") @@ -329,7 +333,7 @@ var ( shuffleRegionNoSourceStoreCounter = shuffleRegionCounterWithEvent("no-source-store") splitBucketDisableCounter = splitBucketCounterWithEvent("bucket-disable") - splitBuckerSplitLimitCounter = splitBucketCounterWithEvent("split-limit") + splitBucketSplitLimitCounter = splitBucketCounterWithEvent("split-limit") splitBucketScheduleCounter = splitBucketCounterWithEvent("schedule") splitBucketNoRegionCounter = splitBucketCounterWithEvent("no-region") splitBucketRegionTooSmallCounter = splitBucketCounterWithEvent("region-too-small") @@ -342,4 +346,13 @@ var ( transferWitnessLeaderCounter = transferWitnessLeaderCounterWithEvent("schedule") transferWitnessLeaderNewOperatorCounter = transferWitnessLeaderCounterWithEvent("new-operator") transferWitnessLeaderNoTargetStoreCounter = transferWitnessLeaderCounterWithEvent("no-target-store") + + balanceRangeCounter = balanceRangeCounterWithEvent("schedule") + balanceRangeNewOperatorCounter = balanceRangeCounterWithEvent("new-operator") + balanceRangeExpiredCounter = balanceRangeCounterWithEvent("expired") + balanceRangeNoRegionCounter = balanceRangeCounterWithEvent("no-region") + balanceRangeHotCounter = balanceRangeCounterWithEvent("region-hot") + balanceRangeNoLeaderCounter = balanceRangeCounterWithEvent("no-leader") + balanceRangeCreateOpFailCounter = balanceRangeCounterWithEvent("create-operator-fail") + balanceRangeNoReplacementCounter = balanceRangeCounterWithEvent("no-replacement") ) diff --git a/pkg/schedule/schedulers/split_bucket.go b/pkg/schedule/schedulers/split_bucket.go index d6aee65b181..feecad2fb27 100644 --- a/pkg/schedule/schedulers/split_bucket.go +++ b/pkg/schedule/schedulers/split_bucket.go @@ -181,7 +181,7 @@ func (s *splitBucketScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) } allowed := s.BaseScheduler.OpController.OperatorCount(operator.OpSplit) < s.conf.getSplitLimit() if !allowed { - splitBuckerSplitLimitCounter.Inc() + splitBucketSplitLimitCounter.Inc() operator.IncOperatorLimitCounter(s.GetType(), operator.OpSplit) } return allowed diff --git a/pkg/schedule/types/type.go b/pkg/schedule/types/type.go index 7bc27892010..87e89c18948 100644 --- a/pkg/schedule/types/type.go +++ b/pkg/schedule/types/type.go @@ -70,6 +70,8 @@ const ( TransferWitnessLeaderScheduler CheckerSchedulerType = "transfer-witness-leader-scheduler" // LabelScheduler is label scheduler name. LabelScheduler CheckerSchedulerType = "label-scheduler" + // BalanceRangeScheduler is balance key range scheduler name. + BalanceRangeScheduler CheckerSchedulerType = "balance-range-scheduler" ) // TODO: SchedulerTypeCompatibleMap and ConvertOldStrToType should be removed after @@ -97,6 +99,7 @@ var ( SplitBucketScheduler: "split-bucket", TransferWitnessLeaderScheduler: "transfer-witness-leader", LabelScheduler: "label", + BalanceRangeScheduler: "balance-range", } // ConvertOldStrToType exists for compatibility. @@ -120,6 +123,7 @@ var ( "split-bucket": SplitBucketScheduler, "transfer-witness-leader": TransferWitnessLeaderScheduler, "label": LabelScheduler, + "balance-range": BalanceRangeScheduler, } // StringToSchedulerType is a map to convert the scheduler string to the CheckerSchedulerType. @@ -143,6 +147,7 @@ var ( "split-bucket-scheduler": SplitBucketScheduler, "transfer-witness-leader-scheduler": TransferWitnessLeaderScheduler, "label-scheduler": LabelScheduler, + "balance-range-scheduler": BalanceRangeScheduler, } // DefaultSchedulers is the default scheduler types. diff --git a/server/api/scheduler.go b/server/api/scheduler.go index b2d18012c89..d9f8aa6518d 100644 --- a/server/api/scheduler.go +++ b/server/api/scheduler.go @@ -99,6 +99,39 @@ func (h *schedulerHandler) CreateScheduler(w http.ResponseWriter, r *http.Reques } switch tp { + case types.BalanceRangeScheduler: + exist, _ := h.IsSchedulerExisted(name) + if exist { + h.r.JSON(w, http.StatusBadRequest, "The scheduler already exists, pls remove the exist scheduler first.") + return + } + if err := apiutil.CollectStringOption("role", input, collector); err != nil { + h.r.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + if err := apiutil.CollectStringOption("engine", input, collector); err != nil { + h.r.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + defaultTimeout := "1h" + if err := apiutil.CollectStringOption("timeout", input, collector); err != nil { + if errors.ErrorEqual(err, errs.ErrOptionNotExist) { + collector(defaultTimeout) + } else { + h.r.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + } + + if err := apiutil.CollectEscapeStringOption("start_key", input, collector); err != nil { + h.r.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + + if err := apiutil.CollectEscapeStringOption("end_key", input, collector); err != nil { + h.r.JSON(w, http.StatusInternalServerError, err.Error()) + return + } case types.ScatterRangeScheduler: if err := apiutil.CollectEscapeStringOption("start_key", input, collector); err != nil { h.r.JSON(w, http.StatusInternalServerError, err.Error()) diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 2f6d04bbf52..d2382ded70c 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -3209,6 +3209,20 @@ func TestAddScheduler(t *testing.T) { re.NoError(err) re.NoError(controller.AddScheduler(gls)) + _, err = schedulers.CreateScheduler(types.BalanceRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceRangeScheduler, []string{}), controller.RemoveScheduler) + re.Error(err) + + gls, err = schedulers.CreateScheduler(types.BalanceRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceRangeScheduler, []string{"learner", "tiflash", "1h", "100", "200"}), controller.RemoveScheduler) + re.NoError(err) + re.NoError(controller.AddScheduler(gls)) + conf, err = gls.EncodeConfig() + re.NoError(err) + data = make(map[string]any) + re.NoError(json.Unmarshal(conf, &data)) + re.Equal("learner", data["role"]) + re.Equal("tiflash", data["engine"]) + re.Equal(float64(time.Hour.Nanoseconds()), data["timeout"]) + hb, err := schedulers.CreateScheduler(types.BalanceHotRegionScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigJSONDecoder([]byte("{}"))) re.NoError(err) conf, err = hb.EncodeConfig() diff --git a/tools/pd-ctl/pdctl/command/scheduler.go b/tools/pd-ctl/pdctl/command/scheduler.go index 5dc05aff62f..e2bbb09120c 100644 --- a/tools/pd-ctl/pdctl/command/scheduler.go +++ b/tools/pd-ctl/pdctl/command/scheduler.go @@ -162,6 +162,7 @@ func NewAddSchedulerCommand() *cobra.Command { c.AddCommand(NewSlowTrendEvictLeaderSchedulerCommand()) c.AddCommand(NewBalanceWitnessSchedulerCommand()) c.AddCommand(NewTransferWitnessLeaderSchedulerCommand()) + c.AddCommand(NewBalanceRangeSchedulerCommand()) return c } @@ -374,6 +375,18 @@ func NewBalanceWitnessSchedulerCommand() *cobra.Command { return c } +// NewBalanceRangeSchedulerCommand returns a command to add a balance-range-scheduler. +func NewBalanceRangeSchedulerCommand() *cobra.Command { + c := &cobra.Command{ + Use: "balance-range-scheduler [--format=raw|encode|hex] ", + Short: "add a scheduler to balance region for given range", + Run: addSchedulerForBalanceRangeCommandFunc, + Deprecated: "balance-range will be deprecated in the future, please use sql instead", + } + c.Flags().String("format", "hex", "the key format") + return c +} + // NewTransferWitnessLeaderSchedulerCommand returns a command to add a transfer-witness-leader-shceudler. func NewTransferWitnessLeaderSchedulerCommand() *cobra.Command { c := &cobra.Command{ @@ -412,6 +425,32 @@ func addSchedulerForGrantHotRegionCommandFunc(cmd *cobra.Command, args []string) postJSON(cmd, schedulersPrefix, input) } +func addSchedulerForBalanceRangeCommandFunc(cmd *cobra.Command, args []string) { + if len(args) != 4 { + cmd.Println(cmd.UsageString()) + return + } + startKey, err := parseKey(cmd.Flags(), args[2]) + if err != nil { + cmd.Println("Error: ", err) + return + } + endKey, err := parseKey(cmd.Flags(), args[3]) + if err != nil { + cmd.Println("Error: ", err) + return + } + + input := make(map[string]any) + input["name"] = cmd.Name() + input["engine"] = args[0] + input["role"] = args[1] + input["start_key"] = url.QueryEscape(startKey) + input["end_key"] = url.QueryEscape(endKey) + + postJSON(cmd, schedulersPrefix, input) +} + func addSchedulerCommandFunc(cmd *cobra.Command, args []string) { if len(args) != 0 { cmd.Println(cmd.UsageString()) @@ -523,6 +562,7 @@ func NewConfigSchedulerCommand() *cobra.Command { newConfigEvictSlowStoreCommand(), newConfigShuffleHotRegionSchedulerCommand(), newConfigEvictSlowTrendCommand(), + newConfigBalanceRangeCommand(), ) return c } @@ -547,6 +587,26 @@ func newConfigBalanceLeaderCommand() *cobra.Command { return c } +func newConfigBalanceRangeCommand() *cobra.Command { + c := &cobra.Command{ + Use: "balance-range-scheduler", + Short: "balance-range-scheduler config", + Run: listSchedulerConfigCommandFunc, + } + + c.AddCommand(&cobra.Command{ + Use: "show", + Short: "show the config item", + Run: listSchedulerConfigCommandFunc, + }, &cobra.Command{ + Use: "set ", + Short: "set the config item", + Run: func(cmd *cobra.Command, args []string) { postSchedulerConfigCommandFunc(cmd, c.Name(), args) }, + }) + + return c +} + func newSplitBucketCommand() *cobra.Command { c := &cobra.Command{ Use: "split-bucket-scheduler", diff --git a/tools/pd-ctl/tests/scheduler/scheduler_test.go b/tools/pd-ctl/tests/scheduler/scheduler_test.go index f3a81845921..1d011329c42 100644 --- a/tools/pd-ctl/tests/scheduler/scheduler_test.go +++ b/tools/pd-ctl/tests/scheduler/scheduler_test.go @@ -84,7 +84,7 @@ func (suite *schedulerTestSuite) TearDownTest() { return currentSchedulers[i] == scheduler }) { echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", scheduler}, nil) - re.Contains(echo, "Success!") + re.Contains(echo, "Success!", scheduler) } } for _, scheduler := range currentSchedulers { @@ -541,6 +541,27 @@ func (suite *schedulerTestSuite) checkSchedulerConfig(cluster *pdTests.TestClust return !strings.Contains(echo, "evict-leader-scheduler") }) + // test balance key range scheduler + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-range-scheduler"}, nil) + re.NotContains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-range-scheduler", "--format=raw", "tiflash", "learner", "a", "b"}, nil) + re.Contains(echo, "Success!") + conf = make(map[string]any) + testutil.Eventually(re, func() bool { + mightExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-range-scheduler"}, &conf) + return conf["role"] == "learner" && conf["engine"] == "tiflash" + }) + re.Equal(float64(time.Hour.Nanoseconds()), conf["timeout"]) + ranges := conf["ranges"].([]any)[0].(map[string]any) + re.Equal(core.HexRegionKeyStr([]byte("a")), ranges["start-key"]) + re.Equal(core.HexRegionKeyStr([]byte("b")), ranges["end-key"]) + + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-range-scheduler", "--format=raw", "tiflash", "learner", "a", "b"}, nil) + re.Contains(echo, "400") + re.Contains(echo, "scheduler already exists") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-range-scheduler"}, nil) + re.Contains(echo, "Success!") + // test balance leader config conf = make(map[string]any) conf1 := make(map[string]any)