diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 20b4fd95306..2a1c8ac637d 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -66,6 +66,8 @@ var ( DefaultMinResolvedTSPersistenceInterval = config.DefaultMinResolvedTSPersistenceInterval regionUpdateCacheEventCounter = regionEventCounter.WithLabelValues("update_cache") regionUpdateKVEventCounter = regionEventCounter.WithLabelValues("update_kv") + + denySchedulersByLabelerCounter = schedule.LabelerEventCounter.WithLabelValues("schedulers", "deny") ) // regionLabelGCInterval is the interval to run region-label's GC work. diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index c1770f861eb..e117df615b6 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -893,9 +893,27 @@ func (s *scheduleController) Schedule(diagnosable bool) []*operator.Operator { if diagnosable { s.diagnosticRecorder.setResultFromPlans(ops, plans) } + foundDisabled := false + for _, op := range ops { + if labelMgr := s.cluster.GetRegionLabeler(); labelMgr != nil { + region := s.cluster.GetRegion(op.RegionID()) + if region == nil { + continue + } + if labelMgr.ScheduleDisabled(region) { + denySchedulersByLabelerCounter.Inc() + foundDisabled = true + break + } + } + } if len(ops) > 0 { // If we have schedule, reset interval to the minimal interval. s.nextInterval = s.Scheduler.GetMinInterval() + // try regenerating operators + if foundDisabled { + continue + } return ops } } diff --git a/server/cluster/coordinator_test.go b/server/cluster/coordinator_test.go index 7df642f333e..b21a7910a98 100644 --- a/server/cluster/coordinator_test.go +++ b/server/cluster/coordinator_test.go @@ -411,11 +411,23 @@ func TestCheckRegionWithScheduleDeny(t *testing.T) { Data: []interface{}{map[string]interface{}{"start_key": "", "end_key": ""}}, }) + // should allow to do rule checker re.True(labelerManager.ScheduleDisabled(region)) - checkRegionAndOperator(re, tc, co, 1, 0) + checkRegionAndOperator(re, tc, co, 1, 1) + + // should not allow to merge + tc.opt.SetSplitMergeInterval(time.Duration(0)) + + re.NoError(tc.addLeaderRegion(2, 2, 3, 4)) + re.NoError(tc.addLeaderRegion(3, 2, 3, 4)) + region = tc.GetRegion(2) + re.True(labelerManager.ScheduleDisabled(region)) + checkRegionAndOperator(re, tc, co, 2, 0) + + // delete label rule, should allow to do merge labelerManager.DeleteLabelRule("schedulelabel") re.False(labelerManager.ScheduleDisabled(region)) - checkRegionAndOperator(re, tc, co, 1, 1) + checkRegionAndOperator(re, tc, co, 2, 2) } func TestCheckerIsBusy(t *testing.T) { @@ -876,6 +888,45 @@ func TestPersistScheduler(t *testing.T) { re.Len(co.schedulers, 3) } +func TestDenyScheduler(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, func(co *coordinator) { + labelerManager := co.cluster.GetRegionLabeler() + labelerManager.SetLabelRule(&labeler.LabelRule{ + ID: "schedulelabel", + Labels: []labeler.RegionLabel{{Key: "schedule", Value: "deny"}}, + RuleType: labeler.KeyRange, + Data: []interface{}{map[string]interface{}{"start_key": "", "end_key": ""}}, + }) + co.run() + }, re) + defer cleanup() + + re.Len(co.schedulers, len(config.DefaultSchedulers)) + + // Transfer peer from store 4 to store 1 if not set deny. + re.NoError(tc.addRegionStore(4, 40)) + re.NoError(tc.addRegionStore(3, 30)) + re.NoError(tc.addRegionStore(2, 20)) + re.NoError(tc.addRegionStore(1, 10)) + re.NoError(tc.addLeaderRegion(1, 2, 3, 4)) + + // Transfer leader from store 4 to store 2 if not set deny. + re.NoError(tc.updateLeaderCount(4, 1000)) + re.NoError(tc.updateLeaderCount(3, 50)) + re.NoError(tc.updateLeaderCount(2, 20)) + re.NoError(tc.updateLeaderCount(1, 10)) + re.NoError(tc.addLeaderRegion(2, 4, 3, 2)) + + // there should no balance leader/region operator + for i := 0; i < 10; i++ { + re.Nil(co.opController.GetOperator(1)) + re.Nil(co.opController.GetOperator(2)) + time.Sleep(10 * time.Millisecond) + } +} + func TestRemoveScheduler(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) diff --git a/server/schedule/checker/checker_controller.go b/server/schedule/checker/checker_controller.go index 4106ed1c780..d3e082ff60a 100644 --- a/server/schedule/checker/checker_controller.go +++ b/server/schedule/checker/checker_controller.go @@ -32,6 +32,8 @@ import ( // DefaultCacheSize is the default length of waiting list. const DefaultCacheSize = 1000 +var denyCheckersByLabelerCounter = schedule.LabelerEventCounter.WithLabelValues("checkers", "deny") + // Controller is used to manage all checkers. type Controller struct { cluster schedule.Cluster @@ -80,13 +82,6 @@ func (c *Controller) CheckRegion(region *core.RegionInfo) []*operator.Operator { return []*operator.Operator{op} } - if cl, ok := c.cluster.(interface{ GetRegionLabeler() *labeler.RegionLabeler }); ok { - l := cl.GetRegionLabeler() - if l.ScheduleDisabled(region) { - return nil - } - } - if op := c.splitChecker.Check(region); op != nil { return []*operator.Operator{op} } @@ -112,6 +107,15 @@ func (c *Controller) CheckRegion(region *core.RegionInfo) []*operator.Operator { c.regionWaitingList.Put(region.GetID(), nil) } } + // skip the joint checker, split checker and rule checker when region label is set to "schedule=deny". + // those checkers is help to make region health, it's necessary to skip them when region is set to deny. + if cl, ok := c.cluster.(interface{ GetRegionLabeler() *labeler.RegionLabeler }); ok { + l := cl.GetRegionLabeler() + if l.ScheduleDisabled(region) { + denyCheckersByLabelerCounter.Inc() + return nil + } + } if c.mergeChecker != nil { allowed := opController.OperatorCount(operator.OpMerge) < c.opts.GetMergeScheduleLimit() diff --git a/server/schedule/metrics.go b/server/schedule/metrics.go index 89dabf8e74e..79e789075fe 100644 --- a/server/schedule/metrics.go +++ b/server/schedule/metrics.go @@ -83,6 +83,15 @@ var ( Name: "scatter_distribution", Help: "Counter of the distribution in scatter.", }, []string{"store", "is_leader", "engine"}) + + // LabelerEventCounter is a counter of the scheduler labeler system. + LabelerEventCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "schedule", + Name: "labeler_event_counter", + Help: "Counter of the scheduler label.", + }, []string{"type", "event"}) ) func init() { @@ -94,4 +103,5 @@ func init() { prometheus.MustRegister(scatterCounter) prometheus.MustRegister(scatterDistributionCounter) prometheus.MustRegister(operatorSizeHist) + prometheus.MustRegister(LabelerEventCounter) } diff --git a/server/schedule/operator_controller.go b/server/schedule/operator_controller.go index c7b2cc99d34..4beb10b665c 100644 --- a/server/schedule/operator_controller.go +++ b/server/schedule/operator_controller.go @@ -30,7 +30,6 @@ import ( "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/core/storelimit" "github.com/tikv/pd/server/schedule/hbstream" - "github.com/tikv/pd/server/schedule/labeler" "github.com/tikv/pd/server/schedule/operator" "github.com/tikv/pd/server/versioninfo" "go.uber.org/zap" @@ -423,14 +422,6 @@ func (oc *OperatorController) checkAddOperator(isPromoting bool, ops ...*operato if op.SchedulerKind() == operator.OpAdmin || op.IsLeaveJointStateOperator() { continue } - if cl, ok := oc.cluster.(interface{ GetRegionLabeler() *labeler.RegionLabeler }); ok { - l := cl.GetRegionLabeler() - if l.ScheduleDisabled(region) { - log.Debug("schedule disabled", zap.Uint64("region-id", op.RegionID())) - operatorWaitCounter.WithLabelValues(op.Desc(), "schedule-disabled").Inc() - return false - } - } } expired := false for _, op := range ops { diff --git a/server/schedule/operator_controller_test.go b/server/schedule/operator_controller_test.go index afbfdc22e1b..3e1121289d0 100644 --- a/server/schedule/operator_controller_test.go +++ b/server/schedule/operator_controller_test.go @@ -762,23 +762,8 @@ func (suite *operatorControllerTestSuite) TestAddWaitingOperator() { }) suite.True(labelerManager.ScheduleDisabled(source)) - // add operator should be failed since it is labeled with `schedule=deny`. - suite.Equal(0, controller.AddWaitingOperator(ops...)) - - // add operator should be success without `schedule=deny` - labelerManager.DeleteLabelRule("schedulelabel") - labelerManager.ScheduleDisabled(source) - suite.False(labelerManager.ScheduleDisabled(source)) - // now there is one operator being allowed to add, if it is a merge operator - // both of the pair are allowed - ops, err = operator.CreateMergeRegionOperator("merge-region", cluster, source, target, operator.OpMerge) - suite.NoError(err) - suite.Len(ops, 2) + // add operator should be success since it is not check in addWaitingOperator suite.Equal(2, controller.AddWaitingOperator(ops...)) - suite.Equal(0, controller.AddWaitingOperator(ops...)) - - // no space left, new operator can not be added. - suite.Equal(0, controller.AddWaitingOperator(addPeerOp(0))) } // issue #5279