From 5bf0c14e7225c6439cf3337f52defa7bca2da469 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 18 Jan 2024 14:14:44 +0800 Subject: [PATCH] mcs: pick some updates and bug fixes (#256) * etcdutil, mcs: fix the issue loading label rules is too slow (#7718) Signed-off-by: lhy1024 * ci: run `make check` with longer timeout (#7271) ref tikv/pd#4399 Signed-off-by: lhy1024 * mcs: add a switch to dynamically enable scheduling service (#7595) ref tikv/pd#5839 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> * schedule: prevent suddenly scheduling (#7714) ref tikv/pd#7671 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> * makefile: update golangci (#7556) close tikv/pd#7551 Signed-off-by: husharp * fix conflict Signed-off-by: lhy1024 --------- Signed-off-by: lhy1024 Signed-off-by: husharp Co-authored-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> Co-authored-by: Hu# --- .github/workflows/check.yaml | 2 +- Makefile | 2 +- pkg/keyspace/util.go | 6 +- pkg/mcs/scheduling/server/rule/watcher.go | 25 ++-- .../scheduling/server/rule/watcher_test.go | 113 ++++++++++++++++++ pkg/schedule/labeler/labeler.go | 33 +++-- pkg/schedule/placement/rule_manager.go | 22 ++-- pkg/schedule/prepare_checker.go | 4 + pkg/tso/keyspace_group_manager.go | 5 - pkg/tso/keyspace_group_manager_test.go | 24 ---- pkg/utils/etcdutil/etcdutil.go | 102 +++++++++------- pkg/utils/etcdutil/etcdutil_test.go | 41 ++++++- pkg/utils/etcdutil/testutil.go | 11 +- server/api/config.go | 18 +++ server/cluster/cluster.go | 2 +- server/config/config.go | 28 +++++ server/config/persist_options.go | 14 +++ server/server.go | 22 ++++ .../mcs/scheduling/server_test.go | 53 +++++++- tests/pdctl/config/config_test.go | 31 +++++ tests/pdctl/operator/operator_test.go | 2 +- tests/server/api/operator_test.go | 2 +- tests/server/api/rule_test.go | 2 +- 23 files changed, 443 insertions(+), 121 deletions(-) create mode 100644 pkg/mcs/scheduling/server/rule/watcher_test.go diff --git a/.github/workflows/check.yaml b/.github/workflows/check.yaml index fb22dadccf7..df5ccfd9771 100644 --- a/.github/workflows/check.yaml +++ b/.github/workflows/check.yaml @@ -9,7 +9,7 @@ concurrency: jobs: statics: runs-on: ubuntu-latest - timeout-minutes: 10 + timeout-minutes: 20 steps: - name: Checkout code uses: actions/checkout@v3 diff --git a/Makefile b/Makefile index addfa628c42..81573e9da5b 100644 --- a/Makefile +++ b/Makefile @@ -160,7 +160,7 @@ SHELL := env PATH='$(PATH)' GOBIN='$(GO_TOOLS_BIN_PATH)' $(shell which bash) install-tools: @mkdir -p $(GO_TOOLS_BIN_PATH) - @which golangci-lint >/dev/null 2>&1 || curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GO_TOOLS_BIN_PATH) v1.51.2 + @which golangci-lint >/dev/null 2>&1 || curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GO_TOOLS_BIN_PATH) v1.55.2 @grep '_' tools.go | sed 's/"//g' | awk '{print $$2}' | xargs go install .PHONY: install-tools diff --git a/pkg/keyspace/util.go b/pkg/keyspace/util.go index e580412cb16..e68331e109c 100644 --- a/pkg/keyspace/util.go +++ b/pkg/keyspace/util.go @@ -175,8 +175,8 @@ func MakeRegionBound(id uint32) *RegionBound { } } -// makeKeyRanges encodes keyspace ID to correct LabelRule data. -func makeKeyRanges(id uint32, skipRaw bool) []interface{} { +// MakeKeyRanges encodes keyspace ID to correct LabelRule data. +func MakeKeyRanges(id uint32, skipRaw bool) []interface{} { regionBound := MakeRegionBound(id) if skipRaw { return []interface{}{ @@ -215,7 +215,7 @@ func makeLabelRule(id uint32, skipRaw bool) *labeler.LabelRule { }, }, RuleType: labeler.KeyRange, - Data: makeKeyRanges(id, skipRaw), + Data: MakeKeyRanges(id, skipRaw), } } diff --git a/pkg/mcs/scheduling/server/rule/watcher.go b/pkg/mcs/scheduling/server/rule/watcher.go index c345ef037df..d8a8dd3e609 100644 --- a/pkg/mcs/scheduling/server/rule/watcher.go +++ b/pkg/mcs/scheduling/server/rule/watcher.go @@ -110,7 +110,7 @@ func (rw *Watcher) initializeRuleWatcher() error { var suspectKeyRanges *core.KeyRanges preEventsFn := func(events []*clientv3.Event) error { - // It will be locked until the postFn is finished. + // It will be locked until the postEventsFn is finished. rw.ruleManager.Lock() rw.patch = rw.ruleManager.BeginPatch() suspectKeyRanges = &core.KeyRanges{} @@ -188,7 +188,7 @@ func (rw *Watcher) initializeRuleWatcher() error { } postEventsFn := func(events []*clientv3.Event) error { defer rw.ruleManager.Unlock() - if err := rw.ruleManager.TryCommitPatch(rw.patch); err != nil { + if err := rw.ruleManager.TryCommitPatchLocked(rw.patch); err != nil { log.Error("failed to commit patch", zap.Error(err)) return err } @@ -212,26 +212,37 @@ func (rw *Watcher) initializeRuleWatcher() error { func (rw *Watcher) initializeRegionLabelWatcher() error { prefixToTrim := rw.regionLabelPathPrefix + "/" + // TODO: use txn in region labeler. + preEventsFn := func(events []*clientv3.Event) error { + // It will be locked until the postEventsFn is finished. + rw.regionLabeler.Lock() + return nil + } putFn := func(kv *mvccpb.KeyValue) error { - log.Info("update region label rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) + log.Debug("update region label rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) rule, err := labeler.NewLabelRuleFromJSON(kv.Value) if err != nil { return err } - return rw.regionLabeler.SetLabelRule(rule) + return rw.regionLabeler.SetLabelRuleLocked(rule) } deleteFn := func(kv *mvccpb.KeyValue) error { key := string(kv.Key) log.Info("delete region label rule", zap.String("key", key)) - return rw.regionLabeler.DeleteLabelRule(strings.TrimPrefix(key, prefixToTrim)) + return rw.regionLabeler.DeleteLabelRuleLocked(strings.TrimPrefix(key, prefixToTrim)) + } + postEventsFn := func(events []*clientv3.Event) error { + defer rw.regionLabeler.Unlock() + rw.regionLabeler.BuildRangeListLocked() + return nil } rw.labelWatcher = etcdutil.NewLoopWatcher( rw.ctx, &rw.wg, rw.etcdClient, "scheduling-region-label-watcher", rw.regionLabelPathPrefix, - func([]*clientv3.Event) error { return nil }, + preEventsFn, putFn, deleteFn, - func([]*clientv3.Event) error { return nil }, + postEventsFn, true, /* withPrefix */ ) rw.labelWatcher.StartWatchLoop() diff --git a/pkg/mcs/scheduling/server/rule/watcher_test.go b/pkg/mcs/scheduling/server/rule/watcher_test.go new file mode 100644 index 00000000000..4b4a6f53e14 --- /dev/null +++ b/pkg/mcs/scheduling/server/rule/watcher_test.go @@ -0,0 +1,113 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rule + +import ( + "context" + "encoding/json" + "os" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/keyspace" + "github.com/tikv/pd/pkg/schedule/labeler" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/etcdutil" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/embed" +) + +const ( + clusterID = uint64(20240117) + rulesNum = 16384 +) + +func TestLoadLargeRules(t *testing.T) { + re := require.New(t) + ctx, client, clean := prepare(t) + defer clean() + runWatcherLoadLabelRule(ctx, re, client) +} + +func BenchmarkLoadLargeRules(b *testing.B) { + re := require.New(b) + ctx, client, clean := prepare(b) + defer clean() + + b.ResetTimer() // Resets the timer to ignore initialization time in the benchmark + + for n := 0; n < b.N; n++ { + runWatcherLoadLabelRule(ctx, re, client) + } +} + +func runWatcherLoadLabelRule(ctx context.Context, re *require.Assertions, client *clientv3.Client) { + storage := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil) + labelerManager, err := labeler.NewRegionLabeler(ctx, storage, time.Hour) + re.NoError(err) + ctx, cancel := context.WithCancel(ctx) + rw := &Watcher{ + ctx: ctx, + cancel: cancel, + rulesPathPrefix: endpoint.RulesPathPrefix(clusterID), + ruleCommonPathPrefix: endpoint.RuleCommonPathPrefix(clusterID), + ruleGroupPathPrefix: endpoint.RuleGroupPathPrefix(clusterID), + regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID), + etcdClient: client, + ruleStorage: storage, + regionLabeler: labelerManager, + } + err = rw.initializeRegionLabelWatcher() + re.NoError(err) + re.Len(labelerManager.GetAllLabelRules(), rulesNum) + cancel() +} + +func prepare(t require.TestingT) (context.Context, *clientv3.Client, func()) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + cfg := etcdutil.NewTestSingleConfig() + cfg.Dir = os.TempDir() + "/test_etcd" + os.RemoveAll(cfg.Dir) + etcd, err := embed.StartEtcd(cfg) + re.NoError(err) + client, err := etcdutil.CreateEtcdClient(nil, cfg.LCUrls) + re.NoError(err) + <-etcd.Server.ReadyNotify() + + for i := 1; i < rulesNum+1; i++ { + rule := &labeler.LabelRule{ + ID: "test_" + strconv.Itoa(i), + Labels: []labeler.RegionLabel{{Key: "test", Value: "test"}}, + RuleType: labeler.KeyRange, + Data: keyspace.MakeKeyRanges(uint32(i), true), + } + value, err := json.Marshal(rule) + re.NoError(err) + key := endpoint.RegionLabelPathPrefix(clusterID) + "/" + rule.ID + _, err = clientv3.NewKV(client).Put(ctx, key, string(value)) + re.NoError(err) + } + + return ctx, client, func() { + cancel() + client.Close() + etcd.Close() + os.RemoveAll(cfg.Dir) + } +} diff --git a/pkg/schedule/labeler/labeler.go b/pkg/schedule/labeler/labeler.go index 39722b1a038..7254460f577 100644 --- a/pkg/schedule/labeler/labeler.go +++ b/pkg/schedule/labeler/labeler.go @@ -99,7 +99,7 @@ func (l *RegionLabeler) checkAndClearExpiredLabels() { } } if deleted { - l.buildRangeList() + l.BuildRangeListLocked() } } @@ -128,11 +128,12 @@ func (l *RegionLabeler) loadRules() error { return err } } - l.buildRangeList() + l.BuildRangeListLocked() return nil } -func (l *RegionLabeler) buildRangeList() { +// BuildRangeListLocked builds the range list. +func (l *RegionLabeler) BuildRangeListLocked() { builder := rangelist.NewBuilder() l.minExpire = nil for _, rule := range l.labelRules { @@ -206,16 +207,24 @@ func (l *RegionLabeler) getAndCheckRule(id string, now time.Time) *LabelRule { // SetLabelRule inserts or updates a LabelRule. func (l *RegionLabeler) SetLabelRule(rule *LabelRule) error { + l.Lock() + defer l.Unlock() + if err := l.SetLabelRuleLocked(rule); err != nil { + return err + } + l.BuildRangeListLocked() + return nil +} + +// SetLabelRuleLocked inserts or updates a LabelRule but not buildRangeList. +func (l *RegionLabeler) SetLabelRuleLocked(rule *LabelRule) error { if err := rule.checkAndAdjust(); err != nil { return err } - l.Lock() - defer l.Unlock() if err := l.storage.SaveRegionRule(rule.ID, rule); err != nil { return err } l.labelRules[rule.ID] = rule - l.buildRangeList() return nil } @@ -223,6 +232,15 @@ func (l *RegionLabeler) SetLabelRule(rule *LabelRule) error { func (l *RegionLabeler) DeleteLabelRule(id string) error { l.Lock() defer l.Unlock() + if err := l.DeleteLabelRuleLocked(id); err != nil { + return err + } + l.BuildRangeListLocked() + return nil +} + +// DeleteLabelRuleLocked removes a LabelRule but not buildRangeList. +func (l *RegionLabeler) DeleteLabelRuleLocked(id string) error { if _, ok := l.labelRules[id]; !ok { return errs.ErrRegionRuleNotFound.FastGenByArgs(id) } @@ -230,7 +248,6 @@ func (l *RegionLabeler) DeleteLabelRule(id string) error { return err } delete(l.labelRules, id) - l.buildRangeList() return nil } @@ -264,7 +281,7 @@ func (l *RegionLabeler) Patch(patch LabelRulePatch) error { for _, rule := range patch.SetRules { l.labelRules[rule.ID] = rule } - l.buildRangeList() + l.BuildRangeListLocked() return nil } diff --git a/pkg/schedule/placement/rule_manager.go b/pkg/schedule/placement/rule_manager.go index 63537f4d7b0..3be1373e488 100644 --- a/pkg/schedule/placement/rule_manager.go +++ b/pkg/schedule/placement/rule_manager.go @@ -309,7 +309,7 @@ func (m *RuleManager) SetRule(rule *Rule) error { defer m.Unlock() p := m.BeginPatch() p.SetRule(rule) - if err := m.TryCommitPatch(p); err != nil { + if err := m.TryCommitPatchLocked(p); err != nil { return err } log.Info("placement rule updated", zap.String("rule", fmt.Sprint(rule))) @@ -322,7 +322,7 @@ func (m *RuleManager) DeleteRule(group, id string) error { defer m.Unlock() p := m.BeginPatch() p.DeleteRule(group, id) - if err := m.TryCommitPatch(p); err != nil { + if err := m.TryCommitPatchLocked(p); err != nil { return err } log.Info("placement rule is removed", zap.String("group", group), zap.String("id", id)) @@ -467,8 +467,8 @@ func (m *RuleManager) BeginPatch() *RuleConfigPatch { return m.ruleConfig.beginPatch() } -// TryCommitPatch tries to commit a patch. -func (m *RuleManager) TryCommitPatch(patch *RuleConfigPatch) error { +// TryCommitPatchLocked tries to commit a patch. +func (m *RuleManager) TryCommitPatchLocked(patch *RuleConfigPatch) error { patch.adjust() ruleList, err := buildRuleList(patch) @@ -533,7 +533,7 @@ func (m *RuleManager) SetRules(rules []*Rule) error { } p.SetRule(r) } - if err := m.TryCommitPatch(p); err != nil { + if err := m.TryCommitPatchLocked(p); err != nil { return err } @@ -596,7 +596,7 @@ func (m *RuleManager) Batch(todo []RuleOp) error { } } - if err := m.TryCommitPatch(patch); err != nil { + if err := m.TryCommitPatchLocked(patch); err != nil { return err } @@ -632,7 +632,7 @@ func (m *RuleManager) SetRuleGroup(group *RuleGroup) error { defer m.Unlock() p := m.BeginPatch() p.SetGroup(group) - if err := m.TryCommitPatch(p); err != nil { + if err := m.TryCommitPatchLocked(p); err != nil { return err } log.Info("group config updated", zap.String("group", fmt.Sprint(group))) @@ -645,7 +645,7 @@ func (m *RuleManager) DeleteRuleGroup(id string) error { defer m.Unlock() p := m.BeginPatch() p.DeleteGroup(id) - if err := m.TryCommitPatch(p); err != nil { + if err := m.TryCommitPatchLocked(p); err != nil { return err } log.Info("group config reset", zap.String("group", id)) @@ -735,7 +735,7 @@ func (m *RuleManager) SetAllGroupBundles(groups []GroupBundle, override bool) er p.SetRule(r) } } - if err := m.TryCommitPatch(p); err != nil { + if err := m.TryCommitPatchLocked(p); err != nil { return err } log.Info("full config reset", zap.String("config", fmt.Sprint(groups))) @@ -766,7 +766,7 @@ func (m *RuleManager) SetGroupBundle(group GroupBundle) error { } p.SetRule(r) } - if err := m.TryCommitPatch(p); err != nil { + if err := m.TryCommitPatchLocked(p); err != nil { return err } log.Info("group is reset", zap.String("group", fmt.Sprint(group))) @@ -798,7 +798,7 @@ func (m *RuleManager) DeleteGroupBundle(id string, regex bool) error { p.DeleteGroup(g.ID) } } - if err := m.TryCommitPatch(p); err != nil { + if err := m.TryCommitPatchLocked(p); err != nil { return err } log.Info("groups are removed", zap.String("id", id), zap.Bool("regexp", regex)) diff --git a/pkg/schedule/prepare_checker.go b/pkg/schedule/prepare_checker.go index 7843249229b..126e3bba41d 100644 --- a/pkg/schedule/prepare_checker.go +++ b/pkg/schedule/prepare_checker.go @@ -60,6 +60,10 @@ func (checker *prepareChecker) check(c *core.BasicCluster, collectWaitTime ...ti continue } storeID := store.GetID() + // It is used to avoid sudden scheduling when scheduling service is just started. + if len(collectWaitTime) > 0 && (float64(store.GetStoreStats().GetRegionCount())*collectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID))) { + return false + } // For each store, the number of active regions should be more than total region of the store * collectFactor if float64(c.GetStoreRegionCount(storeID))*collectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID)) { return false diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index fc60a26d5af..fb3cbd477b7 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -364,8 +364,6 @@ type KeyspaceGroupManager struct { // cfg is the TSO config cfg ServiceConfig - // loadKeyspaceGroupsTimeout is the timeout for loading the initial keyspace group assignment. - loadKeyspaceGroupsTimeout time.Duration loadKeyspaceGroupsBatchSize int64 loadFromEtcdMaxRetryTimes int @@ -574,9 +572,6 @@ func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error { postEventsFn, true, /* withPrefix */ ) - if kgm.loadKeyspaceGroupsTimeout > 0 { - kgm.groupWatcher.SetLoadTimeout(kgm.loadKeyspaceGroupsTimeout) - } if kgm.loadFromEtcdMaxRetryTimes > 0 { kgm.groupWatcher.SetLoadRetryTimes(kgm.loadFromEtcdMaxRetryTimes) } diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index 0c1b017d7aa..99e2e058591 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -31,7 +31,6 @@ import ( "github.com/pingcap/failpoint" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/discovery" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/storage/endpoint" @@ -228,29 +227,6 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadWithDifferentBatchSize() { } } -// TestLoadKeyspaceGroupsTimeout tests there is timeout when loading the initial keyspace group assignment -// from etcd. The initialization of the keyspace group manager should fail. -func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsTimeout() { - re := suite.Require() - - mgr := suite.newUniqueKeyspaceGroupManager(1) - re.NotNil(mgr) - defer mgr.Close() - - addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, uint32(0), mgr.legacySvcRootPath, - []string{mgr.tsoServiceID.ServiceAddr}, []int{0}, []uint32{0}) - - // Set the timeout to 1 second and inject the delayLoad to return 3 seconds to let - // the loading sleep 3 seconds. - mgr.loadKeyspaceGroupsTimeout = time.Second - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/delayLoad", "return(3)")) - err := mgr.Initialize() - // If loading keyspace groups timeout, the initialization should fail with ErrLoadKeyspaceGroupsTerminated. - re.Contains(err.Error(), errs.ErrLoadKeyspaceGroupsTerminated.Error()) - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/delayLoad")) -} - // TestLoadKeyspaceGroupsSucceedWithTempFailures tests the initialization should succeed when there are temporary // failures during loading the initial keyspace group assignment from etcd. func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsSucceedWithTempFailures() { diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 983b55861be..a8638f024d0 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -21,6 +21,7 @@ import ( "math/rand" "net/http" "net/url" + "strings" "sync" "time" @@ -39,6 +40,7 @@ import ( "go.etcd.io/etcd/mvcc/mvccpb" "go.etcd.io/etcd/pkg/types" "go.uber.org/zap" + "google.golang.org/grpc/codes" ) const ( @@ -550,10 +552,10 @@ func InitOrGetClusterID(c *clientv3.Client, key string) (uint64, error) { } const ( - defaultLoadDataFromEtcdTimeout = 5 * time.Minute - defaultEtcdRetryInterval = time.Second - defaultLoadFromEtcdRetryTimes = 3 - defaultLoadBatchSize = 400 + defaultEtcdRetryInterval = time.Second + defaultLoadFromEtcdRetryTimes = 3 + maxLoadBatchSize = int64(10000) + minLoadBatchSize = int64(100) // RequestProgressInterval is the interval to call RequestProgress for watcher. RequestProgressInterval = 1 * time.Second @@ -592,8 +594,6 @@ type LoopWatcher struct { // lastTimeForceLoad is used to record the last time force loading data from etcd. lastTimeForceLoad time.Time - // loadTimeout is used to set the timeout for loading data from etcd. - loadTimeout time.Duration // loadRetryTimes is used to set the retry times for loading data from etcd. loadRetryTimes int // loadBatchSize is used to set the batch size for loading data from etcd. @@ -630,9 +630,8 @@ func NewLoopWatcher( preEventsFn: preEventsFn, isWithPrefix: isWithPrefix, lastTimeForceLoad: time.Now(), - loadTimeout: defaultLoadDataFromEtcdTimeout, loadRetryTimes: defaultLoadFromEtcdRetryTimes, - loadBatchSize: defaultLoadBatchSize, + loadBatchSize: maxLoadBatchSize, watchChangeRetryInterval: defaultEtcdRetryInterval, } } @@ -849,21 +848,10 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision } func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error) { - ctx, cancel := context.WithTimeout(ctx, lw.loadTimeout) - defer cancel() - failpoint.Inject("delayLoad", func(val failpoint.Value) { - if sleepIntervalSeconds, ok := val.(int); ok && sleepIntervalSeconds > 0 { - time.Sleep(time.Duration(sleepIntervalSeconds) * time.Second) - } - }) - startKey := lw.key - // If limit is 0, it means no limit. - // If limit is not 0, we need to add 1 to limit to get the next key. limit := lw.loadBatchSize - if limit != 0 { - limit++ - } + opts := lw.buildLoadingOpts(limit) + if err := lw.preEventsFn([]*clientv3.Event{}); err != nil { log.Error("run pre event failed in watch loop", zap.String("name", lw.name), zap.String("key", lw.key), zap.Error(err)) @@ -875,32 +863,42 @@ func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error) } }() - // In most cases, 'Get(foo, WithPrefix())' is equivalent to 'Get(foo, WithRange(GetPrefixRangeEnd(foo))'. - // However, when the startKey changes, the two are no longer equivalent. - // For example, the end key for 'WithRange(GetPrefixRangeEnd(foo))' is consistently 'fop'. - // But when using 'Get(foo1, WithPrefix())', the end key becomes 'foo2', not 'fop'. - // So, we use 'WithRange()' to avoid this problem. - opts := []clientv3.OpOption{ - clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend), - clientv3.WithLimit(limit)} - if lw.isWithPrefix { - opts = append(opts, clientv3.WithRange(clientv3.GetPrefixRangeEnd(startKey))) - } - for { - // Sort by key to get the next key and we don't need to worry about the performance, - // Because the default sort is just SortByKey and SortAscend - resp, err := clientv3.NewKV(lw.client).Get(ctx, startKey, opts...) + select { + case <-ctx.Done(): + return 0, nil + default: + } + resp, err := EtcdKVGet(lw.client, startKey, opts...) + failpoint.Inject("meetEtcdError", func() { + if limit > minLoadBatchSize { + err = errors.New(codes.ResourceExhausted.String()) + } + }) if err != nil { log.Error("load failed in watch loop", zap.String("name", lw.name), zap.String("key", lw.key), zap.Error(err)) + if strings.Contains(err.Error(), codes.ResourceExhausted.String()) || + strings.Contains(err.Error(), codes.DeadlineExceeded.String()) { + if limit == 0 { + limit = maxLoadBatchSize + } else if limit > minLoadBatchSize { + limit /= 2 + } else { + return 0, err + } + opts = lw.buildLoadingOpts(limit) + continue + } return 0, err } for i, item := range resp.Kvs { - if resp.More && i == len(resp.Kvs)-1 { - // The last key is the start key of the next batch. - // To avoid to get the same key in the next load, we need to skip the last key. + if i == len(resp.Kvs)-1 && resp.More { + // If there are more keys, we need to load the next batch. + // The last key in current batch is the start key of the next batch. startKey = string(item.Key) + // To avoid to get the same key in the next batch, + // we need to skip the last key for the current batch. continue } err = lw.putFn(item) @@ -919,6 +917,27 @@ func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error) } } +func (lw *LoopWatcher) buildLoadingOpts(limit int64) []clientv3.OpOption { + // Sort by key to get the next key and we don't need to worry about the performance, + // Because the default sort is just SortByKey and SortAscend + opts := []clientv3.OpOption{ + clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)} + // In most cases, 'Get(foo, WithPrefix())' is equivalent to 'Get(foo, WithRange(GetPrefixRangeEnd(foo))'. + // However, when the startKey changes, the two are no longer equivalent. + // For example, the end key for 'WithRange(GetPrefixRangeEnd(foo))' is consistently 'fop'. + // But when using 'Get(foo1, WithPrefix())', the end key becomes 'foo2', not 'fop'. + // So, we use 'WithRange()' to avoid this problem. + if lw.isWithPrefix { + opts = append(opts, clientv3.WithRange(clientv3.GetPrefixRangeEnd(lw.key))) + } + // If limit is 0, it means no limit. + // If limit is not 0, we need to add 1 to limit to get the next key. + if limit == 0 { + return opts + } + return append(opts, clientv3.WithLimit(limit+1)) +} + // ForceLoad forces to load the key. func (lw *LoopWatcher) ForceLoad() { // When NotLeader error happens, a large volume of force load requests will be received here, @@ -956,11 +975,6 @@ func (lw *LoopWatcher) SetLoadRetryTimes(times int) { lw.loadRetryTimes = times } -// SetLoadTimeout sets the timeout when loading data from etcd. -func (lw *LoopWatcher) SetLoadTimeout(timeout time.Duration) { - lw.loadTimeout = timeout -} - // SetLoadBatchSize sets the batch size when loading data from etcd. func (lw *LoopWatcher) SetLoadBatchSize(size int64) { lw.loadBatchSize = size diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index 2d0a4e79cee..0e9e6b5f3aa 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -376,11 +376,11 @@ func TestLoopWatcherTestSuite(t *testing.T) { func (suite *loopWatcherTestSuite) SetupSuite() { var err error - t := suite.T() suite.ctx, suite.cancel = context.WithCancel(context.Background()) suite.cleans = make([]func(), 0) // Start a etcd server and create a client with etcd1 as endpoint. - suite.config = newTestSingleConfig(t) + suite.config = NewTestSingleConfig() + suite.config.Dir = suite.T().TempDir() suite.startEtcd() suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls) suite.NoError(err) @@ -397,7 +397,7 @@ func (suite *loopWatcherTestSuite) TearDownSuite() { } } -func (suite *loopWatcherTestSuite) TestLoadWithoutKey() { +func (suite *loopWatcherTestSuite) TestLoadNoExistedKey() { re := suite.Require() cache := make(map[string]struct{}) watcher := NewLoopWatcher( @@ -405,7 +405,7 @@ func (suite *loopWatcherTestSuite) TestLoadWithoutKey() { &suite.wg, suite.client, "test", - "TestLoadWithoutKey", + "TestLoadNoExistedKey", func([]*clientv3.Event) error { return nil }, func(kv *mvccpb.KeyValue) error { cache[string(kv.Key)] = struct{}{} @@ -421,6 +421,35 @@ func (suite *loopWatcherTestSuite) TestLoadWithoutKey() { re.Empty(cache) } +func (suite *loopWatcherTestSuite) TestLoadWithLimitChange() { + re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/meetEtcdError", `return()`)) + cache := make(map[string]struct{}) + for i := 0; i < int(maxLoadBatchSize)*2; i++ { + suite.put(fmt.Sprintf("TestLoadWithLimitChange%d", i), "") + } + watcher := NewLoopWatcher( + suite.ctx, + &suite.wg, + suite.client, + "test", + "TestLoadWithLimitChange", + func([]*clientv3.Event) error { return nil }, + func(kv *mvccpb.KeyValue) error { + cache[string(kv.Key)] = struct{}{} + return nil + }, + func(kv *mvccpb.KeyValue) error { return nil }, + func([]*clientv3.Event) error { return nil }, + true, /* withPrefix */ + ) + watcher.StartWatchLoop() + err := watcher.WaitLoad() + re.NoError(err) + re.Len(cache, int(maxLoadBatchSize)*2) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/meetEtcdError")) +} + func (suite *loopWatcherTestSuite) TestCallBack() { cache := struct { syncutil.RWMutex @@ -521,8 +550,8 @@ func (suite *loopWatcherTestSuite) TestWatcherLoadLimit() { func (suite *loopWatcherTestSuite) TestWatcherLoadLargeKey() { re := suite.Require() - // use default limit to test 16384 key in etcd - count := 16384 + // use default limit to test 65536 key in etcd + count := 65536 ctx, cancel := context.WithCancel(suite.ctx) defer cancel() for i := 0; i < count; i++ { diff --git a/pkg/utils/etcdutil/testutil.go b/pkg/utils/etcdutil/testutil.go index 54ba38b93b6..2096886eab0 100644 --- a/pkg/utils/etcdutil/testutil.go +++ b/pkg/utils/etcdutil/testutil.go @@ -29,11 +29,10 @@ import ( "go.etcd.io/etcd/etcdserver/etcdserverpb" ) -// newTestSingleConfig is used to create a etcd config for the unit test purpose. -func newTestSingleConfig(t *testing.T) *embed.Config { +// NewTestSingleConfig is used to create a etcd config for the unit test purpose. +func NewTestSingleConfig() *embed.Config { cfg := embed.NewConfig() cfg.Name = genRandName() - cfg.Dir = t.TempDir() cfg.WalDir = "" cfg.Logger = "zap" cfg.LogOutputs = []string{"stdout"} @@ -60,7 +59,8 @@ func NewTestEtcdCluster(t *testing.T, count int) (servers []*embed.Etcd, etcdCli re := require.New(t) servers = make([]*embed.Etcd, 0, count) - cfg := newTestSingleConfig(t) + cfg := NewTestSingleConfig() + cfg.Dir = t.TempDir() etcd, err := embed.StartEtcd(cfg) re.NoError(err) etcdClient, err = CreateEtcdClient(nil, cfg.LCUrls) @@ -98,7 +98,8 @@ func NewTestEtcdCluster(t *testing.T, count int) (servers []*embed.Etcd, etcdCli // MustAddEtcdMember is used to add a new etcd member to the cluster for test. func MustAddEtcdMember(t *testing.T, cfg1 *embed.Config, client *clientv3.Client) *embed.Etcd { re := require.New(t) - cfg2 := newTestSingleConfig(t) + cfg2 := NewTestSingleConfig() + cfg2.Dir = t.TempDir() cfg2.Name = genRandName() cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0]) cfg2.ClusterState = embed.ClusterStateFlagExisting diff --git a/server/api/config.go b/server/api/config.go index f87331d5e09..6037de650a0 100644 --- a/server/api/config.go +++ b/server/api/config.go @@ -181,6 +181,8 @@ func (h *confHandler) updateConfig(cfg *config.Config, key string, value interfa case "label-property": // TODO: support changing label-property case "keyspace": return h.updateKeyspaceConfig(cfg, kp[len(kp)-1], value) + case "micro-service": + return h.updateMicroServiceConfig(cfg, kp[len(kp)-1], value) } return errors.Errorf("config prefix %s not found", kp[0]) } @@ -201,6 +203,22 @@ func (h *confHandler) updateKeyspaceConfig(config *config.Config, key string, va return err } +func (h *confHandler) updateMicroServiceConfig(config *config.Config, key string, value interface{}) error { + updated, found, err := jsonutil.AddKeyValue(&config.MicroService, key, value) + if err != nil { + return err + } + + if !found { + return errors.Errorf("config item %s not found", key) + } + + if updated { + err = h.svr.SetMicroServiceConfig(config.MicroService) + } + return err +} + func (h *confHandler) updateSchedule(config *config.Config, key string, value interface{}) error { updated, found, err := jsonutil.AddKeyValue(&config.Schedule, key, value) if err != nil { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 877a77e549d..9d66f8f4fa7 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -353,7 +353,7 @@ var once sync.Once func (c *RaftCluster) checkServices() { if c.isAPIServiceMode { servers, err := discovery.Discover(c.etcdClient, strconv.FormatUint(c.clusterID, 10), mcsutils.SchedulingServiceName) - if err != nil || len(servers) == 0 { + if c.opt.GetMicroServiceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) { c.startSchedulingJobs(c, c.hbstreams) c.independentServices.Delete(mcsutils.SchedulingServiceName) } else { diff --git a/server/config/config.go b/server/config/config.go index e65648820e5..0fb56eeabd7 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -165,6 +165,8 @@ type Config struct { Keyspace KeyspaceConfig `toml:"keyspace" json:"keyspace"` + MicroService MicroServiceConfig `toml:"micro-service" json:"micro-service"` + Controller rm.ControllerConfig `toml:"controller" json:"controller"` } @@ -252,6 +254,8 @@ const ( defaultCheckRegionSplitInterval = 50 * time.Millisecond minCheckRegionSplitInterval = 1 * time.Millisecond maxCheckRegionSplitInterval = 100 * time.Millisecond + + defaultEnableSchedulingFallback = true ) // Special keys for Labels @@ -464,6 +468,8 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error { c.Keyspace.adjust(configMetaData.Child("keyspace")) + c.MicroService.adjust(configMetaData.Child("micro-service")) + c.Security.Encryption.Adjust() if len(c.Log.Format) == 0 { @@ -853,6 +859,28 @@ func (c *DRAutoSyncReplicationConfig) adjust(meta *configutil.ConfigMetaData) { } } +// MicroServiceConfig is the configuration for micro service. +type MicroServiceConfig struct { + EnableSchedulingFallback bool `toml:"enable-scheduling-fallback" json:"enable-scheduling-fallback,string"` +} + +func (c *MicroServiceConfig) adjust(meta *configutil.ConfigMetaData) { + if !meta.IsDefined("enable-scheduling-fallback") { + c.EnableSchedulingFallback = defaultEnableSchedulingFallback + } +} + +// Clone returns a copy of micro service config. +func (c *MicroServiceConfig) Clone() *MicroServiceConfig { + cfg := *c + return &cfg +} + +// IsSchedulingFallbackEnabled returns whether to enable scheduling service fallback to api service. +func (c *MicroServiceConfig) IsSchedulingFallbackEnabled() bool { + return c.EnableSchedulingFallback +} + // KeyspaceConfig is the configuration for keyspace management. type KeyspaceConfig struct { // PreAlloc contains the keyspace to be allocated during keyspace manager initialization. diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 71d8b67fca8..5942a17022b 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -52,6 +52,7 @@ type PersistOptions struct { replicationMode atomic.Value labelProperty atomic.Value keyspace atomic.Value + microService atomic.Value storeConfig atomic.Value clusterVersion unsafe.Pointer } @@ -65,6 +66,7 @@ func NewPersistOptions(cfg *Config) *PersistOptions { o.replicationMode.Store(&cfg.ReplicationMode) o.labelProperty.Store(cfg.LabelProperty) o.keyspace.Store(&cfg.Keyspace) + o.microService.Store(&cfg.MicroService) // storeConfig will be fetched from TiKV later, // set it to an empty config here first. o.storeConfig.Store(&sc.StoreConfig{}) @@ -133,6 +135,16 @@ func (o *PersistOptions) SetKeyspaceConfig(cfg *KeyspaceConfig) { o.keyspace.Store(cfg) } +// GetMicroServiceConfig returns the micro service configuration. +func (o *PersistOptions) GetMicroServiceConfig() *MicroServiceConfig { + return o.microService.Load().(*MicroServiceConfig) +} + +// SetMicroServiceConfig sets the micro service configuration. +func (o *PersistOptions) SetMicroServiceConfig(cfg *MicroServiceConfig) { + o.microService.Store(cfg) +} + // GetStoreConfig returns the store config. func (o *PersistOptions) GetStoreConfig() *sc.StoreConfig { return o.storeConfig.Load().(*sc.StoreConfig) @@ -774,6 +786,7 @@ func (o *PersistOptions) Persist(storage endpoint.ConfigStorage) error { ReplicationMode: *o.GetReplicationModeConfig(), LabelProperty: o.GetLabelPropertyConfig(), Keyspace: *o.GetKeyspaceConfig(), + MicroService: *o.GetMicroServiceConfig(), ClusterVersion: *o.GetClusterVersion(), }, StoreConfig: *o.GetStoreConfig(), @@ -805,6 +818,7 @@ func (o *PersistOptions) Reload(storage endpoint.ConfigStorage) error { o.replicationMode.Store(&cfg.ReplicationMode) o.labelProperty.Store(cfg.LabelProperty) o.keyspace.Store(&cfg.Keyspace) + o.microService.Store(&cfg.MicroService) o.storeConfig.Store(&cfg.StoreConfig) o.SetClusterVersion(&cfg.ClusterVersion) } diff --git a/server/server.go b/server/server.go index 29f952bb77b..5d52b8dd811 100644 --- a/server/server.go +++ b/server/server.go @@ -922,6 +922,7 @@ func (s *Server) GetConfig() *config.Config { cfg.PDServerCfg = *s.persistOptions.GetPDServerConfig().Clone() cfg.ReplicationMode = *s.persistOptions.GetReplicationModeConfig() cfg.Keyspace = *s.persistOptions.GetKeyspaceConfig().Clone() + cfg.MicroService = *s.persistOptions.GetMicroServiceConfig().Clone() cfg.LabelProperty = s.persistOptions.GetLabelPropertyConfig().Clone() cfg.ClusterVersion = *s.persistOptions.GetClusterVersion() if s.storage == nil { @@ -960,6 +961,27 @@ func (s *Server) SetKeyspaceConfig(cfg config.KeyspaceConfig) error { return nil } +// GetMicroServiceConfig gets the micro service config information. +func (s *Server) GetMicroServiceConfig() *config.MicroServiceConfig { + return s.persistOptions.GetMicroServiceConfig().Clone() +} + +// SetMicroServiceConfig sets the micro service config information. +func (s *Server) SetMicroServiceConfig(cfg config.MicroServiceConfig) error { + old := s.persistOptions.GetMicroServiceConfig() + s.persistOptions.SetMicroServiceConfig(&cfg) + if err := s.persistOptions.Persist(s.storage); err != nil { + s.persistOptions.SetMicroServiceConfig(old) + log.Error("failed to update micro service config", + zap.Reflect("new", cfg), + zap.Reflect("old", old), + errs.ZapError(err)) + return err + } + log.Info("micro service config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old)) + return nil +} + // GetScheduleConfig gets the balance config information. func (s *Server) GetScheduleConfig() *sc.ScheduleConfig { return s.persistOptions.GetScheduleConfig().Clone() diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index 8e3821831b4..1c97e9da8ed 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -207,9 +207,14 @@ func (suite *serverTestSuite) TestForwardStoreHeartbeat() { }) } -func (suite *serverTestSuite) TestDynamicSwitch() { +func (suite *serverTestSuite) TestSchedulingServiceFallback() { re := suite.Require() - // API server will execute scheduling jobs since there is no scheduler server. + leaderServer := suite.pdLeader.GetServer() + conf := leaderServer.GetMicroServiceConfig().Clone() + // Change back to the default value. + conf.EnableSchedulingFallback = true + leaderServer.SetMicroServiceConfig(*conf) + // API server will execute scheduling jobs since there is no scheduling server. testutil.Eventually(re, func() bool { return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() }) @@ -245,6 +250,50 @@ func (suite *serverTestSuite) TestDynamicSwitch() { }) } +func (suite *serverTestSuite) TestDisableSchedulingServiceFallback() { + re := suite.Require() + + // API server will execute scheduling jobs since there is no scheduling server. + testutil.Eventually(re, func() bool { + return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() + }) + leaderServer := suite.pdLeader.GetServer() + // After Disabling scheduling service fallback, the API server will stop scheduling. + conf := leaderServer.GetMicroServiceConfig().Clone() + conf.EnableSchedulingFallback = false + leaderServer.SetMicroServiceConfig(*conf) + testutil.Eventually(re, func() bool { + return !suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() + }) + // Enable scheduling service fallback again, the API server will restart scheduling. + conf.EnableSchedulingFallback = true + leaderServer.SetMicroServiceConfig(*conf) + testutil.Eventually(re, func() bool { + return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() + }) + + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + // After scheduling server is started, API server will not execute scheduling jobs. + testutil.Eventually(re, func() bool { + return !suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() + }) + // Scheduling server is responsible for executing scheduling jobs. + testutil.Eventually(re, func() bool { + return tc.GetPrimaryServer().GetCluster().IsBackgroundJobsRunning() + }) + // Disable scheduling service fallback and stop scheduling server. API server won't execute scheduling jobs again. + conf.EnableSchedulingFallback = false + leaderServer.SetMicroServiceConfig(*conf) + tc.GetPrimaryServer().Close() + time.Sleep(time.Second) + testutil.Eventually(re, func() bool { + return !suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() + }) +} + func (suite *serverTestSuite) TestSchedulerSync() { re := suite.Require() tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) diff --git a/tests/pdctl/config/config_test.go b/tests/pdctl/config/config_test.go index 9c4be639734..167e9f9691d 100644 --- a/tests/pdctl/config/config_test.go +++ b/tests/pdctl/config/config_test.go @@ -1054,6 +1054,37 @@ func (suite *configTestSuite) checkPDServerConfig(cluster *tests.TestCluster) { re.Equal(int(3), conf.FlowRoundByDigit) } +func (suite *configTestSuite) TestMicroServiceConfig() { + env := tests.NewSchedulingTestEnvironment(suite.T()) + env.RunTestInTwoModes(suite.checkMicroServiceConfig) +} + +func (suite *configTestSuite) checkMicroServiceConfig(cluster *tests.TestCluster) { + re := suite.Require() + leaderServer := cluster.GetLeaderServer() + pdAddr := leaderServer.GetAddr() + cmd := pdctlCmd.GetRootCmd() + + store := &metapb.Store{ + Id: 1, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + } + tests.MustPutStore(re, cluster, store) + svr := leaderServer.GetServer() + output, err := pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "show", "all") + re.NoError(err) + cfg := config.Config{} + re.NoError(json.Unmarshal(output, &cfg)) + re.True(svr.GetMicroServiceConfig().EnableSchedulingFallback) + re.True(cfg.MicroService.EnableSchedulingFallback) + // config set enable-scheduling-fallback + args := []string{"-u", pdAddr, "config", "set", "enable-scheduling-fallback", "false"} + _, err = pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + re.False(svr.GetMicroServiceConfig().EnableSchedulingFallback) +} + func assertBundles(re *require.Assertions, a, b []placement.GroupBundle) { re.Len(b, len(a)) for i := 0; i < len(a); i++ { diff --git a/tests/pdctl/operator/operator_test.go b/tests/pdctl/operator/operator_test.go index ec373c663ed..56379148f33 100644 --- a/tests/pdctl/operator/operator_test.go +++ b/tests/pdctl/operator/operator_test.go @@ -214,7 +214,7 @@ func (suite *operatorTestSuite) checkOperator(cluster *tests.TestCluster) { _, err = pdctl.ExecuteCommand(cmd, "config", "set", "enable-placement-rules", "true") re.NoError(err) if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { - // wait for the scheduler server to update the config + // wait for the scheduling server to update the config testutil.Eventually(re, func() bool { return sche.GetCluster().GetCheckerConfig().IsPlacementRulesEnabled() }) diff --git a/tests/server/api/operator_test.go b/tests/server/api/operator_test.go index 2d5d28d5644..88eaee4a548 100644 --- a/tests/server/api/operator_test.go +++ b/tests/server/api/operator_test.go @@ -440,7 +440,7 @@ func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *te err := tu.CheckPostJSON(testDialClient, url, reqData, tu.StatusOK(re)) re.NoError(err) if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { - // wait for the scheduler server to update the config + // wait for the scheduling server to update the config tu.Eventually(re, func() bool { return sche.GetCluster().GetCheckerConfig().IsPlacementRulesEnabled() == testCase.placementRuleEnable }) diff --git a/tests/server/api/rule_test.go b/tests/server/api/rule_test.go index 031077fed78..82eae84978a 100644 --- a/tests/server/api/rule_test.go +++ b/tests/server/api/rule_test.go @@ -1500,7 +1500,7 @@ func (suite *regionRuleTestSuite) checkRegionPlacementRule(cluster *tests.TestCl err = tu.CheckPostJSON(testDialClient, u, reqData, tu.StatusOK(re)) re.NoError(err) if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { - // wait for the scheduler server to update the config + // wait for the scheduling server to update the config tu.Eventually(re, func() bool { return !sche.GetCluster().GetCheckerConfig().IsPlacementRulesEnabled() })