From 37c3c766b8427f8a1245193c8049e6cc2ca68492 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 1 Nov 2023 21:55:35 +0800 Subject: [PATCH] mcs: support region http interface in scheduling server Signed-off-by: lhy1024 --- pkg/mcs/scheduling/server/apis/v1/api.go | 199 ++++++++++++++ pkg/schedule/handler/handler.go | 207 ++++++++++++++ server/api/region.go | 252 +++++------------- server/api/region_test.go | 10 +- server/api/server.go | 27 +- tests/integrations/mcs/scheduling/api_test.go | 29 +- 6 files changed, 527 insertions(+), 197 deletions(-) diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index ed998b9b62d..9ec9145ba28 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -15,10 +15,12 @@ package apis import ( + "errors" "fmt" "net/http" "net/url" "strconv" + "strings" "sync" "github.com/gin-contrib/cors" @@ -38,6 +40,7 @@ import ( "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/typeutil" "github.com/unrolled/render" ) @@ -169,6 +172,11 @@ func (s *Service) RegisterRegionsRouter() { router := s.root.Group("regions") router.GET("/:id/label/:key", getRegionLabelByKey) router.GET("/:id/labels", getRegionLabels) + router.POST("/accelerate-schedule", accelerateRegionsScheduleInRange) + router.POST("/accelerate-schedule/batch", accelerateRegionsScheduleInRanges) + router.POST("/scatter", scatterRegions) + router.POST("/split", splitRegions) + router.GET("/replicated", checkRegionsReplicated) } // RegisterRegionLabelRouter registers the router of the region label handler. @@ -708,3 +716,194 @@ func getRegionLabelRuleByID(c *gin.Context) { } c.IndentedJSON(http.StatusOK, rule) } + +// @Tags region +// @Summary Accelerate regions scheduling a in given range, only receive hex format for keys +// @Accept json +// @Param body body object true "json params" +// @Param limit query integer false "Limit count" default(256) +// @Produce json +// @Success 200 {string} string "Accelerate regions scheduling in a given range [startKey, endKey)" +// @Failure 400 {string} string "The input is invalid." +// @Router /regions/accelerate-schedule [post] +func accelerateRegionsScheduleInRange(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + + var input map[string]interface{} + if err := c.BindJSON(&input); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + rawStartKey, ok1 := input["start_key"].(string) + rawEndKey, ok2 := input["end_key"].(string) + if !ok1 || !ok2 { + c.String(http.StatusBadRequest, "start_key or end_key is not string") + return + } + + limitStr, _ := c.GetQuery("limit") + limit, err := handler.AdjustLimit(limitStr, 256 /*default limit*/) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + err = handler.AccelerateRegionsScheduleInRange(rawStartKey, rawEndKey, limit) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + c.String(http.StatusOK, fmt.Sprintf("Accelerate regions scheduling in a given range [%s,%s)", rawStartKey, rawEndKey)) +} + +// @Tags region +// @Summary Accelerate regions scheduling in given ranges, only receive hex format for keys +// @Accept json +// @Param body body object true "json params" +// @Param limit query integer false "Limit count" default(256) +// @Produce json +// @Success 200 {string} string "Accelerate regions scheduling in given ranges [startKey1, endKey1), [startKey2, endKey2), ..." +// @Failure 400 {string} string "The input is invalid." +// @Router /regions/accelerate-schedule/batch [post] +func accelerateRegionsScheduleInRanges(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + + var input []map[string]interface{} + if err := c.BindJSON(&input); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + limitStr, _ := c.GetQuery("limit") + limit, err := handler.AdjustLimit(limitStr, 256 /*default limit*/) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + var msgBuilder strings.Builder + msgBuilder.Grow(128) + msgBuilder.WriteString("Accelerate regions scheduling in given ranges: ") + var startKeys, endKeys [][]byte + for _, rg := range input { + startKey, rawStartKey, err := apiutil.ParseKey("start_key", rg) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + endKey, rawEndKey, err := apiutil.ParseKey("end_key", rg) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + startKeys = append(startKeys, startKey) + endKeys = append(endKeys, endKey) + msgBuilder.WriteString(fmt.Sprintf("[%s,%s), ", rawStartKey, rawEndKey)) + } + err = handler.AccelerateRegionsScheduleInRanges(startKeys, endKeys, limit) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + c.String(http.StatusOK, msgBuilder.String()) +} + +// @Tags region +// @Summary Scatter regions by given key ranges or regions id distributed by given group with given retry limit +// @Accept json +// @Param body body object true "json params" +// @Produce json +// @Success 200 {string} string "Scatter regions by given key ranges or regions id distributed by given group with given retry limit" +// @Failure 400 {string} string "The input is invalid." +// @Router /regions/scatter [post] +func scatterRegions(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + + var input map[string]interface{} + if err := c.BindJSON(&input); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + rawStartKey, ok1 := input["start_key"].(string) + rawEndKey, ok2 := input["end_key"].(string) + group, _ := input["group"].(string) + retryLimit := 5 + if rl, ok := input["retry_limit"].(float64); ok { + retryLimit = int(rl) + } + + opsCount, failures, err := func() (int, map[uint64]error, error) { + if ok1 && ok2 { + return handler.ScatterRegionsByRange(rawStartKey, rawEndKey, group, retryLimit) + } + ids, ok := typeutil.JSONToUint64Slice(input["regions_id"]) + if !ok { + return 0, nil, errors.New("regions_id is invalid") + } + return handler.ScatterRegionsByID(ids, group, retryLimit, false) + }() + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + s := handler.BuildScatterRegionsResp(opsCount, failures) + c.IndentedJSON(http.StatusOK, &s) +} + +// @Tags region +// @Summary Split regions with given split keys +// @Accept json +// @Param body body object true "json params" +// @Produce json +// @Success 200 {string} string "Split regions with given split keys" +// @Failure 400 {string} string "The input is invalid." +// @Router /regions/split [post] +func splitRegions(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + + var input map[string]interface{} + if err := c.BindJSON(&input); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + s, ok := input["split_keys"] + if !ok { + c.String(http.StatusBadRequest, "split_keys should be provided.") + return + } + rawSplitKeys := s.([]string) + if len(rawSplitKeys) < 1 { + c.String(http.StatusBadRequest, "empty split keys.") + return + } + fmt.Println(rawSplitKeys) + retryLimit := 5 + if rl, ok := input["retry_limit"].(float64); ok { + retryLimit = int(rl) + } + s, err := handler.SplitRegions(c.Request.Context(), rawSplitKeys, retryLimit) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + c.IndentedJSON(http.StatusOK, &s) +} + +// @Tags region +// @Summary Check if regions in the given key ranges are replicated. Returns 'REPLICATED', 'INPROGRESS', or 'PENDING'. 'PENDING' means that there is at least one region pending for scheduling. Similarly, 'INPROGRESS' means there is at least one region in scheduling. +// @Param startKey query string true "Regions start key, hex encoded" +// @Param endKey query string true "Regions end key, hex encoded" +// @Produce plain +// @Success 200 {string} string "INPROGRESS" +// @Failure 400 {string} string "The input is invalid." +// @Router /regions/replicated [get] +func checkRegionsReplicated(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + rawStartKey, _ := c.GetQuery("start_key") + rawEndKey, _ := c.GetQuery("end_key") + state, err := handler.CheckRegionsReplicated(rawStartKey, rawEndKey) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + c.String(http.StatusOK, state) +} diff --git a/pkg/schedule/handler/handler.go b/pkg/schedule/handler/handler.go index c0cee81d27e..c8fb9e38a9f 100644 --- a/pkg/schedule/handler/handler.go +++ b/pkg/schedule/handler/handler.go @@ -16,12 +16,15 @@ package handler import ( "bytes" + "context" "encoding/hex" "net/http" + "strconv" "strings" "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" @@ -42,6 +45,11 @@ import ( "go.uber.org/zap" ) +const ( + defaultRegionLimit = 16 + maxRegionLimit = 10240 +) + // Server is the interface for handler about schedule. // TODO: remove it after GetCluster is unified between PD server and Scheduling server. type Server interface { @@ -1059,3 +1067,202 @@ func (h *Handler) GetRegionLabeler() (*labeler.RegionLabeler, error) { } return c.GetRegionLabeler(), nil } + +// AccelerateRegionsScheduleInRange accelerates regions scheduling in a given range. +func (h *Handler) AccelerateRegionsScheduleInRange(rawStartKey, rawEndKey string, limit int) error { + startKey, err := hex.DecodeString(rawStartKey) + if err != nil { + return err + } + endKey, err := hex.DecodeString(rawEndKey) + if err != nil { + return err + } + c := h.GetCluster() + if c == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } + co := h.GetCoordinator() + if co == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } + regions := c.ScanRegions(startKey, endKey, limit) + if len(regions) > 0 { + regionsIDList := make([]uint64, 0, len(regions)) + for _, region := range regions { + regionsIDList = append(regionsIDList, region.GetID()) + } + co.GetCheckerController().AddSuspectRegions(regionsIDList...) + } + return nil +} + +// AccelerateRegionsScheduleInRanges accelerates regions scheduling in given ranges. +func (h *Handler) AccelerateRegionsScheduleInRanges(startKeys [][]byte, endKeys [][]byte, limit int) error { + c := h.GetCluster() + if c == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } + co := h.GetCoordinator() + if co == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } + if len(startKeys) != len(endKeys) { + return errors.New("startKeys and endKeys should have the same length") + } + var regions []*core.RegionInfo + for i := range startKeys { + regions = append(regions, c.ScanRegions(startKeys[i], endKeys[i], limit)...) + } + if len(regions) > 0 { + regionsIDList := make([]uint64, 0, len(regions)) + for _, region := range regions { + regionsIDList = append(regionsIDList, region.GetID()) + } + co.GetCheckerController().AddSuspectRegions(regionsIDList...) + } + return nil +} + +// AdjustLimit adjusts the limit of regions to schedule. +func (h *Handler) AdjustLimit(limitStr string, defaultLimits ...int) (int, error) { + limit := defaultRegionLimit + if len(defaultLimits) > 0 { + limit = defaultLimits[0] + } + if limitStr != "" { + var err error + limit, err = strconv.Atoi(limitStr) + if err != nil { + return 0, err + } + } + if limit > maxRegionLimit { + limit = maxRegionLimit + } + return limit, nil +} + +// ScatterRegionsResponse is the response for scatter regions. +type ScatterRegionsResponse struct { + ProcessedPercentage int `json:"processed-percentage"` +} + +// BuildScatterRegionsResp builds ScatterRegionsResponse. +func (h *Handler) BuildScatterRegionsResp(opsCount int, failures map[uint64]error) *ScatterRegionsResponse { + // If there existed any operator failed to be added into Operator Controller, add its regions into unProcessedRegions + percentage := 100 + if len(failures) > 0 { + percentage = 100 - 100*len(failures)/(opsCount+len(failures)) + log.Debug("scatter regions", zap.Errors("failures", func() []error { + r := make([]error, 0, len(failures)) + for _, err := range failures { + r = append(r, err) + } + return r + }())) + } + return &ScatterRegionsResponse{ + ProcessedPercentage: percentage, + } +} + +// ScatterRegionsByRange scatters regions by range. +func (h *Handler) ScatterRegionsByRange(rawStartKey, rawEndKey string, group string, retryLimit int) (int, map[uint64]error, error) { + startKey, err := hex.DecodeString(rawStartKey) + if err != nil { + return 0, nil, err + } + endKey, err := hex.DecodeString(rawEndKey) + if err != nil { + return 0, nil, err + } + co := h.GetCoordinator() + if co == nil { + return 0, nil, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + return co.GetRegionScatterer().ScatterRegionsByRange(startKey, endKey, group, retryLimit) +} + +// ScatterRegionsByID scatters regions by id. +func (h *Handler) ScatterRegionsByID(ids []uint64, group string, retryLimit int, skipStoreLimit bool) (int, map[uint64]error, error) { + co := h.GetCoordinator() + if co == nil { + return 0, nil, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + return co.GetRegionScatterer().ScatterRegionsByID(ids, group, retryLimit, false) +} + +// SplitRegionsResponse is the response for split regions. +type SplitRegionsResponse struct { + ProcessedPercentage int `json:"processed-percentage"` + NewRegionsID []uint64 `json:"regions-id"` +} + +// SplitRegions splits regions by split keys. +func (h *Handler) SplitRegions(ctx context.Context, rawSplitKeys []string, retryLimit int) (*SplitRegionsResponse, error) { + co := h.GetCoordinator() + if co == nil { + return nil, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + splitKeys := make([][]byte, 0, len(rawSplitKeys)) + for _, rawKey := range rawSplitKeys { + key, err := hex.DecodeString(rawKey) + if err != nil { + return nil, err + } + splitKeys = append(splitKeys, key) + } + + percentage, newRegionsID := co.GetRegionSplitter().SplitRegions(ctx, splitKeys, retryLimit) + s := &SplitRegionsResponse{ + ProcessedPercentage: percentage, + NewRegionsID: newRegionsID, + } + failpoint.Inject("splitResponses", func(val failpoint.Value) { + rawID, ok := val.(int) + if ok { + s.ProcessedPercentage = 100 + s.NewRegionsID = []uint64{uint64(rawID)} + } + }) + return s, nil +} + +// CheckRegionsReplicated checks if regions are replicated. +func (h *Handler) CheckRegionsReplicated(rawStartKey, rawEndKey string) (string, error) { + startKey, err := hex.DecodeString(rawStartKey) + if err != nil { + return "", err + } + endKey, err := hex.DecodeString(rawEndKey) + if err != nil { + return "", err + } + c := h.GetCluster() + if c == nil { + return "", errs.ErrNotBootstrapped.GenWithStackByArgs() + } + co := h.GetCoordinator() + if co == nil { + return "", errs.ErrNotBootstrapped.GenWithStackByArgs() + } + regions := c.ScanRegions(startKey, endKey, -1) + state := "REPLICATED" + for _, region := range regions { + if !filter.IsRegionReplicated(c, region) { + state = "INPROGRESS" + if co.IsPendingRegion(region.GetID()) { + state = "PENDING" + break + } + } + } + failpoint.Inject("mockPending", func(val failpoint.Value) { + aok, ok := val.(bool) + if ok && aok { + state = "PENDING" + } + }) + return state, nil +} diff --git a/server/api/region.go b/server/api/region.go index 68e280f610c..a80e3d0d48a 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -27,21 +27,18 @@ import ( "github.com/gorilla/mux" jwriter "github.com/mailru/easyjson/jwriter" - "github.com/pingcap/failpoint" + "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/replication_modepb" - "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/keyspace" - "github.com/tikv/pd/pkg/schedule/filter" "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/server" "github.com/unrolled/render" - "go.uber.org/zap" ) // MetaPeer is api compatible with *metapb.Peer. @@ -301,51 +298,28 @@ func (h *regionHandler) GetRegion(w http.ResponseWriter, r *http.Request) { // @Failure 400 {string} string "The input is invalid." // @Router /regions/replicated [get] func (h *regionsHandler) CheckRegionsReplicated(w http.ResponseWriter, r *http.Request) { - rc := getCluster(r) - vars := mux.Vars(r) - startKeyHex := vars["startKey"] - startKey, err := hex.DecodeString(startKeyHex) + rawStartKey := vars["startKey"] + rawEndKey := vars["endKey"] + state, err := h.Handler.CheckRegionsReplicated(rawStartKey, rawEndKey) if err != nil { h.rd.JSON(w, http.StatusBadRequest, err.Error()) return } - endKeyHex := vars["endKey"] - endKey, err := hex.DecodeString(endKeyHex) - if err != nil { - h.rd.JSON(w, http.StatusBadRequest, err.Error()) - return - } - - regions := rc.ScanRegions(startKey, endKey, -1) - state := "REPLICATED" - for _, region := range regions { - if !filter.IsRegionReplicated(rc, region) { - state = "INPROGRESS" - if rc.GetCoordinator().IsPendingRegion(region.GetID()) { - state = "PENDING" - break - } - } - } - failpoint.Inject("mockPending", func(val failpoint.Value) { - aok, ok := val.(bool) - if ok && aok { - state = "PENDING" - } - }) h.rd.JSON(w, http.StatusOK, state) } type regionsHandler struct { + *server.Handler svr *server.Server rd *render.Render } func newRegionsHandler(svr *server.Server, rd *render.Render) *regionsHandler { return ®ionsHandler{ - svr: svr, - rd: rd, + Handler: svr.GetHandler(), + svr: svr, + rd: rd, } } @@ -422,19 +396,12 @@ func (h *regionsHandler) ScanRegions(w http.ResponseWriter, r *http.Request) { rc := getCluster(r) startKey := r.URL.Query().Get("key") endKey := r.URL.Query().Get("end_key") - - limit := defaultRegionLimit - if limitStr := r.URL.Query().Get("limit"); limitStr != "" { - var err error - limit, err = strconv.Atoi(limitStr) - if err != nil { - h.rd.JSON(w, http.StatusBadRequest, err.Error()) - return - } - } - if limit > maxRegionLimit { - limit = maxRegionLimit + limit, err := h.AdjustLimit(r.URL.Query().Get("limit")) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return } + regions := rc.ScanRegions([]byte(startKey), []byte(endKey), limit) b, err := marshalRegionsInfoJSON(r.Context(), regions) if err != nil { @@ -509,16 +476,10 @@ func (h *regionsHandler) GetKeyspaceRegions(w http.ResponseWriter, r *http.Reque return } - limit := defaultRegionLimit - if limitStr := r.URL.Query().Get("limit"); limitStr != "" { - limit, err = strconv.Atoi(limitStr) - if err != nil { - h.rd.JSON(w, http.StatusBadRequest, err.Error()) - return - } - } - if limit > maxRegionLimit { - limit = maxRegionLimit + limit, err := h.AdjustLimit(r.URL.Query().Get("limit")) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return } regionBound := keyspace.MakeRegionBound(keyspaceID) regions := rc.ScanRegions(regionBound.RawLeftBound, regionBound.RawRightBound, limit) @@ -789,8 +750,6 @@ func (h *regionsHandler) GetRegionSiblings(w http.ResponseWriter, r *http.Reques } const ( - defaultRegionLimit = 16 - maxRegionLimit = 10240 minRegionHistogramSize = 1 minRegionHistogramKeys = 1000 ) @@ -892,43 +851,27 @@ func (h *regionsHandler) GetTopCPURegions(w http.ResponseWriter, r *http.Request // @Failure 400 {string} string "The input is invalid." // @Router /regions/accelerate-schedule [post] func (h *regionsHandler) AccelerateRegionsScheduleInRange(w http.ResponseWriter, r *http.Request) { - rc := getCluster(r) var input map[string]interface{} if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil { return } - startKey, rawStartKey, err := apiutil.ParseKey("start_key", input) - if err != nil { - h.rd.JSON(w, http.StatusBadRequest, err.Error()) + rawStartKey, ok1 := input["start_key"].(string) + rawEndKey, ok2 := input["end_key"].(string) + if !ok1 || !ok2 { + h.rd.JSON(w, http.StatusBadRequest, "start_key or end_key is not string") return } - endKey, rawEndKey, err := apiutil.ParseKey("end_key", input) + limit, err := h.AdjustLimit(r.URL.Query().Get("limit"), 256 /*default limit*/) if err != nil { h.rd.JSON(w, http.StatusBadRequest, err.Error()) return } - limit := 256 - if limitStr := r.URL.Query().Get("limit"); limitStr != "" { - var err error - limit, err = strconv.Atoi(limitStr) - if err != nil { - h.rd.JSON(w, http.StatusBadRequest, err.Error()) - return - } - } - if limit > maxRegionLimit { - limit = maxRegionLimit - } - - regions := rc.ScanRegions(startKey, endKey, limit) - if len(regions) > 0 { - regionsIDList := make([]uint64, 0, len(regions)) - for _, region := range regions { - regionsIDList = append(regionsIDList, region.GetID()) - } - rc.AddSuspectRegions(regionsIDList...) + err = h.Handler.AccelerateRegionsScheduleInRange(rawStartKey, rawEndKey, limit) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return } h.rd.Text(w, http.StatusOK, fmt.Sprintf("Accelerate regions scheduling in a given range [%s,%s)", rawStartKey, rawEndKey)) } @@ -943,27 +886,20 @@ func (h *regionsHandler) AccelerateRegionsScheduleInRange(w http.ResponseWriter, // @Failure 400 {string} string "The input is invalid." // @Router /regions/accelerate-schedule/batch [post] func (h *regionsHandler) AccelerateRegionsScheduleInRanges(w http.ResponseWriter, r *http.Request) { - rc := getCluster(r) var input []map[string]interface{} if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil { return } - limit := 256 - if limitStr := r.URL.Query().Get("limit"); limitStr != "" { - var err error - limit, err = strconv.Atoi(limitStr) - if err != nil { - h.rd.JSON(w, http.StatusBadRequest, err.Error()) - return - } - } - if limit > maxRegionLimit { - limit = maxRegionLimit + limit, err := h.AdjustLimit(r.URL.Query().Get("limit"), 256 /*default limit*/) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return } + var msgBuilder strings.Builder msgBuilder.Grow(128) msgBuilder.WriteString("Accelerate regions scheduling in given ranges: ") - var regions []*core.RegionInfo + var startKeys, endKeys [][]byte for _, rg := range input { startKey, rawStartKey, err := apiutil.ParseKey("start_key", rg) if err != nil { @@ -975,32 +911,24 @@ func (h *regionsHandler) AccelerateRegionsScheduleInRanges(w http.ResponseWriter h.rd.JSON(w, http.StatusBadRequest, err.Error()) return } - regions = append(regions, rc.ScanRegions(startKey, endKey, limit)...) + startKeys = append(startKeys, startKey) + endKeys = append(endKeys, endKey) msgBuilder.WriteString(fmt.Sprintf("[%s,%s), ", rawStartKey, rawEndKey)) } - if len(regions) > 0 { - regionsIDList := make([]uint64, 0, len(regions)) - for _, region := range regions { - regionsIDList = append(regionsIDList, region.GetID()) - } - rc.AddSuspectRegions(regionsIDList...) + err = h.Handler.AccelerateRegionsScheduleInRanges(startKeys, endKeys, limit) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return } h.rd.Text(w, http.StatusOK, msgBuilder.String()) } func (h *regionsHandler) GetTopNRegions(w http.ResponseWriter, r *http.Request, less func(a, b *core.RegionInfo) bool) { rc := getCluster(r) - limit := defaultRegionLimit - if limitStr := r.URL.Query().Get("limit"); limitStr != "" { - var err error - limit, err = strconv.Atoi(limitStr) - if err != nil { - h.rd.JSON(w, http.StatusBadRequest, err.Error()) - return - } - } - if limit > maxRegionLimit { - limit = maxRegionLimit + limit, err := h.AdjustLimit(r.URL.Query().Get("limit")) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return } regions := TopNRegions(rc.GetRegions(), less, limit) b, err := marshalRegionsInfoJSON(r.Context(), regions) @@ -1020,69 +948,33 @@ func (h *regionsHandler) GetTopNRegions(w http.ResponseWriter, r *http.Request, // @Failure 400 {string} string "The input is invalid." // @Router /regions/scatter [post] func (h *regionsHandler) ScatterRegions(w http.ResponseWriter, r *http.Request) { - rc := getCluster(r) var input map[string]interface{} if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil { return } - _, ok1 := input["start_key"].(string) - _, ok2 := input["end_key"].(string) - group, ok := input["group"].(string) - if !ok { - group = "" - } + rawStartKey, ok1 := input["start_key"].(string) + rawEndKey, ok2 := input["end_key"].(string) + group, _ := input["group"].(string) retryLimit := 5 if rl, ok := input["retry_limit"].(float64); ok { retryLimit = int(rl) } - opsCount := 0 - var failures map[uint64]error - var err error - if ok1 && ok2 { - startKey, _, err := apiutil.ParseKey("start_key", input) - if err != nil { - h.rd.JSON(w, http.StatusBadRequest, err.Error()) - return - } - endKey, _, err := apiutil.ParseKey("end_key", input) - if err != nil { - h.rd.JSON(w, http.StatusBadRequest, err.Error()) - return - } - opsCount, failures, err = rc.GetRegionScatterer().ScatterRegionsByRange(startKey, endKey, group, retryLimit) - if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return + + opsCount, failures, err := func() (int, map[uint64]error, error) { + if ok1 && ok2 { + return h.ScatterRegionsByRange(rawStartKey, rawEndKey, group, retryLimit) } - } else { ids, ok := typeutil.JSONToUint64Slice(input["regions_id"]) if !ok { - h.rd.JSON(w, http.StatusBadRequest, "regions_id is invalid") - return + return 0, nil, errors.New("regions_id is invalid") } - opsCount, failures, err = rc.GetRegionScatterer().ScatterRegionsByID(ids, group, retryLimit, false) - if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - } - // If there existed any operator failed to be added into Operator Controller, add its regions into unProcessedRegions - percentage := 100 - if len(failures) > 0 { - percentage = 100 - 100*len(failures)/(opsCount+len(failures)) - log.Debug("scatter regions", zap.Errors("failures", func() []error { - r := make([]error, 0, len(failures)) - for _, err := range failures { - r = append(r, err) - } - return r - }())) - } - s := struct { - ProcessedPercentage int `json:"processed-percentage"` - }{ - ProcessedPercentage: percentage, + return h.ScatterRegionsByID(ids, group, retryLimit, false) + }() + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return } + s := h.BuildScatterRegionsResp(opsCount, failures) h.rd.JSON(w, http.StatusOK, &s) } @@ -1095,16 +987,16 @@ func (h *regionsHandler) ScatterRegions(w http.ResponseWriter, r *http.Request) // @Failure 400 {string} string "The input is invalid." // @Router /regions/split [post] func (h *regionsHandler) SplitRegions(w http.ResponseWriter, r *http.Request) { - rc := getCluster(r) var input map[string]interface{} if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil { return } - rawSplitKeys, ok := input["split_keys"].([]interface{}) + s, ok := input["split_keys"] if !ok { h.rd.JSON(w, http.StatusBadRequest, "split_keys should be provided.") return } + rawSplitKeys := s.([]string) if len(rawSplitKeys) < 1 { h.rd.JSON(w, http.StatusBadRequest, "empty split keys.") return @@ -1113,29 +1005,11 @@ func (h *regionsHandler) SplitRegions(w http.ResponseWriter, r *http.Request) { if rl, ok := input["retry_limit"].(float64); ok { retryLimit = int(rl) } - splitKeys := make([][]byte, 0, len(rawSplitKeys)) - for _, rawKey := range rawSplitKeys { - key, err := hex.DecodeString(rawKey.(string)) - if err != nil { - h.rd.JSON(w, http.StatusBadRequest, err.Error()) - return - } - splitKeys = append(splitKeys, key) - } - s := struct { - ProcessedPercentage int `json:"processed-percentage"` - NewRegionsID []uint64 `json:"regions-id"` - }{} - percentage, newRegionsID := rc.GetRegionSplitter().SplitRegions(r.Context(), splitKeys, retryLimit) - s.ProcessedPercentage = percentage - s.NewRegionsID = newRegionsID - failpoint.Inject("splitResponses", func(val failpoint.Value) { - rawID, ok := val.(int) - if ok { - s.ProcessedPercentage = 100 - s.NewRegionsID = []uint64{uint64(rawID)} - } - }) + s, err := h.Handler.SplitRegions(r.Context(), rawSplitKeys, retryLimit) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } h.rd.JSON(w, http.StatusOK, &s) } diff --git a/server/api/region_test.go b/server/api/region_test.go index a39a1e5c5fd..3e794b0c412 100644 --- a/server/api/region_test.go +++ b/server/api/region_test.go @@ -424,9 +424,9 @@ func (suite *regionTestSuite) TestSplitRegions() { suite.Equal(100, s.ProcessedPercentage) suite.Equal([]uint64{newRegionID}, s.NewRegionsID) } - suite.NoError(failpoint.Enable("github.com/tikv/pd/server/api/splitResponses", fmt.Sprintf("return(%v)", newRegionID))) + suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/handler/splitResponses", fmt.Sprintf("return(%v)", newRegionID))) err := tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/regions/split", suite.urlPrefix), []byte(body), checkOpt) - suite.NoError(failpoint.Disable("github.com/tikv/pd/server/api/splitResponses")) + suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/handler/splitResponses")) suite.NoError(err) } @@ -716,6 +716,8 @@ func (suite *regionsReplicatedTestSuite) TestCheckRegionsReplicated() { // correct test url = fmt.Sprintf(`%s/regions/replicated?startKey=%s&endKey=%s`, suite.urlPrefix, hex.EncodeToString(r1.GetStartKey()), hex.EncodeToString(r1.GetEndKey())) + err = tu.CheckGetJSON(testDialClient, url, nil, tu.StatusOK(re)) + suite.NoError(err) // test one rule data, err := json.Marshal(bundle) @@ -727,11 +729,11 @@ func (suite *regionsReplicatedTestSuite) TestCheckRegionsReplicated() { suite.NoError(err) suite.Equal("REPLICATED", status) - suite.NoError(failpoint.Enable("github.com/tikv/pd/server/api/mockPending", "return(true)")) + suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/handler/mockPending", "return(true)")) err = tu.ReadGetJSON(re, testDialClient, url, &status) suite.NoError(err) suite.Equal("PENDING", status) - suite.NoError(failpoint.Disable("github.com/tikv/pd/server/api/mockPending")) + suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/handler/mockPending")) // test multiple rules r1 = core.NewTestRegionInfo(2, 1, []byte("a"), []byte("b")) r1.GetMeta().Peers = append(r1.GetMeta().Peers, &metapb.Peer{Id: 5, StoreId: 1}) diff --git a/server/api/server.go b/server/api/server.go index 992cd42d796..83e8908f1f3 100644 --- a/server/api/server.go +++ b/server/api/server.go @@ -89,6 +89,31 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP // However, the path "/region/id" is used to get the region by id, which is not what we want. return strings.Contains(r.URL.Path, "label") }), + serverapi.MicroserviceRedirectRule( + prefix+"/regions/accelerate-schedule", + scheapi.APIPathPrefix+"/regions/accelerate-schedule", + mcs.SchedulingServiceName, + []string{http.MethodPost}), + serverapi.MicroserviceRedirectRule( + prefix+"/regions/accelerate-schedule/batch", + scheapi.APIPathPrefix+"/regions/accelerate-schedule/batch", + mcs.SchedulingServiceName, + []string{http.MethodPost}), + serverapi.MicroserviceRedirectRule( + prefix+"/regions/scatter", + scheapi.APIPathPrefix+"/regions/scatter", + mcs.SchedulingServiceName, + []string{http.MethodPost}), + serverapi.MicroserviceRedirectRule( + prefix+"/regions/split", + scheapi.APIPathPrefix+"/regions/split", + mcs.SchedulingServiceName, + []string{http.MethodPost}), + serverapi.MicroserviceRedirectRule( + prefix+"/regions/replicated", + scheapi.APIPathPrefix+"/regions/replicated", + mcs.SchedulingServiceName, + []string{http.MethodGet}), serverapi.MicroserviceRedirectRule( prefix+"/config/region-label", scheapi.APIPathPrefix+"/config/region-label", @@ -111,8 +136,6 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP scheapi.APIPathPrefix+"/schedulers", mcs.SchedulingServiceName, []string{http.MethodPost}), - // TODO: we need to consider the case that v1 api not support restful api. - // we might change the previous path parameters to query parameters. ), negroni.Wrap(r)), ) diff --git a/tests/integrations/mcs/scheduling/api_test.go b/tests/integrations/mcs/scheduling/api_test.go index 3873ca0f96d..9b04f09dabf 100644 --- a/tests/integrations/mcs/scheduling/api_test.go +++ b/tests/integrations/mcs/scheduling/api_test.go @@ -2,6 +2,7 @@ package scheduling_test import ( "context" + "encoding/hex" "encoding/json" "fmt" "net/http" @@ -235,9 +236,33 @@ func (suite *apiTestSuite) TestAPIForward() { testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader)) re.NoError(err) err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "region/id/1/label/key"), nil, - testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader,"true")) + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) re.NoError(err) err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "region/id/1/labels"), nil, - testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader,"true")) + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + + // Test Region + body := fmt.Sprintf(`{"start_key":"%s", "end_key": "%s"}`, hex.EncodeToString([]byte("a1")), hex.EncodeToString([]byte("a3"))) + err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "regions/accelerate-schedule"), []byte(body), + testutil.StatusOK(re), testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + body = fmt.Sprintf(`[{"start_key":"%s", "end_key": "%s"}, {"start_key":"%s", "end_key": "%s"}]`, hex.EncodeToString([]byte("a1")), hex.EncodeToString([]byte("a3")), hex.EncodeToString([]byte("a4")), hex.EncodeToString([]byte("a6"))) + err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "regions/accelerate-schedule/batch"), []byte(body), + testutil.StatusOK(re), testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) re.NoError(err) + body = fmt.Sprintf(`{"start_key":"%s", "end_key": "%s"}`, hex.EncodeToString([]byte("b1")), hex.EncodeToString([]byte("b3"))) + err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "regions/scatter"), []byte(body), + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + suite.NoError(err) + body = fmt.Sprintf(`{"retry_limit":%v, "split_keys": ["%s","%s","%s"]}`, 3, + hex.EncodeToString([]byte("bbb")), + hex.EncodeToString([]byte("ccc")), + hex.EncodeToString([]byte("ddd"))) + err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "regions/split"), []byte(body), + testutil.StatusOK(re), testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + suite.NoError(err) + err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf(`%s/regions/replicated?startKey=%s&endKey=%s`, urlPrefix, hex.EncodeToString([]byte("a1")), hex.EncodeToString([]byte("a2"))), nil, + testutil.StatusOK(re), testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + suite.NoError(err) }