Skip to content

Commit

Permalink
mcs: support scheduler config forward and enable some tests (#7256)
Browse files Browse the repository at this point in the history
ref #5839

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
lhy1024 and ti-chi-bot[bot] authored Nov 1, 2023
1 parent ded917b commit 4e45e95
Show file tree
Hide file tree
Showing 13 changed files with 350 additions and 185 deletions.
57 changes: 57 additions & 0 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/handler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/statistics/utils"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/utils/apiutil"
Expand Down Expand Up @@ -130,6 +131,8 @@ func (s *Service) RegisterSchedulersRouter() {
router := s.root.Group("schedulers")
router.GET("", getSchedulers)
router.GET("/diagnostic/:name", getDiagnosticResult)
router.GET("/config", getSchedulerConfig)
router.GET("/config/:name/list", getSchedulerConfigByName)
// TODO: in the future, we should split pauseOrResumeScheduler to two different APIs.
// And we need to do one-to-two forwarding in the API middleware.
router.POST("/:name", pauseOrResumeScheduler)
Expand Down Expand Up @@ -432,6 +435,60 @@ func getSchedulers(c *gin.Context) {
c.IndentedJSON(http.StatusOK, output)
}

// @Tags schedulers
// @Summary List all scheduler configs.
// @Produce json
// @Success 200 {object} map[string]interface{}
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /schedulers/config/ [get]
func getSchedulerConfig(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
sc, err := handler.GetSchedulersController()
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
sches, configs, err := sc.GetAllSchedulerConfigs()
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, schedulers.ToPayload(sches, configs))
}

// @Tags schedulers
// @Summary List scheduler config by name.
// @Produce json
// @Success 200 {object} map[string]interface{}
// @Failure 404 {string} string scheduler not found
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /schedulers/config/{name}/list [get]
func getSchedulerConfigByName(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
sc, err := handler.GetSchedulersController()
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
handlers := sc.GetSchedulerHandlers()
name := c.Param("name")
if _, ok := handlers[name]; !ok {
c.String(http.StatusNotFound, errs.ErrSchedulerNotFound.GenWithStackByArgs().Error())
return
}
isDisabled, err := sc.IsSchedulerDisabled(name)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
if isDisabled {
c.String(http.StatusNotFound, errs.ErrSchedulerNotFound.GenWithStackByArgs().Error())
return
}
c.Request.URL.Path = "/list"
handlers[name].ServeHTTP(c.Writer, c.Request)
}

// @Tags schedulers
// @Summary List schedulers diagnostic result.
// @Produce json
Expand Down
45 changes: 33 additions & 12 deletions pkg/schedule/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,13 +765,22 @@ func (h *Handler) GetCheckerStatus(name string) (map[string]bool, error) {
}, nil
}

// GetSchedulerNames returns all names of schedulers.
func (h *Handler) GetSchedulerNames() ([]string, error) {
// GetSchedulersController returns controller of schedulers.
func (h *Handler) GetSchedulersController() (*schedulers.Controller, error) {
co := h.GetCoordinator()
if co == nil {
return nil, errs.ErrNotBootstrapped.GenWithStackByArgs()
}
return co.GetSchedulersController().GetSchedulerNames(), nil
return co.GetSchedulersController(), nil
}

// GetSchedulerNames returns all names of schedulers.
func (h *Handler) GetSchedulerNames() ([]string, error) {
sc, err := h.GetSchedulersController()
if err != nil {
return nil, err
}
return sc.GetSchedulerNames(), nil
}

type schedulerPausedPeriod struct {
Expand All @@ -782,11 +791,10 @@ type schedulerPausedPeriod struct {

// GetSchedulerByStatus returns all names of schedulers by status.
func (h *Handler) GetSchedulerByStatus(status string, needTS bool) (interface{}, error) {
co := h.GetCoordinator()
if co == nil {
return nil, errs.ErrNotBootstrapped.GenWithStackByArgs()
sc, err := h.GetSchedulersController()
if err != nil {
return nil, err
}
sc := co.GetSchedulersController()
schedulers := sc.GetSchedulerNames()
switch status {
case "paused":
Expand Down Expand Up @@ -837,7 +845,20 @@ func (h *Handler) GetSchedulerByStatus(status string, needTS bool) (interface{},
}
return disabledSchedulers, nil
default:
return schedulers, nil
// The default scheduler could not be deleted in scheduling server,
// so schedulers could only be disabled.
// We should not return the disabled schedulers here.
var enabledSchedulers []string
for _, scheduler := range schedulers {
disabled, err := sc.IsSchedulerDisabled(scheduler)
if err != nil {
return nil, err
}
if !disabled {
enabledSchedulers = append(enabledSchedulers, scheduler)
}
}
return enabledSchedulers, nil
}
}

Expand All @@ -861,11 +882,11 @@ func (h *Handler) GetDiagnosticResult(name string) (*schedulers.DiagnosticResult
// t == 0 : resume scheduler.
// t > 0 : scheduler delays t seconds.
func (h *Handler) PauseOrResumeScheduler(name string, t int64) (err error) {
co := h.GetCoordinator()
if co == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()
sc, err := h.GetSchedulersController()
if err != nil {
return err
}
if err = co.GetSchedulersController().PauseOrResumeScheduler(name, t); err != nil {
if err = sc.PauseOrResumeScheduler(name, t); err != nil {
if t == 0 {
log.Error("can not resume scheduler", zap.String("scheduler-name", name), errs.ZapError(err))
} else {
Expand Down
18 changes: 18 additions & 0 deletions pkg/schedule/schedulers/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,24 @@ func DecodeConfig(data []byte, v interface{}) error {
return nil
}

// ToPayload returns the payload of config.
func ToPayload(sches, configs []string) map[string]interface{} {
payload := make(map[string]interface{})
for i, sche := range sches {
var config interface{}
err := DecodeConfig([]byte(configs[i]), &config)
if err != nil {
log.Error("failed to decode scheduler config",
zap.String("config", configs[i]),
zap.String("scheduler", sche),
errs.ZapError(err))
continue
}
payload[sche] = config
}
return payload
}

// ConfigDecoder used to decode the config.
type ConfigDecoder func(v interface{}) error

Expand Down
5 changes: 5 additions & 0 deletions pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,11 @@ func (c *Controller) CheckTransferWitnessLeader(region *core.RegionInfo) {
}
}

// GetAllSchedulerConfigs returns all scheduler configs.
func (c *Controller) GetAllSchedulerConfigs() ([]string, []string, error) {
return c.storage.LoadAllSchedulerConfigs()
}

// ScheduleController is used to manage a scheduler.
type ScheduleController struct {
Scheduler
Expand Down
1 change: 1 addition & 0 deletions pkg/schedule/schedulers/shuffle_region_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (conf *shuffleRegionSchedulerConfig) IsRoleAllow(role string) bool {

func (conf *shuffleRegionSchedulerConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) {
router := mux.NewRouter()
router.HandleFunc("/list", conf.handleGetRoles).Methods(http.MethodGet)
router.HandleFunc("/roles", conf.handleGetRoles).Methods(http.MethodGet)
router.HandleFunc("/roles", conf.handleSetRoles).Methods(http.MethodPost)
router.ServeHTTP(w, r)
Expand Down
16 changes: 8 additions & 8 deletions pkg/utils/testutil/api_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,29 +37,29 @@ func StatusOK(re *require.Assertions) func([]byte, int, http.Header) {

// StatusNotOK is used to check whether http response code is not equal http.StatusOK.
func StatusNotOK(re *require.Assertions) func([]byte, int, http.Header) {
return func(_ []byte, i int, _ http.Header) {
re.NotEqual(http.StatusOK, i)
return func(resp []byte, i int, _ http.Header) {
re.NotEqual(http.StatusOK, i, "resp: "+string(resp))
}
}

// ExtractJSON is used to check whether given data can be extracted successfully.
func ExtractJSON(re *require.Assertions, data interface{}) func([]byte, int, http.Header) {
return func(res []byte, _ int, _ http.Header) {
re.NoError(json.Unmarshal(res, data))
return func(resp []byte, _ int, _ http.Header) {
re.NoError(json.Unmarshal(resp, data), "resp: "+string(resp))
}
}

// StringContain is used to check whether response context contains given string.
func StringContain(re *require.Assertions, sub string) func([]byte, int, http.Header) {
return func(res []byte, _ int, _ http.Header) {
re.Contains(string(res), sub)
return func(resp []byte, _ int, _ http.Header) {
re.Contains(string(resp), sub, "resp: "+string(resp))
}
}

// StringEqual is used to check whether response context equal given string.
func StringEqual(re *require.Assertions, str string) func([]byte, int, http.Header) {
return func(res []byte, _ int, _ http.Header) {
re.Contains(string(res), str)
return func(resp []byte, _ int, _ http.Header) {
re.Contains(string(resp), str, "resp: "+string(resp))
}
}

Expand Down
6 changes: 6 additions & 0 deletions server/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP
// "/schedulers", http.MethodGet
// "/schedulers/{name}", http.MethodPost
// "/schedulers/diagnostic/{name}", http.MethodGet
// "/scheduler-config", http.MethodGet
// "/hotspot/regions/read", http.MethodGet
// "/hotspot/regions/write", http.MethodGet
// "/hotspot/regions/history", http.MethodGet
Expand Down Expand Up @@ -90,6 +91,11 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP
scheapi.APIPathPrefix+"/schedulers",
mcs.SchedulingServiceName,
[]string{http.MethodGet}),
serverapi.MicroserviceRedirectRule(
prefix+"/scheduler-config",
scheapi.APIPathPrefix+"/schedulers/config",
mcs.SchedulingServiceName,
[]string{http.MethodGet}),
serverapi.MicroserviceRedirectRule(
prefix+"/schedulers/", // Note: this means "/schedulers/{name}"
scheapi.APIPathPrefix+"/schedulers",
Expand Down
5 changes: 2 additions & 3 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,11 +789,10 @@ func (o *PersistOptions) Persist(storage endpoint.ConfigStorage) error {
},
StoreConfig: *o.GetStoreConfig(),
}
err := storage.SaveConfig(cfg)
failpoint.Inject("persistFail", func() {
err = errors.New("fail to persist")
failpoint.Return(errors.New("fail to persist"))
})
return err
return storage.SaveConfig(cfg)
}

// Reload reloads the configuration from the storage.
Expand Down
15 changes: 1 addition & 14 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -948,20 +948,7 @@ func (s *Server) GetConfig() *config.Config {
if err != nil {
return cfg
}
payload := make(map[string]interface{})
for i, sche := range sches {
var config interface{}
err := schedulers.DecodeConfig([]byte(configs[i]), &config)
if err != nil {
log.Error("failed to decode scheduler config",
zap.String("config", configs[i]),
zap.String("scheduler", sche),
errs.ZapError(err))
continue
}
payload[sche] = config
}
cfg.Schedule.SchedulersPayload = payload
cfg.Schedule.SchedulersPayload = schedulers.ToPayload(sches, configs)
return cfg
}

Expand Down
21 changes: 21 additions & 0 deletions tests/integrations/mcs/scheduling/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ func (suite *apiTestSuite) TestAPIForward() {
// "/schedulers", http.MethodGet
// "/schedulers/{name}", http.MethodPost
// "/schedulers/diagnostic/{name}", http.MethodGet
// "/scheduler-config/", http.MethodGet
// "/scheduler-config/{name}/list", http.MethodGet
// "/scheduler-config/{name}/roles", http.MethodGet
// Should not redirect:
// "/schedulers", http.MethodPost
// "/schedulers/{name}", http.MethodDelete
Expand All @@ -191,6 +194,24 @@ func (suite *apiTestSuite) TestAPIForward() {
testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true"))
suite.NoError(err)

err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "scheduler-config"), &resp,
testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true"))
suite.NoError(err)
re.Contains(resp, "balance-leader-scheduler")
re.Contains(resp, "balance-witness-scheduler")
re.Contains(resp, "balance-hot-region-scheduler")

schedulers := []string{
"balance-leader-scheduler",
"balance-witness-scheduler",
"balance-hot-region-scheduler",
}
for _, schedulerName := range schedulers {
err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s/%s/%s", urlPrefix, "scheduler-config", schedulerName, "list"), &resp,
testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true"))
suite.NoError(err)
}

err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "schedulers"), pauseArgs,
testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader))
re.NoError(err)
Expand Down
Loading

0 comments on commit 4e45e95

Please sign in to comment.