Skip to content

Commit

Permalink
extract scheduling control flow from cluster
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Nov 2, 2023
1 parent 0d47169 commit 1684a14
Show file tree
Hide file tree
Showing 8 changed files with 466 additions and 366 deletions.
389 changes: 32 additions & 357 deletions server/cluster/cluster.go

Large diffs are not rendered by default.

13 changes: 7 additions & 6 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1144,7 +1144,7 @@ func TestRegionLabelIsolationLevel(t *testing.T) {
re.NoError(cluster.putRegion(r))

cluster.UpdateRegionsLabelLevelStats([]*core.RegionInfo{r})
counter := cluster.labelLevelStats.GetLabelCounter()
counter := cluster.labelStats.GetLabelCounter()
re.Equal(0, counter["none"])
re.Equal(1, counter["zone"])
}
Expand Down Expand Up @@ -2130,14 +2130,15 @@ func newTestRaftCluster(
basicCluster *core.BasicCluster,
) *RaftCluster {
rc := &RaftCluster{serverCtx: ctx}
rc.InitCluster(id, opt, s, basicCluster, nil)
rc.InitCluster(id, opt, s, basicCluster, nil, nil)
rc.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), rc, opt)
if opt.IsPlacementRulesEnabled() {
err := rc.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel())
if err != nil {
panic(err)
}
}
rc.schedulingController.init(basicCluster, opt, nil, rc.ruleManager)
return rc
}

Expand Down Expand Up @@ -2502,11 +2503,11 @@ func TestCollectMetricsConcurrent(t *testing.T) {
for i := 0; i < 1000; i++ {
co.CollectHotSpotMetrics()
controller.CollectSchedulerMetrics()
co.GetCluster().(*RaftCluster).collectClusterMetrics()
co.GetCluster().(*RaftCluster).collectStatisticsMetrics()
}
co.ResetHotSpotMetrics()
controller.ResetSchedulerMetrics()
co.GetCluster().(*RaftCluster).resetClusterMetrics()
co.GetCluster().(*RaftCluster).resetStatisticsMetrics()
wg.Wait()
}

Expand Down Expand Up @@ -2537,7 +2538,7 @@ func TestCollectMetrics(t *testing.T) {
for i := 0; i < 1000; i++ {
co.CollectHotSpotMetrics()
controller.CollectSchedulerMetrics()
co.GetCluster().(*RaftCluster).collectClusterMetrics()
co.GetCluster().(*RaftCluster).collectStatisticsMetrics()
}
stores := co.GetCluster().GetStores()
regionStats := co.GetCluster().RegionWriteStats()
Expand All @@ -2552,7 +2553,7 @@ func TestCollectMetrics(t *testing.T) {
re.Equal(status1, status2)
co.ResetHotSpotMetrics()
controller.ResetSchedulerMetrics()
co.GetCluster().(*RaftCluster).resetClusterMetrics()
co.GetCluster().(*RaftCluster).resetStatisticsMetrics()
}

func prepare(setCfg func(*sc.ScheduleConfig), setTc func(*testCluster), run func(*schedule.Coordinator), re *require.Assertions) (*testCluster, *schedule.Coordinator, func()) {
Expand Down
Loading

0 comments on commit 1684a14

Please sign in to comment.