Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config, cluster: add an option to halt the cluster scheduling #6498

Merged
merged 4 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ error = '''
TiKV cluster not bootstrapped, please start TiKV first
'''

["PD:cluster:ErrSchedulingIsHalted"]
error = '''
scheduling is halted
'''

["PD:cluster:ErrStoreIsUp"]
error = '''
store is still up, please remove store gracefully
Expand Down
109 changes: 108 additions & 1 deletion metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -2340,6 +2340,113 @@
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": true,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "The allowance status of the scheduling.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about putting is near "Scheduler is running"
But it makes sense where it is now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer leaving it here since it's more like a cluster-level status rather than the scheduler. Another reason is that if it is placed in the Scheduler panel, it may cause many changes to the Grafana JSON file. If it is only appended here, there will be fewer changes.

"fieldConfig": {
"defaults": {},
"overrides": []
},
"fill": 0,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 41
},
"hiddenSeries": false,
"id": 1464,
"legend": {
"alignAsTable": true,
"avg": false,
"current": true,
"hideEmpty": true,
"hideZero": true,
"max": false,
"min": false,
"rightSide": true,
"show": true,
"total": false,
"values": true
},
"lines": false,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"paceLength": 10,
"percentage": false,
"pluginVersion": "7.5.10",
"pointradius": 1,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": true,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "pd_scheduling_allowance_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=\"$instance\"}",
"format": "time_series",
"interval": "",
"intervalFactor": 2,
"legendFormat": "{{kind}}",
"metric": "pd_scheduling_allowance_status",
"refId": "A",
"step": 2
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Scheduling Allowance Status",
"tooltip": {
"shared": true,
"sort": 1,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"$$hashKey": "object:533",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": "0",
"show": true
},
{
"$$hashKey": "object:534",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"cacheTimeout": null,
"colorBackground": false,
Expand Down Expand Up @@ -2967,7 +3074,7 @@
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "{{event}}",
"metric": "pd_scheduler_status",
"metric": "pd_schedule_operators_count",
"refId": "A",
"step": 4
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,10 @@ var (

// cluster errors
var (
ErrNotBootstrapped = errors.Normalize("TiKV cluster not bootstrapped, please start TiKV first", errors.RFCCodeText("PD:cluster:ErrNotBootstrapped"))
ErrStoreIsUp = errors.Normalize("store is still up, please remove store gracefully", errors.RFCCodeText("PD:cluster:ErrStoreIsUp"))
ErrInvalidStoreID = errors.Normalize("invalid store id %d, not found", errors.RFCCodeText("PD:cluster:ErrInvalidStoreID"))
ErrNotBootstrapped = errors.Normalize("TiKV cluster not bootstrapped, please start TiKV first", errors.RFCCodeText("PD:cluster:ErrNotBootstrapped"))
ErrStoreIsUp = errors.Normalize("store is still up, please remove store gracefully", errors.RFCCodeText("PD:cluster:ErrStoreIsUp"))
ErrInvalidStoreID = errors.Normalize("invalid store id %d, not found", errors.RFCCodeText("PD:cluster:ErrInvalidStoreID"))
ErrSchedulingIsHalted = errors.Normalize("scheduling is halted", errors.RFCCodeText("PD:cluster:ErrSchedulingIsHalted"))
)

// versioninfo errors
Expand Down
8 changes: 3 additions & 5 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,6 @@ func (mc *Cluster) GetAllocator() id.Allocator {
return mc.IDAllocator
}

// IsUnsafeRecovering returns if the cluster is in unsafe recovering.
func (mc *Cluster) IsUnsafeRecovering() bool {
return false
}

// GetPersistOptions returns the persist options.
func (mc *Cluster) GetPersistOptions() *config.PersistOptions {
return mc.PersistOptions
Expand All @@ -123,6 +118,9 @@ func (mc *Cluster) IsSchedulerExisted(name string) (bool, error) { return false,
// IsSchedulerDisabled checks if the scheduler with name is disabled or not.
func (mc *Cluster) IsSchedulerDisabled(name string) (bool, error) { return false, nil }

// CheckSchedulingAllowance checks if the cluster allows scheduling currently.
func (mc *Cluster) CheckSchedulingAllowance() (bool, error) { return true, nil }

// ScanRegions scans region with start key, until number greater than limit.
func (mc *Cluster) ScanRegions(startKey, endKey []byte, limit int) []*core.RegionInfo {
return mc.ScanRange(startKey, endKey, limit)
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type Config interface {
SetSplitMergeInterval(time.Duration)
SetMaxReplicas(int)
SetPlacementRulesCacheEnabled(bool)
SetWitnessEnabled(bool)
SetEnableWitness(bool)
// only for store configuration
UseRaftV2()
}
Expand Down
14 changes: 10 additions & 4 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,7 @@ func (c *Coordinator) PatrolRegions() {
log.Info("patrol regions has been stopped")
return
}
if c.cluster.IsUnsafeRecovering() {
// Skip patrolling regions during unsafe recovery.
if allowed, _ := c.cluster.CheckSchedulingAllowance(); !allowed {
continue
}

Expand Down Expand Up @@ -559,7 +558,7 @@ func (c *Coordinator) CollectSchedulerMetrics() {
var allowScheduler float64
// If the scheduler is not allowed to schedule, it will disappear in Grafana panel.
// See issue #1341.
if !s.IsPaused() && !s.cluster.IsUnsafeRecovering() {
if allowed, _ := s.cluster.CheckSchedulingAllowance(); !s.IsPaused() && allowed {
allowScheduler = 1
}
schedulerStatusGauge.WithLabelValues(s.Scheduler.GetName(), "allow").Set(allowScheduler)
Expand Down Expand Up @@ -1029,7 +1028,14 @@ func (s *scheduleController) AllowSchedule(diagnosable bool) bool {
}
return false
}
if s.IsPaused() || s.cluster.IsUnsafeRecovering() {
allowed, _ := s.cluster.CheckSchedulingAllowance()
if !allowed {
if diagnosable {
s.diagnosticRecorder.setResultFromStatus(halted)
}
return false
}
if s.IsPaused() {
if diagnosable {
s.diagnosticRecorder.setResultFromStatus(paused)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/core/cluster_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ type ClusterInformer interface {
UpdateRegionsLabelLevelStats(regions []*core.RegionInfo)
IsSchedulerExisted(name string) (bool, error)
IsSchedulerDisabled(name string) (bool, error)
CheckSchedulingAllowance() (bool, error)
GetPersistOptions() *config.PersistOptions
IsUnsafeRecovering() bool
}

// RegionHealthCluster is an aggregate interface that wraps multiple interfaces
Expand Down
2 changes: 2 additions & 0 deletions pkg/schedule/diagnostic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const (
disabled = "disabled"
// paused means the current scheduler is paused
paused = "paused"
// halted means the current scheduler is halted
halted = "halted"
// scheduling means the current scheduler is generating.
scheduling = "scheduling"
// pending means the current scheduler cannot generate scheduling operator
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/placement/rule_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func newTestManager(t *testing.T, enableWitness bool) (endpoint.RuleStorage, *Ru
store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
var err error
manager := NewRuleManager(store, nil, mockconfig.NewTestOptions())
manager.conf.SetWitnessEnabled(enableWitness)
manager.conf.SetEnableWitness(enableWitness)
err = manager.Initialize(3, []string{"zone", "rack", "host"})
re.NoError(err)
return store, manager
Expand Down
27 changes: 22 additions & 5 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,11 +826,6 @@ func (c *RaftCluster) GetUnsafeRecoveryController() *unsaferecovery.Controller {
return c.unsafeRecoveryController
}

// IsUnsafeRecovering returns if the cluster is in unsafe recovering.
func (c *RaftCluster) IsUnsafeRecovering() bool {
return c.unsafeRecoveryController.IsRunning()
}

// AddSuspectKeyRange adds the key range with the its ruleID as the key
// The instance of each keyRange is like following format:
// [2][]byte: start key/end key
Expand Down Expand Up @@ -2711,3 +2706,25 @@ func (c *RaftCluster) GetPausedSchedulerDelayAt(name string) (int64, error) {
func (c *RaftCluster) GetPausedSchedulerDelayUntil(name string) (int64, error) {
return c.coordinator.GetPausedSchedulerDelayUntil(name)
}

var (
onlineUnsafeRecoveryStatus = schedulingAllowanceStatusGauge.WithLabelValues("online-unsafe-recovery")
haltSchedulingStatus = schedulingAllowanceStatusGauge.WithLabelValues("halt-scheduling")
)

// CheckSchedulingAllowance checks if the cluster allows scheduling currently.
func (c *RaftCluster) CheckSchedulingAllowance() (bool, error) {
// If the cluster is in the process of online unsafe recovery, it should not allow scheduling.
if c.GetUnsafeRecoveryController().IsRunning() {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
onlineUnsafeRecoveryStatus.Set(1)
return false, errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs()
}
onlineUnsafeRecoveryStatus.Set(0)
// If the halt-scheduling is set, it should not allow scheduling.
if c.opt.IsSchedulingHalted() {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
haltSchedulingStatus.Set(1)
return false, errs.ErrSchedulingIsHalted.FastGenByArgs()
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
}
haltSchedulingStatus.Set(0)
return true, nil
}
8 changes: 4 additions & 4 deletions server/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error {

// HandleAskSplit handles the split request.
func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSplitResponse, error) {
if c.GetUnsafeRecoveryController().IsRunning() {
return nil, errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs()
if allowed, err := c.CheckSchedulingAllowance(); !allowed {
return nil, err
}
if !c.opt.IsTikvRegionSplitEnabled() {
return nil, errs.ErrSchedulerTiKVSplitDisabled.FastGenByArgs()
Expand Down Expand Up @@ -105,8 +105,8 @@ func (c *RaftCluster) ValidRequestRegion(reqRegion *metapb.Region) error {

// HandleAskBatchSplit handles the batch split request.
func (c *RaftCluster) HandleAskBatchSplit(request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) {
if c.GetUnsafeRecoveryController().IsRunning() {
return nil, errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs()
if allowed, err := c.CheckSchedulingAllowance(); !allowed {
return nil, err
}
if !c.opt.IsTikvRegionSplitEnabled() {
return nil, errs.ErrSchedulerTiKVSplitDisabled.FastGenByArgs()
Expand Down
9 changes: 9 additions & 0 deletions server/cluster/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,20 @@ var (
Name: "store_sync",
Help: "The state of store sync config",
}, []string{"address", "state"})

schedulingAllowanceStatusGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "scheduling",
Name: "allowance_status",
Help: "Status of the scheduling allowance.",
}, []string{"kind"})
)

func init() {
prometheus.MustRegister(regionEventCounter)
prometheus.MustRegister(healthStatusGauge)
prometheus.MustRegister(schedulingAllowanceStatusGauge)
prometheus.MustRegister(clusterStateCPUGauge)
prometheus.MustRegister(clusterStateCurrent)
prometheus.MustRegister(bucketEventCounter)
Expand Down
9 changes: 9 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ const (
defaultEnableGRPCGateway = true
defaultDisableErrorVerbose = true
defaultEnableWitness = false
defaultHaltScheduling = false

defaultDashboardAddress = "auto"

Expand Down Expand Up @@ -684,6 +685,10 @@ type ScheduleConfig struct {
// v1: which is based on the region count by rate limit.
// v2: which is based on region size by window size.
StoreLimitVersion string `toml:"store-limit-version" json:"store-limit-version,omitempty"`

// HaltScheduling is the option to halt the scheduling. Once it's on, PD will halt the scheduling,
// and any other scheduling configs will be ignored.
HaltScheduling bool `toml:"halt-scheduling" json:"halt-scheduling,string,omitempty"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, I am trying to introduce a scheduling mode to cover this case. For me, it's ok to use an individual config to control it. Maybe we can name it enable-scheduling or something else.

Copy link
Member Author

@JmPotato JmPotato May 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's best to use a configuration name with a default value of false to control the global scheduling switch, in order to avoid unexpected behaviors in scenarios that require compatibility considerations such as upgrades. Therefore, from this perspective, I think descriptions like "disable" or "halt" are more appropriate. At the same time, this global shutdown scheduling behavior should not be long-term. In addition, we already have the concept and operation of "pause" for Scheduler. So I ultimately chose the word "halt". WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I work on #6553, I found that maybe it's better to use one config for both unsafe recovery or halt, so that we can decouple the dependencies between cluster and coordinator.

}

// Clone returns a cloned scheduling configuration.
Expand Down Expand Up @@ -820,6 +825,10 @@ func (c *ScheduleConfig) adjust(meta *configutil.ConfigMetaData, reloading bool)
configutil.AdjustString(&c.RegionScoreFormulaVersion, defaultRegionScoreFormulaVersion)
}

if !meta.IsDefined("halt-scheduling") {
c.HaltScheduling = defaultHaltScheduling
}

adjustSchedulers(&c.Schedulers, DefaultSchedulers)

for k, b := range c.migrateConfigurationMap() {
Expand Down
19 changes: 12 additions & 7 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,6 @@ func (o *PersistOptions) SetPlacementRulesCacheEnabled(enabled bool) {
o.SetReplicationConfig(v)
}

// SetWitnessEnabled set EanbleWitness
func (o *PersistOptions) SetWitnessEnabled(enabled bool) {
v := o.GetScheduleConfig().Clone()
v.EnableWitness = enabled
o.SetScheduleConfig(v)
}

// GetStrictlyMatchLabel returns whether check label strict.
func (o *PersistOptions) GetStrictlyMatchLabel() bool {
return o.GetReplicationConfig().StrictlyMatchLabel
Expand Down Expand Up @@ -926,3 +919,15 @@ func (o *PersistOptions) SetAllStoresLimitTTL(ctx context.Context, client *clien
}
return err
}

// SetHaltScheduling set HaltScheduling.
func (o *PersistOptions) SetHaltScheduling(halt bool) {
v := o.GetScheduleConfig().Clone()
v.HaltScheduling = halt
o.SetScheduleConfig(v)
}

// IsSchedulingHalted returns if PD scheduling is halted.
func (o *PersistOptions) IsSchedulingHalted() bool {
return o.GetScheduleConfig().HaltScheduling
}