Skip to content

Commit

Permalink
checker: fix unhealth region skip the rule check (#6427) (#6430)
Browse files Browse the repository at this point in the history
close #6426, ref #6427

allow the `schedule=deny` label can do rule constraints check

Signed-off-by: ti-chi-bot <[email protected]>
Signed-off-by: nolouch <[email protected]>
Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: ShuNing <[email protected]>
Co-authored-by: nolouch <[email protected]>
Co-authored-by: Ryan Leung <[email protected]>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
4 people authored May 25, 2023
1 parent cb52d28 commit 08b8fc8
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 34 deletions.
2 changes: 2 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ var (
DefaultMinResolvedTSPersistenceInterval = config.DefaultMinResolvedTSPersistenceInterval
regionUpdateCacheEventCounter = regionEventCounter.WithLabelValues("update_cache")
regionUpdateKVEventCounter = regionEventCounter.WithLabelValues("update_kv")

denySchedulersByLabelerCounter = schedule.LabelerEventCounter.WithLabelValues("schedulers", "deny")
)

// regionLabelGCInterval is the interval to run region-label's GC work.
Expand Down
18 changes: 18 additions & 0 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,9 +893,27 @@ func (s *scheduleController) Schedule(diagnosable bool) []*operator.Operator {
if diagnosable {
s.diagnosticRecorder.setResultFromPlans(ops, plans)
}
foundDisabled := false
for _, op := range ops {
if labelMgr := s.cluster.GetRegionLabeler(); labelMgr != nil {
region := s.cluster.GetRegion(op.RegionID())
if region == nil {
continue
}
if labelMgr.ScheduleDisabled(region) {
denySchedulersByLabelerCounter.Inc()
foundDisabled = true
break
}
}
}
if len(ops) > 0 {
// If we have schedule, reset interval to the minimal interval.
s.nextInterval = s.Scheduler.GetMinInterval()
// try regenerating operators
if foundDisabled {
continue
}
return ops
}
}
Expand Down
55 changes: 53 additions & 2 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,11 +411,23 @@ func TestCheckRegionWithScheduleDeny(t *testing.T) {
Data: []interface{}{map[string]interface{}{"start_key": "", "end_key": ""}},
})

// should allow to do rule checker
re.True(labelerManager.ScheduleDisabled(region))
checkRegionAndOperator(re, tc, co, 1, 0)
checkRegionAndOperator(re, tc, co, 1, 1)

// should not allow to merge
tc.opt.SetSplitMergeInterval(time.Duration(0))

re.NoError(tc.addLeaderRegion(2, 2, 3, 4))
re.NoError(tc.addLeaderRegion(3, 2, 3, 4))
region = tc.GetRegion(2)
re.True(labelerManager.ScheduleDisabled(region))
checkRegionAndOperator(re, tc, co, 2, 0)

// delete label rule, should allow to do merge
labelerManager.DeleteLabelRule("schedulelabel")
re.False(labelerManager.ScheduleDisabled(region))
checkRegionAndOperator(re, tc, co, 1, 1)
checkRegionAndOperator(re, tc, co, 2, 2)
}

func TestCheckerIsBusy(t *testing.T) {
Expand Down Expand Up @@ -876,6 +888,45 @@ func TestPersistScheduler(t *testing.T) {
re.Len(co.schedulers, 3)
}

func TestDenyScheduler(t *testing.T) {
re := require.New(t)

tc, co, cleanup := prepare(nil, nil, func(co *coordinator) {
labelerManager := co.cluster.GetRegionLabeler()
labelerManager.SetLabelRule(&labeler.LabelRule{
ID: "schedulelabel",
Labels: []labeler.RegionLabel{{Key: "schedule", Value: "deny"}},
RuleType: labeler.KeyRange,
Data: []interface{}{map[string]interface{}{"start_key": "", "end_key": ""}},
})
co.run()
}, re)
defer cleanup()

re.Len(co.schedulers, len(config.DefaultSchedulers))

// Transfer peer from store 4 to store 1 if not set deny.
re.NoError(tc.addRegionStore(4, 40))
re.NoError(tc.addRegionStore(3, 30))
re.NoError(tc.addRegionStore(2, 20))
re.NoError(tc.addRegionStore(1, 10))
re.NoError(tc.addLeaderRegion(1, 2, 3, 4))

// Transfer leader from store 4 to store 2 if not set deny.
re.NoError(tc.updateLeaderCount(4, 1000))
re.NoError(tc.updateLeaderCount(3, 50))
re.NoError(tc.updateLeaderCount(2, 20))
re.NoError(tc.updateLeaderCount(1, 10))
re.NoError(tc.addLeaderRegion(2, 4, 3, 2))

// there should no balance leader/region operator
for i := 0; i < 10; i++ {
re.Nil(co.opController.GetOperator(1))
re.Nil(co.opController.GetOperator(2))
time.Sleep(10 * time.Millisecond)
}
}

func TestRemoveScheduler(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down
18 changes: 11 additions & 7 deletions server/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
// DefaultCacheSize is the default length of waiting list.
const DefaultCacheSize = 1000

var denyCheckersByLabelerCounter = schedule.LabelerEventCounter.WithLabelValues("checkers", "deny")

// Controller is used to manage all checkers.
type Controller struct {
cluster schedule.Cluster
Expand Down Expand Up @@ -80,13 +82,6 @@ func (c *Controller) CheckRegion(region *core.RegionInfo) []*operator.Operator {
return []*operator.Operator{op}
}

if cl, ok := c.cluster.(interface{ GetRegionLabeler() *labeler.RegionLabeler }); ok {
l := cl.GetRegionLabeler()
if l.ScheduleDisabled(region) {
return nil
}
}

if op := c.splitChecker.Check(region); op != nil {
return []*operator.Operator{op}
}
Expand All @@ -112,6 +107,15 @@ func (c *Controller) CheckRegion(region *core.RegionInfo) []*operator.Operator {
c.regionWaitingList.Put(region.GetID(), nil)
}
}
// skip the joint checker, split checker and rule checker when region label is set to "schedule=deny".
// those checkers is help to make region health, it's necessary to skip them when region is set to deny.
if cl, ok := c.cluster.(interface{ GetRegionLabeler() *labeler.RegionLabeler }); ok {
l := cl.GetRegionLabeler()
if l.ScheduleDisabled(region) {
denyCheckersByLabelerCounter.Inc()
return nil
}
}

if c.mergeChecker != nil {
allowed := opController.OperatorCount(operator.OpMerge) < c.opts.GetMergeScheduleLimit()
Expand Down
10 changes: 10 additions & 0 deletions server/schedule/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ var (
Name: "scatter_distribution",
Help: "Counter of the distribution in scatter.",
}, []string{"store", "is_leader", "engine"})

// LabelerEventCounter is a counter of the scheduler labeler system.
LabelerEventCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "schedule",
Name: "labeler_event_counter",
Help: "Counter of the scheduler label.",
}, []string{"type", "event"})
)

func init() {
Expand All @@ -94,4 +103,5 @@ func init() {
prometheus.MustRegister(scatterCounter)
prometheus.MustRegister(scatterDistributionCounter)
prometheus.MustRegister(operatorSizeHist)
prometheus.MustRegister(LabelerEventCounter)
}
9 changes: 0 additions & 9 deletions server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/core/storelimit"
"github.com/tikv/pd/server/schedule/hbstream"
"github.com/tikv/pd/server/schedule/labeler"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/versioninfo"
"go.uber.org/zap"
Expand Down Expand Up @@ -423,14 +422,6 @@ func (oc *OperatorController) checkAddOperator(isPromoting bool, ops ...*operato
if op.SchedulerKind() == operator.OpAdmin || op.IsLeaveJointStateOperator() {
continue
}
if cl, ok := oc.cluster.(interface{ GetRegionLabeler() *labeler.RegionLabeler }); ok {
l := cl.GetRegionLabeler()
if l.ScheduleDisabled(region) {
log.Debug("schedule disabled", zap.Uint64("region-id", op.RegionID()))
operatorWaitCounter.WithLabelValues(op.Desc(), "schedule-disabled").Inc()
return false
}
}
}
expired := false
for _, op := range ops {
Expand Down
17 changes: 1 addition & 16 deletions server/schedule/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,23 +762,8 @@ func (suite *operatorControllerTestSuite) TestAddWaitingOperator() {
})

suite.True(labelerManager.ScheduleDisabled(source))
// add operator should be failed since it is labeled with `schedule=deny`.
suite.Equal(0, controller.AddWaitingOperator(ops...))

// add operator should be success without `schedule=deny`
labelerManager.DeleteLabelRule("schedulelabel")
labelerManager.ScheduleDisabled(source)
suite.False(labelerManager.ScheduleDisabled(source))
// now there is one operator being allowed to add, if it is a merge operator
// both of the pair are allowed
ops, err = operator.CreateMergeRegionOperator("merge-region", cluster, source, target, operator.OpMerge)
suite.NoError(err)
suite.Len(ops, 2)
// add operator should be success since it is not check in addWaitingOperator
suite.Equal(2, controller.AddWaitingOperator(ops...))
suite.Equal(0, controller.AddWaitingOperator(ops...))

// no space left, new operator can not be added.
suite.Equal(0, controller.AddWaitingOperator(addPeerOp(0)))
}

// issue #5279
Expand Down

0 comments on commit 08b8fc8

Please sign in to comment.