From 0d4716926e23c9bf497f114234e842ac5d120bfa Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 26 Oct 2023 18:19:45 +0800 Subject: [PATCH] dynamically enable scheduling service Signed-off-by: Ryan Leung --- pkg/schedule/coordinator.go | 6 +- pkg/utils/apiutil/serverapi/middleware.go | 3 + server/cluster/cluster.go | 123 +++++++++++++++++----- server/cluster/cluster_worker.go | 3 +- server/grpc_service.go | 35 +++--- server/handler.go | 7 +- server/server.go | 11 +- 7 files changed, 138 insertions(+), 50 deletions(-) diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 8fb9ec8b2860..5f6717e0d33c 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -372,9 +372,6 @@ func (c *Coordinator) RunUntilStop() { c.Run() <-c.ctx.Done() log.Info("coordinator is stopping") - c.GetSchedulersController().Wait() - c.wg.Wait() - log.Info("coordinator has been stopped") } // Run starts coordinator. @@ -579,6 +576,9 @@ func (c *Coordinator) waitPluginUnload(pluginPath, schedulerName string, ch chan // Stop stops the coordinator. func (c *Coordinator) Stop() { c.cancel() + c.GetSchedulersController().Wait() + c.wg.Wait() + log.Info("coordinator has been stopped") } // GetHotRegionsByType gets hot regions' statistics by RWType. diff --git a/pkg/utils/apiutil/serverapi/middleware.go b/pkg/utils/apiutil/serverapi/middleware.go index 19438ad0f913..42b27a2a93e6 100644 --- a/pkg/utils/apiutil/serverapi/middleware.go +++ b/pkg/utils/apiutil/serverapi/middleware.go @@ -117,6 +117,9 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri r.URL.Path = strings.TrimRight(r.URL.Path, "/") for _, rule := range h.microserviceRedirectRules { if strings.HasPrefix(r.URL.Path, rule.matchPath) && slice.Contains(rule.matchMethods, r.Method) { + if !h.s.IsServiceEnabled(rule.targetServiceName) { + continue + } addr, ok := h.s.GetServicePrimaryAddr(r.Context(), rule.targetServiceName) if !ok || addr == "" { log.Warn("failed to get the service primary addr when trying to match redirect rules", diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 25a47a7fca9b..90e65890b74d 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -41,6 +41,8 @@ import ( "github.com/tikv/pd/pkg/gctuner" "github.com/tikv/pd/pkg/id" "github.com/tikv/pd/pkg/keyspace" + "github.com/tikv/pd/pkg/mcs/discovery" + mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/memory" "github.com/tikv/pd/pkg/progress" "github.com/tikv/pd/pkg/replication" @@ -97,6 +99,7 @@ const ( clientTimeout = 3 * time.Second defaultChangedRegionsLimit = 10000 gcTombstoneInterval = 30 * 24 * time.Hour + serviceCheckInterval = 10 * time.Second // persistLimitRetryTimes is used to reduce the probability of the persistent error // since the once the store is added or removed, we shouldn't return an error even if the store limit is failed to persist. persistLimitRetryTimes = 5 @@ -173,6 +176,8 @@ type RaftCluster struct { regionSyncer *syncer.RegionSyncer changedRegions chan *core.RegionInfo keyspaceGroupManager *keyspace.GroupManager + enabledServices sync.Map + isCoordinatorRunning bool } // Status saves some state information. @@ -332,13 +337,9 @@ func (c *RaftCluster) Start(s Server) error { return err } c.initSchedulers() - } else { - c.wg.Add(2) - go c.runCoordinator() - go c.runStatsBackgroundJobs() } - - c.wg.Add(8) + c.wg.Add(9) + go c.runServiceCheckJob() go c.runMetricsCollectionJob() go c.runNodeStateCheckJob() go c.syncRegions() @@ -352,6 +353,66 @@ func (c *RaftCluster) Start(s Server) error { return nil } +func (c *RaftCluster) runServiceCheckJob() { + defer logutil.LogPanic() + defer c.wg.Done() + + tick := time.NewTicker(serviceCheckInterval) + defer tick.Stop() + + var ( + cancel context.CancelFunc + ) + for { + select { + case <-c.ctx.Done(): + log.Info("service check job is stopped") + if cancel != nil { + cancel() + } + return + case <-tick.C: + addr, err := discovery.Discover(c.etcdClient, strconv.FormatUint(c.clusterID, 10), mcsutils.SchedulingServiceName) + if err != nil { + continue + } + // scheduling service is enabled + if len(addr) != 0 { + if c.isCoordinatorRunning { // API server's scheduling service is disabled + cancel() + c.stopSchedulingJobs() + } + c.enabledServices.Store(mcsutils.SchedulingServiceName, true) + } else { // scheduling service is disabled + c.enabledServices.Delete(mcsutils.SchedulingServiceName) + if !c.isCoordinatorRunning { // API server's scheduling service is disabled + cancel = c.startSchedulingJobs() + } + } + } + } +} + +func (c *RaftCluster) stopSchedulingJobs() { + c.coordinator.Stop() + statistics.Reset() + c.coordinator.GetSchedulersController().ResetSchedulerMetrics() + c.coordinator.ResetHotSpotMetrics() + c.resetClusterMetrics() + c.isCoordinatorRunning = false + log.Info("scheduling service is stopped") +} + +func (c *RaftCluster) startSchedulingJobs() context.CancelFunc { + ctx, cancel := context.WithCancel(c.ctx) + c.wg.Add(2) + go c.runCoordinator() + go c.runStatsBackgroundJobs(ctx) + c.isCoordinatorRunning = true + log.Info("scheduling service is started") + return cancel +} + // startGCTuner func (c *RaftCluster) startGCTuner() { defer logutil.LogPanic() @@ -600,10 +661,9 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) { zap.Int("count", c.core.GetTotalRegionCount()), zap.Duration("cost", time.Since(start)), ) - if !c.isAPIServiceMode { + if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) { for _, store := range c.GetStores() { storeID := store.GetID() - c.hotStat.GetOrCreateRollingStoreStats(storeID) c.slowStat.ObserveSlowStoreStatus(storeID, store.IsSlow()) } } @@ -619,7 +679,6 @@ func (c *RaftCluster) runMetricsCollectionJob() { ticker.Stop() ticker = time.NewTicker(time.Microsecond) }) - defer ticker.Stop() for { @@ -657,16 +716,22 @@ func (c *RaftCluster) runNodeStateCheckJob() { } } -func (c *RaftCluster) runStatsBackgroundJobs() { +func (c *RaftCluster) runStatsBackgroundJobs(ctx context.Context) { defer logutil.LogPanic() defer c.wg.Done() ticker := time.NewTicker(statistics.RegionsStatsObserveInterval) defer ticker.Stop() + if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) { + for _, store := range c.GetStores() { + storeID := store.GetID() + c.hotStat.GetOrCreateRollingStoreStats(storeID) + } + } for { select { - case <-c.ctx.Done(): + case <-ctx.Done(): log.Info("statistics background jobs has been stopped") return case <-ticker.C: @@ -723,7 +788,7 @@ func (c *RaftCluster) Stop() { return } c.running = false - if !c.isAPIServiceMode { + if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) { c.coordinator.Stop() } c.cancel() @@ -970,7 +1035,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest nowTime := time.Now() var newStore *core.StoreInfo // If this cluster has slow stores, we should awaken hibernated regions in other stores. - if !c.isAPIServiceMode { + if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) { if needAwaken, slowStoreIDs := c.NeedAwakenAllRegionsInStore(storeID); needAwaken { log.Info("forcely awaken hibernated regions", zap.Uint64("store-id", storeID), zap.Uint64s("slow-stores", slowStoreIDs)) newStore = store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime), core.SetLastAwakenTime(nowTime), opt) @@ -1005,7 +1070,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest regions map[uint64]*core.RegionInfo interval uint64 ) - if !c.isAPIServiceMode { + if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) { c.hotStat.Observe(storeID, newStore.GetStoreStats()) c.hotStat.FilterUnhealthyStore(c) c.slowStat.ObserveSlowStoreStatus(storeID, newStore.IsSlow()) @@ -1061,7 +1126,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest e := int64(dur)*2 - int64(stat.GetTotalDurationSec()) store.Feedback(float64(e)) } - if !c.isAPIServiceMode { + if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) { // Here we will compare the reported regions with the previous hot peers to decide if it is still hot. c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval)) } @@ -1112,7 +1177,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { } region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket()) - if !c.isAPIServiceMode { + if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) { cluster.HandleStatsAsync(c, region) } @@ -1121,7 +1186,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { // Save to cache if meta or leader is updated, or contains any down/pending peer. // Mark isNew if the region in cache does not have leader. isNew, saveKV, saveCache, needSync := regionGuide(region, origin) - if !c.isAPIServiceMode && !saveKV && !saveCache && !isNew { + if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) && !saveKV && !saveCache && !isNew { // Due to some config changes need to update the region stats as well, // so we do some extra checks here. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { @@ -1146,13 +1211,13 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { if overlaps, err = c.core.AtomicCheckAndPutRegion(region); err != nil { return err } - if !c.isAPIServiceMode { + if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) { cluster.HandleOverlaps(c, overlaps) } regionUpdateCacheEventCounter.Inc() } - if !c.isAPIServiceMode { + if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) { cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared()) } @@ -1634,7 +1699,7 @@ func (c *RaftCluster) BuryStore(storeID uint64, forceBury bool) error { c.resetProgress(storeID, addr) storeIDStr := strconv.FormatUint(storeID, 10) statistics.ResetStoreStatistics(addr, storeIDStr) - if !c.isAPIServiceMode { + if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) { c.hotStat.RemoveRollingStoreStats(storeID) c.slowStat.RemoveSlowStoreStatus(storeID) } @@ -1811,7 +1876,7 @@ func (c *RaftCluster) putStoreLocked(store *core.StoreInfo) error { } } c.core.PutStore(store) - if !c.isAPIServiceMode { + if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) { c.hotStat.GetOrCreateRollingStoreStats(store.GetID()) c.slowStat.ObserveSlowStoreStatus(store.GetID(), store.IsSlow()) } @@ -2161,7 +2226,7 @@ func (c *RaftCluster) deleteStore(store *core.StoreInfo) error { } func (c *RaftCluster) collectMetrics() { - if !c.isAPIServiceMode { + if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) { statsMap := statistics.NewStoreStatisticsMap(c.opt) stores := c.GetStores() for _, s := range stores { @@ -2177,9 +2242,8 @@ func (c *RaftCluster) collectMetrics() { } func (c *RaftCluster) resetMetrics() { - statistics.Reset() - - if !c.isAPIServiceMode { + if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) { + statistics.Reset() c.coordinator.GetSchedulersController().ResetSchedulerMetrics() c.coordinator.ResetHotSpotMetrics() c.resetClusterMetrics() @@ -2783,3 +2847,12 @@ func (c *RaftCluster) GetPausedSchedulerDelayAt(name string) (int64, error) { func (c *RaftCluster) GetPausedSchedulerDelayUntil(name string) (int64, error) { return c.coordinator.GetSchedulersController().GetPausedSchedulerDelayUntil(name) } + +// IsServiceEnabled returns whether the service is enabled. +func (c *RaftCluster) IsServiceEnabled(name string) bool { + enabled, exist := c.enabledServices.Load(name) + if !exist { + return false + } + return enabled.(bool) +} diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index a38ae86123fa..7d3069c2c14c 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" + mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/statistics/buckets" "github.com/tikv/pd/pkg/utils/logutil" @@ -233,7 +234,7 @@ func (c *RaftCluster) HandleReportBuckets(b *metapb.Buckets) error { if err := c.processReportBuckets(b); err != nil { return err } - if !c.isAPIServiceMode { + if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) { c.hotStat.CheckAsync(buckets.NewCheckPeerTask(b)) } return nil diff --git a/server/grpc_service.go b/server/grpc_service.go index 2e59bdaf742a..4c691d64ec6d 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1002,7 +1002,7 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear s.handleDamagedStore(request.GetStats()) storeHeartbeatHandleDuration.WithLabelValues(storeAddress, storeLabel).Observe(time.Since(start).Seconds()) - if s.IsAPIServiceMode() { + if s.IsServiceEnabled(utils.SchedulingServiceName) { forwardCli, _ := s.updateSchedulingClient(ctx) if forwardCli != nil { req := &schedulingpb.StoreHeartbeatRequest{ @@ -1048,6 +1048,11 @@ func (s *GrpcServer) updateSchedulingClient(ctx context.Context) (schedulingpb.S lastPrimary: forwardedHost, } s.schedulingClient.Store(forwardCli) + old := "" + if pre != nil { + old = pre.(*schedulingClient).getPrimaryAddr() + } + log.Info("update scheduling client", zap.String("old-forwarded-host", old), zap.String("new-forwarded-host", forwardedHost)) return forwardCli.getClient(), nil } else if forwardedHost != "" && (pre != nil && forwardedHost == pre.(*schedulingClient).getPrimaryAddr()) { return pre.(*schedulingClient).getClient(), nil @@ -1245,7 +1250,6 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error errCh chan error schedulingStream schedulingpb.Scheduling_RegionHeartbeatClient cancel1 context.CancelFunc - lastPrimaryAddr string ) defer func() { // cancel the forward stream @@ -1360,24 +1364,20 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error continue } - if s.IsAPIServiceMode() { + if s.IsServiceEnabled(utils.SchedulingServiceName) { ctx := stream.Context() - primaryAddr, _ := s.GetServicePrimaryAddr(ctx, utils.SchedulingServiceName) - if schedulingStream == nil || lastPrimaryAddr != primaryAddr { + if schedulingStream == nil { if cancel1 != nil { cancel1() } - client, err := s.getDelegateClient(ctx, primaryAddr) + forwardCli, err := s.updateSchedulingClient(ctx) if err != nil { - log.Error("get delegate client failed", zap.Error(err)) + log.Error("update scheduling client failed", zap.Error(err)) } - - log.Info("create region heartbeat forward stream", zap.String("forwarded-host", primaryAddr)) - schedulingStream, cancel1, err = s.createSchedulingStream(client) + schedulingStream, cancel1, err = s.createSchedulingStream(forwardCli) if err != nil { log.Error("create region heartbeat forward stream failed", zap.Error(err)) } else { - lastPrimaryAddr = primaryAddr errCh = make(chan error, 1) go forwardSchedulingToServer(schedulingStream, server, errCh) } @@ -1404,6 +1404,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error } if err := schedulingStream.Send(req); err != nil { log.Error("forward region heartbeat failed", zap.Error(err)) + s.schedulingClient.Store(&schedulingClient{}) } } } @@ -1632,7 +1633,7 @@ func (s *GrpcServer) AskSplit(ctx context.Context, request *pdpb.AskSplitRequest // AskBatchSplit implements gRPC PDServer. func (s *GrpcServer) AskBatchSplit(ctx context.Context, request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) { - if s.IsAPIServiceMode() { + if s.IsServiceEnabled(utils.SchedulingServiceName) { forwardCli, err := s.updateSchedulingClient(ctx) if err != nil { return &pdpb.AskBatchSplitResponse{ @@ -1805,7 +1806,7 @@ func (s *GrpcServer) PutClusterConfig(ctx context.Context, request *pdpb.PutClus // ScatterRegion implements gRPC PDServer. func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterRegionRequest) (*pdpb.ScatterRegionResponse, error) { - if s.IsAPIServiceMode() { + if s.IsServiceEnabled(utils.SchedulingServiceName) { forwardCli, err := s.updateSchedulingClient(ctx) if err != nil { return &pdpb.ScatterRegionResponse{ @@ -2036,7 +2037,7 @@ func (s *GrpcServer) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb // GetOperator gets information about the operator belonging to the specify region. func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorRequest) (*pdpb.GetOperatorResponse, error) { - if s.IsAPIServiceMode() { + if s.IsServiceEnabled(utils.SchedulingServiceName) { forwardCli, err := s.updateSchedulingClient(ctx) if err != nil { return &pdpb.GetOperatorResponse{ @@ -2308,7 +2309,7 @@ func (s *GrpcServer) SyncMaxTS(_ context.Context, request *pdpb.SyncMaxTSRequest // SplitRegions split regions by the given split keys func (s *GrpcServer) SplitRegions(ctx context.Context, request *pdpb.SplitRegionsRequest) (*pdpb.SplitRegionsResponse, error) { - if s.IsAPIServiceMode() { + if s.IsServiceEnabled(utils.SchedulingServiceName) { forwardCli, err := s.updateSchedulingClient(ctx) if err != nil { return &pdpb.SplitRegionsResponse{ @@ -2528,14 +2529,14 @@ func forwardRegionHeartbeatClientToServer(forwardStream pdpb.PD_RegionHeartbeatC } } -func (s *GrpcServer) createSchedulingStream(client *grpc.ClientConn) (schedulingpb.Scheduling_RegionHeartbeatClient, context.CancelFunc, error) { +func (s *GrpcServer) createSchedulingStream(client schedulingpb.SchedulingClient) (schedulingpb.Scheduling_RegionHeartbeatClient, context.CancelFunc, error) { if client == nil { return nil, nil, errors.New("connection is not set") } done := make(chan struct{}) ctx, cancel := context.WithCancel(s.ctx) go grpcutil.CheckStream(ctx, cancel, done) - forwardStream, err := schedulingpb.NewSchedulingClient(client).RegionHeartbeat(ctx) + forwardStream, err := client.RegionHeartbeat(ctx) done <- struct{}{} return forwardStream, cancel, err } diff --git a/server/handler.go b/server/handler.go index dc4b43238d07..8ad42e31680a 100644 --- a/server/handler.go +++ b/server/handler.go @@ -30,6 +30,7 @@ import ( "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/encryption" "github.com/tikv/pd/pkg/errs" + mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/schedule" sc "github.com/tikv/pd/pkg/schedule/config" sche "github.com/tikv/pd/pkg/schedule/core" @@ -192,7 +193,7 @@ func (h *Handler) AddScheduler(name string, args ...string) error { } var removeSchedulerCb func(string) error - if h.s.IsAPIServiceMode() { + if c.IsServiceEnabled(mcsutils.SchedulingServiceName) { removeSchedulerCb = c.GetCoordinator().GetSchedulersController().RemoveSchedulerHandler } else { removeSchedulerCb = c.GetCoordinator().GetSchedulersController().RemoveScheduler @@ -202,7 +203,7 @@ func (h *Handler) AddScheduler(name string, args ...string) error { return err } log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args)) - if h.s.IsAPIServiceMode() { + if c.IsServiceEnabled(mcsutils.SchedulingServiceName) { if err = c.AddSchedulerHandler(s, args...); err != nil { log.Error("can not add scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err)) return err @@ -229,7 +230,7 @@ func (h *Handler) RemoveScheduler(name string) error { if err != nil { return err } - if h.s.IsAPIServiceMode() { + if c.IsServiceEnabled(mcsutils.SchedulingServiceName) { if err = c.RemoveSchedulerHandler(name); err != nil { log.Error("can not remove scheduler handler", zap.String("scheduler-name", name), errs.ZapError(err)) } else { diff --git a/server/server.go b/server/server.go index 9cd7f18578ee..a282096bac5e 100644 --- a/server/server.go +++ b/server/server.go @@ -489,7 +489,7 @@ func (s *Server) startServer(ctx context.Context) error { s.safePointV2Manager = gc.NewSafePointManagerV2(s.ctx, s.storage, s.storage, s.storage) s.hbStreams = hbstream.NewHeartbeatStreams(ctx, s.clusterID, "", s.cluster) // initial hot_region_storage in here. - if !s.IsAPIServiceMode() { + if !s.IsServiceEnabled(mcs.SchedulingServiceName) { s.hotRegionStorage, err = storage.NewHotRegionsStorage( ctx, filepath.Join(s.cfg.DataDir, "hot-region"), s.encryptionKeyManager, s.handler) if err != nil { @@ -1394,6 +1394,15 @@ func (s *Server) GetRegions() []*core.RegionInfo { return nil } +// IsServiceEnabled returns if the service is enabled +func (s *Server) IsServiceEnabled(name string) bool { + rc := s.GetRaftCluster() + if rc != nil { + return rc.IsServiceEnabled(name) + } + return false +} + // GetServiceLabels returns ApiAccessPaths by given service label // TODO: this function will be used for updating api rate limit config func (s *Server) GetServiceLabels(serviceLabel string) []apiutil.AccessPath {