Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: support scheduler config forward and enable some tests #7256

Merged
merged 8 commits into from
Nov 1, 2023
Merged
57 changes: 57 additions & 0 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/handler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/statistics/utils"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/utils/apiutil"
Expand Down Expand Up @@ -130,6 +131,8 @@ func (s *Service) RegisterSchedulersRouter() {
router := s.root.Group("schedulers")
router.GET("", getSchedulers)
router.GET("/diagnostic/:name", getDiagnosticResult)
router.GET("/config", getSchedulerConfig)
router.GET("/config/:name/list", getSchedulerConfigByName)
// TODO: in the future, we should split pauseOrResumeScheduler to two different APIs.
// And we need to do one-to-two forwarding in the API middleware.
router.POST("/:name", pauseOrResumeScheduler)
Expand Down Expand Up @@ -432,6 +435,60 @@ func getSchedulers(c *gin.Context) {
c.IndentedJSON(http.StatusOK, output)
}

// @Tags schedulers
// @Summary List all scheduler configs.
// @Produce json
// @Success 200 {object} map[string]interface{}
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /schedulers/config/ [get]
func getSchedulerConfig(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
sc, err := handler.GetSchedulersController()
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
sches, configs, err := sc.GetAllSchedulerConfigs()
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, schedulers.ToPayload(sches, configs))
}

// @Tags schedulers
// @Summary List scheduler config by name.
// @Produce json
// @Success 200 {object} map[string]interface{}
// @Failure 404 {string} string scheduler not found
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /schedulers/config/{name}/list [get]
func getSchedulerConfigByName(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
sc, err := handler.GetSchedulersController()
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
handlers := sc.GetSchedulerHandlers()
name := c.Param("name")
if _, ok := handlers[name]; !ok {
c.String(http.StatusNotFound, errs.ErrSchedulerNotFound.GenWithStackByArgs().Error())
return
}
isDisabled, err := sc.IsSchedulerDisabled(name)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
if isDisabled {
c.String(http.StatusNotFound, errs.ErrSchedulerNotFound.GenWithStackByArgs().Error())
return
}
c.Request.URL.Path = "/list"
handlers[name].ServeHTTP(c.Writer, c.Request)
}

// @Tags schedulers
// @Summary List schedulers diagnostic result.
// @Produce json
Expand Down
45 changes: 33 additions & 12 deletions pkg/schedule/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,13 +765,22 @@ func (h *Handler) GetCheckerStatus(name string) (map[string]bool, error) {
}, nil
}

// GetSchedulerNames returns all names of schedulers.
func (h *Handler) GetSchedulerNames() ([]string, error) {
// GetSchedulersController returns controller of schedulers.
func (h *Handler) GetSchedulersController() (*schedulers.Controller, error) {
co := h.GetCoordinator()
if co == nil {
return nil, errs.ErrNotBootstrapped.GenWithStackByArgs()
}
return co.GetSchedulersController().GetSchedulerNames(), nil
return co.GetSchedulersController(), nil
}

// GetSchedulerNames returns all names of schedulers.
func (h *Handler) GetSchedulerNames() ([]string, error) {
sc, err := h.GetSchedulersController()
if err != nil {
return nil, err
}
return sc.GetSchedulerNames(), nil
}

type schedulerPausedPeriod struct {
Expand All @@ -782,11 +791,10 @@ type schedulerPausedPeriod struct {

// GetSchedulerByStatus returns all names of schedulers by status.
func (h *Handler) GetSchedulerByStatus(status string, needTS bool) (interface{}, error) {
co := h.GetCoordinator()
if co == nil {
return nil, errs.ErrNotBootstrapped.GenWithStackByArgs()
sc, err := h.GetSchedulersController()
if err != nil {
return nil, err
}
sc := co.GetSchedulersController()
schedulers := sc.GetSchedulerNames()
switch status {
case "paused":
Expand Down Expand Up @@ -837,7 +845,20 @@ func (h *Handler) GetSchedulerByStatus(status string, needTS bool) (interface{},
}
return disabledSchedulers, nil
default:
return schedulers, nil
// The default scheduler could not be deleted in scheduling server,
// so schedulers could only be disabled.
// We should not return the disabled schedulers here.
var enabledSchedulers []string
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we distinguish between disabled and removed schedulers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussion, we will not distinguish them.

for _, scheduler := range schedulers {
disabled, err := sc.IsSchedulerDisabled(scheduler)
if err != nil {
return nil, err
}
if !disabled {
enabledSchedulers = append(enabledSchedulers, scheduler)
}
}
return enabledSchedulers, nil
}
}

Expand All @@ -861,11 +882,11 @@ func (h *Handler) GetDiagnosticResult(name string) (*schedulers.DiagnosticResult
// t == 0 : resume scheduler.
// t > 0 : scheduler delays t seconds.
func (h *Handler) PauseOrResumeScheduler(name string, t int64) (err error) {
co := h.GetCoordinator()
if co == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()
sc, err := h.GetSchedulersController()
if err != nil {
return err
}
if err = co.GetSchedulersController().PauseOrResumeScheduler(name, t); err != nil {
if err = sc.PauseOrResumeScheduler(name, t); err != nil {
if t == 0 {
log.Error("can not resume scheduler", zap.String("scheduler-name", name), errs.ZapError(err))
} else {
Expand Down
18 changes: 18 additions & 0 deletions pkg/schedule/schedulers/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,24 @@ func DecodeConfig(data []byte, v interface{}) error {
return nil
}

// ToPayload returns the payload of config.
func ToPayload(sches, configs []string) map[string]interface{} {
payload := make(map[string]interface{})
for i, sche := range sches {
var config interface{}
err := DecodeConfig([]byte(configs[i]), &config)
if err != nil {
log.Error("failed to decode scheduler config",
zap.String("config", configs[i]),
zap.String("scheduler", sche),
errs.ZapError(err))
continue
}
payload[sche] = config
}
return payload
}

// ConfigDecoder used to decode the config.
type ConfigDecoder func(v interface{}) error

Expand Down
5 changes: 5 additions & 0 deletions pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,11 @@ func (c *Controller) CheckTransferWitnessLeader(region *core.RegionInfo) {
}
}

// GetAllSchedulerConfigs returns all scheduler configs.
func (c *Controller) GetAllSchedulerConfigs() ([]string, []string, error) {
return c.storage.LoadAllSchedulerConfigs()
}

// ScheduleController is used to manage a scheduler.
type ScheduleController struct {
Scheduler
Expand Down
1 change: 1 addition & 0 deletions pkg/schedule/schedulers/shuffle_region_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (conf *shuffleRegionSchedulerConfig) IsRoleAllow(role string) bool {

func (conf *shuffleRegionSchedulerConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) {
router := mux.NewRouter()
router.HandleFunc("/list", conf.handleGetRoles).Methods(http.MethodGet)
router.HandleFunc("/roles", conf.handleGetRoles).Methods(http.MethodGet)
router.HandleFunc("/roles", conf.handleSetRoles).Methods(http.MethodPost)
router.ServeHTTP(w, r)
Expand Down
16 changes: 8 additions & 8 deletions pkg/utils/testutil/api_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,29 +37,29 @@ func StatusOK(re *require.Assertions) func([]byte, int, http.Header) {

// StatusNotOK is used to check whether http response code is not equal http.StatusOK.
func StatusNotOK(re *require.Assertions) func([]byte, int, http.Header) {
return func(_ []byte, i int, _ http.Header) {
re.NotEqual(http.StatusOK, i)
return func(resp []byte, i int, _ http.Header) {
re.NotEqual(http.StatusOK, i, "resp: "+string(resp))
}
}

// ExtractJSON is used to check whether given data can be extracted successfully.
func ExtractJSON(re *require.Assertions, data interface{}) func([]byte, int, http.Header) {
return func(res []byte, _ int, _ http.Header) {
re.NoError(json.Unmarshal(res, data))
return func(resp []byte, _ int, _ http.Header) {
re.NoError(json.Unmarshal(resp, data), "resp: "+string(resp))
}
}

// StringContain is used to check whether response context contains given string.
func StringContain(re *require.Assertions, sub string) func([]byte, int, http.Header) {
return func(res []byte, _ int, _ http.Header) {
re.Contains(string(res), sub)
return func(resp []byte, _ int, _ http.Header) {
re.Contains(string(resp), sub, "resp: "+string(resp))
}
}

// StringEqual is used to check whether response context equal given string.
func StringEqual(re *require.Assertions, str string) func([]byte, int, http.Header) {
return func(res []byte, _ int, _ http.Header) {
re.Contains(string(res), str)
return func(resp []byte, _ int, _ http.Header) {
re.Contains(string(resp), str, "resp: "+string(resp))
}
}

Expand Down
6 changes: 6 additions & 0 deletions server/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP
// "/schedulers", http.MethodGet
// "/schedulers/{name}", http.MethodPost
// "/schedulers/diagnostic/{name}", http.MethodGet
// "/scheduler-config", http.MethodGet
// "/hotspot/regions/read", http.MethodGet
// "/hotspot/regions/write", http.MethodGet
// "/hotspot/regions/history", http.MethodGet
Expand Down Expand Up @@ -90,6 +91,11 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP
scheapi.APIPathPrefix+"/schedulers",
mcs.SchedulingServiceName,
[]string{http.MethodGet}),
serverapi.MicroserviceRedirectRule(
prefix+"/scheduler-config",
scheapi.APIPathPrefix+"/schedulers/config",
mcs.SchedulingServiceName,
[]string{http.MethodGet}),
serverapi.MicroserviceRedirectRule(
prefix+"/schedulers/", // Note: this means "/schedulers/{name}"
scheapi.APIPathPrefix+"/schedulers",
Expand Down
5 changes: 2 additions & 3 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,11 +789,10 @@ func (o *PersistOptions) Persist(storage endpoint.ConfigStorage) error {
},
StoreConfig: *o.GetStoreConfig(),
}
err := storage.SaveConfig(cfg)
failpoint.Inject("persistFail", func() {
err = errors.New("fail to persist")
failpoint.Return(errors.New("fail to persist"))
})
return err
return storage.SaveConfig(cfg)
}

// Reload reloads the configuration from the storage.
Expand Down
15 changes: 1 addition & 14 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -948,20 +948,7 @@ func (s *Server) GetConfig() *config.Config {
if err != nil {
return cfg
}
payload := make(map[string]interface{})
for i, sche := range sches {
var config interface{}
err := schedulers.DecodeConfig([]byte(configs[i]), &config)
if err != nil {
log.Error("failed to decode scheduler config",
zap.String("config", configs[i]),
zap.String("scheduler", sche),
errs.ZapError(err))
continue
}
payload[sche] = config
}
cfg.Schedule.SchedulersPayload = payload
cfg.Schedule.SchedulersPayload = schedulers.ToPayload(sches, configs)
return cfg
}

Expand Down
21 changes: 21 additions & 0 deletions tests/integrations/mcs/scheduling/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ func (suite *apiTestSuite) TestAPIForward() {
// "/schedulers", http.MethodGet
// "/schedulers/{name}", http.MethodPost
// "/schedulers/diagnostic/{name}", http.MethodGet
// "/scheduler-config/", http.MethodGet
// "/scheduler-config/{name}/list", http.MethodGet
// "/scheduler-config/{name}/roles", http.MethodGet
// Should not redirect:
// "/schedulers", http.MethodPost
// "/schedulers/{name}", http.MethodDelete
Expand All @@ -191,6 +194,24 @@ func (suite *apiTestSuite) TestAPIForward() {
testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true"))
suite.NoError(err)

err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "scheduler-config"), &resp,
testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true"))
suite.NoError(err)
re.Contains(resp, "balance-leader-scheduler")
re.Contains(resp, "balance-witness-scheduler")
re.Contains(resp, "balance-hot-region-scheduler")

schedulers := []string{
"balance-leader-scheduler",
"balance-witness-scheduler",
"balance-hot-region-scheduler",
}
for _, schedulerName := range schedulers {
err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s/%s/%s", urlPrefix, "scheduler-config", schedulerName, "list"), &resp,
testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true"))
suite.NoError(err)
}

err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "schedulers"), pauseArgs,
testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader))
re.NoError(err)
Expand Down
Loading