From 2d76829754f1f53221168bbc22255ac310cc5dba Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 20 Nov 2023 14:19:52 +0800 Subject: [PATCH] resolve conflicts Signed-off-by: Ryan Leung --- server/cluster/cluster.go | 2 +- server/cluster/cluster_test.go | 19 +++++----- server/cluster/scheduling_controller.go | 48 ++++++++++++++----------- tests/server/cluster/cluster_test.go | 1 - tests/testutil.go | 1 - 5 files changed, 38 insertions(+), 33 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 6ba926ec1ae1..0df543c96c28 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -156,7 +156,7 @@ type RaftCluster struct { core *core.BasicCluster // cached cluster info opt *config.PersistOptions limiter *StoreLimiter - *SchedulingController + *schedulingController ruleManager *placement.RuleManager regionLabeler *labeler.RegionLabeler replicationMode *replication.ModeManager diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index b9310ccdab52..5d6ed1eca338 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -2486,7 +2486,10 @@ func TestCollectMetricsConcurrent(t *testing.T) { nil) }, func(co *schedule.Coordinator) { co.Run() }, re) defer cleanup() - + rc := co.GetCluster().(*RaftCluster) + rc.schedulingController = newSchedulingController(rc.serverCtx, rc.GetBasicCluster(), rc.GetOpts(), rc.GetRuleManager()) + rc.schedulingController.coordinator = co + controller := co.GetSchedulersController() // Make sure there are no problem when concurrent write and read var wg sync.WaitGroup count := 10 @@ -2499,10 +2502,6 @@ func TestCollectMetricsConcurrent(t *testing.T) { } }(i) } - controller := co.GetSchedulersController() - rc := co.GetCluster().(*RaftCluster) - rc.SchedulingController = NewSchedulingController(rc.serverCtx, rc.GetBasicCluster(), rc.GetOpts(), rc.GetRuleManager()) - rc.SchedulingController.coordinator = co for i := 0; i < 1000; i++ { co.CollectHotSpotMetrics() controller.CollectSchedulerMetrics() @@ -2524,6 +2523,11 @@ func TestCollectMetrics(t *testing.T) { nil) }, func(co *schedule.Coordinator) { co.Run() }, re) defer cleanup() + + rc := co.GetCluster().(*RaftCluster) + rc.schedulingController = newSchedulingController(rc.serverCtx, rc.GetBasicCluster(), rc.GetOpts(), rc.GetRuleManager()) + rc.schedulingController.coordinator = co + controller := co.GetSchedulersController() count := 10 for i := 0; i <= count; i++ { for k := 0; k < 200; k++ { @@ -2537,10 +2541,7 @@ func TestCollectMetrics(t *testing.T) { tc.hotStat.HotCache.Update(item, utils.Write) } } - controller := co.GetSchedulersController() - rc := co.GetCluster().(*RaftCluster) - rc.SchedulingController = NewSchedulingController(rc.serverCtx, rc.GetBasicCluster(), rc.GetOpts(), rc.GetRuleManager()) - rc.SchedulingController.coordinator = co + for i := 0; i < 1000; i++ { co.CollectHotSpotMetrics() controller.CollectSchedulerMetrics() diff --git a/server/cluster/scheduling_controller.go b/server/cluster/scheduling_controller.go index 7c780662d4f0..9cb6515e8c6a 100644 --- a/server/cluster/scheduling_controller.go +++ b/server/cluster/scheduling_controller.go @@ -76,13 +76,14 @@ func (sc *schedulingController) stopSchedulingJobs() bool { sc.mu.Lock() defer sc.mu.Unlock() if !sc.running { - return + return false } sc.coordinator.Stop() sc.cancel() sc.wg.Wait() sc.running = false log.Info("scheduling service is stopped") + return true } func (sc *schedulingController) startSchedulingJobs(cluster sche.ClusterInformer, hbstreams *hbstream.HeartbeatStreams) { @@ -113,13 +114,18 @@ func (sc *schedulingController) initCoordinatorLocked(ctx context.Context, clust } // runCoordinator runs the main scheduling loop. -func (sc *SchedulingController) runCoordinator() { +func (sc *schedulingController) runCoordinator() { defer logutil.LogPanic() defer sc.wg.Done() + select { + case <-sc.ctx.Done(): + return + default: + } sc.coordinator.RunUntilStop() } -func (sc *SchedulingController) runStatsBackgroundJobs() { +func (sc *schedulingController) runStatsBackgroundJobs() { defer logutil.LogPanic() defer sc.wg.Done() @@ -141,7 +147,7 @@ func (sc *SchedulingController) runStatsBackgroundJobs() { } } -func (sc *SchedulingController) runSchedulingMetricsCollectionJob() { +func (sc *schedulingController) runSchedulingMetricsCollectionJob() { defer logutil.LogPanic() defer sc.wg.Done() @@ -165,7 +171,7 @@ func (sc *SchedulingController) runSchedulingMetricsCollectionJob() { } } -func (sc *SchedulingController) resetSchedulingMetrics() { +func (sc *schedulingController) resetSchedulingMetrics() { statistics.Reset() schedulers.ResetSchedulerMetrics() schedule.ResetHotSpotMetrics() @@ -175,7 +181,7 @@ func (sc *SchedulingController) resetSchedulingMetrics() { statistics.ResetHotCacheStatusMetrics() } -func (sc *SchedulingController) collectSchedulingMetrics() { +func (sc *schedulingController) collectSchedulingMetrics() { statsMap := statistics.NewStoreStatisticsMap(sc.opt) stores := sc.GetStores() for _, s := range stores { @@ -194,33 +200,33 @@ func (sc *SchedulingController) collectSchedulingMetrics() { sc.hotStat.CollectMetrics() } -func (sc *SchedulingController) removeStoreStatistics(storeID uint64) { +func (sc *schedulingController) removeStoreStatistics(storeID uint64) { sc.hotStat.RemoveRollingStoreStats(storeID) sc.slowStat.RemoveSlowStoreStatus(storeID) } -func (sc *SchedulingController) updateStoreStatistics(storeID uint64, isSlow bool) { +func (sc *schedulingController) updateStoreStatistics(storeID uint64, isSlow bool) { sc.hotStat.GetOrCreateRollingStoreStats(storeID) sc.slowStat.ObserveSlowStoreStatus(storeID, isSlow) } // GetHotStat gets hot stat. -func (sc *SchedulingController) GetHotStat() *statistics.HotStat { +func (sc *schedulingController) GetHotStat() *statistics.HotStat { return sc.hotStat } // GetRegionStats gets region statistics. -func (sc *SchedulingController) GetRegionStats() *statistics.RegionStatistics { +func (sc *schedulingController) GetRegionStats() *statistics.RegionStatistics { return sc.regionStats } // GetLabelStats gets label statistics. -func (sc *SchedulingController) GetLabelStats() *statistics.LabelStatistics { +func (sc *schedulingController) GetLabelStats() *statistics.LabelStatistics { return sc.labelStats } // GetRegionStatsByType gets the status of the region by types. -func (sc *SchedulingController) GetRegionStatsByType(typ statistics.RegionStatisticType) []*core.RegionInfo { +func (sc *schedulingController) GetRegionStatsByType(typ statistics.RegionStatisticType) []*core.RegionInfo { if sc.regionStats == nil { return nil } @@ -228,13 +234,13 @@ func (sc *SchedulingController) GetRegionStatsByType(typ statistics.RegionStatis } // UpdateRegionsLabelLevelStats updates the status of the region label level by types. -func (sc *SchedulingController) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) { +func (sc *schedulingController) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) { for _, region := range regions { sc.labelStats.Observe(region, sc.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), sc.opt.GetLocationLabels()) } } -func (sc *SchedulingController) getStoresWithoutLabelLocked(region *core.RegionInfo, key, value string) []*core.StoreInfo { +func (sc *schedulingController) getStoresWithoutLabelLocked(region *core.RegionInfo, key, value string) []*core.StoreInfo { stores := make([]*core.StoreInfo, 0, len(region.GetPeers())) for _, p := range region.GetPeers() { if store := sc.GetStore(p.StoreId); store != nil && !core.IsStoreContainLabel(store.GetMeta(), key, value) { @@ -246,29 +252,29 @@ func (sc *SchedulingController) getStoresWithoutLabelLocked(region *core.RegionI // GetStoresStats returns stores' statistics from cluster. // And it will be unnecessary to filter unhealthy store, because it has been solved in process heartbeat -func (sc *SchedulingController) GetStoresStats() *statistics.StoresStats { +func (sc *schedulingController) GetStoresStats() *statistics.StoresStats { return sc.hotStat.StoresStats } // GetStoresLoads returns load stats of all stores. -func (sc *SchedulingController) GetStoresLoads() map[uint64][]float64 { +func (sc *schedulingController) GetStoresLoads() map[uint64][]float64 { return sc.hotStat.GetStoresLoads() } // IsRegionHot checks if a region is in hot state. -func (sc *SchedulingController) IsRegionHot(region *core.RegionInfo) bool { +func (sc *schedulingController) IsRegionHot(region *core.RegionInfo) bool { return sc.hotStat.IsRegionHot(region, sc.opt.GetHotRegionCacheHitsThreshold()) } // GetHotPeerStat returns hot peer stat with specified regionID and storeID. -func (sc *SchedulingController) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *statistics.HotPeerStat { +func (sc *schedulingController) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *statistics.HotPeerStat { return sc.hotStat.GetHotPeerStat(rw, regionID, storeID) } // RegionReadStats returns hot region's read stats. // The result only includes peers that are hot enough. // RegionStats is a thread-safe method -func (sc *SchedulingController) RegionReadStats() map[uint64][]*statistics.HotPeerStat { +func (sc *schedulingController) RegionReadStats() map[uint64][]*statistics.HotPeerStat { // As read stats are reported by store heartbeat, the threshold needs to be adjusted. threshold := sc.opt.GetHotRegionCacheHitsThreshold() * (utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval) @@ -277,13 +283,13 @@ func (sc *SchedulingController) RegionReadStats() map[uint64][]*statistics.HotPe // RegionWriteStats returns hot region's write stats. // The result only includes peers that are hot enough. -func (sc *SchedulingController) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { +func (sc *schedulingController) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { // RegionStats is a thread-safe method return sc.hotStat.RegionStats(utils.Write, sc.opt.GetHotRegionCacheHitsThreshold()) } // BucketsStats returns hot region's buckets stats. -func (sc *SchedulingController) BucketsStats(degree int, regionIDs ...uint64) map[uint64][]*buckets.BucketStat { +func (sc *schedulingController) BucketsStats(degree int, regionIDs ...uint64) map[uint64][]*buckets.BucketStat { return sc.hotStat.BucketsStats(degree, regionIDs...) } diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index ab38e7e428ff..b7a428e36831 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -1562,7 +1562,6 @@ func TestTransferLeaderBack(t *testing.T) { svr := leaderServer.GetServer() rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient()) rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetStorage(), svr.GetBasicCluster(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager()) - rc.SchedulingController = cluster.NewSchedulingController(ctx, rc.GetBasicCluster(), rc.GetOpts(), rc.GetRuleManager()) storage := rc.GetStorage() meta := &metapb.Cluster{Id: 123} re.NoError(storage.SaveMeta(meta)) diff --git a/tests/testutil.go b/tests/testutil.go index d4de26914fd2..2ccf6fb76be9 100644 --- a/tests/testutil.go +++ b/tests/testutil.go @@ -311,7 +311,6 @@ func (s *SchedulingTestEnvironment) startCluster(m mode) { // start scheduling cluster tc, err := NewTestSchedulingCluster(s.ctx, 1, leaderServer.GetAddr()) re.NoError(err) - re.NoError(leaderServer.BootstrapCluster()) tc.WaitForPrimaryServing(re) s.cluster.SetSchedulingCluster(tc) time.Sleep(200 * time.Millisecond) // wait for scheduling cluster to update member