From e693b3e755ba7441f068cd7f09fe22c18eee0e46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=AB=A5=E5=89=91?= <1045931706@qq.com> Date: Tue, 7 Jan 2025 18:24:57 +0800 Subject: [PATCH 01/10] add scheduler config MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 童剑 <1045931706@qq.com> --- pkg/schedule/operator/kind.go | 2 + pkg/schedule/schedulers/balance_key_range.go | 168 ++++++++++++++++++ .../schedulers/balance_key_range_test.go | 1 + pkg/schedule/schedulers/init.go | 25 +++ pkg/schedule/types/type.go | 2 + 5 files changed, 198 insertions(+) create mode 100644 pkg/schedule/schedulers/balance_key_range.go create mode 100644 pkg/schedule/schedulers/balance_key_range_test.go diff --git a/pkg/schedule/operator/kind.go b/pkg/schedule/operator/kind.go index 0187a64c568..0c99a6b7a17 100644 --- a/pkg/schedule/operator/kind.go +++ b/pkg/schedule/operator/kind.go @@ -35,6 +35,8 @@ const ( OpMerge // Initiated by range scheduler. OpRange + // Initiated by key range scheduler. + OpKeyRange // Initiated by replica checker. OpReplica // Include region split. Initiated by rule checker if `kind & OpAdmin == 0`. diff --git a/pkg/schedule/schedulers/balance_key_range.go b/pkg/schedule/schedulers/balance_key_range.go new file mode 100644 index 00000000000..454eb88089a --- /dev/null +++ b/pkg/schedule/schedulers/balance_key_range.go @@ -0,0 +1,168 @@ +package schedulers + +import ( + "net/http" + "net/url" + "time" + + "github.com/gorilla/mux" + "github.com/pingcap/log" + _ "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/core/constant" + "github.com/tikv/pd/pkg/errs" + sche "github.com/tikv/pd/pkg/schedule/core" + "github.com/tikv/pd/pkg/schedule/filter" + "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/plan" + "github.com/tikv/pd/pkg/schedule/types" + "github.com/unrolled/render" +) + +const ( + DefaultTimeout = 1 * time.Hour +) + +type balanceKeyRangeSchedulerHandler struct { + rd *render.Render + config *balanceKeyRangeSchedulerConfig +} + +func newBalanceKeyRangeHandler(conf *balanceKeyRangeSchedulerConfig) http.Handler { + handler := &balanceKeyRangeSchedulerHandler{ + config: conf, + rd: render.New(render.Options{IndentJSON: true}), + } + router := mux.NewRouter() + router.HandleFunc("/config", handler.updateConfig).Methods(http.MethodPost) + router.HandleFunc("/list", handler.listConfig).Methods(http.MethodGet) + return router +} + +func (handler *balanceKeyRangeSchedulerHandler) updateConfig(w http.ResponseWriter, r *http.Request) { + handler.rd.JSON(w, http.StatusBadRequest, "update config is not supported") +} + +func (handler *balanceKeyRangeSchedulerHandler) listConfig(w http.ResponseWriter, _ *http.Request) { + conf := handler.config.clone() + handler.rd.JSON(w, http.StatusOK, conf) +} + +type balanceKeyRangeSchedulerConfig struct { + baseDefaultSchedulerConfig + balanceKeyRangeSchedulerParam +} + +type balanceKeyRangeSchedulerParam struct { + Role string `json:"role"` + Engine string `json:"engine"` + StartKey string `json:"start_key"` + EndKey string `json:"end_key"` + Timeout time.Duration `json:"timeout"` +} + +func (conf *balanceKeyRangeSchedulerConfig) encodeConfig() ([]byte, error) { + conf.RLock() + defer conf.RUnlock() + return EncodeConfig(conf) +} + +func (conf *balanceKeyRangeSchedulerConfig) clone() *balanceKeyRangeSchedulerParam { + conf.RLock() + defer conf.RUnlock() + return &balanceKeyRangeSchedulerParam{ + Role: conf.Role, + Engine: conf.Engine, + StartKey: conf.StartKey, + EndKey: conf.EndKey, + } +} + +func (conf *balanceKeyRangeSchedulerConfig) parseFromArgs(args []string) error { + if len(args) < 4 { + return errs.ErrSchedulerConfig.FastGenByArgs("args length should be greater than 4") + } + newConf := &balanceKeyRangeSchedulerConfig{} + var err error + newConf.StartKey, err = url.QueryUnescape(args[0]) + if err != nil { + return errs.ErrQueryUnescape.Wrap(err) + } + newConf.EndKey, err = url.QueryUnescape(args[1]) + if err != nil { + return errs.ErrQueryUnescape.Wrap(err) + } + + newConf.Role, err = url.QueryUnescape(args[2]) + if err != nil { + return errs.ErrQueryUnescape.Wrap(err) + } + + newConf.Engine, err = url.QueryUnescape(args[3]) + if err != nil { + return errs.ErrQueryUnescape.Wrap(err) + } + if len(args) >= 5 { + timeout, err := url.QueryUnescape(args[4]) + if err != nil { + return errs.ErrQueryUnescape.Wrap(err) + } + conf.Timeout, err = time.ParseDuration(timeout) + if err != nil { + return errs.ErrQueryUnescape.Wrap(err) + } + } else { + conf.Timeout = DefaultTimeout + } + *newConf = *newConf + return nil +} + +func (s *balanceKeyRangeScheduler) EncodeConfig() ([]byte, error) { + return s.conf.encodeConfig() +} + +func (s *balanceKeyRangeScheduler) ReloadConfig() error { + return nil +} + +type balanceKeyRangeScheduler struct { + *BaseScheduler + conf *balanceKeyRangeSchedulerConfig + handler http.Handler + filters []filter.Filter + filterCounter *filter.Counter +} + +func (s *balanceKeyRangeScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { + log.Info("balance key range scheduler is scheduling, need to implement") + return nil, nil +} + +func (s *balanceKeyRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { + allowed := s.OpController.OperatorCount(operator.OpKeyRange) < cluster.GetSchedulerConfig().GetRegionScheduleLimit() + if !allowed { + operator.IncOperatorLimitCounter(s.GetType(), operator.OpKeyRange) + } + return allowed +} + +type BalanceKeyRangeCreateOption func(s *balanceKeyRangeScheduler) + +// newBalanceKeyRangeScheduler creates a scheduler that tends to keep given peer role on +// special store balanced. +func newBalanceKeyRangeScheduler(opController *operator.Controller, conf *balanceKeyRangeSchedulerConfig, options ...BalanceKeyRangeCreateOption) Scheduler { + s := &balanceKeyRangeScheduler{ + BaseScheduler: NewBaseScheduler(opController, types.BalanceLeaderScheduler, conf), + conf: conf, + handler: newBalanceKeyRangeHandler(conf), + } + for _, option := range options { + option(s) + } + s.filters = []filter.Filter{ + &filter.StoreStateFilter{ActionScope: s.GetName(), TransferLeader: true, OperatorLevel: constant.Medium}, + filter.NewSpecialUseFilter(s.GetName()), + } + s.filterCounter = filter.NewCounter(s.GetName()) + return s +} diff --git a/pkg/schedule/schedulers/balance_key_range_test.go b/pkg/schedule/schedulers/balance_key_range_test.go new file mode 100644 index 00000000000..9185832f5db --- /dev/null +++ b/pkg/schedule/schedulers/balance_key_range_test.go @@ -0,0 +1 @@ +package schedulers diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index 51d857ae445..3fc4c0659c4 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -545,4 +545,29 @@ func schedulersRegister() { conf.init(sche.GetName(), storage, conf) return sche, nil }) + + // balance key range scheduler + RegisterSliceDecoderBuilder(types.BalanceKeyRangeScheduler, func(args []string) ConfigDecoder { + return func(v any) error { + conf, ok := v.(*balanceKeyRangeSchedulerConfig) + if !ok { + return errs.ErrScheduleConfigNotExist.FastGenByArgs() + } + return parseBalanceKeyRangeParamArgs(args, conf) + } + }) + + RegisterScheduler(types.BalanceKeyRangeScheduler, func(opController *operator.Controller, + storage endpoint.ConfigStorage, decoder ConfigDecoder, _ ...func(string) error) (Scheduler, error) { + conf := &balanceKeyRangeSchedulerConfig{ + baseDefaultSchedulerConfig: newBaseDefaultSchedulerConfig(), + } + if err := decoder(conf); err != nil { + return nil, err + } + sche := newBalanceKeyRangeScheduler(opController, conf) + conf.init(sche.GetName(), storage, conf) + return sche, nil + }) + } diff --git a/pkg/schedule/types/type.go b/pkg/schedule/types/type.go index 7bc27892010..24983a98520 100644 --- a/pkg/schedule/types/type.go +++ b/pkg/schedule/types/type.go @@ -70,6 +70,8 @@ const ( TransferWitnessLeaderScheduler CheckerSchedulerType = "transfer-witness-leader-scheduler" // LabelScheduler is label scheduler name. LabelScheduler CheckerSchedulerType = "label-scheduler" + // BalanceKeyRangeScheduler is balance key range scheduler name. + BalanceKeyRangeScheduler CheckerSchedulerType = "balance-key-range-scheduler" ) // TODO: SchedulerTypeCompatibleMap and ConvertOldStrToType should be removed after From 23ff7d068653200ecb27f97a483d651721124a47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=AB=A5=E5=89=91?= <1045931706@qq.com> Date: Fri, 10 Jan 2025 09:50:26 +0800 Subject: [PATCH 02/10] add new scheduler for key range MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 童剑 <1045931706@qq.com> --- pkg/schedule/operator/kind.go | 1 + pkg/schedule/operator/operator_test.go | 3 + pkg/schedule/schedulers/balance_key_range.go | 80 ++++++------------- .../schedulers/balance_key_range_test.go | 6 ++ pkg/schedule/schedulers/init.go | 23 +++++- pkg/schedule/schedulers/scheduler.go | 2 + pkg/schedule/types/type.go | 37 +++++---- server/api/scheduler.go | 23 ++++++ server/cluster/cluster_test.go | 7 ++ tools/pd-ctl/pdctl/command/scheduler.go | 58 ++++++++++++++ .../pd-ctl/tests/scheduler/scheduler_test.go | 22 ++++- 11 files changed, 187 insertions(+), 75 deletions(-) diff --git a/pkg/schedule/operator/kind.go b/pkg/schedule/operator/kind.go index 0c99a6b7a17..c6e4614f525 100644 --- a/pkg/schedule/operator/kind.go +++ b/pkg/schedule/operator/kind.go @@ -76,6 +76,7 @@ var nameToFlag = map[string]OpKind{ "replica": OpReplica, "merge": OpMerge, "range": OpRange, + "key-range": OpKeyRange, "witness-leader": OpWitnessLeader, } diff --git a/pkg/schedule/operator/operator_test.go b/pkg/schedule/operator/operator_test.go index 6976b5ca12e..422091dea19 100644 --- a/pkg/schedule/operator/operator_test.go +++ b/pkg/schedule/operator/operator_test.go @@ -476,6 +476,9 @@ func (suite *operatorTestSuite) TestSchedulerKind() { }, { op: NewTestOperator(1, &metapb.RegionEpoch{}, OpLeader), expect: OpLeader, + }, { + op: NewTestOperator(1, &metapb.RegionEpoch{}, OpKeyRange|OpLeader), + expect: OpKeyRange, }, } for _, v := range testData { diff --git a/pkg/schedule/schedulers/balance_key_range.go b/pkg/schedule/schedulers/balance_key_range.go index 454eb88089a..dea00f45e9e 100644 --- a/pkg/schedule/schedulers/balance_key_range.go +++ b/pkg/schedule/schedulers/balance_key_range.go @@ -2,12 +2,13 @@ package schedulers import ( "net/http" - "net/url" "time" "github.com/gorilla/mux" "github.com/pingcap/log" - _ "github.com/tikv/pd/pkg/core" + "github.com/unrolled/render" + + "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/errs" sche "github.com/tikv/pd/pkg/schedule/core" @@ -15,7 +16,6 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" "github.com/tikv/pd/pkg/schedule/types" - "github.com/unrolled/render" ) const ( @@ -44,7 +44,9 @@ func (handler *balanceKeyRangeSchedulerHandler) updateConfig(w http.ResponseWrit func (handler *balanceKeyRangeSchedulerHandler) listConfig(w http.ResponseWriter, _ *http.Request) { conf := handler.config.clone() - handler.rd.JSON(w, http.StatusOK, conf) + if err := handler.rd.JSON(w, http.StatusOK, conf); err != nil { + log.Error("failed to marshal balance key range scheduler config", errs.ZapError(err)) + } } type balanceKeyRangeSchedulerConfig struct { @@ -53,11 +55,10 @@ type balanceKeyRangeSchedulerConfig struct { } type balanceKeyRangeSchedulerParam struct { - Role string `json:"role"` - Engine string `json:"engine"` - StartKey string `json:"start_key"` - EndKey string `json:"end_key"` - Timeout time.Duration `json:"timeout"` + Role string `json:"role"` + Engine string `json:"engine"` + Timeout time.Duration `json:"timeout"` + Ranges []core.KeyRange `json:"ranges"` } func (conf *balanceKeyRangeSchedulerConfig) encodeConfig() ([]byte, error) { @@ -69,54 +70,16 @@ func (conf *balanceKeyRangeSchedulerConfig) encodeConfig() ([]byte, error) { func (conf *balanceKeyRangeSchedulerConfig) clone() *balanceKeyRangeSchedulerParam { conf.RLock() defer conf.RUnlock() + ranges := make([]core.KeyRange, len(conf.Ranges)) + copy(ranges, conf.Ranges) return &balanceKeyRangeSchedulerParam{ - Role: conf.Role, - Engine: conf.Engine, - StartKey: conf.StartKey, - EndKey: conf.EndKey, + Ranges: ranges, + Role: conf.Role, + Engine: conf.Engine, + Timeout: conf.Timeout, } } -func (conf *balanceKeyRangeSchedulerConfig) parseFromArgs(args []string) error { - if len(args) < 4 { - return errs.ErrSchedulerConfig.FastGenByArgs("args length should be greater than 4") - } - newConf := &balanceKeyRangeSchedulerConfig{} - var err error - newConf.StartKey, err = url.QueryUnescape(args[0]) - if err != nil { - return errs.ErrQueryUnescape.Wrap(err) - } - newConf.EndKey, err = url.QueryUnescape(args[1]) - if err != nil { - return errs.ErrQueryUnescape.Wrap(err) - } - - newConf.Role, err = url.QueryUnescape(args[2]) - if err != nil { - return errs.ErrQueryUnescape.Wrap(err) - } - - newConf.Engine, err = url.QueryUnescape(args[3]) - if err != nil { - return errs.ErrQueryUnescape.Wrap(err) - } - if len(args) >= 5 { - timeout, err := url.QueryUnescape(args[4]) - if err != nil { - return errs.ErrQueryUnescape.Wrap(err) - } - conf.Timeout, err = time.ParseDuration(timeout) - if err != nil { - return errs.ErrQueryUnescape.Wrap(err) - } - } else { - conf.Timeout = DefaultTimeout - } - *newConf = *newConf - return nil -} - func (s *balanceKeyRangeScheduler) EncodeConfig() ([]byte, error) { return s.conf.encodeConfig() } @@ -133,8 +96,13 @@ type balanceKeyRangeScheduler struct { filterCounter *filter.Counter } -func (s *balanceKeyRangeScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { - log.Info("balance key range scheduler is scheduling, need to implement") +// ServeHTTP implements the http.Handler interface. +func (s *balanceKeyRangeScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.handler.ServeHTTP(w, r) +} + +func (s *balanceKeyRangeScheduler) Schedule(_cluster sche.SchedulerCluster, _dryRun bool) ([]*operator.Operator, []plan.Plan) { + log.Debug("balance key range scheduler is scheduling, need to implement") return nil, nil } @@ -152,7 +120,7 @@ type BalanceKeyRangeCreateOption func(s *balanceKeyRangeScheduler) // special store balanced. func newBalanceKeyRangeScheduler(opController *operator.Controller, conf *balanceKeyRangeSchedulerConfig, options ...BalanceKeyRangeCreateOption) Scheduler { s := &balanceKeyRangeScheduler{ - BaseScheduler: NewBaseScheduler(opController, types.BalanceLeaderScheduler, conf), + BaseScheduler: NewBaseScheduler(opController, types.BalanceKeyRangeScheduler, conf), conf: conf, handler: newBalanceKeyRangeHandler(conf), } diff --git a/pkg/schedule/schedulers/balance_key_range_test.go b/pkg/schedule/schedulers/balance_key_range_test.go index 9185832f5db..f0a402d108a 100644 --- a/pkg/schedule/schedulers/balance_key_range_test.go +++ b/pkg/schedule/schedulers/balance_key_range_test.go @@ -1 +1,7 @@ package schedulers + +import "testing" + +func TestHttpApi(t *testing.T) { + +} diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index 3fc4c0659c4..f9b296ee6d8 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -15,6 +15,7 @@ package schedulers import ( + "net/url" "strconv" "strings" "sync" @@ -547,13 +548,33 @@ func schedulersRegister() { }) // balance key range scheduler + // args: [role, engine, range1, range2, ...] RegisterSliceDecoderBuilder(types.BalanceKeyRangeScheduler, func(args []string) ConfigDecoder { return func(v any) error { conf, ok := v.(*balanceKeyRangeSchedulerConfig) if !ok { return errs.ErrScheduleConfigNotExist.FastGenByArgs() } - return parseBalanceKeyRangeParamArgs(args, conf) + if len(args) < 4 { + return errs.ErrSchedulerConfig.FastGenByArgs("args length must be greater than 3") + } + role, err := url.QueryUnescape(args[0]) + if err != nil { + return errs.ErrQueryUnescape.Wrap(err) + } + engine, err := url.QueryUnescape(args[1]) + if err != nil { + return errs.ErrQueryUnescape.Wrap(err) + } + ranges, err := getKeyRanges(args[2:]) + if err != nil { + return err + } + conf.Ranges = ranges + conf.Engine = engine + conf.Role = role + conf.Timeout = DefaultTimeout + return nil } }) diff --git a/pkg/schedule/schedulers/scheduler.go b/pkg/schedule/schedulers/scheduler.go index 8976c3a1928..fd6d6710350 100644 --- a/pkg/schedule/schedulers/scheduler.go +++ b/pkg/schedule/schedulers/scheduler.go @@ -157,7 +157,9 @@ func CreateScheduler( removeSchedulerCb ...func(string) error, ) (Scheduler, error) { fn, ok := schedulerMap[typ] + log.Info("create scheduler", zap.Any("typ", typ)) if !ok { + log.Warn("create scheduler not found", zap.Any("typ", typ)) return nil, errs.ErrSchedulerCreateFuncNotRegistered.FastGenByArgs(typ) } diff --git a/pkg/schedule/types/type.go b/pkg/schedule/types/type.go index 24983a98520..baeb536c987 100644 --- a/pkg/schedule/types/type.go +++ b/pkg/schedule/types/type.go @@ -99,29 +99,31 @@ var ( SplitBucketScheduler: "split-bucket", TransferWitnessLeaderScheduler: "transfer-witness-leader", LabelScheduler: "label", + BalanceKeyRangeScheduler: "balance-key-range", } // ConvertOldStrToType exists for compatibility. // // It is used to convert the old scheduler type to `CheckerSchedulerType`. ConvertOldStrToType = map[string]CheckerSchedulerType{ - "balance-leader": BalanceLeaderScheduler, - "balance-region": BalanceRegionScheduler, - "balance-witness": BalanceWitnessScheduler, - "evict-leader": EvictLeaderScheduler, - "evict-slow-store": EvictSlowStoreScheduler, - "evict-slow-trend": EvictSlowTrendScheduler, - "grant-leader": GrantLeaderScheduler, - "grant-hot-region": GrantHotRegionScheduler, - "hot-region": BalanceHotRegionScheduler, - "random-merge": RandomMergeScheduler, - "scatter-range": ScatterRangeScheduler, - "shuffle-hot-region": ShuffleHotRegionScheduler, - "shuffle-leader": ShuffleLeaderScheduler, - "shuffle-region": ShuffleRegionScheduler, - "split-bucket": SplitBucketScheduler, - "transfer-witness-leader": TransferWitnessLeaderScheduler, - "label": LabelScheduler, + "balance-leader": BalanceLeaderScheduler, + "balance-region": BalanceRegionScheduler, + "balance-witness": BalanceWitnessScheduler, + "evict-leader": EvictLeaderScheduler, + "evict-slow-store": EvictSlowStoreScheduler, + "evict-slow-trend": EvictSlowTrendScheduler, + "grant-leader": GrantLeaderScheduler, + "grant-hot-region": GrantHotRegionScheduler, + "hot-region": BalanceHotRegionScheduler, + "random-merge": RandomMergeScheduler, + "scatter-range": ScatterRangeScheduler, + "shuffle-hot-region": ShuffleHotRegionScheduler, + "shuffle-leader": ShuffleLeaderScheduler, + "shuffle-region": ShuffleRegionScheduler, + "split-bucket": SplitBucketScheduler, + "transfer-witness-leader": TransferWitnessLeaderScheduler, + "label": LabelScheduler, + "balance-key-range-scheduler": BalanceKeyRangeScheduler, } // StringToSchedulerType is a map to convert the scheduler string to the CheckerSchedulerType. @@ -145,6 +147,7 @@ var ( "split-bucket-scheduler": SplitBucketScheduler, "transfer-witness-leader-scheduler": TransferWitnessLeaderScheduler, "label-scheduler": LabelScheduler, + "balance-key-range-scheduler": BalanceKeyRangeScheduler, } // DefaultSchedulers is the default scheduler types. diff --git a/server/api/scheduler.go b/server/api/scheduler.go index b2d18012c89..f8b62864c0c 100644 --- a/server/api/scheduler.go +++ b/server/api/scheduler.go @@ -99,6 +99,29 @@ func (h *schedulerHandler) CreateScheduler(w http.ResponseWriter, r *http.Reques } switch tp { + case types.BalanceKeyRangeScheduler: + exist, _ := h.IsSchedulerExisted(name) + if exist { + h.r.JSON(w, http.StatusBadRequest, "The scheduler already exists, pls remove the exist scheduler first.") + return + } + if err := apiutil.CollectStringOption("role", input, collector); err != nil { + h.r.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + if err := apiutil.CollectStringOption("engine", input, collector); err != nil { + h.r.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + if err := apiutil.CollectEscapeStringOption("start_key", input, collector); err != nil { + h.r.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + + if err := apiutil.CollectEscapeStringOption("end_key", input, collector); err != nil { + h.r.JSON(w, http.StatusInternalServerError, err.Error()) + return + } case types.ScatterRangeScheduler: if err := apiutil.CollectEscapeStringOption("start_key", input, collector); err != nil { h.r.JSON(w, http.StatusInternalServerError, err.Error()) diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 2f6d04bbf52..c62fb64fc80 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -3209,6 +3209,13 @@ func TestAddScheduler(t *testing.T) { re.NoError(err) re.NoError(controller.AddScheduler(gls)) + gls, err = schedulers.CreateScheduler(types.BalanceKeyRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceKeyRangeScheduler, []string{}), controller.RemoveScheduler) + re.Error(err) + + gls, err = schedulers.CreateScheduler(types.BalanceKeyRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceKeyRangeScheduler, []string{"leaner", "tiflash", "100", "200"}), controller.RemoveScheduler) + re.NoError(err) + re.NoError(controller.AddScheduler(gls)) + hb, err := schedulers.CreateScheduler(types.BalanceHotRegionScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigJSONDecoder([]byte("{}"))) re.NoError(err) conf, err = hb.EncodeConfig() diff --git a/tools/pd-ctl/pdctl/command/scheduler.go b/tools/pd-ctl/pdctl/command/scheduler.go index 5dc05aff62f..48e8d9ecf2d 100644 --- a/tools/pd-ctl/pdctl/command/scheduler.go +++ b/tools/pd-ctl/pdctl/command/scheduler.go @@ -162,6 +162,7 @@ func NewAddSchedulerCommand() *cobra.Command { c.AddCommand(NewSlowTrendEvictLeaderSchedulerCommand()) c.AddCommand(NewBalanceWitnessSchedulerCommand()) c.AddCommand(NewTransferWitnessLeaderSchedulerCommand()) + c.AddCommand(NewBalanceKeyRangeSchedulerCommand()) return c } @@ -374,6 +375,16 @@ func NewBalanceWitnessSchedulerCommand() *cobra.Command { return c } +func NewBalanceKeyRangeSchedulerCommand() *cobra.Command { + c := &cobra.Command{ + Use: "balance-key-range-scheduler [--format=raw|encode|hex] ", + Short: "add a scheduler to balance region for given key range", + Run: addSchedulerForBalanceKeyRangeCommandFunc, + } + c.Flags().String("format", "hex", "the key format") + return c +} + // NewTransferWitnessLeaderSchedulerCommand returns a command to add a transfer-witness-leader-shceudler. func NewTransferWitnessLeaderSchedulerCommand() *cobra.Command { c := &cobra.Command{ @@ -412,6 +423,32 @@ func addSchedulerForGrantHotRegionCommandFunc(cmd *cobra.Command, args []string) postJSON(cmd, schedulersPrefix, input) } +func addSchedulerForBalanceKeyRangeCommandFunc(cmd *cobra.Command, args []string) { + if len(args) != 4 { + cmd.Println(cmd.UsageString()) + return + } + startKey, err := parseKey(cmd.Flags(), args[2]) + if err != nil { + cmd.Println("Error: ", err) + return + } + endKey, err := parseKey(cmd.Flags(), args[3]) + if err != nil { + cmd.Println("Error: ", err) + return + } + + input := make(map[string]any) + input["name"] = cmd.Name() + input["engine"] = args[0] + input["role"] = args[1] + input["start_key"] = url.QueryEscape(startKey) + input["end_key"] = url.QueryEscape(endKey) + + postJSON(cmd, schedulersPrefix, input) +} + func addSchedulerCommandFunc(cmd *cobra.Command, args []string) { if len(args) != 0 { cmd.Println(cmd.UsageString()) @@ -523,6 +560,7 @@ func NewConfigSchedulerCommand() *cobra.Command { newConfigEvictSlowStoreCommand(), newConfigShuffleHotRegionSchedulerCommand(), newConfigEvictSlowTrendCommand(), + newConfigBalanceKeyRangeCommand(), ) return c } @@ -547,6 +585,26 @@ func newConfigBalanceLeaderCommand() *cobra.Command { return c } +func newConfigBalanceKeyRangeCommand() *cobra.Command { + c := &cobra.Command{ + Use: "balance-key-range-scheduler", + Short: "balance-key-range-scheduler config", + Run: listSchedulerConfigCommandFunc, + } + + c.AddCommand(&cobra.Command{ + Use: "show", + Short: "show the config item", + Run: listSchedulerConfigCommandFunc, + }, &cobra.Command{ + Use: "set ", + Short: "set the config item", + Run: func(cmd *cobra.Command, args []string) { postSchedulerConfigCommandFunc(cmd, c.Name(), args) }, + }) + + return c +} + func newSplitBucketCommand() *cobra.Command { c := &cobra.Command{ Use: "split-bucket-scheduler", diff --git a/tools/pd-ctl/tests/scheduler/scheduler_test.go b/tools/pd-ctl/tests/scheduler/scheduler_test.go index f3a81845921..50237d8303c 100644 --- a/tools/pd-ctl/tests/scheduler/scheduler_test.go +++ b/tools/pd-ctl/tests/scheduler/scheduler_test.go @@ -15,6 +15,7 @@ package scheduler_test import ( + "encoding/base64" "encoding/json" "fmt" "reflect" @@ -84,7 +85,7 @@ func (suite *schedulerTestSuite) TearDownTest() { return currentSchedulers[i] == scheduler }) { echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", scheduler}, nil) - re.Contains(echo, "Success!") + re.Contains(echo, "Success!", scheduler) } } for _, scheduler := range currentSchedulers { @@ -541,6 +542,25 @@ func (suite *schedulerTestSuite) checkSchedulerConfig(cluster *pdTests.TestClust return !strings.Contains(echo, "evict-leader-scheduler") }) + // test balance key range scheduler + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-key-range-scheduler"}, nil) + re.NotContains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-key-range-scheduler", "--format=raw", "tiflash", "learner", "a", "b"}, nil) + re.Contains(echo, "Success!") + conf = make(map[string]any) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-key-range-scheduler", "show"}, &conf) + re.Equal("learner", conf["role"]) + re.Equal("tiflash", conf["engine"]) + ranges := conf["ranges"].([]interface{})[0].(map[string]interface{}) + re.Equal(base64.StdEncoding.EncodeToString([]byte("a")), ranges["start-key"]) + re.Equal(base64.StdEncoding.EncodeToString([]byte("b")), ranges["end-key"]) + + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-key-range-scheduler", "--format=raw", "tiflash", "learner", "a", "b"}, nil) + re.Contains(echo, "400") + re.Contains(echo, "scheduler already exists") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-key-range-scheduler"}, nil) + re.Contains(echo, "Success!") + // test balance leader config conf = make(map[string]any) conf1 := make(map[string]any) From 1e6d628b5afd10bbb2c481016a584a87ea69c6b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=AB=A5=E5=89=91?= <1045931706@qq.com> Date: Fri, 10 Jan 2025 14:32:53 +0800 Subject: [PATCH 03/10] pass ut MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 童剑 <1045931706@qq.com> --- pkg/mcs/scheduling/server/cluster.go | 5 +++- pkg/schedule/types/type.go | 36 ++++++++++++++-------------- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 6f80572673c..e45611b6feb 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -314,7 +314,10 @@ func (c *Cluster) updateScheduler() { ) // Create the newly added schedulers. for _, scheduler := range latestSchedulersConfig { - schedulerType := types.ConvertOldStrToType[scheduler.Type] + schedulerType, ok := types.ConvertOldStrToType[scheduler.Type] + if !ok { + log.Warn("scheduler not found ", zap.String("type", scheduler.Type)) + } s, err := schedulers.CreateScheduler( schedulerType, c.coordinator.GetOperatorController(), diff --git a/pkg/schedule/types/type.go b/pkg/schedule/types/type.go index baeb536c987..c9d06f31e9f 100644 --- a/pkg/schedule/types/type.go +++ b/pkg/schedule/types/type.go @@ -106,24 +106,24 @@ var ( // // It is used to convert the old scheduler type to `CheckerSchedulerType`. ConvertOldStrToType = map[string]CheckerSchedulerType{ - "balance-leader": BalanceLeaderScheduler, - "balance-region": BalanceRegionScheduler, - "balance-witness": BalanceWitnessScheduler, - "evict-leader": EvictLeaderScheduler, - "evict-slow-store": EvictSlowStoreScheduler, - "evict-slow-trend": EvictSlowTrendScheduler, - "grant-leader": GrantLeaderScheduler, - "grant-hot-region": GrantHotRegionScheduler, - "hot-region": BalanceHotRegionScheduler, - "random-merge": RandomMergeScheduler, - "scatter-range": ScatterRangeScheduler, - "shuffle-hot-region": ShuffleHotRegionScheduler, - "shuffle-leader": ShuffleLeaderScheduler, - "shuffle-region": ShuffleRegionScheduler, - "split-bucket": SplitBucketScheduler, - "transfer-witness-leader": TransferWitnessLeaderScheduler, - "label": LabelScheduler, - "balance-key-range-scheduler": BalanceKeyRangeScheduler, + "balance-leader": BalanceLeaderScheduler, + "balance-region": BalanceRegionScheduler, + "balance-witness": BalanceWitnessScheduler, + "evict-leader": EvictLeaderScheduler, + "evict-slow-store": EvictSlowStoreScheduler, + "evict-slow-trend": EvictSlowTrendScheduler, + "grant-leader": GrantLeaderScheduler, + "grant-hot-region": GrantHotRegionScheduler, + "hot-region": BalanceHotRegionScheduler, + "random-merge": RandomMergeScheduler, + "scatter-range": ScatterRangeScheduler, + "shuffle-hot-region": ShuffleHotRegionScheduler, + "shuffle-leader": ShuffleLeaderScheduler, + "shuffle-region": ShuffleRegionScheduler, + "split-bucket": SplitBucketScheduler, + "transfer-witness-leader": TransferWitnessLeaderScheduler, + "label": LabelScheduler, + "balance-key-range": BalanceKeyRangeScheduler, } // StringToSchedulerType is a map to convert the scheduler string to the CheckerSchedulerType. From d1da5b577e21852949548c112e3b62e1bf61fad8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=AB=A5=E5=89=91?= <1045931706@qq.com> Date: Mon, 13 Jan 2025 15:30:14 +0800 Subject: [PATCH 04/10] lint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 童剑 <1045931706@qq.com> --- pkg/schedule/schedulers/balance_key_range.go | 24 ++++++++++++++++--- .../schedulers/balance_key_range_test.go | 7 ------ pkg/schedule/schedulers/init.go | 1 - server/cluster/cluster_test.go | 2 +- tools/pd-ctl/pdctl/command/scheduler.go | 1 + .../pd-ctl/tests/scheduler/scheduler_test.go | 2 +- 6 files changed, 24 insertions(+), 13 deletions(-) delete mode 100644 pkg/schedule/schedulers/balance_key_range_test.go diff --git a/pkg/schedule/schedulers/balance_key_range.go b/pkg/schedule/schedulers/balance_key_range.go index dea00f45e9e..aace3cc057b 100644 --- a/pkg/schedule/schedulers/balance_key_range.go +++ b/pkg/schedule/schedulers/balance_key_range.go @@ -5,9 +5,10 @@ import ( "time" "github.com/gorilla/mux" - "github.com/pingcap/log" "github.com/unrolled/render" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/errs" @@ -19,6 +20,7 @@ import ( ) const ( + // DefaultTimeout is the default balance key range scheduler timeout. DefaultTimeout = 1 * time.Hour ) @@ -38,7 +40,7 @@ func newBalanceKeyRangeHandler(conf *balanceKeyRangeSchedulerConfig) http.Handle return router } -func (handler *balanceKeyRangeSchedulerHandler) updateConfig(w http.ResponseWriter, r *http.Request) { +func (handler *balanceKeyRangeSchedulerHandler) updateConfig(w http.ResponseWriter, _ *http.Request) { handler.rd.JSON(w, http.StatusBadRequest, "update config is not supported") } @@ -80,11 +82,24 @@ func (conf *balanceKeyRangeSchedulerConfig) clone() *balanceKeyRangeSchedulerPar } } +// EncodeConfig serializes the config. func (s *balanceKeyRangeScheduler) EncodeConfig() ([]byte, error) { return s.conf.encodeConfig() } +// ReloadConfig reloads the config. func (s *balanceKeyRangeScheduler) ReloadConfig() error { + s.conf.Lock() + defer s.conf.Unlock() + + newCfg := &balanceKeyRangeSchedulerConfig{} + if err := s.conf.load(newCfg); err != nil { + return err + } + s.conf.Ranges = newCfg.Ranges + s.conf.Timeout = newCfg.Timeout + s.conf.Role = newCfg.Role + s.conf.Engine = newCfg.Engine return nil } @@ -101,11 +116,13 @@ func (s *balanceKeyRangeScheduler) ServeHTTP(w http.ResponseWriter, r *http.Requ s.handler.ServeHTTP(w, r) } -func (s *balanceKeyRangeScheduler) Schedule(_cluster sche.SchedulerCluster, _dryRun bool) ([]*operator.Operator, []plan.Plan) { +// Schedule schedules the balance key range operator. +func (*balanceKeyRangeScheduler) Schedule(_cluster sche.SchedulerCluster, _dryRun bool) ([]*operator.Operator, []plan.Plan) { log.Debug("balance key range scheduler is scheduling, need to implement") return nil, nil } +// IsScheduleAllowed checks if the scheduler is allowed to schedule new operators. func (s *balanceKeyRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { allowed := s.OpController.OperatorCount(operator.OpKeyRange) < cluster.GetSchedulerConfig().GetRegionScheduleLimit() if !allowed { @@ -114,6 +131,7 @@ func (s *balanceKeyRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerClust return allowed } +// BalanceKeyRangeCreateOption is used to create a scheduler with an option. type BalanceKeyRangeCreateOption func(s *balanceKeyRangeScheduler) // newBalanceKeyRangeScheduler creates a scheduler that tends to keep given peer role on diff --git a/pkg/schedule/schedulers/balance_key_range_test.go b/pkg/schedule/schedulers/balance_key_range_test.go deleted file mode 100644 index f0a402d108a..00000000000 --- a/pkg/schedule/schedulers/balance_key_range_test.go +++ /dev/null @@ -1,7 +0,0 @@ -package schedulers - -import "testing" - -func TestHttpApi(t *testing.T) { - -} diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index f9b296ee6d8..4734b162203 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -590,5 +590,4 @@ func schedulersRegister() { conf.init(sche.GetName(), storage, conf) return sche, nil }) - } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index c62fb64fc80..1fdac79f539 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -3209,7 +3209,7 @@ func TestAddScheduler(t *testing.T) { re.NoError(err) re.NoError(controller.AddScheduler(gls)) - gls, err = schedulers.CreateScheduler(types.BalanceKeyRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceKeyRangeScheduler, []string{}), controller.RemoveScheduler) + _, err = schedulers.CreateScheduler(types.BalanceKeyRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceKeyRangeScheduler, []string{}), controller.RemoveScheduler) re.Error(err) gls, err = schedulers.CreateScheduler(types.BalanceKeyRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceKeyRangeScheduler, []string{"leaner", "tiflash", "100", "200"}), controller.RemoveScheduler) diff --git a/tools/pd-ctl/pdctl/command/scheduler.go b/tools/pd-ctl/pdctl/command/scheduler.go index 48e8d9ecf2d..9a5993dd8cf 100644 --- a/tools/pd-ctl/pdctl/command/scheduler.go +++ b/tools/pd-ctl/pdctl/command/scheduler.go @@ -375,6 +375,7 @@ func NewBalanceWitnessSchedulerCommand() *cobra.Command { return c } +// NewBalanceKeyRangeSchedulerCommand returns a command to add a balance-key-range-scheduler. func NewBalanceKeyRangeSchedulerCommand() *cobra.Command { c := &cobra.Command{ Use: "balance-key-range-scheduler [--format=raw|encode|hex] ", diff --git a/tools/pd-ctl/tests/scheduler/scheduler_test.go b/tools/pd-ctl/tests/scheduler/scheduler_test.go index 50237d8303c..e5575677cb6 100644 --- a/tools/pd-ctl/tests/scheduler/scheduler_test.go +++ b/tools/pd-ctl/tests/scheduler/scheduler_test.go @@ -551,7 +551,7 @@ func (suite *schedulerTestSuite) checkSchedulerConfig(cluster *pdTests.TestClust mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-key-range-scheduler", "show"}, &conf) re.Equal("learner", conf["role"]) re.Equal("tiflash", conf["engine"]) - ranges := conf["ranges"].([]interface{})[0].(map[string]interface{}) + ranges := conf["ranges"].([]any)[0].(map[string]any) re.Equal(base64.StdEncoding.EncodeToString([]byte("a")), ranges["start-key"]) re.Equal(base64.StdEncoding.EncodeToString([]byte("b")), ranges["end-key"]) From d0cfc2d352dac6f58c03417cb19a1c0cbf5945db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=AB=A5=E5=89=91?= <1045931706@qq.com> Date: Mon, 13 Jan 2025 17:17:36 +0800 Subject: [PATCH 05/10] pass ut MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 童剑 <1045931706@qq.com> --- pkg/mcs/scheduling/server/cluster.go | 3 ++- pkg/schedule/operator/kind.go | 1 + pkg/schedule/schedulers/balance_key_range.go | 9 +++------ pkg/schedule/schedulers/init.go | 17 +++++++++++++---- pkg/schedule/schedulers/scheduler.go | 2 -- server/api/scheduler.go | 10 ++++++++++ server/cluster/cluster_test.go | 9 ++++++++- tools/pd-ctl/tests/scheduler/scheduler_test.go | 8 +++++--- 8 files changed, 42 insertions(+), 17 deletions(-) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index e45611b6feb..f0b87e82c06 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -316,7 +316,8 @@ func (c *Cluster) updateScheduler() { for _, scheduler := range latestSchedulersConfig { schedulerType, ok := types.ConvertOldStrToType[scheduler.Type] if !ok { - log.Warn("scheduler not found ", zap.String("type", scheduler.Type)) + log.Error("scheduler not found ", zap.String("type", scheduler.Type)) + continue } s, err := schedulers.CreateScheduler( schedulerType, diff --git a/pkg/schedule/operator/kind.go b/pkg/schedule/operator/kind.go index c6e4614f525..0a7ccb34245 100644 --- a/pkg/schedule/operator/kind.go +++ b/pkg/schedule/operator/kind.go @@ -62,6 +62,7 @@ var flagToName = map[OpKind]string{ OpHotRegion: "hot-region", OpReplica: "replica", OpMerge: "merge", + OpKeyRange: "key-range", OpRange: "range", OpWitness: "witness", OpWitnessLeader: "witness-leader", diff --git a/pkg/schedule/schedulers/balance_key_range.go b/pkg/schedule/schedulers/balance_key_range.go index aace3cc057b..c21c62af71f 100644 --- a/pkg/schedule/schedulers/balance_key_range.go +++ b/pkg/schedule/schedulers/balance_key_range.go @@ -17,11 +17,7 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" "github.com/tikv/pd/pkg/schedule/types" -) - -const ( - // DefaultTimeout is the default balance key range scheduler timeout. - DefaultTimeout = 1 * time.Hour + "github.com/tikv/pd/pkg/utils/syncutil" ) type balanceKeyRangeSchedulerHandler struct { @@ -52,7 +48,8 @@ func (handler *balanceKeyRangeSchedulerHandler) listConfig(w http.ResponseWriter } type balanceKeyRangeSchedulerConfig struct { - baseDefaultSchedulerConfig + syncutil.RWMutex + schedulerConfig balanceKeyRangeSchedulerParam } diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index 4734b162203..37d17ddd9ae 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -19,6 +19,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" @@ -548,7 +549,7 @@ func schedulersRegister() { }) // balance key range scheduler - // args: [role, engine, range1, range2, ...] + // args: [role, engine, timeout, range1, range2, ...] RegisterSliceDecoderBuilder(types.BalanceKeyRangeScheduler, func(args []string) ConfigDecoder { return func(v any) error { conf, ok := v.(*balanceKeyRangeSchedulerConfig) @@ -566,14 +567,22 @@ func schedulersRegister() { if err != nil { return errs.ErrQueryUnescape.Wrap(err) } - ranges, err := getKeyRanges(args[2:]) + timeout, err := url.QueryUnescape(args[2]) + if err != nil { + return errs.ErrQueryUnescape.Wrap(err) + } + duration, err := time.ParseDuration(timeout) + if err != nil { + return errs.ErrURLParse.Wrap(err) + } + ranges, err := getKeyRanges(args[3:]) if err != nil { return err } conf.Ranges = ranges conf.Engine = engine conf.Role = role - conf.Timeout = DefaultTimeout + conf.Timeout = duration return nil } }) @@ -581,7 +590,7 @@ func schedulersRegister() { RegisterScheduler(types.BalanceKeyRangeScheduler, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, _ ...func(string) error) (Scheduler, error) { conf := &balanceKeyRangeSchedulerConfig{ - baseDefaultSchedulerConfig: newBaseDefaultSchedulerConfig(), + schedulerConfig: newBaseDefaultSchedulerConfig(), } if err := decoder(conf); err != nil { return nil, err diff --git a/pkg/schedule/schedulers/scheduler.go b/pkg/schedule/schedulers/scheduler.go index fd6d6710350..8976c3a1928 100644 --- a/pkg/schedule/schedulers/scheduler.go +++ b/pkg/schedule/schedulers/scheduler.go @@ -157,9 +157,7 @@ func CreateScheduler( removeSchedulerCb ...func(string) error, ) (Scheduler, error) { fn, ok := schedulerMap[typ] - log.Info("create scheduler", zap.Any("typ", typ)) if !ok { - log.Warn("create scheduler not found", zap.Any("typ", typ)) return nil, errs.ErrSchedulerCreateFuncNotRegistered.FastGenByArgs(typ) } diff --git a/server/api/scheduler.go b/server/api/scheduler.go index f8b62864c0c..e50e563e5b8 100644 --- a/server/api/scheduler.go +++ b/server/api/scheduler.go @@ -113,6 +113,16 @@ func (h *schedulerHandler) CreateScheduler(w http.ResponseWriter, r *http.Reques h.r.JSON(w, http.StatusInternalServerError, err.Error()) return } + defaultTimeout := "1h" + if err := apiutil.CollectStringOption("timeout", input, collector); err != nil { + if errors.ErrorEqual(err, errs.ErrOptionNotExist) { + collector(defaultTimeout) + } else { + h.r.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + } + if err := apiutil.CollectEscapeStringOption("start_key", input, collector); err != nil { h.r.JSON(w, http.StatusInternalServerError, err.Error()) return diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 1fdac79f539..ca78b4cfdd7 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -3212,9 +3212,16 @@ func TestAddScheduler(t *testing.T) { _, err = schedulers.CreateScheduler(types.BalanceKeyRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceKeyRangeScheduler, []string{}), controller.RemoveScheduler) re.Error(err) - gls, err = schedulers.CreateScheduler(types.BalanceKeyRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceKeyRangeScheduler, []string{"leaner", "tiflash", "100", "200"}), controller.RemoveScheduler) + gls, err = schedulers.CreateScheduler(types.BalanceKeyRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceKeyRangeScheduler, []string{"learner", "tiflash", "1h", "100", "200"}), controller.RemoveScheduler) re.NoError(err) re.NoError(controller.AddScheduler(gls)) + conf, err = gls.EncodeConfig() + re.NoError(err) + data = make(map[string]any) + re.NoError(json.Unmarshal(conf, &data)) + re.Equal("learner", data["role"]) + re.Equal("tiflash", data["engine"]) + re.Equal(float64(time.Hour.Nanoseconds()), data["timeout"]) hb, err := schedulers.CreateScheduler(types.BalanceHotRegionScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigJSONDecoder([]byte("{}"))) re.NoError(err) diff --git a/tools/pd-ctl/tests/scheduler/scheduler_test.go b/tools/pd-ctl/tests/scheduler/scheduler_test.go index e5575677cb6..38338bc2494 100644 --- a/tools/pd-ctl/tests/scheduler/scheduler_test.go +++ b/tools/pd-ctl/tests/scheduler/scheduler_test.go @@ -548,9 +548,11 @@ func (suite *schedulerTestSuite) checkSchedulerConfig(cluster *pdTests.TestClust echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-key-range-scheduler", "--format=raw", "tiflash", "learner", "a", "b"}, nil) re.Contains(echo, "Success!") conf = make(map[string]any) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-key-range-scheduler", "show"}, &conf) - re.Equal("learner", conf["role"]) - re.Equal("tiflash", conf["engine"]) + testutil.Eventually(re, func() bool { + mightExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-key-range-scheduler"}, &conf) + return conf["role"] == "learner" && conf["engine"] == "tiflash" + }) + re.Equal(float64(time.Hour.Nanoseconds()), conf["timeout"]) ranges := conf["ranges"].([]any)[0].(map[string]any) re.Equal(base64.StdEncoding.EncodeToString([]byte("a")), ranges["start-key"]) re.Equal(base64.StdEncoding.EncodeToString([]byte("b")), ranges["end-key"]) From fb723a0c5d44b35a300f13e60541596497239fcc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=AB=A5=E5=89=91?= <1045931706@qq.com> Date: Wed, 15 Jan 2025 17:26:16 +0800 Subject: [PATCH 06/10] draft MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 童剑 <1045931706@qq.com> --- pkg/core/basic_cluster.go | 11 ++ pkg/core/constant/kind.go | 5 +- pkg/schedule/filter/filters.go | 2 + pkg/schedule/schedulers/balance_key_range.go | 119 +++++++++++++++++-- pkg/schedule/schedulers/metrics.go | 10 +- pkg/schedule/schedulers/split_bucket.go | 2 +- 6 files changed, 138 insertions(+), 11 deletions(-) diff --git a/pkg/core/basic_cluster.go b/pkg/core/basic_cluster.go index f7c3c5e93b1..1f4b2c5f4e1 100644 --- a/pkg/core/basic_cluster.go +++ b/pkg/core/basic_cluster.go @@ -169,6 +169,17 @@ type KeyRanges struct { krs []*KeyRange } +// NewKeyRanges creates a KeyRanges. +func NewKeyRanges(ranges []KeyRange) *KeyRanges { + krs := make([]*KeyRange, 0, len(ranges)) + for _, kr := range ranges { + krs = append(krs, &kr) + } + return &KeyRanges{ + krs, + } +} + // NewKeyRangesWithSize creates a KeyRanges with the hint size. func NewKeyRangesWithSize(size int) *KeyRanges { return &KeyRanges{ diff --git a/pkg/core/constant/kind.go b/pkg/core/constant/kind.go index 39c256c4f5d..933d3463401 100644 --- a/pkg/core/constant/kind.go +++ b/pkg/core/constant/kind.go @@ -66,7 +66,6 @@ const ( RegionKind // WitnessKind indicates the witness kind resource WitnessKind - // ResourceKindLen represents the ResourceKind count ResourceKindLen ) @@ -79,6 +78,10 @@ func (k ResourceKind) String() string { return "region" case WitnessKind: return "witness" + case LearnerKind: + return `learner` + case unKnownKind: + return "unknown" default: return "unknown" } diff --git a/pkg/schedule/filter/filters.go b/pkg/schedule/filter/filters.go index efb27c3ec6d..6363c903b35 100644 --- a/pkg/schedule/filter/filters.go +++ b/pkg/schedule/filter/filters.go @@ -932,6 +932,8 @@ var ( allSpecialEngines = []string{core.EngineTiFlash} // NotSpecialEngines is used to filter the special engine. NotSpecialEngines = placement.LabelConstraint{Key: core.EngineKey, Op: placement.NotIn, Values: allSpecialEngines} + // TiFlashEngineConstraint is used to filter the TiFlash engine. + TiFlashEngineConstraint = placement.LabelConstraint{Key: core.EngineKey, Op: placement.In, Values: allSpecialEngines} ) type isolationFilter struct { diff --git a/pkg/schedule/schedulers/balance_key_range.go b/pkg/schedule/schedulers/balance_key_range.go index c21c62af71f..db82bca37ef 100644 --- a/pkg/schedule/schedulers/balance_key_range.go +++ b/pkg/schedule/schedulers/balance_key_range.go @@ -1,6 +1,7 @@ package schedulers import ( + "github.com/pingcap/kvproto/pkg/metapb" "net/http" "time" @@ -20,6 +21,8 @@ import ( "github.com/tikv/pd/pkg/utils/syncutil" ) +const balanceKeyRangeName = "balance-key-ranges" + type balanceKeyRangeSchedulerHandler struct { rd *render.Render config *balanceKeyRangeSchedulerConfig @@ -104,6 +107,8 @@ type balanceKeyRangeScheduler struct { *BaseScheduler conf *balanceKeyRangeSchedulerConfig handler http.Handler + start time.Time + role Role filters []filter.Filter filterCounter *filter.Counter } @@ -113,18 +118,16 @@ func (s *balanceKeyRangeScheduler) ServeHTTP(w http.ResponseWriter, r *http.Requ s.handler.ServeHTTP(w, r) } -// Schedule schedules the balance key range operator. -func (*balanceKeyRangeScheduler) Schedule(_cluster sche.SchedulerCluster, _dryRun bool) ([]*operator.Operator, []plan.Plan) { - log.Debug("balance key range scheduler is scheduling, need to implement") - return nil, nil -} - // IsScheduleAllowed checks if the scheduler is allowed to schedule new operators. func (s *balanceKeyRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { allowed := s.OpController.OperatorCount(operator.OpKeyRange) < cluster.GetSchedulerConfig().GetRegionScheduleLimit() if !allowed { operator.IncOperatorLimitCounter(s.GetType(), operator.OpKeyRange) } + if time.Now().Sub(s.start) > s.conf.Timeout { + allowed = false + balanceExpiredCounter.Inc() + } return allowed } @@ -138,14 +141,114 @@ func newBalanceKeyRangeScheduler(opController *operator.Controller, conf *balanc BaseScheduler: NewBaseScheduler(opController, types.BalanceKeyRangeScheduler, conf), conf: conf, handler: newBalanceKeyRangeHandler(conf), + start: time.Now(), + role: NewRole(conf.Role), } for _, option := range options { option(s) } + f := filter.NotSpecialEngines + if conf.Engine == core.EngineTiFlash { + f = filter.TiFlashEngineConstraint + } s.filters = []filter.Filter{ - &filter.StoreStateFilter{ActionScope: s.GetName(), TransferLeader: true, OperatorLevel: constant.Medium}, - filter.NewSpecialUseFilter(s.GetName()), + filter.NewEngineFilter(balanceKeyRangeName, f), } + s.filterCounter = filter.NewCounter(s.GetName()) return s } + +// Schedule schedules the balance key range operator. +func (s *balanceKeyRangeScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { + balanceKeyRangeCounter.Inc() + plan,err:=s.prepare(cluster) + if err != nil { + log.Error("failed to prepare balance key range scheduler", errs.ZapError(err)) + return nil,nil + + } +} + +// BalanceKeyRangeSchedulerPlan is used to record the plan of balance key range scheduler. +type BalanceKeyRangeSchedulerPlan struct { + source []*core.StoreInfo + // store_id -> score + scores map[uint64]uint64 + // store_id -> peer + regions map[uint64]*metapb.Peer +} + +func (s *balanceKeyRangeScheduler) prepare(cluster sche.SchedulerCluster)(*BalanceKeyRangeSchedulerPlan,error) { + krs := core.NewKeyRanges(s.conf.Ranges) + scanRegions, err := cluster.BatchScanRegions(krs) + if err != nil { + return nil,err + } + stores := cluster.GetStores() + sources := filter.SelectSourceStores(stores, s.filters, cluster.GetSchedulerConfig(), nil, nil) + scores := make(map[uint64]uint64, len(sources)) + regions:=make(map[uint64]*metapb.Peer,len(scanRegions)) + for _, region := range scanRegions { + for _, peer := range s.role.getPeers(region) { + scores[peer.GetStoreId()] += 1 + regions[peer.GetStoreId()] = peer + } + } + return &BalanceKeyRangeSchedulerPlan{ + source: sources, + scores: scores, + regions: regions, + },nil +} + + + +type Role int + +const ( + Leader Role = iota + Voter + Learner + Unknown + RoleLen +) + +func (r Role) String() string { + switch r { + case Leader: + return "leader" + case Voter: + return "voter" + case Learner: + return "learner" + default: + return "unknown" + } +} + +func NewRole(role string) Role { + switch role { + case "leader": + return Leader + case "voter": + return Voter + case "learner": + return Learner + default: + return Unknown + } +} + +func (r Role) getPeers(region *core.RegionInfo) []*metapb.Peer {{ + switch r { + case Leader: + return []*metapb.Peer{region.GetLeader()} + case Voter: + return region.GetVoters() + case Learner: + return region.GetLearners() + default: + return nil + } +} diff --git a/pkg/schedule/schedulers/metrics.go b/pkg/schedule/schedulers/metrics.go index bd8a2b4f6ea..4a170d9e146 100644 --- a/pkg/schedule/schedulers/metrics.go +++ b/pkg/schedule/schedulers/metrics.go @@ -222,6 +222,10 @@ func transferWitnessLeaderCounterWithEvent(event string) prometheus.Counter { return schedulerCounter.WithLabelValues(types.TransferWitnessLeaderScheduler.String(), event) } +func balanceKeyRangeCounterWithEvent(event string) prometheus.Counter { + return schedulerCounter.WithLabelValues(types.BalanceKeyRangeScheduler.String(), event) +} + // WithLabelValues is a heavy operation, define variable to avoid call it every time. var ( balanceLeaderScheduleCounter = balanceLeaderCounterWithEvent("schedule") @@ -329,7 +333,7 @@ var ( shuffleRegionNoSourceStoreCounter = shuffleRegionCounterWithEvent("no-source-store") splitBucketDisableCounter = splitBucketCounterWithEvent("bucket-disable") - splitBuckerSplitLimitCounter = splitBucketCounterWithEvent("split-limit") + splitBucketSplitLimitCounter = splitBucketCounterWithEvent("split-limit") splitBucketScheduleCounter = splitBucketCounterWithEvent("schedule") splitBucketNoRegionCounter = splitBucketCounterWithEvent("no-region") splitBucketRegionTooSmallCounter = splitBucketCounterWithEvent("region-too-small") @@ -342,4 +346,8 @@ var ( transferWitnessLeaderCounter = transferWitnessLeaderCounterWithEvent("schedule") transferWitnessLeaderNewOperatorCounter = transferWitnessLeaderCounterWithEvent("new-operator") transferWitnessLeaderNoTargetStoreCounter = transferWitnessLeaderCounterWithEvent("no-target-store") + + balanceKeyRangeCounter = balanceKeyRangeCounterWithEvent("schedule") + balanceKeyRangeNewOperatorCounter = balanceKeyRangeCounterWithEvent("new-operator") + balanceExpiredCounter = balanceKeyRangeCounterWithEvent("expired") ) diff --git a/pkg/schedule/schedulers/split_bucket.go b/pkg/schedule/schedulers/split_bucket.go index d6aee65b181..feecad2fb27 100644 --- a/pkg/schedule/schedulers/split_bucket.go +++ b/pkg/schedule/schedulers/split_bucket.go @@ -181,7 +181,7 @@ func (s *splitBucketScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) } allowed := s.BaseScheduler.OpController.OperatorCount(operator.OpSplit) < s.conf.getSplitLimit() if !allowed { - splitBuckerSplitLimitCounter.Inc() + splitBucketSplitLimitCounter.Inc() operator.IncOperatorLimitCounter(s.GetType(), operator.OpSplit) } return allowed From d86148f6160442b7e3bab0e1b3c5f11ee383ff0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=AB=A5=E5=89=91?= <1045931706@qq.com> Date: Wed, 15 Jan 2025 18:17:13 +0800 Subject: [PATCH 07/10] rename balance-key-range to balance-range MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 童剑 <1045931706@qq.com> --- pkg/mcs/scheduling/server/cluster.go | 2 +- pkg/schedule/operator/kind.go | 4 ---- pkg/schedule/operator/operator_test.go | 3 --- pkg/schedule/schedulers/balance_key_range.go | 6 +++--- pkg/schedule/schedulers/init.go | 4 ++-- pkg/schedule/types/type.go | 10 +++++----- server/api/scheduler.go | 2 +- server/cluster/cluster_test.go | 4 ++-- tools/pd-ctl/pdctl/command/scheduler.go | 18 +++++++++--------- tools/pd-ctl/tests/scheduler/scheduler_test.go | 10 +++++----- 10 files changed, 28 insertions(+), 35 deletions(-) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index f0b87e82c06..9ab5d329398 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -316,7 +316,7 @@ func (c *Cluster) updateScheduler() { for _, scheduler := range latestSchedulersConfig { schedulerType, ok := types.ConvertOldStrToType[scheduler.Type] if !ok { - log.Error("scheduler not found ", zap.String("type", scheduler.Type)) + log.Error("scheduler not found", zap.String("type", scheduler.Type)) continue } s, err := schedulers.CreateScheduler( diff --git a/pkg/schedule/operator/kind.go b/pkg/schedule/operator/kind.go index 0a7ccb34245..0187a64c568 100644 --- a/pkg/schedule/operator/kind.go +++ b/pkg/schedule/operator/kind.go @@ -35,8 +35,6 @@ const ( OpMerge // Initiated by range scheduler. OpRange - // Initiated by key range scheduler. - OpKeyRange // Initiated by replica checker. OpReplica // Include region split. Initiated by rule checker if `kind & OpAdmin == 0`. @@ -62,7 +60,6 @@ var flagToName = map[OpKind]string{ OpHotRegion: "hot-region", OpReplica: "replica", OpMerge: "merge", - OpKeyRange: "key-range", OpRange: "range", OpWitness: "witness", OpWitnessLeader: "witness-leader", @@ -77,7 +74,6 @@ var nameToFlag = map[string]OpKind{ "replica": OpReplica, "merge": OpMerge, "range": OpRange, - "key-range": OpKeyRange, "witness-leader": OpWitnessLeader, } diff --git a/pkg/schedule/operator/operator_test.go b/pkg/schedule/operator/operator_test.go index 422091dea19..6976b5ca12e 100644 --- a/pkg/schedule/operator/operator_test.go +++ b/pkg/schedule/operator/operator_test.go @@ -476,9 +476,6 @@ func (suite *operatorTestSuite) TestSchedulerKind() { }, { op: NewTestOperator(1, &metapb.RegionEpoch{}, OpLeader), expect: OpLeader, - }, { - op: NewTestOperator(1, &metapb.RegionEpoch{}, OpKeyRange|OpLeader), - expect: OpKeyRange, }, } for _, v := range testData { diff --git a/pkg/schedule/schedulers/balance_key_range.go b/pkg/schedule/schedulers/balance_key_range.go index c21c62af71f..71e0fab29d9 100644 --- a/pkg/schedule/schedulers/balance_key_range.go +++ b/pkg/schedule/schedulers/balance_key_range.go @@ -121,9 +121,9 @@ func (*balanceKeyRangeScheduler) Schedule(_cluster sche.SchedulerCluster, _dryRu // IsScheduleAllowed checks if the scheduler is allowed to schedule new operators. func (s *balanceKeyRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { - allowed := s.OpController.OperatorCount(operator.OpKeyRange) < cluster.GetSchedulerConfig().GetRegionScheduleLimit() + allowed := s.OpController.OperatorCount(operator.OpRange) < cluster.GetSchedulerConfig().GetRegionScheduleLimit() if !allowed { - operator.IncOperatorLimitCounter(s.GetType(), operator.OpKeyRange) + operator.IncOperatorLimitCounter(s.GetType(), operator.OpRange) } return allowed } @@ -135,7 +135,7 @@ type BalanceKeyRangeCreateOption func(s *balanceKeyRangeScheduler) // special store balanced. func newBalanceKeyRangeScheduler(opController *operator.Controller, conf *balanceKeyRangeSchedulerConfig, options ...BalanceKeyRangeCreateOption) Scheduler { s := &balanceKeyRangeScheduler{ - BaseScheduler: NewBaseScheduler(opController, types.BalanceKeyRangeScheduler, conf), + BaseScheduler: NewBaseScheduler(opController, types.BalanceRangeScheduler, conf), conf: conf, handler: newBalanceKeyRangeHandler(conf), } diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index 37d17ddd9ae..f86e1596f27 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -550,7 +550,7 @@ func schedulersRegister() { // balance key range scheduler // args: [role, engine, timeout, range1, range2, ...] - RegisterSliceDecoderBuilder(types.BalanceKeyRangeScheduler, func(args []string) ConfigDecoder { + RegisterSliceDecoderBuilder(types.BalanceRangeScheduler, func(args []string) ConfigDecoder { return func(v any) error { conf, ok := v.(*balanceKeyRangeSchedulerConfig) if !ok { @@ -587,7 +587,7 @@ func schedulersRegister() { } }) - RegisterScheduler(types.BalanceKeyRangeScheduler, func(opController *operator.Controller, + RegisterScheduler(types.BalanceRangeScheduler, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, _ ...func(string) error) (Scheduler, error) { conf := &balanceKeyRangeSchedulerConfig{ schedulerConfig: newBaseDefaultSchedulerConfig(), diff --git a/pkg/schedule/types/type.go b/pkg/schedule/types/type.go index c9d06f31e9f..87e89c18948 100644 --- a/pkg/schedule/types/type.go +++ b/pkg/schedule/types/type.go @@ -70,8 +70,8 @@ const ( TransferWitnessLeaderScheduler CheckerSchedulerType = "transfer-witness-leader-scheduler" // LabelScheduler is label scheduler name. LabelScheduler CheckerSchedulerType = "label-scheduler" - // BalanceKeyRangeScheduler is balance key range scheduler name. - BalanceKeyRangeScheduler CheckerSchedulerType = "balance-key-range-scheduler" + // BalanceRangeScheduler is balance key range scheduler name. + BalanceRangeScheduler CheckerSchedulerType = "balance-range-scheduler" ) // TODO: SchedulerTypeCompatibleMap and ConvertOldStrToType should be removed after @@ -99,7 +99,7 @@ var ( SplitBucketScheduler: "split-bucket", TransferWitnessLeaderScheduler: "transfer-witness-leader", LabelScheduler: "label", - BalanceKeyRangeScheduler: "balance-key-range", + BalanceRangeScheduler: "balance-range", } // ConvertOldStrToType exists for compatibility. @@ -123,7 +123,7 @@ var ( "split-bucket": SplitBucketScheduler, "transfer-witness-leader": TransferWitnessLeaderScheduler, "label": LabelScheduler, - "balance-key-range": BalanceKeyRangeScheduler, + "balance-range": BalanceRangeScheduler, } // StringToSchedulerType is a map to convert the scheduler string to the CheckerSchedulerType. @@ -147,7 +147,7 @@ var ( "split-bucket-scheduler": SplitBucketScheduler, "transfer-witness-leader-scheduler": TransferWitnessLeaderScheduler, "label-scheduler": LabelScheduler, - "balance-key-range-scheduler": BalanceKeyRangeScheduler, + "balance-range-scheduler": BalanceRangeScheduler, } // DefaultSchedulers is the default scheduler types. diff --git a/server/api/scheduler.go b/server/api/scheduler.go index e50e563e5b8..d9f8aa6518d 100644 --- a/server/api/scheduler.go +++ b/server/api/scheduler.go @@ -99,7 +99,7 @@ func (h *schedulerHandler) CreateScheduler(w http.ResponseWriter, r *http.Reques } switch tp { - case types.BalanceKeyRangeScheduler: + case types.BalanceRangeScheduler: exist, _ := h.IsSchedulerExisted(name) if exist { h.r.JSON(w, http.StatusBadRequest, "The scheduler already exists, pls remove the exist scheduler first.") diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index ca78b4cfdd7..d2382ded70c 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -3209,10 +3209,10 @@ func TestAddScheduler(t *testing.T) { re.NoError(err) re.NoError(controller.AddScheduler(gls)) - _, err = schedulers.CreateScheduler(types.BalanceKeyRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceKeyRangeScheduler, []string{}), controller.RemoveScheduler) + _, err = schedulers.CreateScheduler(types.BalanceRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceRangeScheduler, []string{}), controller.RemoveScheduler) re.Error(err) - gls, err = schedulers.CreateScheduler(types.BalanceKeyRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceKeyRangeScheduler, []string{"learner", "tiflash", "1h", "100", "200"}), controller.RemoveScheduler) + gls, err = schedulers.CreateScheduler(types.BalanceRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceRangeScheduler, []string{"learner", "tiflash", "1h", "100", "200"}), controller.RemoveScheduler) re.NoError(err) re.NoError(controller.AddScheduler(gls)) conf, err = gls.EncodeConfig() diff --git a/tools/pd-ctl/pdctl/command/scheduler.go b/tools/pd-ctl/pdctl/command/scheduler.go index 9a5993dd8cf..50525d885fd 100644 --- a/tools/pd-ctl/pdctl/command/scheduler.go +++ b/tools/pd-ctl/pdctl/command/scheduler.go @@ -162,7 +162,7 @@ func NewAddSchedulerCommand() *cobra.Command { c.AddCommand(NewSlowTrendEvictLeaderSchedulerCommand()) c.AddCommand(NewBalanceWitnessSchedulerCommand()) c.AddCommand(NewTransferWitnessLeaderSchedulerCommand()) - c.AddCommand(NewBalanceKeyRangeSchedulerCommand()) + c.AddCommand(NewBalanceRangeSchedulerCommand()) return c } @@ -375,12 +375,12 @@ func NewBalanceWitnessSchedulerCommand() *cobra.Command { return c } -// NewBalanceKeyRangeSchedulerCommand returns a command to add a balance-key-range-scheduler. -func NewBalanceKeyRangeSchedulerCommand() *cobra.Command { +// NewBalanceRangeSchedulerCommand returns a command to add a balance-key-range-scheduler. +func NewBalanceRangeSchedulerCommand() *cobra.Command { c := &cobra.Command{ - Use: "balance-key-range-scheduler [--format=raw|encode|hex] ", - Short: "add a scheduler to balance region for given key range", - Run: addSchedulerForBalanceKeyRangeCommandFunc, + Use: "balance-range-scheduler [--format=raw|encode|hex] ", + Short: "add a scheduler to balance region for given range", + Run: addSchedulerForBalanceRangeCommandFunc, } c.Flags().String("format", "hex", "the key format") return c @@ -424,7 +424,7 @@ func addSchedulerForGrantHotRegionCommandFunc(cmd *cobra.Command, args []string) postJSON(cmd, schedulersPrefix, input) } -func addSchedulerForBalanceKeyRangeCommandFunc(cmd *cobra.Command, args []string) { +func addSchedulerForBalanceRangeCommandFunc(cmd *cobra.Command, args []string) { if len(args) != 4 { cmd.Println(cmd.UsageString()) return @@ -588,8 +588,8 @@ func newConfigBalanceLeaderCommand() *cobra.Command { func newConfigBalanceKeyRangeCommand() *cobra.Command { c := &cobra.Command{ - Use: "balance-key-range-scheduler", - Short: "balance-key-range-scheduler config", + Use: "balance-range-scheduler", + Short: "balance-range-scheduler config", Run: listSchedulerConfigCommandFunc, } diff --git a/tools/pd-ctl/tests/scheduler/scheduler_test.go b/tools/pd-ctl/tests/scheduler/scheduler_test.go index 38338bc2494..f95cf033239 100644 --- a/tools/pd-ctl/tests/scheduler/scheduler_test.go +++ b/tools/pd-ctl/tests/scheduler/scheduler_test.go @@ -543,13 +543,13 @@ func (suite *schedulerTestSuite) checkSchedulerConfig(cluster *pdTests.TestClust }) // test balance key range scheduler - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-key-range-scheduler"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-range-scheduler"}, nil) re.NotContains(echo, "Success!") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-key-range-scheduler", "--format=raw", "tiflash", "learner", "a", "b"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-range-scheduler", "--format=raw", "tiflash", "learner", "a", "b"}, nil) re.Contains(echo, "Success!") conf = make(map[string]any) testutil.Eventually(re, func() bool { - mightExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-key-range-scheduler"}, &conf) + mightExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-range-scheduler"}, &conf) return conf["role"] == "learner" && conf["engine"] == "tiflash" }) re.Equal(float64(time.Hour.Nanoseconds()), conf["timeout"]) @@ -557,10 +557,10 @@ func (suite *schedulerTestSuite) checkSchedulerConfig(cluster *pdTests.TestClust re.Equal(base64.StdEncoding.EncodeToString([]byte("a")), ranges["start-key"]) re.Equal(base64.StdEncoding.EncodeToString([]byte("b")), ranges["end-key"]) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-key-range-scheduler", "--format=raw", "tiflash", "learner", "a", "b"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-range-scheduler", "--format=raw", "tiflash", "learner", "a", "b"}, nil) re.Contains(echo, "400") re.Contains(echo, "scheduler already exists") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-key-range-scheduler"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-range-scheduler"}, nil) re.Contains(echo, "Success!") // test balance leader config From 8bdb7bc5c5b1add45ff0b27d5e81c111d5bca296 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=AB=A5=E5=89=91?= <1045931706@qq.com> Date: Thu, 16 Jan 2025 15:21:44 +0800 Subject: [PATCH 08/10] use hex encode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 童剑 <1045931706@qq.com> --- pkg/core/basic_cluster.go | 10 ++++++++++ tools/pd-ctl/pdctl/command/scheduler.go | 9 +++++---- tools/pd-ctl/tests/scheduler/scheduler_test.go | 5 ++--- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/pkg/core/basic_cluster.go b/pkg/core/basic_cluster.go index f7c3c5e93b1..45e06648c35 100644 --- a/pkg/core/basic_cluster.go +++ b/pkg/core/basic_cluster.go @@ -16,6 +16,7 @@ package core import ( "bytes" + "encoding/json" "github.com/tikv/pd/pkg/core/constant" ) @@ -156,6 +157,15 @@ type KeyRange struct { EndKey []byte `json:"end-key"` } +// MarshalJSON marshals to json. +func (kr KeyRange) MarshalJSON() ([]byte, error) { + m := map[string]string{ + "start-key": HexRegionKeyStr(kr.StartKey), + "end-key": HexRegionKeyStr(kr.EndKey), + } + return json.Marshal(m) +} + // NewKeyRange create a KeyRange with the given start key and end key. func NewKeyRange(startKey, endKey string) KeyRange { return KeyRange{ diff --git a/tools/pd-ctl/pdctl/command/scheduler.go b/tools/pd-ctl/pdctl/command/scheduler.go index 50525d885fd..1492709fc79 100644 --- a/tools/pd-ctl/pdctl/command/scheduler.go +++ b/tools/pd-ctl/pdctl/command/scheduler.go @@ -375,12 +375,13 @@ func NewBalanceWitnessSchedulerCommand() *cobra.Command { return c } -// NewBalanceRangeSchedulerCommand returns a command to add a balance-key-range-scheduler. +// NewBalanceRangeSchedulerCommand returns a command to add a balance-range-scheduler. func NewBalanceRangeSchedulerCommand() *cobra.Command { c := &cobra.Command{ - Use: "balance-range-scheduler [--format=raw|encode|hex] ", - Short: "add a scheduler to balance region for given range", - Run: addSchedulerForBalanceRangeCommandFunc, + Use: "balance-range-scheduler [--format=raw|encode|hex] ", + Short: "add a scheduler to balance region for given range", + Run: addSchedulerForBalanceRangeCommandFunc, + Deprecated: "balance-range will be deprecated in the future, please use sql instead", } c.Flags().String("format", "hex", "the key format") return c diff --git a/tools/pd-ctl/tests/scheduler/scheduler_test.go b/tools/pd-ctl/tests/scheduler/scheduler_test.go index f95cf033239..1d011329c42 100644 --- a/tools/pd-ctl/tests/scheduler/scheduler_test.go +++ b/tools/pd-ctl/tests/scheduler/scheduler_test.go @@ -15,7 +15,6 @@ package scheduler_test import ( - "encoding/base64" "encoding/json" "fmt" "reflect" @@ -554,8 +553,8 @@ func (suite *schedulerTestSuite) checkSchedulerConfig(cluster *pdTests.TestClust }) re.Equal(float64(time.Hour.Nanoseconds()), conf["timeout"]) ranges := conf["ranges"].([]any)[0].(map[string]any) - re.Equal(base64.StdEncoding.EncodeToString([]byte("a")), ranges["start-key"]) - re.Equal(base64.StdEncoding.EncodeToString([]byte("b")), ranges["end-key"]) + re.Equal(core.HexRegionKeyStr([]byte("a")), ranges["start-key"]) + re.Equal(core.HexRegionKeyStr([]byte("b")), ranges["end-key"]) echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-range-scheduler", "--format=raw", "tiflash", "learner", "a", "b"}, nil) re.Contains(echo, "400") From 0696ba64e4dad421aab0547371315737f63c9f7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=AB=A5=E5=89=91?= <1045931706@qq.com> Date: Tue, 21 Jan 2025 15:01:40 +0800 Subject: [PATCH 09/10] rename MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 童剑 <1045931706@qq.com> --- ...{balance_key_range.go => balance_range.go} | 74 +++++++++++-------- pkg/schedule/schedulers/init.go | 6 +- tools/pd-ctl/pdctl/command/scheduler.go | 4 +- 3 files changed, 47 insertions(+), 37 deletions(-) rename pkg/schedule/schedulers/{balance_key_range.go => balance_range.go} (55%) diff --git a/pkg/schedule/schedulers/balance_key_range.go b/pkg/schedule/schedulers/balance_range.go similarity index 55% rename from pkg/schedule/schedulers/balance_key_range.go rename to pkg/schedule/schedulers/balance_range.go index 71e0fab29d9..96e015e91c4 100644 --- a/pkg/schedule/schedulers/balance_key_range.go +++ b/pkg/schedule/schedulers/balance_range.go @@ -1,3 +1,17 @@ +// Copyright 2025 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package schedulers import ( @@ -20,13 +34,13 @@ import ( "github.com/tikv/pd/pkg/utils/syncutil" ) -type balanceKeyRangeSchedulerHandler struct { +type balanceRangeSchedulerHandler struct { rd *render.Render - config *balanceKeyRangeSchedulerConfig + config *balanceRangeSchedulerConfig } -func newBalanceKeyRangeHandler(conf *balanceKeyRangeSchedulerConfig) http.Handler { - handler := &balanceKeyRangeSchedulerHandler{ +func newBalanceRangeHandler(conf *balanceRangeSchedulerConfig) http.Handler { + handler := &balanceRangeSchedulerHandler{ config: conf, rd: render.New(render.Options{IndentJSON: true}), } @@ -36,42 +50,36 @@ func newBalanceKeyRangeHandler(conf *balanceKeyRangeSchedulerConfig) http.Handle return router } -func (handler *balanceKeyRangeSchedulerHandler) updateConfig(w http.ResponseWriter, _ *http.Request) { +func (handler *balanceRangeSchedulerHandler) updateConfig(w http.ResponseWriter, _ *http.Request) { handler.rd.JSON(w, http.StatusBadRequest, "update config is not supported") } -func (handler *balanceKeyRangeSchedulerHandler) listConfig(w http.ResponseWriter, _ *http.Request) { +func (handler *balanceRangeSchedulerHandler) listConfig(w http.ResponseWriter, _ *http.Request) { conf := handler.config.clone() if err := handler.rd.JSON(w, http.StatusOK, conf); err != nil { log.Error("failed to marshal balance key range scheduler config", errs.ZapError(err)) } } -type balanceKeyRangeSchedulerConfig struct { +type balanceRangeSchedulerConfig struct { syncutil.RWMutex schedulerConfig - balanceKeyRangeSchedulerParam + balanceRangeSchedulerParam } -type balanceKeyRangeSchedulerParam struct { +type balanceRangeSchedulerParam struct { Role string `json:"role"` Engine string `json:"engine"` Timeout time.Duration `json:"timeout"` Ranges []core.KeyRange `json:"ranges"` } -func (conf *balanceKeyRangeSchedulerConfig) encodeConfig() ([]byte, error) { - conf.RLock() - defer conf.RUnlock() - return EncodeConfig(conf) -} - -func (conf *balanceKeyRangeSchedulerConfig) clone() *balanceKeyRangeSchedulerParam { +func (conf *balanceRangeSchedulerConfig) clone() *balanceRangeSchedulerParam { conf.RLock() defer conf.RUnlock() ranges := make([]core.KeyRange, len(conf.Ranges)) copy(ranges, conf.Ranges) - return &balanceKeyRangeSchedulerParam{ + return &balanceRangeSchedulerParam{ Ranges: ranges, Role: conf.Role, Engine: conf.Engine, @@ -80,16 +88,18 @@ func (conf *balanceKeyRangeSchedulerConfig) clone() *balanceKeyRangeSchedulerPar } // EncodeConfig serializes the config. -func (s *balanceKeyRangeScheduler) EncodeConfig() ([]byte, error) { - return s.conf.encodeConfig() +func (s *balanceRangeScheduler) EncodeConfig() ([]byte, error) { + s.conf.RLock() + defer s.conf.RUnlock() + return EncodeConfig(s.conf) } // ReloadConfig reloads the config. -func (s *balanceKeyRangeScheduler) ReloadConfig() error { +func (s *balanceRangeScheduler) ReloadConfig() error { s.conf.Lock() defer s.conf.Unlock() - newCfg := &balanceKeyRangeSchedulerConfig{} + newCfg := &balanceRangeSchedulerConfig{} if err := s.conf.load(newCfg); err != nil { return err } @@ -100,27 +110,27 @@ func (s *balanceKeyRangeScheduler) ReloadConfig() error { return nil } -type balanceKeyRangeScheduler struct { +type balanceRangeScheduler struct { *BaseScheduler - conf *balanceKeyRangeSchedulerConfig + conf *balanceRangeSchedulerConfig handler http.Handler filters []filter.Filter filterCounter *filter.Counter } // ServeHTTP implements the http.Handler interface. -func (s *balanceKeyRangeScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (s *balanceRangeScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.handler.ServeHTTP(w, r) } // Schedule schedules the balance key range operator. -func (*balanceKeyRangeScheduler) Schedule(_cluster sche.SchedulerCluster, _dryRun bool) ([]*operator.Operator, []plan.Plan) { +func (*balanceRangeScheduler) Schedule(_cluster sche.SchedulerCluster, _dryRun bool) ([]*operator.Operator, []plan.Plan) { log.Debug("balance key range scheduler is scheduling, need to implement") return nil, nil } // IsScheduleAllowed checks if the scheduler is allowed to schedule new operators. -func (s *balanceKeyRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { +func (s *balanceRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { allowed := s.OpController.OperatorCount(operator.OpRange) < cluster.GetSchedulerConfig().GetRegionScheduleLimit() if !allowed { operator.IncOperatorLimitCounter(s.GetType(), operator.OpRange) @@ -128,16 +138,16 @@ func (s *balanceKeyRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerClust return allowed } -// BalanceKeyRangeCreateOption is used to create a scheduler with an option. -type BalanceKeyRangeCreateOption func(s *balanceKeyRangeScheduler) +// BalanceRangeCreateOption is used to create a scheduler with an option. +type BalanceRangeCreateOption func(s *balanceRangeScheduler) -// newBalanceKeyRangeScheduler creates a scheduler that tends to keep given peer role on +// newBalanceRangeScheduler creates a scheduler that tends to keep given peer role on // special store balanced. -func newBalanceKeyRangeScheduler(opController *operator.Controller, conf *balanceKeyRangeSchedulerConfig, options ...BalanceKeyRangeCreateOption) Scheduler { - s := &balanceKeyRangeScheduler{ +func newBalanceRangeScheduler(opController *operator.Controller, conf *balanceRangeSchedulerConfig, options ...BalanceRangeCreateOption) Scheduler { + s := &balanceRangeScheduler{ BaseScheduler: NewBaseScheduler(opController, types.BalanceRangeScheduler, conf), conf: conf, - handler: newBalanceKeyRangeHandler(conf), + handler: newBalanceRangeHandler(conf), } for _, option := range options { option(s) diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index f86e1596f27..45e456efeb3 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -552,7 +552,7 @@ func schedulersRegister() { // args: [role, engine, timeout, range1, range2, ...] RegisterSliceDecoderBuilder(types.BalanceRangeScheduler, func(args []string) ConfigDecoder { return func(v any) error { - conf, ok := v.(*balanceKeyRangeSchedulerConfig) + conf, ok := v.(*balanceRangeSchedulerConfig) if !ok { return errs.ErrScheduleConfigNotExist.FastGenByArgs() } @@ -589,13 +589,13 @@ func schedulersRegister() { RegisterScheduler(types.BalanceRangeScheduler, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, _ ...func(string) error) (Scheduler, error) { - conf := &balanceKeyRangeSchedulerConfig{ + conf := &balanceRangeSchedulerConfig{ schedulerConfig: newBaseDefaultSchedulerConfig(), } if err := decoder(conf); err != nil { return nil, err } - sche := newBalanceKeyRangeScheduler(opController, conf) + sche := newBalanceRangeScheduler(opController, conf) conf.init(sche.GetName(), storage, conf) return sche, nil }) diff --git a/tools/pd-ctl/pdctl/command/scheduler.go b/tools/pd-ctl/pdctl/command/scheduler.go index 1492709fc79..e2bbb09120c 100644 --- a/tools/pd-ctl/pdctl/command/scheduler.go +++ b/tools/pd-ctl/pdctl/command/scheduler.go @@ -562,7 +562,7 @@ func NewConfigSchedulerCommand() *cobra.Command { newConfigEvictSlowStoreCommand(), newConfigShuffleHotRegionSchedulerCommand(), newConfigEvictSlowTrendCommand(), - newConfigBalanceKeyRangeCommand(), + newConfigBalanceRangeCommand(), ) return c } @@ -587,7 +587,7 @@ func newConfigBalanceLeaderCommand() *cobra.Command { return c } -func newConfigBalanceKeyRangeCommand() *cobra.Command { +func newConfigBalanceRangeCommand() *cobra.Command { c := &cobra.Command{ Use: "balance-range-scheduler", Short: "balance-range-scheduler config", From 66f70c2434d394bb71d7cea7aaa09fb613103640 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=AB=A5=E5=89=91?= <1045931706@qq.com> Date: Tue, 21 Jan 2025 17:27:04 +0800 Subject: [PATCH 10/10] impl MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 童剑 <1045931706@qq.com> --- pkg/core/constant/kind.go | 4 - pkg/schedule/operator/operator_controller.go | 27 ++- pkg/schedule/schedulers/balance_range.go | 211 +++++++++++++------ pkg/schedule/schedulers/metrics.go | 14 +- 4 files changed, 186 insertions(+), 70 deletions(-) diff --git a/pkg/core/constant/kind.go b/pkg/core/constant/kind.go index 933d3463401..7e9d173c689 100644 --- a/pkg/core/constant/kind.go +++ b/pkg/core/constant/kind.go @@ -78,10 +78,6 @@ func (k ResourceKind) String() string { return "region" case WitnessKind: return "witness" - case LearnerKind: - return `learner` - case unKnownKind: - return "unknown" default: return "unknown" } diff --git a/pkg/schedule/operator/operator_controller.go b/pkg/schedule/operator/operator_controller.go index cd2470376e1..61a800ebb9b 100644 --- a/pkg/schedule/operator/operator_controller.go +++ b/pkg/schedule/operator/operator_controller.go @@ -15,6 +15,7 @@ package operator import ( + "bytes" "context" "fmt" "strconv" @@ -828,6 +829,25 @@ func (oc *Controller) GetHistory(start time.Time) []OpHistory { return history } +// OpInfluenceOption is used to filter the region. +// returns true if the region meets the condition, it will ignore this region in the influence calculation. +// returns false if the region does not meet the condition, it will calculate the influence of this region. +type OpInfluenceOption func(region *core.RegionInfo) bool + +// WithRangeOption returns an OpInfluenceOption that filters the region by the label. +func WithRangeOption(ranges []core.KeyRange) OpInfluenceOption { + return func(region *core.RegionInfo) bool { + for _, r := range ranges { + // the start key of the region must greater than the given range start key. + // the end key of the region must less than the given range end key. + if bytes.Compare(region.GetStartKey(), r.StartKey) < 0 || bytes.Compare(r.EndKey, region.GetEndKey()) < 0 { + return false + } + } + return true + } +} + // OperatorCount gets the count of operators filtered by kind. // kind only has one OpKind. func (oc *Controller) OperatorCount(kind OpKind) uint64 { @@ -835,7 +855,7 @@ func (oc *Controller) OperatorCount(kind OpKind) uint64 { } // GetOpInfluence gets OpInfluence. -func (oc *Controller) GetOpInfluence(cluster *core.BasicCluster) OpInfluence { +func (oc *Controller) GetOpInfluence(cluster *core.BasicCluster, ops ...OpInfluenceOption) OpInfluence { influence := OpInfluence{ StoresInfluence: make(map[uint64]*StoreInfluence), } @@ -844,6 +864,11 @@ func (oc *Controller) GetOpInfluence(cluster *core.BasicCluster) OpInfluence { op := value.(*Operator) if !op.CheckTimeout() && !op.CheckSuccess() { region := cluster.GetRegion(op.RegionID()) + for _, opt := range ops { + if !opt(region) { + return true + } + } if region != nil { op.UnfinishedInfluence(influence, region) } diff --git a/pkg/schedule/schedulers/balance_range.go b/pkg/schedule/schedulers/balance_range.go index 087c2743d5a..7dd8a539f48 100644 --- a/pkg/schedule/schedulers/balance_range.go +++ b/pkg/schedule/schedulers/balance_range.go @@ -18,27 +18,28 @@ import ( "go.uber.org/zap" "net/http" "sort" + "strconv" "time" "github.com/gorilla/mux" "github.com/unrolled/render" - "github.com/pingcap/log" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" - "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/errs" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/filter" "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/schedule/plan" "github.com/tikv/pd/pkg/schedule/types" "github.com/tikv/pd/pkg/utils/syncutil" ) -const balanceRangeName ="balance-range-scheduler" +const balanceRangeName = "balance-range-scheduler" type balanceRangeSchedulerHandler struct { rd *render.Render @@ -144,7 +145,6 @@ func (s *balanceRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) return allowed } - // BalanceRangeCreateOption is used to create a scheduler with an option. type BalanceRangeCreateOption func(s *balanceRangeScheduler) @@ -174,21 +174,32 @@ func newBalanceRangeScheduler(opController *operator.Controller, conf *balanceRa // Schedule schedules the balance key range operator. func (s *balanceRangeScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { balanceRangeCounter.Inc() - plan,err:=s.prepare(cluster) + opInfluence := s.OpController.GetOpInfluence(cluster.GetBasicCluster(), operator.WithRangeOption(s.conf.Ranges)) + plan, err := s.prepare(cluster, opInfluence) + if err != nil { + log.Error("failed to prepare balance key range scheduler", errs.ZapError(err)) + return nil, nil + } + downFilter := filter.NewRegionDownFilter() replicaFilter := filter.NewRegionReplicatedFilter(cluster) snapshotFilter := filter.NewSnapshotSendFilter(plan.stores, constant.Medium) - baseRegionFilters := []filter.RegionFilter{downFilter, replicaFilter, snapshotFilter} - - for sourceIndex,sourceStore:=range plan.stores{ - plan.source=sourceStore - switch s.role{ + pendingFilter := filter.NewRegionPendingFilter() + baseRegionFilters := []filter.RegionFilter{downFilter, replicaFilter, snapshotFilter, pendingFilter} + + for sourceIndex, sourceStore := range plan.stores { + plan.source = sourceStore + plan.sourceScore = plan.score(plan.source.GetID()) + if plan.sourceScore < plan.averageScore { + break + } + switch s.role { case Leader: - plan.region=filter.SelectOneRegion(cluster.RandLeaderRegions(plan.sourceStoreID(), s.conf.Ranges), nil,baseRegionFilters...) + plan.region = filter.SelectOneRegion(cluster.RandLeaderRegions(plan.sourceStoreID(), s.conf.Ranges), nil, baseRegionFilters...) case Learner: - plan.region=filter.SelectOneRegion(cluster.RandLearnerRegions(plan.sourceStoreID(), s.conf.Ranges), nil,baseRegionFilters...) + plan.region = filter.SelectOneRegion(cluster.RandLearnerRegions(plan.sourceStoreID(), s.conf.Ranges), nil, baseRegionFilters...) case Follower: - plan.region=filter.SelectOneRegion(cluster.RandFollowerRegions(plan.sourceStoreID(), s.conf.Ranges), nil,baseRegionFilters...) + plan.region = filter.SelectOneRegion(cluster.RandFollowerRegions(plan.sourceStoreID(), s.conf.Ranges), nil, baseRegionFilters...) } if plan.region == nil { balanceRangeNoRegionCounter.Inc() @@ -203,89 +214,171 @@ func (s *balanceRangeScheduler) Schedule(cluster sche.SchedulerCluster, dryRun b } // Check region leader if plan.region.GetLeader() == nil { - log.Warn("region have no leader", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", solver.Region.GetID())) + log.Warn("region have no leader", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", plan.region.GetID())) balanceRangeNoLeaderCounter.Inc() continue } plan.fit = replicaFilter.(*filter.RegionReplicatedFilter).GetFit() if op := s.transferPeer(plan, plan.stores[sourceIndex+1:]); op != nil { - op.Counters = append(op.Counters, balanceRegionNewOpCounter) + op.Counters = append(op.Counters, balanceRangeNewOperatorCounter) return []*operator.Operator{op}, nil } } if err != nil { log.Error("failed to prepare balance key range scheduler", errs.ZapError(err)) - return nil,nil + return nil, nil } + return nil, nil } // transferPeer selects the best store to create a new peer to replace the old peer. -func (s *balanceRangeScheduler) transferPeer(plan *balanceRangeSchedulerPlan, dstStores []*storeInfo) *operator.Operator { +func (s *balanceRangeScheduler) transferPeer(plan *balanceRangeSchedulerPlan, dstStores []*core.StoreInfo) *operator.Operator { excludeTargets := plan.region.GetStoreIDs() - if s.role!=Leader{ - excludeTargets = append(excludeTargets, plan.sourceStoreID()) + if s.role != Leader { + excludeTargets = make(map[uint64]struct{}) + } + conf := plan.GetSchedulerConfig() + filters := []filter.Filter{ + filter.NewExcludedFilter(s.GetName(), nil, excludeTargets), + filter.NewPlacementSafeguard(s.GetName(), conf, plan.GetBasicCluster(), plan.GetRuleManager(), plan.region, plan.source, plan.fit), } + candidates := filter.NewCandidates(s.R, dstStores).FilterTarget(conf, nil, s.filterCounter, filters...) + for i := range candidates.Stores { + plan.target = candidates.Stores[len(candidates.Stores)-i-1] + plan.targetScore = plan.score(plan.target.GetID()) + if plan.targetScore > plan.averageScore { + break + } + regionID := plan.region.GetID() + sourceID := plan.source.GetID() + targetID := plan.target.GetID() + if !plan.shouldBalance(s.GetName()) { + continue + } + log.Debug("candidate store", zap.Uint64("region-id", regionID), zap.Uint64("source-store", sourceID), zap.Uint64("target-store", targetID)) + + oldPeer := plan.region.GetStorePeer(sourceID) + newPeer := &metapb.Peer{StoreId: plan.target.GetID(), Role: oldPeer.Role} + op, err := operator.CreateMovePeerOperator(s.GetName(), plan, plan.region, operator.OpRange, oldPeer.GetStoreId(), newPeer) + if err != nil { + balanceRangeCreateOpFailCounter.Inc() + return nil + } + sourceLabel := strconv.FormatUint(sourceID, 10) + targetLabel := strconv.FormatUint(targetID, 10) + op.FinishedCounters = append(op.FinishedCounters, + balanceDirectionCounter.WithLabelValues(s.GetName(), sourceLabel, targetLabel), + ) + op.SetAdditionalInfo("sourceScore", strconv.FormatInt(plan.sourceScore, 10)) + op.SetAdditionalInfo("targetScore", strconv.FormatInt(plan.targetScore, 10)) + return op + } + balanceRangeNoReplacementCounter.Inc() return nil } // balanceRangeSchedulerPlan is used to record the plan of balance key range scheduler. type balanceRangeSchedulerPlan struct { + sche.SchedulerCluster // stores is sorted by score desc - stores []*storeInfo - source *storeInfo - target *storeInfo - region *core.RegionInfo - fit *placement.RegionFit + stores []*core.StoreInfo + // sourceMap records the storeID -> score + sourceMap map[uint64]int64 + source *core.StoreInfo + sourceScore int64 + target *core.StoreInfo + targetScore int64 + region *core.RegionInfo + fit *placement.RegionFit + averageScore int64 } type storeInfo struct { store *core.StoreInfo - score uint64 + score int64 } -func (s *balanceRangeScheduler) prepare(cluster sche.SchedulerCluster)(*balanceRangeSchedulerPlan,error) { +func (s *balanceRangeScheduler) prepare(cluster sche.SchedulerCluster, opInfluence operator.OpInfluence) (*balanceRangeSchedulerPlan, error) { krs := core.NewKeyRanges(s.conf.Ranges) scanRegions, err := cluster.BatchScanRegions(krs) if err != nil { - return nil,err + return nil, err } sources := filter.SelectSourceStores(cluster.GetStores(), s.filters, cluster.GetSchedulerConfig(), nil, nil) - storeInfos:=make(map[uint64]*storeInfo,len(sources)) + storeInfos := make(map[uint64]*storeInfo, len(sources)) for _, source := range sources { storeInfos[source.GetID()] = &storeInfo{store: source} } + totalScore := int64(0) for _, region := range scanRegions { for _, peer := range s.role.getPeers(region) { storeInfos[peer.GetStoreId()].score += 1 + totalScore += 1 } } - stores:=make([]*storeInfo,0,len(storeInfos)) - for _, store := range storeInfos { - stores = append(stores, store) + storeList := make([]*storeInfo, 0, len(storeInfos)) + for storeID, store := range storeInfos { + if influence := opInfluence.GetStoreInfluence(storeID); influence != nil { + store.score += s.role.getStoreInfluence(influence) + } + storeList = append(storeList, store) } - sort.Slice(stores, func(i, j int) bool { - return stores[i].score > stores[j].score + sort.Slice(storeList, func(i, j int) bool { + return storeList[i].score > storeList[j].score }) + sourceMap := make(map[uint64]int64) + for _, store := range storeList { + sourceMap[store.store.GetID()] = store.score + } + + stores := make([]*core.StoreInfo, 0, len(storeList)) + for _, store := range storeList { + stores = append(stores, store.store) + } + averageScore := totalScore / int64(len(storeList)) return &balanceRangeSchedulerPlan{ - stores:stores, - source: nil, - target: nil, - region: nil, - },nil + SchedulerCluster: cluster, + stores: stores, + sourceMap: sourceMap, + source: nil, + target: nil, + region: nil, + averageScore: averageScore, + }, nil } func (p *balanceRangeSchedulerPlan) sourceStoreID() uint64 { - return p.source.store.GetID() + return p.source.GetID() } func (p *balanceRangeSchedulerPlan) targetStoreID() uint64 { - return p.target.store.GetID() + return p.target.GetID() } +func (p *balanceRangeSchedulerPlan) score(storeID uint64) int64 { + return p.sourceMap[storeID] +} +func (p *balanceRangeSchedulerPlan) shouldBalance(scheduler string) bool { + sourceScore := p.score(p.sourceStoreID()) + targetScore := p.score(p.targetStoreID()) + shouldBalance := sourceScore > targetScore + if !shouldBalance && log.GetLevel() <= zap.DebugLevel { + log.Debug("skip balance ", + zap.String("scheduler", scheduler), + zap.Uint64("region-id", p.region.GetID()), + zap.Uint64("source-store", p.sourceStoreID()), + zap.Uint64("target-store", p.targetStoreID()), + zap.Int64("source-score", p.sourceScore), + zap.Int64("target-score", p.targetScore), + zap.Int64("average-region-size", p.averageScore), + ) + } + return shouldBalance +} type Role int @@ -310,33 +403,33 @@ func (r Role) String() string { } } -func NewRole(role string) Role { - switch role { - case "leader": - return Leader - case "follower": - return Follower - case "learner": - return Learner +func (r Role) getPeers(region *core.RegionInfo) []*metapb.Peer { + switch r { + case Leader: + return []*metapb.Peer{region.GetLeader()} + case Follower: + followers := region.GetFollowers() + ret := make([]*metapb.Peer, len(followers)) + for _, peer := range followers { + ret = append(ret, peer) + } + return ret + case Learner: + return region.GetLearners() default: - return Unknown + return nil } } -func (r Role) getPeers(region *core.RegionInfo) []*metapb.Peer {{ +func (r Role) getStoreInfluence(influence *operator.StoreInfluence) int64 { switch r { case Leader: - return []*metapb.Peer{region.GetLeader()} + return influence.LeaderCount case Follower: - followers:=region.GetFollowers() - ret:=make([]*metapb.Peer,len(followers)) - for _,peer:=range followers{ - ret=append(ret,peer) - } - return ret + return influence.RegionCount case Learner: - return region.GetLearners() + return influence.RegionCount default: - return nil + return 0 } } diff --git a/pkg/schedule/schedulers/metrics.go b/pkg/schedule/schedulers/metrics.go index 38c61443179..8297f3e30c0 100644 --- a/pkg/schedule/schedulers/metrics.go +++ b/pkg/schedule/schedulers/metrics.go @@ -347,10 +347,12 @@ var ( transferWitnessLeaderNewOperatorCounter = transferWitnessLeaderCounterWithEvent("new-operator") transferWitnessLeaderNoTargetStoreCounter = transferWitnessLeaderCounterWithEvent("no-target-store") - balanceRangeCounter = balanceRangeCounterWithEvent("schedule") - balanceKeyRangeNewOperatorCounter = balanceRangeCounterWithEvent("new-operator") - balanceRangeExpiredCounter = balanceRangeCounterWithEvent("expired") - balanceRangeNoRegionCounter = balanceRangeCounterWithEvent("no-region") - balanceRangeHotCounter = balanceRangeCounterWithEvent("region-hot") - balanceRangeNoLeaderCounter = balanceRangeCounterWithEvent("no-leader") + balanceRangeCounter = balanceRangeCounterWithEvent("schedule") + balanceRangeNewOperatorCounter = balanceRangeCounterWithEvent("new-operator") + balanceRangeExpiredCounter = balanceRangeCounterWithEvent("expired") + balanceRangeNoRegionCounter = balanceRangeCounterWithEvent("no-region") + balanceRangeHotCounter = balanceRangeCounterWithEvent("region-hot") + balanceRangeNoLeaderCounter = balanceRangeCounterWithEvent("no-leader") + balanceRangeCreateOpFailCounter = balanceRangeCounterWithEvent("create-operator-fail") + balanceRangeNoReplacementCounter = balanceRangeCounterWithEvent("no-replacement") )