Skip to content

Commit

Permalink
resolve conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Nov 20, 2023
1 parent d8ffc5f commit 2d76829
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 33 deletions.
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ type RaftCluster struct {
core *core.BasicCluster // cached cluster info
opt *config.PersistOptions
limiter *StoreLimiter
*SchedulingController
*schedulingController
ruleManager *placement.RuleManager
regionLabeler *labeler.RegionLabeler
replicationMode *replication.ModeManager
Expand Down
19 changes: 10 additions & 9 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2486,7 +2486,10 @@ func TestCollectMetricsConcurrent(t *testing.T) {
nil)
}, func(co *schedule.Coordinator) { co.Run() }, re)
defer cleanup()

rc := co.GetCluster().(*RaftCluster)
rc.schedulingController = newSchedulingController(rc.serverCtx, rc.GetBasicCluster(), rc.GetOpts(), rc.GetRuleManager())
rc.schedulingController.coordinator = co
controller := co.GetSchedulersController()
// Make sure there are no problem when concurrent write and read
var wg sync.WaitGroup
count := 10
Expand All @@ -2499,10 +2502,6 @@ func TestCollectMetricsConcurrent(t *testing.T) {
}
}(i)
}
controller := co.GetSchedulersController()
rc := co.GetCluster().(*RaftCluster)
rc.SchedulingController = NewSchedulingController(rc.serverCtx, rc.GetBasicCluster(), rc.GetOpts(), rc.GetRuleManager())
rc.SchedulingController.coordinator = co
for i := 0; i < 1000; i++ {
co.CollectHotSpotMetrics()
controller.CollectSchedulerMetrics()
Expand All @@ -2524,6 +2523,11 @@ func TestCollectMetrics(t *testing.T) {
nil)
}, func(co *schedule.Coordinator) { co.Run() }, re)
defer cleanup()

rc := co.GetCluster().(*RaftCluster)
rc.schedulingController = newSchedulingController(rc.serverCtx, rc.GetBasicCluster(), rc.GetOpts(), rc.GetRuleManager())
rc.schedulingController.coordinator = co
controller := co.GetSchedulersController()
count := 10
for i := 0; i <= count; i++ {
for k := 0; k < 200; k++ {
Expand All @@ -2537,10 +2541,7 @@ func TestCollectMetrics(t *testing.T) {
tc.hotStat.HotCache.Update(item, utils.Write)
}
}
controller := co.GetSchedulersController()
rc := co.GetCluster().(*RaftCluster)
rc.SchedulingController = NewSchedulingController(rc.serverCtx, rc.GetBasicCluster(), rc.GetOpts(), rc.GetRuleManager())
rc.SchedulingController.coordinator = co

for i := 0; i < 1000; i++ {
co.CollectHotSpotMetrics()
controller.CollectSchedulerMetrics()
Expand Down
48 changes: 27 additions & 21 deletions server/cluster/scheduling_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,14 @@ func (sc *schedulingController) stopSchedulingJobs() bool {
sc.mu.Lock()
defer sc.mu.Unlock()
if !sc.running {
return
return false
}
sc.coordinator.Stop()
sc.cancel()
sc.wg.Wait()
sc.running = false
log.Info("scheduling service is stopped")
return true
}

func (sc *schedulingController) startSchedulingJobs(cluster sche.ClusterInformer, hbstreams *hbstream.HeartbeatStreams) {
Expand Down Expand Up @@ -113,13 +114,18 @@ func (sc *schedulingController) initCoordinatorLocked(ctx context.Context, clust
}

// runCoordinator runs the main scheduling loop.
func (sc *SchedulingController) runCoordinator() {
func (sc *schedulingController) runCoordinator() {
defer logutil.LogPanic()
defer sc.wg.Done()
select {
case <-sc.ctx.Done():
return
default:
}
sc.coordinator.RunUntilStop()
}

func (sc *SchedulingController) runStatsBackgroundJobs() {
func (sc *schedulingController) runStatsBackgroundJobs() {
defer logutil.LogPanic()
defer sc.wg.Done()

Expand All @@ -141,7 +147,7 @@ func (sc *SchedulingController) runStatsBackgroundJobs() {
}
}

func (sc *SchedulingController) runSchedulingMetricsCollectionJob() {
func (sc *schedulingController) runSchedulingMetricsCollectionJob() {
defer logutil.LogPanic()
defer sc.wg.Done()

Expand All @@ -165,7 +171,7 @@ func (sc *SchedulingController) runSchedulingMetricsCollectionJob() {
}
}

func (sc *SchedulingController) resetSchedulingMetrics() {
func (sc *schedulingController) resetSchedulingMetrics() {
statistics.Reset()
schedulers.ResetSchedulerMetrics()
schedule.ResetHotSpotMetrics()
Expand All @@ -175,7 +181,7 @@ func (sc *SchedulingController) resetSchedulingMetrics() {
statistics.ResetHotCacheStatusMetrics()
}

func (sc *SchedulingController) collectSchedulingMetrics() {
func (sc *schedulingController) collectSchedulingMetrics() {
statsMap := statistics.NewStoreStatisticsMap(sc.opt)
stores := sc.GetStores()
for _, s := range stores {
Expand All @@ -194,47 +200,47 @@ func (sc *SchedulingController) collectSchedulingMetrics() {
sc.hotStat.CollectMetrics()
}

func (sc *SchedulingController) removeStoreStatistics(storeID uint64) {
func (sc *schedulingController) removeStoreStatistics(storeID uint64) {
sc.hotStat.RemoveRollingStoreStats(storeID)
sc.slowStat.RemoveSlowStoreStatus(storeID)
}

func (sc *SchedulingController) updateStoreStatistics(storeID uint64, isSlow bool) {
func (sc *schedulingController) updateStoreStatistics(storeID uint64, isSlow bool) {
sc.hotStat.GetOrCreateRollingStoreStats(storeID)
sc.slowStat.ObserveSlowStoreStatus(storeID, isSlow)
}

// GetHotStat gets hot stat.
func (sc *SchedulingController) GetHotStat() *statistics.HotStat {
func (sc *schedulingController) GetHotStat() *statistics.HotStat {
return sc.hotStat
}

// GetRegionStats gets region statistics.
func (sc *SchedulingController) GetRegionStats() *statistics.RegionStatistics {
func (sc *schedulingController) GetRegionStats() *statistics.RegionStatistics {
return sc.regionStats
}

// GetLabelStats gets label statistics.
func (sc *SchedulingController) GetLabelStats() *statistics.LabelStatistics {
func (sc *schedulingController) GetLabelStats() *statistics.LabelStatistics {
return sc.labelStats
}

// GetRegionStatsByType gets the status of the region by types.
func (sc *SchedulingController) GetRegionStatsByType(typ statistics.RegionStatisticType) []*core.RegionInfo {
func (sc *schedulingController) GetRegionStatsByType(typ statistics.RegionStatisticType) []*core.RegionInfo {
if sc.regionStats == nil {
return nil
}
return sc.regionStats.GetRegionStatsByType(typ)
}

// UpdateRegionsLabelLevelStats updates the status of the region label level by types.
func (sc *SchedulingController) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {
func (sc *schedulingController) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {
for _, region := range regions {
sc.labelStats.Observe(region, sc.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), sc.opt.GetLocationLabels())
}
}

func (sc *SchedulingController) getStoresWithoutLabelLocked(region *core.RegionInfo, key, value string) []*core.StoreInfo {
func (sc *schedulingController) getStoresWithoutLabelLocked(region *core.RegionInfo, key, value string) []*core.StoreInfo {
stores := make([]*core.StoreInfo, 0, len(region.GetPeers()))
for _, p := range region.GetPeers() {
if store := sc.GetStore(p.StoreId); store != nil && !core.IsStoreContainLabel(store.GetMeta(), key, value) {
Expand All @@ -246,29 +252,29 @@ func (sc *SchedulingController) getStoresWithoutLabelLocked(region *core.RegionI

// GetStoresStats returns stores' statistics from cluster.
// And it will be unnecessary to filter unhealthy store, because it has been solved in process heartbeat
func (sc *SchedulingController) GetStoresStats() *statistics.StoresStats {
func (sc *schedulingController) GetStoresStats() *statistics.StoresStats {
return sc.hotStat.StoresStats
}

// GetStoresLoads returns load stats of all stores.
func (sc *SchedulingController) GetStoresLoads() map[uint64][]float64 {
func (sc *schedulingController) GetStoresLoads() map[uint64][]float64 {
return sc.hotStat.GetStoresLoads()
}

// IsRegionHot checks if a region is in hot state.
func (sc *SchedulingController) IsRegionHot(region *core.RegionInfo) bool {
func (sc *schedulingController) IsRegionHot(region *core.RegionInfo) bool {
return sc.hotStat.IsRegionHot(region, sc.opt.GetHotRegionCacheHitsThreshold())
}

// GetHotPeerStat returns hot peer stat with specified regionID and storeID.
func (sc *SchedulingController) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *statistics.HotPeerStat {
func (sc *schedulingController) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *statistics.HotPeerStat {
return sc.hotStat.GetHotPeerStat(rw, regionID, storeID)
}

// RegionReadStats returns hot region's read stats.
// The result only includes peers that are hot enough.
// RegionStats is a thread-safe method
func (sc *SchedulingController) RegionReadStats() map[uint64][]*statistics.HotPeerStat {
func (sc *schedulingController) RegionReadStats() map[uint64][]*statistics.HotPeerStat {
// As read stats are reported by store heartbeat, the threshold needs to be adjusted.
threshold := sc.opt.GetHotRegionCacheHitsThreshold() *
(utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval)
Expand All @@ -277,13 +283,13 @@ func (sc *SchedulingController) RegionReadStats() map[uint64][]*statistics.HotPe

// RegionWriteStats returns hot region's write stats.
// The result only includes peers that are hot enough.
func (sc *SchedulingController) RegionWriteStats() map[uint64][]*statistics.HotPeerStat {
func (sc *schedulingController) RegionWriteStats() map[uint64][]*statistics.HotPeerStat {
// RegionStats is a thread-safe method
return sc.hotStat.RegionStats(utils.Write, sc.opt.GetHotRegionCacheHitsThreshold())
}

// BucketsStats returns hot region's buckets stats.
func (sc *SchedulingController) BucketsStats(degree int, regionIDs ...uint64) map[uint64][]*buckets.BucketStat {
func (sc *schedulingController) BucketsStats(degree int, regionIDs ...uint64) map[uint64][]*buckets.BucketStat {
return sc.hotStat.BucketsStats(degree, regionIDs...)
}

Expand Down
1 change: 0 additions & 1 deletion tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1562,7 +1562,6 @@ func TestTransferLeaderBack(t *testing.T) {
svr := leaderServer.GetServer()
rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient())
rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetStorage(), svr.GetBasicCluster(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager())
rc.SchedulingController = cluster.NewSchedulingController(ctx, rc.GetBasicCluster(), rc.GetOpts(), rc.GetRuleManager())
storage := rc.GetStorage()
meta := &metapb.Cluster{Id: 123}
re.NoError(storage.SaveMeta(meta))
Expand Down
1 change: 0 additions & 1 deletion tests/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,6 @@ func (s *SchedulingTestEnvironment) startCluster(m mode) {
// start scheduling cluster
tc, err := NewTestSchedulingCluster(s.ctx, 1, leaderServer.GetAddr())
re.NoError(err)
re.NoError(leaderServer.BootstrapCluster())
tc.WaitForPrimaryServing(re)
s.cluster.SetSchedulingCluster(tc)
time.Sleep(200 * time.Millisecond) // wait for scheduling cluster to update member
Expand Down

0 comments on commit 2d76829

Please sign in to comment.