diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 9caae932037..5e2ed58a009 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -504,12 +504,6 @@ func (s *Server) stopWatcher() { s.metaWatcher.Close() } -// GetPersistConfig returns the persist config. -// It's used to test. -func (s *Server) GetPersistConfig() *config.PersistConfig { - return s.persistConfig -} - // CreateServer creates the Server func CreateServer(ctx context.Context, cfg *config.Config) *Server { svr := &Server{ diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index 85cf84361b4..a359e1d023a 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -509,6 +509,6 @@ func checkOperatorFail(re *require.Assertions, oc *operator.Controller, op *oper func waitSyncFinish(re *require.Assertions, tc *tests.TestSchedulingCluster, typ storelimit.Type, expectedLimit float64) { testutil.Eventually(re, func() bool { - return tc.GetPrimaryServer().GetPersistConfig().GetStoreLimitByType(2, typ) == expectedLimit + return tc.GetPrimaryServer().GetCluster().GetSharedConfig().GetStoreLimitByType(2, typ) == expectedLimit }) } diff --git a/tests/pdctl/operator/operator_test.go b/tests/pdctl/operator/operator_test.go index 1752c28a3c0..8bb034993fa 100644 --- a/tests/pdctl/operator/operator_test.go +++ b/tests/pdctl/operator/operator_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/pdctl" @@ -221,6 +222,13 @@ func (suite *operatorTestSuite) checkOperator(cluster *tests.TestCluster) { _, err = pdctl.ExecuteCommand(cmd, "config", "set", "enable-placement-rules", "true") re.NoError(err) + if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { + // wait for the scheduler server to update the config + testutil.Eventually(re, func() bool { + return sche.GetCluster().GetCheckerConfig().IsPlacementRulesEnabled() + }) + } + output, err = pdctl.ExecuteCommand(cmd, "operator", "add", "transfer-region", "1", "2", "3") re.NoError(err) re.Contains(string(output), "not supported") diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index b3d9f356ad1..fe58e304791 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -472,10 +472,8 @@ func (suite *schedulerTestSuite) checkSchedulerDiagnostic(cluster *tests.TestClu result := make(map[string]interface{}) testutil.Eventually(re, func() bool { mightExec(re, cmd, []string{"-u", pdAddr, "scheduler", "describe", schedulerName}, &result) - return len(result) != 0 + return len(result) != 0 && expectedStatus == result["status"] && expectedSummary == result["summary"] }, testutil.WithTickInterval(50*time.Millisecond)) - re.Equal(expectedStatus, result["status"]) - re.Equal(expectedSummary, result["summary"]) } stores := []*metapb.Store{ diff --git a/tests/server/api/operator_test.go b/tests/server/api/operator_test.go index 64ed5114646..83ab0f3c7ed 100644 --- a/tests/server/api/operator_test.go +++ b/tests/server/api/operator_test.go @@ -15,6 +15,7 @@ package api import ( + "encoding/json" "errors" "fmt" "net/http" @@ -73,6 +74,7 @@ func (suite *operatorTestSuite) TestOperator() { func (suite *operatorTestSuite) checkAddRemovePeer(cluster *tests.TestCluster) { re := suite.Require() + suite.pauseRuleChecker(cluster) stores := []*metapb.Store{ { Id: 1, @@ -106,6 +108,8 @@ func (suite *operatorTestSuite) checkAddRemovePeer(cluster *tests.TestCluster) { ConfVer: 1, Version: 1, }, + StartKey: []byte("a"), + EndKey: []byte("b"), } regionInfo := core.NewRegionInfo(region, peer1) tests.MustPutRegionInfo(re, cluster, regionInfo) @@ -176,6 +180,7 @@ func (suite *operatorTestSuite) checkAddRemovePeer(cluster *tests.TestCluster) { func (suite *operatorTestSuite) checkMergeRegionOperator(cluster *tests.TestCluster) { re := suite.Require() + suite.pauseRuleChecker(cluster) r1 := core.NewTestRegionInfo(10, 1, []byte(""), []byte("b"), core.SetWrittenBytes(1000), core.SetReadBytes(1000), core.SetRegionConfVer(1), core.SetRegionVersion(1)) tests.MustPutRegionInfo(re, cluster, r1) r2 := core.NewTestRegionInfo(20, 1, []byte("b"), []byte("c"), core.SetWrittenBytes(2000), core.SetReadBytes(0), core.SetRegionConfVer(2), core.SetRegionVersion(3)) @@ -201,6 +206,7 @@ func (suite *operatorTestSuite) checkMergeRegionOperator(cluster *tests.TestClus func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *tests.TestCluster) { re := suite.Require() + suite.pauseRuleChecker(cluster) stores := []*metapb.Store{ { Id: 1, @@ -239,6 +245,8 @@ func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *te ConfVer: 1, Version: 1, }, + StartKey: []byte("a"), + EndKey: []byte("b"), } tests.MustPutRegionInfo(re, cluster, core.NewRegionInfo(region, peer1)) @@ -408,13 +416,24 @@ func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *te }, } svr := cluster.GetLeaderServer() + url := fmt.Sprintf("%s/pd/api/v1/config", svr.GetAddr()) for _, testCase := range testCases { suite.T().Log(testCase.name) - // TODO: remove this after we can sync this config to all servers. - if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { - sche.GetCluster().GetSchedulerConfig().SetPlacementRuleEnabled(testCase.placementRuleEnable) + data := make(map[string]interface{}) + if testCase.placementRuleEnable { + data["enable-placement-rules"] = "true" } else { - svr.GetRaftCluster().GetOpts().SetPlacementRuleEnabled(testCase.placementRuleEnable) + data["enable-placement-rules"] = "false" + } + reqData, e := json.Marshal(data) + re.NoError(e) + err := tu.CheckPostJSON(testDialClient, url, reqData, tu.StatusOK(re)) + re.NoError(err) + if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { + // wait for the scheduler server to update the config + tu.Eventually(re, func() bool { + return sche.GetCluster().GetCheckerConfig().IsPlacementRulesEnabled() == testCase.placementRuleEnable + }) } manager := svr.GetRaftCluster().GetRuleManager() if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { @@ -436,7 +455,6 @@ func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *te err = manager.DeleteRule("pd", "default") suite.NoError(err) } - var err error if testCase.expectedError == nil { err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", urlPrefix), testCase.input, tu.StatusOK(re)) } else { @@ -457,3 +475,17 @@ func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *te suite.NoError(err) } } + +// pauseRuleChecker will pause rule checker to avoid unexpected operator. +func (suite *operatorTestSuite) pauseRuleChecker(cluster *tests.TestCluster) { + re := suite.Require() + checkerName := "rule" + addr := cluster.GetLeaderServer().GetAddr() + resp := make(map[string]interface{}) + url := fmt.Sprintf("%s/pd/api/v1/checker/%s", addr, checkerName) + err := tu.CheckPostJSON(testDialClient, url, []byte(`{"delay":1000}`), tu.StatusOK(re)) + re.NoError(err) + err = tu.ReadGetJSON(re, testDialClient, url, &resp) + re.NoError(err) + re.True(resp["paused"].(bool)) +}