Skip to content

Commit

Permalink
dynamically enable scheduling service
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Nov 2, 2023
1 parent 4e45e95 commit 0d47169
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 50 deletions.
6 changes: 3 additions & 3 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,9 +372,6 @@ func (c *Coordinator) RunUntilStop() {
c.Run()
<-c.ctx.Done()
log.Info("coordinator is stopping")
c.GetSchedulersController().Wait()
c.wg.Wait()
log.Info("coordinator has been stopped")
}

// Run starts coordinator.
Expand Down Expand Up @@ -579,6 +576,9 @@ func (c *Coordinator) waitPluginUnload(pluginPath, schedulerName string, ch chan
// Stop stops the coordinator.
func (c *Coordinator) Stop() {
c.cancel()
c.GetSchedulersController().Wait()
c.wg.Wait()
log.Info("coordinator has been stopped")
}

// GetHotRegionsByType gets hot regions' statistics by RWType.
Expand Down
3 changes: 3 additions & 0 deletions pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri
r.URL.Path = strings.TrimRight(r.URL.Path, "/")
for _, rule := range h.microserviceRedirectRules {
if strings.HasPrefix(r.URL.Path, rule.matchPath) && slice.Contains(rule.matchMethods, r.Method) {
if !h.s.IsServiceEnabled(rule.targetServiceName) {
continue
}
addr, ok := h.s.GetServicePrimaryAddr(r.Context(), rule.targetServiceName)
if !ok || addr == "" {
log.Warn("failed to get the service primary addr when trying to match redirect rules",
Expand Down
123 changes: 98 additions & 25 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (
"github.com/tikv/pd/pkg/gctuner"
"github.com/tikv/pd/pkg/id"
"github.com/tikv/pd/pkg/keyspace"
"github.com/tikv/pd/pkg/mcs/discovery"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/memory"
"github.com/tikv/pd/pkg/progress"
"github.com/tikv/pd/pkg/replication"
Expand Down Expand Up @@ -97,6 +99,7 @@ const (
clientTimeout = 3 * time.Second
defaultChangedRegionsLimit = 10000
gcTombstoneInterval = 30 * 24 * time.Hour
serviceCheckInterval = 10 * time.Second
// persistLimitRetryTimes is used to reduce the probability of the persistent error
// since the once the store is added or removed, we shouldn't return an error even if the store limit is failed to persist.
persistLimitRetryTimes = 5
Expand Down Expand Up @@ -173,6 +176,8 @@ type RaftCluster struct {
regionSyncer *syncer.RegionSyncer
changedRegions chan *core.RegionInfo
keyspaceGroupManager *keyspace.GroupManager
enabledServices sync.Map
isCoordinatorRunning bool
}

// Status saves some state information.
Expand Down Expand Up @@ -332,13 +337,9 @@ func (c *RaftCluster) Start(s Server) error {
return err
}
c.initSchedulers()
} else {
c.wg.Add(2)
go c.runCoordinator()
go c.runStatsBackgroundJobs()
}

c.wg.Add(8)
c.wg.Add(9)
go c.runServiceCheckJob()
go c.runMetricsCollectionJob()
go c.runNodeStateCheckJob()
go c.syncRegions()
Expand All @@ -352,6 +353,66 @@ func (c *RaftCluster) Start(s Server) error {
return nil
}

func (c *RaftCluster) runServiceCheckJob() {
defer logutil.LogPanic()
defer c.wg.Done()

tick := time.NewTicker(serviceCheckInterval)
defer tick.Stop()

var (
cancel context.CancelFunc
)
for {
select {
case <-c.ctx.Done():
log.Info("service check job is stopped")
if cancel != nil {
cancel()
}
return
case <-tick.C:
addr, err := discovery.Discover(c.etcdClient, strconv.FormatUint(c.clusterID, 10), mcsutils.SchedulingServiceName)
if err != nil {
continue
}
// scheduling service is enabled
if len(addr) != 0 {
if c.isCoordinatorRunning { // API server's scheduling service is disabled
cancel()
c.stopSchedulingJobs()
}
c.enabledServices.Store(mcsutils.SchedulingServiceName, true)
} else { // scheduling service is disabled
c.enabledServices.Delete(mcsutils.SchedulingServiceName)
if !c.isCoordinatorRunning { // API server's scheduling service is disabled
cancel = c.startSchedulingJobs()
}
}
}
}
}

func (c *RaftCluster) stopSchedulingJobs() {
c.coordinator.Stop()
statistics.Reset()
c.coordinator.GetSchedulersController().ResetSchedulerMetrics()
c.coordinator.ResetHotSpotMetrics()
c.resetClusterMetrics()
c.isCoordinatorRunning = false
log.Info("scheduling service is stopped")
}

func (c *RaftCluster) startSchedulingJobs() context.CancelFunc {
ctx, cancel := context.WithCancel(c.ctx)
c.wg.Add(2)
go c.runCoordinator()
go c.runStatsBackgroundJobs(ctx)
c.isCoordinatorRunning = true
log.Info("scheduling service is started")
return cancel
}

// startGCTuner
func (c *RaftCluster) startGCTuner() {
defer logutil.LogPanic()
Expand Down Expand Up @@ -600,10 +661,9 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) {
zap.Int("count", c.core.GetTotalRegionCount()),
zap.Duration("cost", time.Since(start)),
)
if !c.isAPIServiceMode {
if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) {
for _, store := range c.GetStores() {
storeID := store.GetID()
c.hotStat.GetOrCreateRollingStoreStats(storeID)
c.slowStat.ObserveSlowStoreStatus(storeID, store.IsSlow())
}
}
Expand All @@ -619,7 +679,6 @@ func (c *RaftCluster) runMetricsCollectionJob() {
ticker.Stop()
ticker = time.NewTicker(time.Microsecond)
})

defer ticker.Stop()

for {
Expand Down Expand Up @@ -657,16 +716,22 @@ func (c *RaftCluster) runNodeStateCheckJob() {
}
}

func (c *RaftCluster) runStatsBackgroundJobs() {
func (c *RaftCluster) runStatsBackgroundJobs(ctx context.Context) {
defer logutil.LogPanic()
defer c.wg.Done()

ticker := time.NewTicker(statistics.RegionsStatsObserveInterval)
defer ticker.Stop()

if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) {
for _, store := range c.GetStores() {
storeID := store.GetID()
c.hotStat.GetOrCreateRollingStoreStats(storeID)
}
}
for {
select {
case <-c.ctx.Done():
case <-ctx.Done():
log.Info("statistics background jobs has been stopped")
return
case <-ticker.C:
Expand Down Expand Up @@ -723,7 +788,7 @@ func (c *RaftCluster) Stop() {
return
}
c.running = false
if !c.isAPIServiceMode {
if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) {
c.coordinator.Stop()
}
c.cancel()
Expand Down Expand Up @@ -970,7 +1035,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest
nowTime := time.Now()
var newStore *core.StoreInfo
// If this cluster has slow stores, we should awaken hibernated regions in other stores.
if !c.isAPIServiceMode {
if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) {
if needAwaken, slowStoreIDs := c.NeedAwakenAllRegionsInStore(storeID); needAwaken {
log.Info("forcely awaken hibernated regions", zap.Uint64("store-id", storeID), zap.Uint64s("slow-stores", slowStoreIDs))
newStore = store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime), core.SetLastAwakenTime(nowTime), opt)
Expand Down Expand Up @@ -1005,7 +1070,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest
regions map[uint64]*core.RegionInfo
interval uint64
)
if !c.isAPIServiceMode {
if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) {
c.hotStat.Observe(storeID, newStore.GetStoreStats())
c.hotStat.FilterUnhealthyStore(c)
c.slowStat.ObserveSlowStoreStatus(storeID, newStore.IsSlow())
Expand Down Expand Up @@ -1061,7 +1126,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest
e := int64(dur)*2 - int64(stat.GetTotalDurationSec())
store.Feedback(float64(e))
}
if !c.isAPIServiceMode {
if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) {
// Here we will compare the reported regions with the previous hot peers to decide if it is still hot.
c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval))
}
Expand Down Expand Up @@ -1112,7 +1177,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
}
region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket())

if !c.isAPIServiceMode {
if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) {
cluster.HandleStatsAsync(c, region)
}

Expand All @@ -1121,7 +1186,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
isNew, saveKV, saveCache, needSync := regionGuide(region, origin)
if !c.isAPIServiceMode && !saveKV && !saveCache && !isNew {
if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) && !saveKV && !saveCache && !isNew {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
Expand All @@ -1146,13 +1211,13 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
if overlaps, err = c.core.AtomicCheckAndPutRegion(region); err != nil {
return err
}
if !c.isAPIServiceMode {
if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) {
cluster.HandleOverlaps(c, overlaps)
}
regionUpdateCacheEventCounter.Inc()
}

if !c.isAPIServiceMode {
if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) {
cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared())
}

Expand Down Expand Up @@ -1634,7 +1699,7 @@ func (c *RaftCluster) BuryStore(storeID uint64, forceBury bool) error {
c.resetProgress(storeID, addr)
storeIDStr := strconv.FormatUint(storeID, 10)
statistics.ResetStoreStatistics(addr, storeIDStr)
if !c.isAPIServiceMode {
if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) {
c.hotStat.RemoveRollingStoreStats(storeID)
c.slowStat.RemoveSlowStoreStatus(storeID)
}
Expand Down Expand Up @@ -1811,7 +1876,7 @@ func (c *RaftCluster) putStoreLocked(store *core.StoreInfo) error {
}
}
c.core.PutStore(store)
if !c.isAPIServiceMode {
if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) {
c.hotStat.GetOrCreateRollingStoreStats(store.GetID())
c.slowStat.ObserveSlowStoreStatus(store.GetID(), store.IsSlow())
}
Expand Down Expand Up @@ -2161,7 +2226,7 @@ func (c *RaftCluster) deleteStore(store *core.StoreInfo) error {
}

func (c *RaftCluster) collectMetrics() {
if !c.isAPIServiceMode {
if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) {
statsMap := statistics.NewStoreStatisticsMap(c.opt)
stores := c.GetStores()
for _, s := range stores {
Expand All @@ -2177,9 +2242,8 @@ func (c *RaftCluster) collectMetrics() {
}

func (c *RaftCluster) resetMetrics() {
statistics.Reset()

if !c.isAPIServiceMode {
if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) {
statistics.Reset()
c.coordinator.GetSchedulersController().ResetSchedulerMetrics()
c.coordinator.ResetHotSpotMetrics()
c.resetClusterMetrics()
Expand Down Expand Up @@ -2783,3 +2847,12 @@ func (c *RaftCluster) GetPausedSchedulerDelayAt(name string) (int64, error) {
func (c *RaftCluster) GetPausedSchedulerDelayUntil(name string) (int64, error) {
return c.coordinator.GetSchedulersController().GetPausedSchedulerDelayUntil(name)
}

// IsServiceEnabled returns whether the service is enabled.
func (c *RaftCluster) IsServiceEnabled(name string) bool {
enabled, exist := c.enabledServices.Load(name)
if !exist {
return false
}
return enabled.(bool)
}
3 changes: 2 additions & 1 deletion server/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/statistics/buckets"
"github.com/tikv/pd/pkg/utils/logutil"
Expand Down Expand Up @@ -233,7 +234,7 @@ func (c *RaftCluster) HandleReportBuckets(b *metapb.Buckets) error {
if err := c.processReportBuckets(b); err != nil {
return err
}
if !c.isAPIServiceMode {
if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) {
c.hotStat.CheckAsync(buckets.NewCheckPeerTask(b))
}
return nil
Expand Down
Loading

0 comments on commit 0d47169

Please sign in to comment.