Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: add a new scheduler to balance the regions of the given key range #8988

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package core

import (
"bytes"
"encoding/json"

"github.com/tikv/pd/pkg/core/constant"
)
Expand Down Expand Up @@ -156,6 +157,15 @@ type KeyRange struct {
EndKey []byte `json:"end-key"`
}

// MarshalJSON marshals to json.
func (kr KeyRange) MarshalJSON() ([]byte, error) {
m := map[string]string{
"start-key": HexRegionKeyStr(kr.StartKey),
"end-key": HexRegionKeyStr(kr.EndKey),
}
return json.Marshal(m)
}

// NewKeyRange create a KeyRange with the given start key and end key.
func NewKeyRange(startKey, endKey string) KeyRange {
return KeyRange{
Expand Down
6 changes: 5 additions & 1 deletion pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,11 @@ func (c *Cluster) updateScheduler() {
)
// Create the newly added schedulers.
for _, scheduler := range latestSchedulersConfig {
schedulerType := types.ConvertOldStrToType[scheduler.Type]
schedulerType, ok := types.ConvertOldStrToType[scheduler.Type]
if !ok {
log.Error("scheduler not found", zap.String("type", scheduler.Type))
continue
}
s, err := schedulers.CreateScheduler(
schedulerType,
c.coordinator.GetOperatorController(),
Expand Down
163 changes: 163 additions & 0 deletions pkg/schedule/schedulers/balance_range.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright 2025 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schedulers

import (
"net/http"
"time"

"github.com/gorilla/mux"
"github.com/unrolled/render"

"github.com/pingcap/log"

"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/errs"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/filter"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/pkg/utils/syncutil"
)

type balanceRangeSchedulerHandler struct {
rd *render.Render
config *balanceRangeSchedulerConfig
}

func newBalanceRangeHandler(conf *balanceRangeSchedulerConfig) http.Handler {
handler := &balanceRangeSchedulerHandler{
config: conf,
rd: render.New(render.Options{IndentJSON: true}),
}
router := mux.NewRouter()
router.HandleFunc("/config", handler.updateConfig).Methods(http.MethodPost)
router.HandleFunc("/list", handler.listConfig).Methods(http.MethodGet)
return router
}

func (handler *balanceRangeSchedulerHandler) updateConfig(w http.ResponseWriter, _ *http.Request) {
handler.rd.JSON(w, http.StatusBadRequest, "update config is not supported")
}

func (handler *balanceRangeSchedulerHandler) listConfig(w http.ResponseWriter, _ *http.Request) {
conf := handler.config.clone()
if err := handler.rd.JSON(w, http.StatusOK, conf); err != nil {
log.Error("failed to marshal balance key range scheduler config", errs.ZapError(err))
}
}

type balanceRangeSchedulerConfig struct {
syncutil.RWMutex
schedulerConfig
balanceRangeSchedulerParam
}

type balanceRangeSchedulerParam struct {
Role string `json:"role"`
Engine string `json:"engine"`
Timeout time.Duration `json:"timeout"`
Ranges []core.KeyRange `json:"ranges"`
TableName string `json:"table-name"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Name it as alias for better, since pd does not understand the table concept.

}

func (conf *balanceRangeSchedulerConfig) clone() *balanceRangeSchedulerParam {
conf.RLock()
defer conf.RUnlock()
ranges := make([]core.KeyRange, len(conf.Ranges))
copy(ranges, conf.Ranges)
return &balanceRangeSchedulerParam{
Ranges: ranges,
Role: conf.Role,
Engine: conf.Engine,
Timeout: conf.Timeout,
TableName: conf.TableName,
}
}

// EncodeConfig serializes the config.
func (s *balanceRangeScheduler) EncodeConfig() ([]byte, error) {
s.conf.RLock()
defer s.conf.RUnlock()
return EncodeConfig(s.conf)
}

// ReloadConfig reloads the config.
func (s *balanceRangeScheduler) ReloadConfig() error {
s.conf.Lock()
defer s.conf.Unlock()

newCfg := &balanceRangeSchedulerConfig{}
if err := s.conf.load(newCfg); err != nil {
return err
}
s.conf.Ranges = newCfg.Ranges
s.conf.Timeout = newCfg.Timeout
s.conf.Role = newCfg.Role
s.conf.Engine = newCfg.Engine
return nil
}

type balanceRangeScheduler struct {
*BaseScheduler
conf *balanceRangeSchedulerConfig
handler http.Handler
filters []filter.Filter
filterCounter *filter.Counter
}

// ServeHTTP implements the http.Handler interface.
func (s *balanceRangeScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.handler.ServeHTTP(w, r)
}

// Schedule schedules the balance key range operator.
func (*balanceRangeScheduler) Schedule(_cluster sche.SchedulerCluster, _dryRun bool) ([]*operator.Operator, []plan.Plan) {
log.Debug("balance key range scheduler is scheduling, need to implement")
return nil, nil
}

// IsScheduleAllowed checks if the scheduler is allowed to schedule new operators.
func (s *balanceRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool {
allowed := s.OpController.OperatorCount(operator.OpRange) < cluster.GetSchedulerConfig().GetRegionScheduleLimit()
if !allowed {
operator.IncOperatorLimitCounter(s.GetType(), operator.OpRange)
}
return allowed
}

// BalanceRangeCreateOption is used to create a scheduler with an option.
type BalanceRangeCreateOption func(s *balanceRangeScheduler)

// newBalanceRangeScheduler creates a scheduler that tends to keep given peer role on
// special store balanced.
func newBalanceRangeScheduler(opController *operator.Controller, conf *balanceRangeSchedulerConfig, options ...BalanceRangeCreateOption) Scheduler {
s := &balanceRangeScheduler{
BaseScheduler: NewBaseScheduler(opController, types.BalanceRangeScheduler, conf),
conf: conf,
handler: newBalanceRangeHandler(conf),
}
for _, option := range options {
option(s)
}
s.filters = []filter.Filter{
&filter.StoreStateFilter{ActionScope: s.GetName(), TransferLeader: true, OperatorLevel: constant.Medium},
filter.NewSpecialUseFilter(s.GetName()),
}
s.filterCounter = filter.NewCounter(s.GetName())
return s
}
59 changes: 59 additions & 0 deletions pkg/schedule/schedulers/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package schedulers

import (
"net/url"
"strconv"
"strings"
"sync"
"time"

"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
Expand Down Expand Up @@ -545,4 +547,61 @@ func schedulersRegister() {
conf.init(sche.GetName(), storage, conf)
return sche, nil
})

// balance key range scheduler
// args: [role, engine, timeout, range1, range2, ...]
RegisterSliceDecoderBuilder(types.BalanceRangeScheduler, func(args []string) ConfigDecoder {
return func(v any) error {
conf, ok := v.(*balanceRangeSchedulerConfig)
if !ok {
return errs.ErrScheduleConfigNotExist.FastGenByArgs()
}
if len(args) < 4 {
return errs.ErrSchedulerConfig.FastGenByArgs("args length must be greater than 3")
}
role, err := url.QueryUnescape(args[0])
if err != nil {
return errs.ErrQueryUnescape.Wrap(err)
}
engine, err := url.QueryUnescape(args[1])
if err != nil {
return errs.ErrQueryUnescape.Wrap(err)
}
timeout, err := url.QueryUnescape(args[2])
if err != nil {
return errs.ErrQueryUnescape.Wrap(err)
}
duration, err := time.ParseDuration(timeout)
if err != nil {
return errs.ErrURLParse.Wrap(err)
}
tableName, err := url.QueryUnescape(args[3])
if err != nil {
return errs.ErrURLParse.Wrap(err)
}
ranges, err := getKeyRanges(args[4:])
if err != nil {
return err
}
conf.Ranges = ranges
conf.Engine = engine
conf.Role = role
conf.Timeout = duration
conf.TableName = tableName
return nil
}
})

RegisterScheduler(types.BalanceRangeScheduler, func(opController *operator.Controller,
storage endpoint.ConfigStorage, decoder ConfigDecoder, _ ...func(string) error) (Scheduler, error) {
conf := &balanceRangeSchedulerConfig{
schedulerConfig: newBaseDefaultSchedulerConfig(),
}
if err := decoder(conf); err != nil {
return nil, err
}
sche := newBalanceRangeScheduler(opController, conf)
conf.init(sche.GetName(), storage, conf)
return sche, nil
})
}
5 changes: 5 additions & 0 deletions pkg/schedule/types/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ const (
TransferWitnessLeaderScheduler CheckerSchedulerType = "transfer-witness-leader-scheduler"
// LabelScheduler is label scheduler name.
LabelScheduler CheckerSchedulerType = "label-scheduler"
// BalanceRangeScheduler is balance key range scheduler name.
BalanceRangeScheduler CheckerSchedulerType = "balance-range-scheduler"
)

// TODO: SchedulerTypeCompatibleMap and ConvertOldStrToType should be removed after
Expand Down Expand Up @@ -97,6 +99,7 @@ var (
SplitBucketScheduler: "split-bucket",
TransferWitnessLeaderScheduler: "transfer-witness-leader",
LabelScheduler: "label",
BalanceRangeScheduler: "balance-range",
}

// ConvertOldStrToType exists for compatibility.
Expand All @@ -120,6 +123,7 @@ var (
"split-bucket": SplitBucketScheduler,
"transfer-witness-leader": TransferWitnessLeaderScheduler,
"label": LabelScheduler,
"balance-range": BalanceRangeScheduler,
}

// StringToSchedulerType is a map to convert the scheduler string to the CheckerSchedulerType.
Expand All @@ -143,6 +147,7 @@ var (
"split-bucket-scheduler": SplitBucketScheduler,
"transfer-witness-leader-scheduler": TransferWitnessLeaderScheduler,
"label-scheduler": LabelScheduler,
"balance-range-scheduler": BalanceRangeScheduler,
}

// DefaultSchedulers is the default scheduler types.
Expand Down
39 changes: 39 additions & 0 deletions server/api/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,45 @@ func (h *schedulerHandler) CreateScheduler(w http.ResponseWriter, r *http.Reques
}

switch tp {
case types.BalanceRangeScheduler:
exist, _ := h.IsSchedulerExisted(name)
if exist {
h.r.JSON(w, http.StatusBadRequest, "The scheduler already exists, pls remove the exist scheduler first.")
return
}
if err := apiutil.CollectStringOption("role", input, collector); err != nil {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}
if err := apiutil.CollectStringOption("engine", input, collector); err != nil {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}

defaultTimeout := "1h"
if err := apiutil.CollectStringOption("timeout", input, collector); err != nil {
if errors.ErrorEqual(err, errs.ErrOptionNotExist) {
collector(defaultTimeout)
} else {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}
}

if err := apiutil.CollectStringOption("table-name", input, collector); err != nil {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}

if err := apiutil.CollectEscapeStringOption("start-key", input, collector); err != nil {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}

if err := apiutil.CollectEscapeStringOption("end-key", input, collector); err != nil {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}
case types.ScatterRangeScheduler:
if err := apiutil.CollectEscapeStringOption("start_key", input, collector); err != nil {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
Expand Down
14 changes: 14 additions & 0 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3209,6 +3209,20 @@ func TestAddScheduler(t *testing.T) {
re.NoError(err)
re.NoError(controller.AddScheduler(gls))

_, err = schedulers.CreateScheduler(types.BalanceRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceRangeScheduler, []string{}), controller.RemoveScheduler)
re.Error(err)

gls, err = schedulers.CreateScheduler(types.BalanceRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(types.BalanceRangeScheduler, []string{"learner", "tiflash", "1h", "100", "200"}), controller.RemoveScheduler)
re.NoError(err)
re.NoError(controller.AddScheduler(gls))
conf, err = gls.EncodeConfig()
re.NoError(err)
data = make(map[string]any)
re.NoError(json.Unmarshal(conf, &data))
re.Equal("learner", data["role"])
re.Equal("tiflash", data["engine"])
re.Equal(float64(time.Hour.Nanoseconds()), data["timeout"])

hb, err := schedulers.CreateScheduler(types.BalanceHotRegionScheduler, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigJSONDecoder([]byte("{}")))
re.NoError(err)
conf, err = hb.EncodeConfig()
Expand Down
Loading
Loading