Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: support scheduler config forward and enable some tests #7256

Merged
merged 8 commits into from
Nov 1, 2023
34 changes: 34 additions & 0 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/joho/godotenv"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
sche "github.com/tikv/pd/pkg/schedule/core"
Expand Down Expand Up @@ -128,6 +129,7 @@ func (s *Service) RegisterSchedulersRouter() {
router := s.root.Group("schedulers")
router.GET("", getSchedulers)
router.GET("/diagnostic/:name", getDiagnosticResult)
router.GET("/config/:name/:suffix", getSchedulerConfigByName)
// TODO: in the future, we should split pauseOrResumeScheduler to two different APIs.
// And we need to do one-to-two forwarding in the API middleware.
router.POST("/:name", pauseOrResumeScheduler)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about unifying these formats?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about

router.GET("", getSchedulers)
router.GET("/:name", getDiagnosticResult)
router.POST("/:name", pauseOrResumeScheduler)
router.GET("/config/:name/:suffix", getSchedulerConfigByName)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using
router.GET("", getSchedulers)
router.GET("/:name/diagnose", getDiagnosticResult)
router.POST("/:name", pauseOrResumeScheduler)
router.GET("/:name/config", getSchedulerConfigByName)

Expand Down Expand Up @@ -385,6 +387,38 @@ func getSchedulers(c *gin.Context) {
c.IndentedJSON(http.StatusOK, output)
}

func getSchedulerConfigByName(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
handlers := svr.GetCoordinator().GetSchedulersController().GetSchedulerHandlers()
name := c.Param("name")
if _, ok := handlers[name]; !ok {
c.String(http.StatusNotFound, errs.ErrSchedulerNotFound.GenWithStackByArgs().Error())
return
}
co := svr.GetCoordinator()
if co == nil {
c.String(http.StatusInternalServerError, errs.ErrNotBootstrapped.GenWithStackByArgs().Error())
return
}
sc := co.GetSchedulersController()
if sc == nil {
c.String(http.StatusInternalServerError, errs.ErrNotBootstrapped.GenWithStackByArgs().Error())
return
}
isDisabled, err := sc.IsSchedulerDisabled(name)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
if isDisabled {
c.String(http.StatusNotFound, errs.ErrSchedulerNotFound.GenWithStackByArgs().Error())
return
}
suffix := c.Param("suffix")
c.Request.URL.Path = "/" + suffix
handlers[name].ServeHTTP(c.Writer, c.Request)
}

// @Tags schedulers
// @Summary List schedulers diagnostic result.
// @Produce json
Expand Down
15 changes: 14 additions & 1 deletion pkg/schedule/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,20 @@ func (h *Handler) GetSchedulerByStatus(status string, needTS bool) (interface{},
}
return disabledSchedulers, nil
default:
return schedulers, nil
// The default scheduler could not be deleted in scheduling server,
// so schedulers could only be disabled.
// We should not return the disabled schedulers here.
var enabledSchedulers []string
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we distinguish between disabled and removed schedulers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussion, we will not distinguish them.

for _, scheduler := range schedulers {
disabled, err := sc.IsSchedulerDisabled(scheduler)
if err != nil {
return nil, err
}
if !disabled {
enabledSchedulers = append(enabledSchedulers, scheduler)
}
}
return enabledSchedulers, nil
}
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/utils/testutil/api_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,29 +37,29 @@ func StatusOK(re *require.Assertions) func([]byte, int, http.Header) {

// StatusNotOK is used to check whether http response code is not equal http.StatusOK.
func StatusNotOK(re *require.Assertions) func([]byte, int, http.Header) {
return func(_ []byte, i int, _ http.Header) {
re.NotEqual(http.StatusOK, i)
return func(resp []byte, i int, _ http.Header) {
re.NotEqual(http.StatusOK, i, "resp: "+string(resp))
}
}

// ExtractJSON is used to check whether given data can be extracted successfully.
func ExtractJSON(re *require.Assertions, data interface{}) func([]byte, int, http.Header) {
return func(res []byte, _ int, _ http.Header) {
re.NoError(json.Unmarshal(res, data))
return func(resp []byte, _ int, _ http.Header) {
re.NoError(json.Unmarshal(resp, data), "resp: "+string(resp))
}
}

// StringContain is used to check whether response context contains given string.
func StringContain(re *require.Assertions, sub string) func([]byte, int, http.Header) {
return func(res []byte, _ int, _ http.Header) {
re.Contains(string(res), sub)
return func(resp []byte, _ int, _ http.Header) {
re.Contains(string(resp), sub, "resp: "+string(resp))
}
}

// StringEqual is used to check whether response context equal given string.
func StringEqual(re *require.Assertions, str string) func([]byte, int, http.Header) {
return func(res []byte, _ int, _ http.Header) {
re.Contains(string(res), str)
return func(resp []byte, _ int, _ http.Header) {
re.Contains(string(resp), str, "resp: "+string(resp))
}
}

Expand Down
6 changes: 6 additions & 0 deletions server/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP
// "/schedulers", http.MethodGet
// "/schedulers/{name}", http.MethodPost
// "/schedulers/diagnostic/{name}", http.MethodGet
// "/scheduler-config", http.MethodGet
// "/hotspot/regions/read", http.MethodGet
// "/hotspot/regions/write", http.MethodGet
// "/hotspot/regions/history", http.MethodGet
Expand Down Expand Up @@ -90,6 +91,11 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP
scheapi.APIPathPrefix+"/schedulers",
mcs.SchedulingServiceName,
[]string{http.MethodGet}),
serverapi.MicroserviceRedirectRule(
prefix+"/scheduler-config",
scheapi.APIPathPrefix+"/schedulers/config",
mcs.SchedulingServiceName,
[]string{http.MethodGet}),
serverapi.MicroserviceRedirectRule(
prefix+"/schedulers/", // Note: this means "/schedulers/{name}"
scheapi.APIPathPrefix+"/schedulers",
Expand Down
5 changes: 2 additions & 3 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,11 +789,10 @@ func (o *PersistOptions) Persist(storage endpoint.ConfigStorage) error {
},
StoreConfig: *o.GetStoreConfig(),
}
err := storage.SaveConfig(cfg)
failpoint.Inject("persistFail", func() {
err = errors.New("fail to persist")
failpoint.Return(errors.New("fail to persist"))
})
return err
return storage.SaveConfig(cfg)
}

// Reload reloads the configuration from the storage.
Expand Down
12 changes: 12 additions & 0 deletions tests/integrations/mcs/scheduling/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func (suite *apiTestSuite) TestAPIForward() {
// "/schedulers", http.MethodGet
// "/schedulers/{name}", http.MethodPost
// "/schedulers/diagnostic/{name}", http.MethodGet
// "/scheduler-config/", http.MethodGet
// Should not redirect:
// "/schedulers", http.MethodPost
// "/schedulers/{name}", http.MethodDelete
Expand All @@ -189,6 +190,17 @@ func (suite *apiTestSuite) TestAPIForward() {
testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true"))
suite.NoError(err)

schedulers := []string{
"balance-leader-scheduler",
"balance-witness-scheduler",
"balance-hot-region-scheduler",
}
for _, schedulerName := range schedulers {
err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s/%s/%s", urlPrefix, "scheduler-config", schedulerName, "list"), &resp,
testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true"))
suite.NoError(err)
}

err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "schedulers"), pauseArgs,
testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader))
re.NoError(err)
Expand Down
Loading