Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Nov 9, 2023
1 parent 2d6d331 commit 2ce4ff0
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 42 deletions.
42 changes: 23 additions & 19 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ type RaftCluster struct {
regionSyncer *syncer.RegionSyncer
changedRegions chan *core.RegionInfo
keyspaceGroupManager *keyspace.GroupManager
enabledServices sync.Map
independentServices sync.Map
hbstreams *hbstream.HeartbeatStreams
}

Expand Down Expand Up @@ -350,11 +350,15 @@ func (c *RaftCluster) runServiceCheckJob() {
checkFn := func() {
if c.isAPIServiceMode {
once.Do(c.initSchedulers)
c.enabledServices.Store(mcsutils.SchedulingServiceName, true)
} else if !c.schedulingController.running.Load() {
c.independentServices.Store(mcsutils.SchedulingServiceName, true)
return
}
c.RLock()
if !c.schedulingController.running.Load() {
c.startSchedulingJobs()
c.enabledServices.Delete(mcsutils.SchedulingServiceName)
c.independentServices.Delete(mcsutils.SchedulingServiceName)
}
c.RUnlock()
}
checkFn()

Expand Down Expand Up @@ -620,7 +624,7 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) {
zap.Int("count", c.core.GetTotalRegionCount()),
zap.Duration("cost", time.Since(start)),
)
if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) {
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
for _, store := range c.GetStores() {
storeID := store.GetID()
c.slowStat.ObserveSlowStoreStatus(storeID, store.IsSlow())
Expand Down Expand Up @@ -716,7 +720,7 @@ func (c *RaftCluster) Stop() {
return
}
c.running = false
if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) && c.schedulingController.running.Load() {
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) && c.schedulingController.running.Load() {
c.stopSchedulingJobs()
}
c.cancel()
Expand Down Expand Up @@ -850,7 +854,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest
nowTime := time.Now()
var newStore *core.StoreInfo
// If this cluster has slow stores, we should awaken hibernated regions in other stores.
if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) {
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
if needAwaken, slowStoreIDs := c.NeedAwakenAllRegionsInStore(storeID); needAwaken {
log.Info("forcely awaken hibernated regions", zap.Uint64("store-id", storeID), zap.Uint64s("slow-stores", slowStoreIDs))
newStore = store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime), core.SetLastAwakenTime(nowTime), opt)
Expand Down Expand Up @@ -885,7 +889,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest
regions map[uint64]*core.RegionInfo
interval uint64
)
if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) {
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
c.hotStat.Observe(storeID, newStore.GetStoreStats())
c.hotStat.FilterUnhealthyStore(c)
c.slowStat.ObserveSlowStoreStatus(storeID, newStore.IsSlow())
Expand Down Expand Up @@ -941,7 +945,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest
e := int64(dur)*2 - int64(stat.GetTotalDurationSec())
store.Feedback(float64(e))
}
if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) {
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
// Here we will compare the reported regions with the previous hot peers to decide if it is still hot.
c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval))
}
Expand Down Expand Up @@ -987,7 +991,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
}
region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket())

if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) {
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
cluster.HandleStatsAsync(c, region)
}

Expand All @@ -996,7 +1000,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
isNew, saveKV, saveCache, needSync := regionGuide(region, origin)
if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) && !saveKV && !saveCache && !isNew {
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) && !saveKV && !saveCache && !isNew {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
Expand All @@ -1021,13 +1025,13 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
if overlaps, err = c.core.AtomicCheckAndPutRegion(region); err != nil {
return err
}
if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) {
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
cluster.HandleOverlaps(c, overlaps)
}
regionUpdateCacheEventCounter.Inc()
}

if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) {
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared())
}

Expand Down Expand Up @@ -1491,7 +1495,7 @@ func (c *RaftCluster) BuryStore(storeID uint64, forceBury bool) error {
c.resetProgress(storeID, addr)
storeIDStr := strconv.FormatUint(storeID, 10)
statistics.ResetStoreStatistics(addr, storeIDStr)
if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) {
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
c.removeStoreStatistics(storeID)
}
}
Expand Down Expand Up @@ -1667,7 +1671,7 @@ func (c *RaftCluster) putStoreLocked(store *core.StoreInfo) error {
}
}
c.core.PutStore(store)
if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) {
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
c.updateStoreStatistics(store.GetID(), store.IsSlow())
}
return nil
Expand Down Expand Up @@ -2522,11 +2526,11 @@ func IsClientURL(addr string, etcdClient *clientv3.Client) bool {
return false
}

// IsServiceEnabled returns whether the service is enabled.
func (c *RaftCluster) IsServiceEnabled(name string) bool {
enabled, exist := c.enabledServices.Load(name)
// IsServiceIndependent returns whether the service is independent.
func (c *RaftCluster) IsServiceIndependent(name string) bool {
independent, exist := c.independentServices.Load(name)
if !exist {
return false
}
return enabled.(bool)
return independent.(bool)
}
2 changes: 1 addition & 1 deletion server/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (c *RaftCluster) HandleReportBuckets(b *metapb.Buckets) error {
if err := c.processReportBuckets(b); err != nil {
return err
}
if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) {
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
c.hotStat.CheckAsync(buckets.NewCheckPeerTask(b))
}
return nil
Expand Down
21 changes: 12 additions & 9 deletions server/cluster/scheduling_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ import (
)

type schedulingController struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
parentCtx context.Context
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
*core.BasicCluster
opt *config.PersistOptions
coordinator *schedule.Coordinator
Expand All @@ -51,14 +52,15 @@ type schedulingController struct {
running atomic.Bool
}

func newSchedulingController(ctx context.Context) *schedulingController {
ctx, cancel := context.WithCancel(ctx)
func newSchedulingController(parentCtx context.Context) *schedulingController {
ctx, cancel := context.WithCancel(parentCtx)
return &schedulingController{
parentCtx: parentCtx,
ctx: ctx,
cancel: cancel,
labelStats: statistics.NewLabelStatistics(),
hotStat: statistics.NewHotStat(ctx),
slowStat: statistics.NewSlowStat(ctx),
hotStat: statistics.NewHotStat(parentCtx),
slowStat: statistics.NewSlowStat(parentCtx),
}
}

Expand All @@ -73,16 +75,17 @@ func (sc *schedulingController) stopSchedulingJobs() {
sc.coordinator.Stop()
sc.cancel()
sc.wg.Wait()
sc.running.Store(false)
sc.running.CompareAndSwap(true, false)
log.Info("scheduling service is stopped")
}

func (sc *schedulingController) startSchedulingJobs() {
sc.ctx, sc.cancel = context.WithCancel(sc.parentCtx)
sc.wg.Add(3)
go sc.runCoordinator()
go sc.runStatsBackgroundJobs()
go sc.runSchedulingMetricsCollectionJob()
sc.running.Store(true)
sc.running.CompareAndSwap(false, true)
log.Info("scheduling service is started")
}

Expand Down
12 changes: 6 additions & 6 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1002,7 +1002,7 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear

s.handleDamagedStore(request.GetStats())
storeHeartbeatHandleDuration.WithLabelValues(storeAddress, storeLabel).Observe(time.Since(start).Seconds())
if s.IsServiceEnabled(utils.SchedulingServiceName) {
if s.IsServiceIndependent(utils.SchedulingServiceName) {
forwardCli, _ := s.updateSchedulingClient(ctx)
if forwardCli != nil {
req := &schedulingpb.StoreHeartbeatRequest{
Expand Down Expand Up @@ -1360,7 +1360,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error
continue
}

if s.IsServiceEnabled(utils.SchedulingServiceName) {
if s.IsServiceIndependent(utils.SchedulingServiceName) {
ctx := stream.Context()
primaryAddr, _ := s.GetServicePrimaryAddr(ctx, utils.SchedulingServiceName)
if schedulingStream == nil || lastPrimaryAddr != primaryAddr {
Expand Down Expand Up @@ -1632,7 +1632,7 @@ func (s *GrpcServer) AskSplit(ctx context.Context, request *pdpb.AskSplitRequest

// AskBatchSplit implements gRPC PDServer.
func (s *GrpcServer) AskBatchSplit(ctx context.Context, request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) {
if s.IsServiceEnabled(utils.SchedulingServiceName) {
if s.IsServiceIndependent(utils.SchedulingServiceName) {
forwardCli, err := s.updateSchedulingClient(ctx)
if err != nil {
return &pdpb.AskBatchSplitResponse{
Expand Down Expand Up @@ -1805,7 +1805,7 @@ func (s *GrpcServer) PutClusterConfig(ctx context.Context, request *pdpb.PutClus

// ScatterRegion implements gRPC PDServer.
func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterRegionRequest) (*pdpb.ScatterRegionResponse, error) {
if s.IsServiceEnabled(utils.SchedulingServiceName) {
if s.IsServiceIndependent(utils.SchedulingServiceName) {
forwardCli, err := s.updateSchedulingClient(ctx)
if err != nil {
return &pdpb.ScatterRegionResponse{
Expand Down Expand Up @@ -2036,7 +2036,7 @@ func (s *GrpcServer) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb

// GetOperator gets information about the operator belonging to the specify region.
func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorRequest) (*pdpb.GetOperatorResponse, error) {
if s.IsServiceEnabled(utils.SchedulingServiceName) {
if s.IsServiceIndependent(utils.SchedulingServiceName) {
forwardCli, err := s.updateSchedulingClient(ctx)
if err != nil {
return &pdpb.GetOperatorResponse{
Expand Down Expand Up @@ -2308,7 +2308,7 @@ func (s *GrpcServer) SyncMaxTS(_ context.Context, request *pdpb.SyncMaxTSRequest

// SplitRegions split regions by the given split keys
func (s *GrpcServer) SplitRegions(ctx context.Context, request *pdpb.SplitRegionsRequest) (*pdpb.SplitRegionsResponse, error) {
if s.IsServiceEnabled(utils.SchedulingServiceName) {
if s.IsServiceIndependent(utils.SchedulingServiceName) {
forwardCli, err := s.updateSchedulingClient(ctx)
if err != nil {
return &pdpb.SplitRegionsResponse{
Expand Down
6 changes: 3 additions & 3 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (h *Handler) AddScheduler(name string, args ...string) error {
}

var removeSchedulerCb func(string) error
if c.IsServiceEnabled(mcsutils.SchedulingServiceName) {
if c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
removeSchedulerCb = c.GetCoordinator().GetSchedulersController().RemoveSchedulerHandler
} else {
removeSchedulerCb = c.GetCoordinator().GetSchedulersController().RemoveScheduler
Expand All @@ -203,7 +203,7 @@ func (h *Handler) AddScheduler(name string, args ...string) error {
return err
}
log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args))
if c.IsServiceEnabled(mcsutils.SchedulingServiceName) {
if c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
if err = c.AddSchedulerHandler(s, args...); err != nil {
log.Error("can not add scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err))
return err
Expand All @@ -230,7 +230,7 @@ func (h *Handler) RemoveScheduler(name string) error {
if err != nil {
return err
}
if c.IsServiceEnabled(mcsutils.SchedulingServiceName) {
if c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
if err = c.RemoveSchedulerHandler(name); err != nil {
log.Error("can not remove scheduler handler", zap.String("scheduler-name", name), errs.ZapError(err))
} else {
Expand Down
8 changes: 4 additions & 4 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func (s *Server) startServer(ctx context.Context) error {
s.safePointV2Manager = gc.NewSafePointManagerV2(s.ctx, s.storage, s.storage, s.storage)
s.hbStreams = hbstream.NewHeartbeatStreams(ctx, s.clusterID, "", s.cluster)
// initial hot_region_storage in here.
if !s.IsServiceEnabled(mcs.SchedulingServiceName) {
if !s.IsServiceIndependent(mcs.SchedulingServiceName) {
s.hotRegionStorage, err = storage.NewHotRegionsStorage(
ctx, filepath.Join(s.cfg.DataDir, "hot-region"), s.encryptionKeyManager, s.handler)
if err != nil {
Expand Down Expand Up @@ -1394,11 +1394,11 @@ func (s *Server) GetRegions() []*core.RegionInfo {
return nil
}

// IsServiceEnabled returns if the service is enabled
func (s *Server) IsServiceEnabled(name string) bool {
// IsServiceIndependent returns if the service is enabled
func (s *Server) IsServiceIndependent(name string) bool {
rc := s.GetRaftCluster()
if rc != nil {
return rc.IsServiceEnabled(name)
return rc.IsServiceIndependent(name)
}
return false
}
Expand Down

0 comments on commit 2ce4ff0

Please sign in to comment.