Skip to content

Commit

Permalink
mcs: add store and region interface in scheduling server (#7754)
Browse files Browse the repository at this point in the history
ref #5839

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
lhy1024 and ti-chi-bot[bot] authored Jan 31, 2024
1 parent 357ad3f commit 4e48f5b
Show file tree
Hide file tree
Showing 21 changed files with 912 additions and 583 deletions.
125 changes: 125 additions & 0 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ import (
"github.com/gin-contrib/pprof"
"github.com/gin-gonic/gin"
"github.com/joho/godotenv"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/response"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/handler"
"github.com/tikv/pd/pkg/schedule/operator"
Expand Down Expand Up @@ -124,6 +126,7 @@ func NewService(srv *scheserver.Service) *Service {
s.RegisterCheckersRouter()
s.RegisterHotspotRouter()
s.RegisterRegionsRouter()
s.RegisterStoresRouter()
return s
}

Expand Down Expand Up @@ -174,9 +177,19 @@ func (s *Service) RegisterOperatorsRouter() {
router.GET("/records", getOperatorRecords)
}

// RegisterStoresRouter registers the router of the stores handler.
func (s *Service) RegisterStoresRouter() {
router := s.root.Group("stores")
router.GET("", getAllStores)
router.GET("/:id", getStoreByID)
}

// RegisterRegionsRouter registers the router of the regions handler.
func (s *Service) RegisterRegionsRouter() {
router := s.root.Group("regions")
router.GET("", getAllRegions)
router.GET("/:id", getRegionByID)
router.GET("/count", getRegionCount)
router.POST("/accelerate-schedule", accelerateRegionsScheduleInRange)
router.POST("/accelerate-schedule/batch", accelerateRegionsScheduleInRanges)
router.POST("/scatter", scatterRegions)
Expand Down Expand Up @@ -1343,3 +1356,115 @@ func checkRegionsReplicated(c *gin.Context) {
}
c.IndentedJSON(http.StatusOK, state)
}

// @Tags store
// @Summary Get a store's information.
// @Param id path integer true "Store Id"
// @Produce json
// @Success 200 {object} response.StoreInfo
// @Failure 400 {string} string "The input is invalid."
// @Failure 404 {string} string "The store does not exist."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /stores/{id} [get]
func getStoreByID(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
idStr := c.Param("id")
storeID, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
store := svr.GetBasicCluster().GetStore(storeID)
if store == nil {
c.String(http.StatusNotFound, errs.ErrStoreNotFound.FastGenByArgs(storeID).Error())
return
}

storeInfo := response.BuildStoreInfo(&svr.GetConfig().Schedule, store)
c.IndentedJSON(http.StatusOK, storeInfo)
}

// @Tags store
// @Summary Get all stores in the cluster.
// @Produce json
// @Success 200 {object} response.StoresInfo
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /stores [get]
func getAllStores(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
stores := svr.GetBasicCluster().GetMetaStores()
StoresInfo := &response.StoresInfo{
Stores: make([]*response.StoreInfo, 0, len(stores)),
}

for _, s := range stores {
storeID := s.GetId()
store := svr.GetBasicCluster().GetStore(storeID)
if store == nil {
c.String(http.StatusInternalServerError, errs.ErrStoreNotFound.FastGenByArgs(storeID).Error())
return
}
if store.GetMeta().State == metapb.StoreState_Tombstone {
continue
}
storeInfo := response.BuildStoreInfo(&svr.GetConfig().Schedule, store)
StoresInfo.Stores = append(StoresInfo.Stores, storeInfo)
}
StoresInfo.Count = len(StoresInfo.Stores)
c.IndentedJSON(http.StatusOK, StoresInfo)
}

// @Tags region
// @Summary List all regions in the cluster.
// @Produce json
// @Success 200 {object} response.RegionsInfo
// @Router /regions [get]
func getAllRegions(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
regions := svr.GetBasicCluster().GetRegions()
b, err := response.MarshalRegionsInfoJSON(c.Request.Context(), regions)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
c.Data(http.StatusOK, "application/json", b)
}

// @Tags region
// @Summary Get count of regions.
// @Produce json
// @Success 200 {object} response.RegionsInfo
// @Router /regions/count [get]
func getRegionCount(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
count := svr.GetBasicCluster().GetTotalRegionCount()
c.IndentedJSON(http.StatusOK, &response.RegionsInfo{Count: count})
}

// @Tags region
// @Summary Search for a region by region ID.
// @Param id path integer true "Region Id"
// @Produce json
// @Success 200 {object} response.RegionInfo
// @Failure 400 {string} string "The input is invalid."
// @Router /regions/{id} [get]
func getRegionByID(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
idStr := c.Param("id")
regionID, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
regionInfo := svr.GetBasicCluster().GetRegion(regionID)
if regionInfo == nil {
c.String(http.StatusNotFound, errs.ErrRegionNotFound.FastGenByArgs(regionID).Error())
return
}
b, err := response.MarshalRegionInfoJSON(c.Request.Context(), regionInfo)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
c.Data(http.StatusOK, "application/json", b)
}
2 changes: 2 additions & 0 deletions pkg/mcs/scheduling/server/meta/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (w *Watcher) initializeStoreWatcher() error {
zap.String("event-kv-key", string(kv.Key)), zap.Error(err))
return err
}
log.Debug("update store meta", zap.Stringer("store", store))
origin := w.basicCluster.GetStore(store.GetId())
if origin == nil {
w.basicCluster.PutStore(core.NewStoreInfo(store))
Expand All @@ -101,6 +102,7 @@ func (w *Watcher) initializeStoreWatcher() error {
origin := w.basicCluster.GetStore(storeID)
if origin != nil {
w.basicCluster.DeleteStore(origin)
log.Info("delete store meta", zap.Uint64("store-id", storeID))
}
return nil
}
Expand Down
Loading

0 comments on commit 4e48f5b

Please sign in to comment.