diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 8362ee9f331..c0fb1b15f8f 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -41,22 +41,18 @@ import ( "github.com/tikv/pd/pkg/gctuner" "github.com/tikv/pd/pkg/id" "github.com/tikv/pd/pkg/keyspace" + mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/memory" "github.com/tikv/pd/pkg/progress" "github.com/tikv/pd/pkg/replication" "github.com/tikv/pd/pkg/schedule" - "github.com/tikv/pd/pkg/schedule/checker" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/hbstream" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" - "github.com/tikv/pd/pkg/schedule/scatter" - "github.com/tikv/pd/pkg/schedule/schedulers" - "github.com/tikv/pd/pkg/schedule/splitter" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/statistics" - "github.com/tikv/pd/pkg/statistics/buckets" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/storage/endpoint" @@ -97,6 +93,7 @@ const ( clientTimeout = 3 * time.Second defaultChangedRegionsLimit = 10000 gcTombstoneInterval = 30 * 24 * time.Hour + serviceCheckInterval = 10 * time.Second // persistLimitRetryTimes is used to reduce the probability of the persistent error // since the once the store is added or removed, we shouldn't return an error even if the store limit is failed to persist. persistLimitRetryTimes = 5 @@ -155,16 +152,12 @@ type RaftCluster struct { prevStoreLimit map[uint64]map[storelimit.Type]float64 // This below fields are all read-only, we cannot update itself after the raft cluster starts. - clusterID uint64 - id id.Allocator - core *core.BasicCluster // cached cluster info - opt *config.PersistOptions - limiter *StoreLimiter - coordinator *schedule.Coordinator - labelLevelStats *statistics.LabelStatistics - regionStats *statistics.RegionStatistics - hotStat *statistics.HotStat - slowStat *statistics.SlowStat + clusterID uint64 + id id.Allocator + core *core.BasicCluster // cached cluster info + opt *config.PersistOptions + limiter *StoreLimiter + *schedulingController ruleManager *placement.RuleManager regionLabeler *labeler.RegionLabeler replicationMode *replication.ModeManager @@ -173,6 +166,8 @@ type RaftCluster struct { regionSyncer *syncer.RegionSyncer changedRegions chan *core.RegionInfo keyspaceGroupManager *keyspace.GroupManager + independentServices sync.Map + hbstreams *hbstream.HeartbeatStreams } // Status saves some state information. @@ -266,17 +261,17 @@ func (c *RaftCluster) InitCluster( opt sc.ConfProvider, storage storage.Storage, basicCluster *core.BasicCluster, + hbstreams *hbstream.HeartbeatStreams, keyspaceGroupManager *keyspace.GroupManager) { c.core, c.opt, c.storage, c.id = basicCluster, opt.(*config.PersistOptions), storage, id c.ctx, c.cancel = context.WithCancel(c.serverCtx) - c.labelLevelStats = statistics.NewLabelStatistics() - c.hotStat = statistics.NewHotStat(c.ctx) - c.slowStat = statistics.NewSlowStat(c.ctx) c.progressManager = progress.NewManager() c.changedRegions = make(chan *core.RegionInfo, defaultChangedRegionsLimit) c.prevStoreLimit = make(map[uint64]map[storelimit.Type]float64) c.unsafeRecoveryController = unsaferecovery.NewController(c) c.keyspaceGroupManager = keyspaceGroupManager + c.hbstreams = hbstreams + c.schedulingController = newSchedulingController(c.ctx) } // Start starts a cluster. @@ -290,7 +285,7 @@ func (c *RaftCluster) Start(s Server) error { } c.isAPIServiceMode = s.IsAPIServiceMode() - c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetStorage(), s.GetBasicCluster(), s.GetKeyspaceGroupManager()) + c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetStorage(), s.GetBasicCluster(), s.GetHBStreams(), s.GetKeyspaceGroupManager()) cluster, err := c.LoadClusterInfo() if err != nil { return err @@ -316,8 +311,7 @@ func (c *RaftCluster) Start(s Server) error { return err } - c.coordinator = schedule.NewCoordinator(c.ctx, cluster, s.GetHBStreams()) - c.regionStats = statistics.NewRegionStatistics(c.core, c.opt, c.ruleManager) + c.schedulingController.init(c.core, c.opt, schedule.NewCoordinator(c.ctx, c, c.GetHeartbeatStreams()), c.ruleManager) c.limiter = NewStoreLimiter(s.GetPersistOptions()) c.externalTS, err = c.storage.LoadExternalTS() if err != nil { @@ -331,14 +325,9 @@ func (c *RaftCluster) Start(s Server) error { if err != nil { return err } - c.initSchedulers() - } else { - c.wg.Add(2) - go c.runCoordinator() - go c.runStatsBackgroundJobs() } - - c.wg.Add(8) + c.wg.Add(9) + go c.runServiceCheckJob() go c.runMetricsCollectionJob() go c.runNodeStateCheckJob() go c.syncRegions() @@ -352,6 +341,38 @@ func (c *RaftCluster) Start(s Server) error { return nil } +func (c *RaftCluster) runServiceCheckJob() { + defer logutil.LogPanic() + defer c.wg.Done() + + var once sync.Once + + checkFn := func() { + if c.isAPIServiceMode { + once.Do(c.initSchedulers) + c.independentServices.Store(mcsutils.SchedulingServiceName, true) + return + } + if c.startSchedulingJobs() { + c.independentServices.Delete(mcsutils.SchedulingServiceName) + } + } + checkFn() + + ticker := time.NewTicker(serviceCheckInterval) + defer ticker.Stop() + + for { + select { + case <-c.ctx.Done(): + log.Info("service check job is stopped") + return + case <-ticker.C: + checkFn() + } + } +} + // startGCTuner func (c *RaftCluster) startGCTuner() { defer logutil.LogPanic() @@ -600,10 +621,9 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) { zap.Int("count", c.core.GetTotalRegionCount()), zap.Duration("cost", time.Since(start)), ) - if !c.isAPIServiceMode { + if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { for _, store := range c.GetStores() { storeID := store.GetID() - c.hotStat.GetOrCreateRollingStoreStats(storeID) c.slowStat.ObserveSlowStoreStatus(storeID, store.IsSlow()) } } @@ -619,7 +639,6 @@ func (c *RaftCluster) runMetricsCollectionJob() { ticker.Stop() ticker = time.NewTicker(time.Microsecond) }) - defer ticker.Stop() for { @@ -657,24 +676,6 @@ func (c *RaftCluster) runNodeStateCheckJob() { } } -func (c *RaftCluster) runStatsBackgroundJobs() { - defer logutil.LogPanic() - defer c.wg.Done() - - ticker := time.NewTicker(statistics.RegionsStatsObserveInterval) - defer ticker.Stop() - - for { - select { - case <-c.ctx.Done(): - log.Info("statistics background jobs has been stopped") - return - case <-ticker.C: - c.hotStat.ObserveRegionsStats(c.core.GetStoresWriteRate()) - } - } -} - func (c *RaftCluster) runUpdateStoreStats() { defer logutil.LogPanic() defer c.wg.Done() @@ -696,13 +697,6 @@ func (c *RaftCluster) runUpdateStoreStats() { } } -// runCoordinator runs the main scheduling loop. -func (c *RaftCluster) runCoordinator() { - defer logutil.LogPanic() - defer c.wg.Done() - c.coordinator.RunUntilStop() -} - func (c *RaftCluster) syncRegions() { defer logutil.LogPanic() defer c.wg.Done() @@ -723,8 +717,8 @@ func (c *RaftCluster) Stop() { return } c.running = false - if !c.isAPIServiceMode { - c.coordinator.Stop() + if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { + c.stopSchedulingJobs() } c.cancel() c.Unlock() @@ -750,6 +744,11 @@ func (c *RaftCluster) Context() context.Context { return nil } +// GetHeartbeatStreams returns the heartbeat streams. +func (c *RaftCluster) GetHeartbeatStreams() *hbstream.HeartbeatStreams { + return c.hbstreams +} + // GetCoordinator returns the coordinator. func (c *RaftCluster) GetCoordinator() *schedule.Coordinator { return c.coordinator @@ -760,71 +759,6 @@ func (c *RaftCluster) GetOperatorController() *operator.Controller { return c.coordinator.GetOperatorController() } -// SetPrepared set the prepare check to prepared. Only for test purpose. -func (c *RaftCluster) SetPrepared() { - c.coordinator.GetPrepareChecker().SetPrepared() -} - -// GetRegionScatterer returns the region scatter. -func (c *RaftCluster) GetRegionScatterer() *scatter.RegionScatterer { - return c.coordinator.GetRegionScatterer() -} - -// GetRegionSplitter returns the region splitter -func (c *RaftCluster) GetRegionSplitter() *splitter.RegionSplitter { - return c.coordinator.GetRegionSplitter() -} - -// GetMergeChecker returns merge checker. -func (c *RaftCluster) GetMergeChecker() *checker.MergeChecker { - return c.coordinator.GetMergeChecker() -} - -// GetRuleChecker returns rule checker. -func (c *RaftCluster) GetRuleChecker() *checker.RuleChecker { - return c.coordinator.GetRuleChecker() -} - -// GetSchedulers gets all schedulers. -func (c *RaftCluster) GetSchedulers() []string { - return c.coordinator.GetSchedulersController().GetSchedulerNames() -} - -// GetSchedulerHandlers gets all scheduler handlers. -func (c *RaftCluster) GetSchedulerHandlers() map[string]http.Handler { - return c.coordinator.GetSchedulersController().GetSchedulerHandlers() -} - -// AddSchedulerHandler adds a scheduler handler. -func (c *RaftCluster) AddSchedulerHandler(scheduler schedulers.Scheduler, args ...string) error { - return c.coordinator.GetSchedulersController().AddSchedulerHandler(scheduler, args...) -} - -// RemoveSchedulerHandler removes a scheduler handler. -func (c *RaftCluster) RemoveSchedulerHandler(name string) error { - return c.coordinator.GetSchedulersController().RemoveSchedulerHandler(name) -} - -// AddScheduler adds a scheduler. -func (c *RaftCluster) AddScheduler(scheduler schedulers.Scheduler, args ...string) error { - return c.coordinator.GetSchedulersController().AddScheduler(scheduler, args...) -} - -// RemoveScheduler removes a scheduler. -func (c *RaftCluster) RemoveScheduler(name string) error { - return c.coordinator.GetSchedulersController().RemoveScheduler(name) -} - -// PauseOrResumeScheduler pauses or resumes a scheduler. -func (c *RaftCluster) PauseOrResumeScheduler(name string, t int64) error { - return c.coordinator.GetSchedulersController().PauseOrResumeScheduler(name, t) -} - -// PauseOrResumeChecker pauses or resumes checker. -func (c *RaftCluster) PauseOrResumeChecker(name string, t int64) error { - return c.coordinator.PauseOrResumeChecker(name, t) -} - // AllocID returns a global unique ID. func (c *RaftCluster) AllocID() (uint64, error) { return c.id.Alloc() @@ -861,10 +795,6 @@ func (c *RaftCluster) GetOpts() sc.ConfProvider { return c.opt } -func (c *RaftCluster) initSchedulers() { - c.coordinator.InitSchedulers(false) -} - // GetScheduleConfig returns scheduling configurations. func (c *RaftCluster) GetScheduleConfig() *sc.ScheduleConfig { return c.opt.GetScheduleConfig() @@ -890,60 +820,11 @@ func (c *RaftCluster) SetPDServerConfig(cfg *config.PDServerConfig) { c.opt.SetPDServerConfig(cfg) } -// AddSuspectRegions adds regions to suspect list. -func (c *RaftCluster) AddSuspectRegions(regionIDs ...uint64) { - c.coordinator.GetCheckerController().AddSuspectRegions(regionIDs...) -} - -// GetSuspectRegions gets all suspect regions. -func (c *RaftCluster) GetSuspectRegions() []uint64 { - return c.coordinator.GetCheckerController().GetSuspectRegions() -} - -// GetHotStat gets hot stat. -func (c *RaftCluster) GetHotStat() *statistics.HotStat { - return c.hotStat -} - -// GetRegionStats gets region statistics. -func (c *RaftCluster) GetRegionStats() *statistics.RegionStatistics { - return c.regionStats -} - -// GetLabelStats gets label statistics. -func (c *RaftCluster) GetLabelStats() *statistics.LabelStatistics { - return c.labelLevelStats -} - -// RemoveSuspectRegion removes region from suspect list. -func (c *RaftCluster) RemoveSuspectRegion(id uint64) { - c.coordinator.GetCheckerController().RemoveSuspectRegion(id) -} - // GetUnsafeRecoveryController returns the unsafe recovery controller. func (c *RaftCluster) GetUnsafeRecoveryController() *unsaferecovery.Controller { return c.unsafeRecoveryController } -// AddSuspectKeyRange adds the key range with the its ruleID as the key -// The instance of each keyRange is like following format: -// [2][]byte: start key/end key -func (c *RaftCluster) AddSuspectKeyRange(start, end []byte) { - c.coordinator.GetCheckerController().AddSuspectKeyRange(start, end) -} - -// PopOneSuspectKeyRange gets one suspect keyRange group. -// it would return value and true if pop success, or return empty [][2][]byte and false -// if suspectKeyRanges couldn't pop keyRange group. -func (c *RaftCluster) PopOneSuspectKeyRange() ([2][]byte, bool) { - return c.coordinator.GetCheckerController().PopOneSuspectKeyRange() -} - -// ClearSuspectKeyRanges clears the suspect keyRanges, only for unit test -func (c *RaftCluster) ClearSuspectKeyRanges() { - c.coordinator.GetCheckerController().ClearSuspectKeyRanges() -} - // HandleStoreHeartbeat updates the store status. func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest, resp *pdpb.StoreHeartbeatResponse) error { stats := heartbeat.GetStats() @@ -970,7 +851,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest nowTime := time.Now() var newStore *core.StoreInfo // If this cluster has slow stores, we should awaken hibernated regions in other stores. - if !c.isAPIServiceMode { + if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { if needAwaken, slowStoreIDs := c.NeedAwakenAllRegionsInStore(storeID); needAwaken { log.Info("forcely awaken hibernated regions", zap.Uint64("store-id", storeID), zap.Uint64s("slow-stores", slowStoreIDs)) newStore = store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime), core.SetLastAwakenTime(nowTime), opt) @@ -1005,7 +886,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest regions map[uint64]*core.RegionInfo interval uint64 ) - if !c.isAPIServiceMode { + if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { c.hotStat.Observe(storeID, newStore.GetStoreStats()) c.hotStat.FilterUnhealthyStore(c) c.slowStat.ObserveSlowStoreStatus(storeID, newStore.IsSlow()) @@ -1061,7 +942,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest e := int64(dur)*2 - int64(stat.GetTotalDurationSec()) store.Feedback(float64(e)) } - if !c.isAPIServiceMode { + if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { // Here we will compare the reported regions with the previous hot peers to decide if it is still hot. c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval)) } @@ -1097,11 +978,6 @@ func (c *RaftCluster) processReportBuckets(buckets *metapb.Buckets) error { return nil } -// IsPrepared return true if the prepare checker is ready. -func (c *RaftCluster) IsPrepared() bool { - return c.coordinator.GetPrepareChecker().IsPrepared() -} - var regionGuide = core.GenerateRegionGuideFunc(true) // processRegionHeartbeat updates the region information. @@ -1112,7 +988,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { } region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket()) - if !c.isAPIServiceMode { + if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { cluster.HandleStatsAsync(c, region) } @@ -1121,7 +997,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { // Save to cache if meta or leader is updated, or contains any down/pending peer. // Mark isNew if the region in cache does not have leader. isNew, saveKV, saveCache, needSync := regionGuide(region, origin) - if !c.isAPIServiceMode && !saveKV && !saveCache && !isNew { + if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) && !saveKV && !saveCache && !isNew { // Due to some config changes need to update the region stats as well, // so we do some extra checks here. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { @@ -1146,13 +1022,13 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { if overlaps, err = c.core.AtomicCheckAndPutRegion(region); err != nil { return err } - if !c.isAPIServiceMode { + if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { cluster.HandleOverlaps(c, overlaps) } regionUpdateCacheEventCounter.Inc() } - if !c.isAPIServiceMode { + if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared()) } @@ -1566,24 +1442,6 @@ func (c *RaftCluster) checkReplicaBeforeOfflineStore(storeID uint64) error { return nil } -func (c *RaftCluster) getEvictLeaderStores() (evictStores []uint64) { - if c.coordinator == nil { - return nil - } - handler, ok := c.coordinator.GetSchedulersController().GetSchedulerHandlers()[schedulers.EvictLeaderName] - if !ok { - return - } - type evictLeaderHandler interface { - EvictStoreIDs() []uint64 - } - h, ok := handler.(evictLeaderHandler) - if !ok { - return - } - return h.EvictStoreIDs() -} - func (c *RaftCluster) getUpStores() []uint64 { upStores := make([]uint64, 0) for _, store := range c.GetStores() { @@ -1634,9 +1492,8 @@ func (c *RaftCluster) BuryStore(storeID uint64, forceBury bool) error { c.resetProgress(storeID, addr) storeIDStr := strconv.FormatUint(storeID, 10) statistics.ResetStoreStatistics(addr, storeIDStr) - if !c.isAPIServiceMode { - c.hotStat.RemoveRollingStoreStats(storeID) - c.slowStat.RemoveSlowStoreStatus(storeID) + if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { + c.removeStoreStatistics(storeID) } } return err @@ -1811,9 +1668,8 @@ func (c *RaftCluster) putStoreLocked(store *core.StoreInfo) error { } } c.core.PutStore(store) - if !c.isAPIServiceMode { - c.hotStat.GetOrCreateRollingStoreStats(store.GetID()) - c.slowStat.ObserveSlowStoreStatus(store.GetID(), store.IsSlow()) + if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { + c.updateStoreStatistics(store.GetID(), store.IsSlow()) } return nil } @@ -2162,53 +2018,14 @@ func (c *RaftCluster) deleteStore(store *core.StoreInfo) error { } func (c *RaftCluster) collectMetrics() { - if !c.isAPIServiceMode { - statsMap := statistics.NewStoreStatisticsMap(c.opt) - stores := c.GetStores() - for _, s := range stores { - statsMap.Observe(s) - statsMap.ObserveHotStat(s, c.hotStat.StoresStats) - } - statsMap.Collect() - c.coordinator.GetSchedulersController().CollectSchedulerMetrics() - c.coordinator.CollectHotSpotMetrics() - c.collectClusterMetrics() - } c.collectHealthStatus() } func (c *RaftCluster) resetMetrics() { - statistics.Reset() - - if !c.isAPIServiceMode { - c.coordinator.GetSchedulersController().ResetSchedulerMetrics() - c.coordinator.ResetHotSpotMetrics() - c.resetClusterMetrics() - } c.resetHealthStatus() c.resetProgressIndicator() } -func (c *RaftCluster) collectClusterMetrics() { - if c.regionStats == nil { - return - } - c.regionStats.Collect() - c.labelLevelStats.Collect() - // collect hot cache metrics - c.hotStat.CollectMetrics() -} - -func (c *RaftCluster) resetClusterMetrics() { - if c.regionStats == nil { - return - } - c.regionStats.Reset() - c.labelLevelStats.Reset() - // reset hot cache metrics - c.hotStat.ResetMetrics() -} - func (c *RaftCluster) collectHealthStatus() { members, err := GetMembers(c.etcdClient) if err != nil { @@ -2235,21 +2052,6 @@ func (c *RaftCluster) resetProgressIndicator() { storesETAGauge.Reset() } -// GetRegionStatsByType gets the status of the region by types. -func (c *RaftCluster) GetRegionStatsByType(typ statistics.RegionStatisticType) []*core.RegionInfo { - if c.regionStats == nil { - return nil - } - return c.regionStats.GetRegionStatsByType(typ) -} - -// UpdateRegionsLabelLevelStats updates the status of the region label level by types. -func (c *RaftCluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) { - for _, region := range regions { - c.labelLevelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.opt.GetLocationLabels()) - } -} - func (c *RaftCluster) getRegionStoresLocked(region *core.RegionInfo) []*core.StoreInfo { stores := make([]*core.StoreInfo, 0, len(region.GetPeers())) for _, p := range region.GetPeers() { @@ -2260,16 +2062,6 @@ func (c *RaftCluster) getRegionStoresLocked(region *core.RegionInfo) []*core.Sto return stores } -func (c *RaftCluster) getStoresWithoutLabelLocked(region *core.RegionInfo, key, value string) []*core.StoreInfo { - stores := make([]*core.StoreInfo, 0, len(region.GetPeers())) - for _, p := range region.GetPeers() { - if store := c.core.GetStore(p.StoreId); store != nil && !core.IsStoreContainLabel(store.GetMeta(), key, value) { - stores = append(stores, store) - } - } - return stores -} - // OnStoreVersionChange changes the version of the cluster when needed. func (c *RaftCluster) OnStoreVersionChange() { c.RLock() @@ -2345,49 +2137,6 @@ func (c *RaftCluster) GetRegionCount(startKey, endKey []byte) *statistics.Region return stats } -// GetStoresStats returns stores' statistics from cluster. -// And it will be unnecessary to filter unhealthy store, because it has been solved in process heartbeat -func (c *RaftCluster) GetStoresStats() *statistics.StoresStats { - return c.hotStat.StoresStats -} - -// GetStoresLoads returns load stats of all stores. -func (c *RaftCluster) GetStoresLoads() map[uint64][]float64 { - return c.hotStat.GetStoresLoads() -} - -// IsRegionHot checks if a region is in hot state. -func (c *RaftCluster) IsRegionHot(region *core.RegionInfo) bool { - return c.hotStat.IsRegionHot(region, c.opt.GetHotRegionCacheHitsThreshold()) -} - -// GetHotPeerStat returns hot peer stat with specified regionID and storeID. -func (c *RaftCluster) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *statistics.HotPeerStat { - return c.hotStat.GetHotPeerStat(rw, regionID, storeID) -} - -// RegionReadStats returns hot region's read stats. -// The result only includes peers that are hot enough. -// RegionStats is a thread-safe method -func (c *RaftCluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat { - // As read stats are reported by store heartbeat, the threshold needs to be adjusted. - threshold := c.GetOpts().GetHotRegionCacheHitsThreshold() * - (utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval) - return c.hotStat.RegionStats(utils.Read, threshold) -} - -// RegionWriteStats returns hot region's write stats. -// The result only includes peers that are hot enough. -func (c *RaftCluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { - // RegionStats is a thread-safe method - return c.hotStat.RegionStats(utils.Write, c.GetOpts().GetHotRegionCacheHitsThreshold()) -} - -// BucketsStats returns hot region's buckets stats. -func (c *RaftCluster) BucketsStats(degree int, regionIDs ...uint64) map[uint64][]*buckets.BucketStat { - return c.hotStat.BucketsStats(degree, regionIDs...) -} - // TODO: remove me. // only used in test. func (c *RaftCluster) putRegion(region *core.RegionInfo) error { @@ -2775,12 +2524,11 @@ func IsClientURL(addr string, etcdClient *clientv3.Client) bool { return false } -// GetPausedSchedulerDelayAt returns DelayAt of a paused scheduler -func (c *RaftCluster) GetPausedSchedulerDelayAt(name string) (int64, error) { - return c.coordinator.GetSchedulersController().GetPausedSchedulerDelayAt(name) -} - -// GetPausedSchedulerDelayUntil returns DelayUntil of a paused scheduler -func (c *RaftCluster) GetPausedSchedulerDelayUntil(name string) (int64, error) { - return c.coordinator.GetSchedulersController().GetPausedSchedulerDelayUntil(name) +// IsServiceIndependent returns whether the service is independent. +func (c *RaftCluster) IsServiceIndependent(name string) bool { + independent, exist := c.independentServices.Load(name) + if !exist { + return false + } + return independent.(bool) } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 70782e27cd3..7ebd012a6a2 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -1144,7 +1144,7 @@ func TestRegionLabelIsolationLevel(t *testing.T) { re.NoError(cluster.putRegion(r)) cluster.UpdateRegionsLabelLevelStats([]*core.RegionInfo{r}) - counter := cluster.labelLevelStats.GetLabelCounter() + counter := cluster.labelStats.GetLabelCounter() re.Equal(0, counter["none"]) re.Equal(1, counter["zone"]) } @@ -2130,7 +2130,7 @@ func newTestRaftCluster( basicCluster *core.BasicCluster, ) *RaftCluster { rc := &RaftCluster{serverCtx: ctx} - rc.InitCluster(id, opt, s, basicCluster, nil) + rc.InitCluster(id, opt, s, basicCluster, nil, nil) rc.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), rc, opt) if opt.IsPlacementRulesEnabled() { err := rc.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) @@ -2138,6 +2138,7 @@ func newTestRaftCluster( panic(err) } } + rc.schedulingController.init(basicCluster, opt, nil, rc.ruleManager) return rc } @@ -2502,11 +2503,11 @@ func TestCollectMetricsConcurrent(t *testing.T) { for i := 0; i < 1000; i++ { co.CollectHotSpotMetrics() controller.CollectSchedulerMetrics() - co.GetCluster().(*RaftCluster).collectClusterMetrics() + co.GetCluster().(*RaftCluster).collectStatisticsMetrics() } co.ResetHotSpotMetrics() controller.ResetSchedulerMetrics() - co.GetCluster().(*RaftCluster).resetClusterMetrics() + co.GetCluster().(*RaftCluster).resetStatisticsMetrics() wg.Wait() } @@ -2537,7 +2538,7 @@ func TestCollectMetrics(t *testing.T) { for i := 0; i < 1000; i++ { co.CollectHotSpotMetrics() controller.CollectSchedulerMetrics() - co.GetCluster().(*RaftCluster).collectClusterMetrics() + co.GetCluster().(*RaftCluster).collectStatisticsMetrics() } stores := co.GetCluster().GetStores() regionStats := co.GetCluster().RegionWriteStats() @@ -2552,7 +2553,7 @@ func TestCollectMetrics(t *testing.T) { re.Equal(status1, status2) co.ResetHotSpotMetrics() controller.ResetSchedulerMetrics() - co.GetCluster().(*RaftCluster).resetClusterMetrics() + co.GetCluster().(*RaftCluster).resetStatisticsMetrics() } func prepare(setCfg func(*sc.ScheduleConfig), setTc func(*testCluster), run func(*schedule.Coordinator), re *require.Assertions) (*testCluster, *schedule.Coordinator, func()) { diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index a38ae86123f..3a319c48196 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" + mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/statistics/buckets" "github.com/tikv/pd/pkg/utils/logutil" @@ -233,7 +234,7 @@ func (c *RaftCluster) HandleReportBuckets(b *metapb.Buckets) error { if err := c.processReportBuckets(b); err != nil { return err } - if !c.isAPIServiceMode { + if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { c.hotStat.CheckAsync(buckets.NewCheckPeerTask(b)) } return nil diff --git a/server/cluster/scheduling_controller.go b/server/cluster/scheduling_controller.go new file mode 100644 index 00000000000..1c41c830cf6 --- /dev/null +++ b/server/cluster/scheduling_controller.go @@ -0,0 +1,424 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cluster + +import ( + "context" + "net/http" + "sync" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/schedule" + "github.com/tikv/pd/pkg/schedule/checker" + "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/schedule/scatter" + "github.com/tikv/pd/pkg/schedule/schedulers" + "github.com/tikv/pd/pkg/schedule/splitter" + "github.com/tikv/pd/pkg/statistics" + "github.com/tikv/pd/pkg/statistics/buckets" + "github.com/tikv/pd/pkg/statistics/utils" + "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/server/config" +) + +type schedulingController struct { + parentCtx context.Context + ctx context.Context + cancel context.CancelFunc + mu sync.RWMutex + wg sync.WaitGroup + *core.BasicCluster + opt *config.PersistOptions + coordinator *schedule.Coordinator + labelStats *statistics.LabelStatistics + regionStats *statistics.RegionStatistics + hotStat *statistics.HotStat + slowStat *statistics.SlowStat + running bool +} + +func newSchedulingController(parentCtx context.Context) *schedulingController { + ctx, cancel := context.WithCancel(parentCtx) + return &schedulingController{ + parentCtx: parentCtx, + ctx: ctx, + cancel: cancel, + labelStats: statistics.NewLabelStatistics(), + hotStat: statistics.NewHotStat(parentCtx), + slowStat: statistics.NewSlowStat(parentCtx), + } +} + +func (sc *schedulingController) init(basicCluster *core.BasicCluster, opt *config.PersistOptions, coordinator *schedule.Coordinator, ruleManager *placement.RuleManager) { + sc.BasicCluster = basicCluster + sc.opt = opt + sc.coordinator = coordinator + sc.regionStats = statistics.NewRegionStatistics(basicCluster, opt, ruleManager) +} + +func (sc *schedulingController) stopSchedulingJobs() bool { + sc.mu.Lock() + defer sc.mu.Unlock() + if !sc.running { + return false + } + sc.coordinator.Stop() + sc.cancel() + sc.wg.Wait() + sc.running = false + log.Info("scheduling service is stopped") + return true +} + +func (sc *schedulingController) startSchedulingJobs() bool { + sc.mu.Lock() + defer sc.mu.Unlock() + if sc.running { + return false + } + sc.ctx, sc.cancel = context.WithCancel(sc.parentCtx) + sc.wg.Add(3) + go sc.runCoordinator() + go sc.runStatsBackgroundJobs() + go sc.runSchedulingMetricsCollectionJob() + sc.running = true + log.Info("scheduling service is started") + return true +} + +// runCoordinator runs the main scheduling loop. +func (sc *schedulingController) runCoordinator() { + defer logutil.LogPanic() + defer sc.wg.Done() + sc.coordinator.RunUntilStop() +} + +func (sc *schedulingController) runStatsBackgroundJobs() { + defer logutil.LogPanic() + defer sc.wg.Done() + + ticker := time.NewTicker(statistics.RegionsStatsObserveInterval) + defer ticker.Stop() + + for _, store := range sc.GetStores() { + storeID := store.GetID() + sc.hotStat.GetOrCreateRollingStoreStats(storeID) + } + for { + select { + case <-sc.ctx.Done(): + log.Info("statistics background jobs has been stopped") + return + case <-ticker.C: + sc.hotStat.ObserveRegionsStats(sc.GetStoresWriteRate()) + } + } +} + +func (sc *schedulingController) runSchedulingMetricsCollectionJob() { + defer logutil.LogPanic() + defer sc.wg.Done() + + ticker := time.NewTicker(metricsCollectionJobInterval) + failpoint.Inject("highFrequencyClusterJobs", func() { + ticker.Stop() + ticker = time.NewTicker(time.Microsecond) + }) + defer ticker.Stop() + + for { + select { + case <-sc.ctx.Done(): + log.Info("scheduling metrics are reset") + sc.resetSchedulingMetrics() + log.Info("scheduling metrics collection job has been stopped") + return + case <-ticker.C: + sc.collectSchedulingMetrics() + } + } +} + +func (sc *schedulingController) resetSchedulingMetrics() { + statistics.Reset() + sc.coordinator.GetSchedulersController().ResetSchedulerMetrics() + sc.coordinator.ResetHotSpotMetrics() + sc.resetStatisticsMetrics() +} + +func (sc *schedulingController) collectSchedulingMetrics() { + statsMap := statistics.NewStoreStatisticsMap(sc.opt) + stores := sc.GetStores() + for _, s := range stores { + statsMap.Observe(s) + statsMap.ObserveHotStat(s, sc.hotStat.StoresStats) + } + statsMap.Collect() + sc.coordinator.GetSchedulersController().CollectSchedulerMetrics() + sc.coordinator.CollectHotSpotMetrics() + sc.collectStatisticsMetrics() +} + +func (sc *schedulingController) resetStatisticsMetrics() { + if sc.regionStats == nil { + return + } + sc.regionStats.Reset() + sc.labelStats.Reset() + // reset hot cache metrics + sc.hotStat.ResetMetrics() +} + +func (sc *schedulingController) collectStatisticsMetrics() { + if sc.regionStats == nil { + return + } + sc.regionStats.Collect() + sc.labelStats.Collect() + // collect hot cache metrics + sc.hotStat.CollectMetrics() +} + +func (sc *schedulingController) removeStoreStatistics(storeID uint64) { + sc.hotStat.RemoveRollingStoreStats(storeID) + sc.slowStat.RemoveSlowStoreStatus(storeID) +} + +func (sc *schedulingController) updateStoreStatistics(storeID uint64, isSlow bool) { + sc.hotStat.GetOrCreateRollingStoreStats(storeID) + sc.slowStat.ObserveSlowStoreStatus(storeID, isSlow) +} + +// GetHotStat gets hot stat. +func (sc *schedulingController) GetHotStat() *statistics.HotStat { + return sc.hotStat +} + +// GetRegionStats gets region statistics. +func (sc *schedulingController) GetRegionStats() *statistics.RegionStatistics { + return sc.regionStats +} + +// GetLabelStats gets label statistics. +func (sc *schedulingController) GetLabelStats() *statistics.LabelStatistics { + return sc.labelStats +} + +// GetRegionStatsByType gets the status of the region by types. +func (sc *schedulingController) GetRegionStatsByType(typ statistics.RegionStatisticType) []*core.RegionInfo { + if sc.regionStats == nil { + return nil + } + return sc.regionStats.GetRegionStatsByType(typ) +} + +// UpdateRegionsLabelLevelStats updates the status of the region label level by types. +func (sc *schedulingController) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) { + for _, region := range regions { + sc.labelStats.Observe(region, sc.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), sc.opt.GetLocationLabels()) + } +} + +func (sc *schedulingController) getStoresWithoutLabelLocked(region *core.RegionInfo, key, value string) []*core.StoreInfo { + stores := make([]*core.StoreInfo, 0, len(region.GetPeers())) + for _, p := range region.GetPeers() { + if store := sc.GetStore(p.StoreId); store != nil && !core.IsStoreContainLabel(store.GetMeta(), key, value) { + stores = append(stores, store) + } + } + return stores +} + +// GetStoresStats returns stores' statistics from cluster. +// And it will be unnecessary to filter unhealthy store, because it has been solved in process heartbeat +func (sc *schedulingController) GetStoresStats() *statistics.StoresStats { + return sc.hotStat.StoresStats +} + +// GetStoresLoads returns load stats of all stores. +func (sc *schedulingController) GetStoresLoads() map[uint64][]float64 { + return sc.hotStat.GetStoresLoads() +} + +// IsRegionHot checks if a region is in hot state. +func (sc *schedulingController) IsRegionHot(region *core.RegionInfo) bool { + return sc.hotStat.IsRegionHot(region, sc.opt.GetHotRegionCacheHitsThreshold()) +} + +// GetHotPeerStat returns hot peer stat with specified regionID and storeID. +func (sc *schedulingController) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *statistics.HotPeerStat { + return sc.hotStat.GetHotPeerStat(rw, regionID, storeID) +} + +// RegionReadStats returns hot region's read stats. +// The result only includes peers that are hot enough. +// RegionStats is a thread-safe method +func (sc *schedulingController) RegionReadStats() map[uint64][]*statistics.HotPeerStat { + // As read stats are reported by store heartbeat, the threshold needs to be adjusted. + threshold := sc.opt.GetHotRegionCacheHitsThreshold() * + (utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval) + return sc.hotStat.RegionStats(utils.Read, threshold) +} + +// RegionWriteStats returns hot region's write stats. +// The result only includes peers that are hot enough. +func (sc *schedulingController) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { + // RegionStats is a thread-safe method + return sc.hotStat.RegionStats(utils.Write, sc.opt.GetHotRegionCacheHitsThreshold()) +} + +// BucketsStats returns hot region's buckets stats. +func (sc *schedulingController) BucketsStats(degree int, regionIDs ...uint64) map[uint64][]*buckets.BucketStat { + return sc.hotStat.BucketsStats(degree, regionIDs...) +} + +// GetPausedSchedulerDelayAt returns DelayAt of a paused scheduler +func (sc *schedulingController) GetPausedSchedulerDelayAt(name string) (int64, error) { + return sc.coordinator.GetSchedulersController().GetPausedSchedulerDelayAt(name) +} + +// GetPausedSchedulerDelayUntil returns DelayUntil of a paused scheduler +func (sc *schedulingController) GetPausedSchedulerDelayUntil(name string) (int64, error) { + return sc.coordinator.GetSchedulersController().GetPausedSchedulerDelayUntil(name) +} + +// GetRegionScatterer returns the region scatter. +func (sc *schedulingController) GetRegionScatterer() *scatter.RegionScatterer { + return sc.coordinator.GetRegionScatterer() +} + +// GetRegionSplitter returns the region splitter +func (sc *schedulingController) GetRegionSplitter() *splitter.RegionSplitter { + return sc.coordinator.GetRegionSplitter() +} + +// GetMergeChecker returns merge checker. +func (sc *schedulingController) GetMergeChecker() *checker.MergeChecker { + return sc.coordinator.GetMergeChecker() +} + +// GetRuleChecker returns rule checker. +func (sc *schedulingController) GetRuleChecker() *checker.RuleChecker { + return sc.coordinator.GetRuleChecker() +} + +// GetSchedulers gets all schedulers. +func (sc *schedulingController) GetSchedulers() []string { + return sc.coordinator.GetSchedulersController().GetSchedulerNames() +} + +// GetSchedulerHandlers gets all scheduler handlers. +func (sc *schedulingController) GetSchedulerHandlers() map[string]http.Handler { + return sc.coordinator.GetSchedulersController().GetSchedulerHandlers() +} + +// AddSchedulerHandler adds a scheduler handler. +func (sc *schedulingController) AddSchedulerHandler(scheduler schedulers.Scheduler, args ...string) error { + return sc.coordinator.GetSchedulersController().AddSchedulerHandler(scheduler, args...) +} + +// RemoveSchedulerHandler removes a scheduler handler. +func (sc *schedulingController) RemoveSchedulerHandler(name string) error { + return sc.coordinator.GetSchedulersController().RemoveSchedulerHandler(name) +} + +// AddScheduler adds a scheduler. +func (sc *schedulingController) AddScheduler(scheduler schedulers.Scheduler, args ...string) error { + return sc.coordinator.GetSchedulersController().AddScheduler(scheduler, args...) +} + +// RemoveScheduler removes a scheduler. +func (sc *schedulingController) RemoveScheduler(name string) error { + return sc.coordinator.GetSchedulersController().RemoveScheduler(name) +} + +// PauseOrResumeScheduler pauses or resumes a scheduler. +func (sc *schedulingController) PauseOrResumeScheduler(name string, t int64) error { + return sc.coordinator.GetSchedulersController().PauseOrResumeScheduler(name, t) +} + +// PauseOrResumeChecker pauses or resumes checker. +func (sc *schedulingController) PauseOrResumeChecker(name string, t int64) error { + return sc.coordinator.PauseOrResumeChecker(name, t) +} + +// AddSuspectRegions adds regions to suspect list. +func (sc *schedulingController) AddSuspectRegions(regionIDs ...uint64) { + sc.coordinator.GetCheckerController().AddSuspectRegions(regionIDs...) +} + +// GetSuspectRegions gets all suspect regions. +func (sc *schedulingController) GetSuspectRegions() []uint64 { + return sc.coordinator.GetCheckerController().GetSuspectRegions() +} + +// RemoveSuspectRegion removes region from suspect list. +func (sc *schedulingController) RemoveSuspectRegion(id uint64) { + sc.coordinator.GetCheckerController().RemoveSuspectRegion(id) +} + +// PopOneSuspectKeyRange gets one suspect keyRange group. +// it would return value and true if pop success, or return empty [][2][]byte and false +// if suspectKeyRanges couldn't pop keyRange group. +func (sc *schedulingController) PopOneSuspectKeyRange() ([2][]byte, bool) { + return sc.coordinator.GetCheckerController().PopOneSuspectKeyRange() +} + +// ClearSuspectKeyRanges clears the suspect keyRanges, only for unit test +func (sc *schedulingController) ClearSuspectKeyRanges() { + sc.coordinator.GetCheckerController().ClearSuspectKeyRanges() +} + +// AddSuspectKeyRange adds the key range with the its ruleID as the key +// The instance of each keyRange is like following format: +// [2][]byte: start key/end key +func (sc *schedulingController) AddSuspectKeyRange(start, end []byte) { + sc.coordinator.GetCheckerController().AddSuspectKeyRange(start, end) +} + +func (sc *schedulingController) initSchedulers() { + sc.coordinator.InitSchedulers(false) +} + +func (sc *schedulingController) getEvictLeaderStores() (evictStores []uint64) { + if sc.coordinator == nil { + return nil + } + handler, ok := sc.coordinator.GetSchedulersController().GetSchedulerHandlers()[schedulers.EvictLeaderName] + if !ok { + return + } + type evictLeaderHandler interface { + EvictStoreIDs() []uint64 + } + h, ok := handler.(evictLeaderHandler) + if !ok { + return + } + return h.EvictStoreIDs() +} + +// IsPrepared return true if the prepare checker is ready. +func (sc *schedulingController) IsPrepared() bool { + return sc.coordinator.GetPrepareChecker().IsPrepared() +} + +// SetPrepared set the prepare check to prepared. Only for test purpose. +func (sc *schedulingController) SetPrepared() { + sc.coordinator.GetPrepareChecker().SetPrepared() +} diff --git a/server/grpc_service.go b/server/grpc_service.go index 05ec38919cb..34741d4da5b 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1002,7 +1002,7 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear s.handleDamagedStore(request.GetStats()) storeHeartbeatHandleDuration.WithLabelValues(storeAddress, storeLabel).Observe(time.Since(start).Seconds()) - if s.IsAPIServiceMode() { + if s.IsServiceIndependent(utils.SchedulingServiceName) { forwardCli, _ := s.updateSchedulingClient(ctx) if forwardCli != nil { req := &schedulingpb.StoreHeartbeatRequest{ @@ -1360,7 +1360,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error continue } - if s.IsAPIServiceMode() { + if s.IsServiceIndependent(utils.SchedulingServiceName) { ctx := stream.Context() primaryAddr, _ := s.GetServicePrimaryAddr(ctx, utils.SchedulingServiceName) if schedulingStream == nil || lastPrimaryAddr != primaryAddr { @@ -1632,7 +1632,7 @@ func (s *GrpcServer) AskSplit(ctx context.Context, request *pdpb.AskSplitRequest // AskBatchSplit implements gRPC PDServer. func (s *GrpcServer) AskBatchSplit(ctx context.Context, request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) { - if s.IsAPIServiceMode() { + if s.IsServiceIndependent(utils.SchedulingServiceName) { forwardCli, err := s.updateSchedulingClient(ctx) if err != nil { return &pdpb.AskBatchSplitResponse{ @@ -1805,7 +1805,7 @@ func (s *GrpcServer) PutClusterConfig(ctx context.Context, request *pdpb.PutClus // ScatterRegion implements gRPC PDServer. func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterRegionRequest) (*pdpb.ScatterRegionResponse, error) { - if s.IsAPIServiceMode() { + if s.IsServiceIndependent(utils.SchedulingServiceName) { forwardCli, err := s.updateSchedulingClient(ctx) if err != nil { return &pdpb.ScatterRegionResponse{ @@ -2028,7 +2028,7 @@ func (s *GrpcServer) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb // GetOperator gets information about the operator belonging to the specify region. func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorRequest) (*pdpb.GetOperatorResponse, error) { - if s.IsAPIServiceMode() { + if s.IsServiceIndependent(utils.SchedulingServiceName) { forwardCli, err := s.updateSchedulingClient(ctx) if err != nil { return &pdpb.GetOperatorResponse{ @@ -2300,7 +2300,7 @@ func (s *GrpcServer) SyncMaxTS(_ context.Context, request *pdpb.SyncMaxTSRequest // SplitRegions split regions by the given split keys func (s *GrpcServer) SplitRegions(ctx context.Context, request *pdpb.SplitRegionsRequest) (*pdpb.SplitRegionsResponse, error) { - if s.IsAPIServiceMode() { + if s.IsServiceIndependent(utils.SchedulingServiceName) { forwardCli, err := s.updateSchedulingClient(ctx) if err != nil { return &pdpb.SplitRegionsResponse{ diff --git a/server/handler.go b/server/handler.go index dc4b43238d0..6c0679bd9f9 100644 --- a/server/handler.go +++ b/server/handler.go @@ -30,6 +30,7 @@ import ( "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/encryption" "github.com/tikv/pd/pkg/errs" + mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/schedule" sc "github.com/tikv/pd/pkg/schedule/config" sche "github.com/tikv/pd/pkg/schedule/core" @@ -192,7 +193,7 @@ func (h *Handler) AddScheduler(name string, args ...string) error { } var removeSchedulerCb func(string) error - if h.s.IsAPIServiceMode() { + if c.IsServiceIndependent(mcsutils.SchedulingServiceName) { removeSchedulerCb = c.GetCoordinator().GetSchedulersController().RemoveSchedulerHandler } else { removeSchedulerCb = c.GetCoordinator().GetSchedulersController().RemoveScheduler @@ -202,7 +203,7 @@ func (h *Handler) AddScheduler(name string, args ...string) error { return err } log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args)) - if h.s.IsAPIServiceMode() { + if c.IsServiceIndependent(mcsutils.SchedulingServiceName) { if err = c.AddSchedulerHandler(s, args...); err != nil { log.Error("can not add scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err)) return err @@ -229,7 +230,7 @@ func (h *Handler) RemoveScheduler(name string) error { if err != nil { return err } - if h.s.IsAPIServiceMode() { + if c.IsServiceIndependent(mcsutils.SchedulingServiceName) { if err = c.RemoveSchedulerHandler(name); err != nil { log.Error("can not remove scheduler handler", zap.String("scheduler-name", name), errs.ZapError(err)) } else { diff --git a/server/server.go b/server/server.go index 9cd7f18578e..a2c99d0cbec 100644 --- a/server/server.go +++ b/server/server.go @@ -489,7 +489,7 @@ func (s *Server) startServer(ctx context.Context) error { s.safePointV2Manager = gc.NewSafePointManagerV2(s.ctx, s.storage, s.storage, s.storage) s.hbStreams = hbstream.NewHeartbeatStreams(ctx, s.clusterID, "", s.cluster) // initial hot_region_storage in here. - if !s.IsAPIServiceMode() { + if !s.IsServiceIndependent(mcs.SchedulingServiceName) { s.hotRegionStorage, err = storage.NewHotRegionsStorage( ctx, filepath.Join(s.cfg.DataDir, "hot-region"), s.encryptionKeyManager, s.handler) if err != nil { @@ -1394,6 +1394,15 @@ func (s *Server) GetRegions() []*core.RegionInfo { return nil } +// IsServiceIndependent returns if the service is enabled +func (s *Server) IsServiceIndependent(name string) bool { + rc := s.GetRaftCluster() + if rc != nil { + return rc.IsServiceIndependent(name) + } + return false +} + // GetServiceLabels returns ApiAccessPaths by given service label // TODO: this function will be used for updating api rate limit config func (s *Server) GetServiceLabels(serviceLabel string) []apiutil.AccessPath { diff --git a/tests/integrations/mcs/scheduling/config_test.go b/tests/integrations/mcs/scheduling/config_test.go index 8b8e284f765..42ba051eb84 100644 --- a/tests/integrations/mcs/scheduling/config_test.go +++ b/tests/integrations/mcs/scheduling/config_test.go @@ -133,7 +133,6 @@ func persistConfig(re *require.Assertions, pdLeaderServer *tests.TestServer) { func (suite *configTestSuite) TestSchedulerConfigWatch() { re := suite.Require() - // Make sure the config is persisted before the watcher is created. persistConfig(re, suite.pdLeaderServer) // Create a config watcher. diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 701eb9b5d69..ebf0a4e574d 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -815,7 +815,7 @@ func TestLoadClusterInfo(t *testing.T) { rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient()) // Cluster is not bootstrapped. - rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetStorage(), svr.GetBasicCluster(), svr.GetKeyspaceGroupManager()) + rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetStorage(), svr.GetBasicCluster(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager()) raftCluster, err := rc.LoadClusterInfo() re.NoError(err) re.Nil(raftCluster) @@ -853,7 +853,7 @@ func TestLoadClusterInfo(t *testing.T) { re.NoError(testStorage.Flush()) raftCluster = cluster.NewRaftCluster(ctx, svr.ClusterID(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient()) - raftCluster.InitCluster(mockid.NewIDAllocator(), svr.GetPersistOptions(), testStorage, basicCluster, svr.GetKeyspaceGroupManager()) + raftCluster.InitCluster(mockid.NewIDAllocator(), svr.GetPersistOptions(), testStorage, basicCluster, svr.GetHBStreams(), svr.GetKeyspaceGroupManager()) raftCluster, err = raftCluster.LoadClusterInfo() re.NoError(err) re.NotNil(raftCluster) @@ -1561,7 +1561,7 @@ func TestTransferLeaderBack(t *testing.T) { leaderServer := tc.GetLeaderServer() svr := leaderServer.GetServer() rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient()) - rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetStorage(), svr.GetBasicCluster(), svr.GetKeyspaceGroupManager()) + rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetStorage(), svr.GetBasicCluster(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager()) storage := rc.GetStorage() meta := &metapb.Cluster{Id: 123} re.NoError(storage.SaveMeta(meta))