From b36b725c4b977a4345bd1c167e7ea6deaa730daf Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 22 Dec 2023 17:16:56 +0800 Subject: [PATCH] rule: split txn to multi batch (#7600) close tikv/pd#7599 Signed-off-by: lhy1024 Co-authored-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/keyspace/keyspace.go | 17 +- pkg/keyspace/keyspace_test.go | 19 +- pkg/keyspace/tso_keyspace_group.go | 4 +- pkg/keyspace/tso_keyspace_group_test.go | 3 +- pkg/schedule/placement/rule.go | 9 + pkg/schedule/placement/rule_manager.go | 166 ++++++++++-------- pkg/storage/endpoint/rule.go | 19 +- pkg/storage/endpoint/util.go | 6 +- pkg/utils/etcdutil/etcdutil.go | 10 +- .../mcs/tso/keyspace_group_manager_test.go | 6 +- tests/server/api/rule_test.go | 82 ++++++--- 11 files changed, 208 insertions(+), 133 deletions(-) diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index 1607676a37b..d84b3698f69 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -31,6 +31,7 @@ import ( "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/syncutil" "go.uber.org/zap" ) @@ -51,10 +52,6 @@ const ( // Note: Config[TSOKeyspaceGroupIDKey] is only used to judge whether there is keyspace group id. // It will not update the keyspace group id when merging or splitting. TSOKeyspaceGroupIDKey = "tso_keyspace_group_id" - // MaxEtcdTxnOps is the max value of operations in an etcd txn. The default limit of etcd txn op is 128. - // We use 120 here to leave some space for other operations. - // See: https://github.com/etcd-io/etcd/blob/d3e43d4de6f6d9575b489dd7850a85e37e0f6b6c/server/embed/config.go#L61 - MaxEtcdTxnOps = 120 ) // Config is the interface for keyspace config. @@ -710,7 +707,7 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID zap.Duration("cost", time.Since(start)), zap.Uint64("patrolled-keyspace-count", patrolledKeyspaceCount), zap.Uint64("assigned-keyspace-count", assignedKeyspaceCount), - zap.Int("batch-size", MaxEtcdTxnOps), + zap.Int("batch-size", etcdutil.MaxEtcdTxnOps), zap.Uint32("start-keyspace-id", startKeyspaceID), zap.Uint32("end-keyspace-id", endKeyspaceID), zap.Uint32("current-start-id", currentStartID), @@ -734,7 +731,7 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID if defaultKeyspaceGroup.IsMerging() { return ErrKeyspaceGroupInMerging(utils.DefaultKeyspaceGroupID) } - keyspaces, err := manager.store.LoadRangeKeyspace(txn, manager.nextPatrolStartID, MaxEtcdTxnOps) + keyspaces, err := manager.store.LoadRangeKeyspace(txn, manager.nextPatrolStartID, etcdutil.MaxEtcdTxnOps) if err != nil { return err } @@ -744,9 +741,9 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID currentStartID = keyspaces[0].GetId() nextStartID = keyspaces[keyspaceNum-1].GetId() + 1 } - // If there are less than `MaxEtcdTxnOps` keyspaces or the next start ID reaches the end, + // If there are less than ` etcdutil.MaxEtcdTxnOps` keyspaces or the next start ID reaches the end, // there is no need to patrol again. - moreToPatrol = keyspaceNum == MaxEtcdTxnOps + moreToPatrol = keyspaceNum == etcdutil.MaxEtcdTxnOps var ( assigned = false keyspaceIDsToUnlock = make([]uint32, 0, keyspaceNum) @@ -785,7 +782,7 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID err = manager.store.SaveKeyspaceMeta(txn, ks) if err != nil { log.Error("[keyspace] failed to save keyspace meta during patrol", - zap.Int("batch-size", MaxEtcdTxnOps), + zap.Int("batch-size", etcdutil.MaxEtcdTxnOps), zap.Uint32("start-keyspace-id", startKeyspaceID), zap.Uint32("end-keyspace-id", endKeyspaceID), zap.Uint32("current-start-id", currentStartID), @@ -799,7 +796,7 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID err = manager.kgm.store.SaveKeyspaceGroup(txn, defaultKeyspaceGroup) if err != nil { log.Error("[keyspace] failed to save default keyspace group meta during patrol", - zap.Int("batch-size", MaxEtcdTxnOps), + zap.Int("batch-size", etcdutil.MaxEtcdTxnOps), zap.Uint32("start-keyspace-id", startKeyspaceID), zap.Uint32("end-keyspace-id", endKeyspaceID), zap.Uint32("current-start-id", currentStartID), diff --git a/pkg/keyspace/keyspace_test.go b/pkg/keyspace/keyspace_test.go index 552adc8d83e..4bd8dfd5474 100644 --- a/pkg/keyspace/keyspace_test.go +++ b/pkg/keyspace/keyspace_test.go @@ -31,6 +31,7 @@ import ( "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/typeutil" ) @@ -408,7 +409,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignment() { func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() { re := suite.Require() // Create some keyspaces without any keyspace group. - for i := 1; i < MaxEtcdTxnOps*2+1; i++ { + for i := 1; i < etcdutil.MaxEtcdTxnOps*2+1; i++ { now := time.Now().Unix() err := suite.manager.saveNewKeyspace(&keyspacepb.KeyspaceMeta{ Id: uint32(i), @@ -423,7 +424,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() { defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID) re.NoError(err) re.NotNil(defaultKeyspaceGroup) - for i := 1; i < MaxEtcdTxnOps*2+1; i++ { + for i := 1; i < etcdutil.MaxEtcdTxnOps*2+1; i++ { re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(i)) } // Patrol the keyspace assignment. @@ -433,7 +434,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() { defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID) re.NoError(err) re.NotNil(defaultKeyspaceGroup) - for i := 1; i < MaxEtcdTxnOps*2+1; i++ { + for i := 1; i < etcdutil.MaxEtcdTxnOps*2+1; i++ { re.Contains(defaultKeyspaceGroup.Keyspaces, uint32(i)) } } @@ -441,7 +442,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() { func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentWithRange() { re := suite.Require() // Create some keyspaces without any keyspace group. - for i := 1; i < MaxEtcdTxnOps*2+1; i++ { + for i := 1; i < etcdutil.MaxEtcdTxnOps*2+1; i++ { now := time.Now().Unix() err := suite.manager.saveNewKeyspace(&keyspacepb.KeyspaceMeta{ Id: uint32(i), @@ -456,14 +457,14 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentWithRange() { defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID) re.NoError(err) re.NotNil(defaultKeyspaceGroup) - for i := 1; i < MaxEtcdTxnOps*2+1; i++ { + for i := 1; i < etcdutil.MaxEtcdTxnOps*2+1; i++ { re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(i)) } - // Patrol the keyspace assignment with range [MaxEtcdTxnOps/2, MaxEtcdTxnOps/2+MaxEtcdTxnOps+1] + // Patrol the keyspace assignment with range [ etcdutil.MaxEtcdTxnOps/2, etcdutil.MaxEtcdTxnOps/2+ etcdutil.MaxEtcdTxnOps+1] // to make sure the range crossing the boundary of etcd transaction operation limit. var ( - startKeyspaceID = uint32(MaxEtcdTxnOps / 2) - endKeyspaceID = startKeyspaceID + MaxEtcdTxnOps + 1 + startKeyspaceID = uint32(etcdutil.MaxEtcdTxnOps / 2) + endKeyspaceID = startKeyspaceID + etcdutil.MaxEtcdTxnOps + 1 ) err = suite.manager.PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID) re.NoError(err) @@ -471,7 +472,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentWithRange() { defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID) re.NoError(err) re.NotNil(defaultKeyspaceGroup) - for i := 1; i < MaxEtcdTxnOps*2+1; i++ { + for i := 1; i < etcdutil.MaxEtcdTxnOps*2+1; i++ { keyspaceID := uint32(i) if keyspaceID >= startKeyspaceID && keyspaceID <= endKeyspaceID { re.Contains(defaultKeyspaceGroup.Keyspaces, keyspaceID) diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index 51a53f75cc2..55c9adf66d9 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -915,7 +915,7 @@ func (m *GroupManager) MergeKeyspaceGroups(mergeTargetID uint32, mergeList []uin // - Load and delete the keyspace groups in the merge list. // - Load and update the target keyspace group. // So we pre-check the number of operations to avoid exceeding the maximum number of etcd transaction. - if (mergeListNum+1)*2 > MaxEtcdTxnOps { + if (mergeListNum+1)*2 > etcdutil.MaxEtcdTxnOps { return ErrExceedMaxEtcdTxnOps } if slice.Contains(mergeList, utils.DefaultKeyspaceGroupID) { @@ -1062,7 +1062,7 @@ func (m *GroupManager) MergeAllIntoDefaultKeyspaceGroup() error { continue } var ( - maxBatchSize = MaxEtcdTxnOps/2 - 1 + maxBatchSize = etcdutil.MaxEtcdTxnOps/2 - 1 groupsToMerge = make([]uint32, 0, maxBatchSize) ) for idx, group := range groups.GetAll() { diff --git a/pkg/keyspace/tso_keyspace_group_test.go b/pkg/keyspace/tso_keyspace_group_test.go index 2dec780c3c8..b5df85c56a8 100644 --- a/pkg/keyspace/tso_keyspace_group_test.go +++ b/pkg/keyspace/tso_keyspace_group_test.go @@ -28,6 +28,7 @@ import ( "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/etcdutil" ) type keyspaceGroupTestSuite struct { @@ -449,7 +450,7 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupMerge() { err = suite.kgm.MergeKeyspaceGroups(4, []uint32{5}) re.ErrorContains(err, ErrKeyspaceGroupNotExists(5).Error()) // merge with the number of keyspace groups exceeds the limit - err = suite.kgm.MergeKeyspaceGroups(1, make([]uint32, MaxEtcdTxnOps/2)) + err = suite.kgm.MergeKeyspaceGroups(1, make([]uint32, etcdutil.MaxEtcdTxnOps/2)) re.ErrorIs(err, ErrExceedMaxEtcdTxnOps) // merge the default keyspace group err = suite.kgm.MergeKeyspaceGroups(1, []uint32{utils.DefaultKeyspaceGroupID}) diff --git a/pkg/schedule/placement/rule.go b/pkg/schedule/placement/rule.go index 7e57866512d..75ccd509ee8 100644 --- a/pkg/schedule/placement/rule.go +++ b/pkg/schedule/placement/rule.go @@ -139,6 +139,15 @@ func (g *RuleGroup) String() string { return string(b) } +// Clone returns a copy of RuleGroup. +func (g *RuleGroup) Clone() *RuleGroup { + return &RuleGroup{ + ID: g.ID, + Index: g.Index, + Override: g.Override, + } +} + // Rules are ordered by (GroupID, Index, ID). func compareRule(a, b *Rule) int { switch { diff --git a/pkg/schedule/placement/rule_manager.go b/pkg/schedule/placement/rule_manager.go index ea85911462b..0a66d82e865 100644 --- a/pkg/schedule/placement/rule_manager.go +++ b/pkg/schedule/placement/rule_manager.go @@ -34,6 +34,7 @@ import ( "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/syncutil" "go.uber.org/zap" "golang.org/x/exp/slices" @@ -157,61 +158,61 @@ func (m *RuleManager) loadRules() error { toSave []*Rule toDelete []string ) - return m.storage.RunInTxn(m.ctx, func(txn kv.Txn) (err error) { - err = m.storage.LoadRules(txn, func(k, v string) { - r, err := NewRuleFromJSON([]byte(v)) - if err != nil { - log.Error("failed to unmarshal rule value", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule)) - toDelete = append(toDelete, k) - return - } - err = m.AdjustRule(r, "") - if err != nil { - log.Error("rule is in bad format", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule, err)) - toDelete = append(toDelete, k) - return - } - _, ok := m.ruleConfig.rules[r.Key()] - if ok { - log.Error("duplicated rule key", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule)) - toDelete = append(toDelete, k) - return - } - if k != r.StoreKey() { - log.Error("mismatch data key, need to restore", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule)) - toDelete = append(toDelete, k) - toSave = append(toSave, r) - } - m.ruleConfig.rules[r.Key()] = r - }) + // load rules from storage + err := m.storage.LoadRules(func(k, v string) { + r, err := NewRuleFromJSON([]byte(v)) if err != nil { - return err + log.Error("failed to unmarshal rule value", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule)) + toDelete = append(toDelete, k) + return } - - for _, s := range toSave { - if err = m.storage.SaveRule(txn, s.StoreKey(), s); err != nil { - return err - } + err = m.AdjustRule(r, "") + if err != nil { + log.Error("rule is in bad format", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule, err)) + toDelete = append(toDelete, k) + return } - for _, d := range toDelete { - if err = m.storage.DeleteRule(txn, d); err != nil { - return err - } + _, ok := m.ruleConfig.rules[r.Key()] + if ok { + log.Error("duplicated rule key", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule)) + toDelete = append(toDelete, k) + return } - return nil + if k != r.StoreKey() { + log.Error("mismatch data key, need to restore", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule)) + toDelete = append(toDelete, k) + toSave = append(toSave, r) + } + m.ruleConfig.rules[r.Key()] = r }) + if err != nil { + return err + } + // save the rules with mismatch data key or bad format + var batch []func(kv.Txn) error + for _, s := range toSave { + localRule := s + batch = append(batch, func(txn kv.Txn) error { + return m.storage.SaveRule(txn, localRule.StoreKey(), localRule) + }) + } + for _, d := range toDelete { + localKey := d + batch = append(batch, func(txn kv.Txn) error { + return m.storage.DeleteRule(txn, localKey) + }) + } + return m.runBatchOpInTxn(batch) } func (m *RuleManager) loadGroups() error { - return m.storage.RunInTxn(m.ctx, func(txn kv.Txn) (err error) { - return m.storage.LoadRuleGroups(txn, func(k, v string) { - g, err := NewRuleGroupFromJSON([]byte(v)) - if err != nil { - log.Error("failed to unmarshal rule group", zap.String("group-id", k), errs.ZapError(errs.ErrLoadRuleGroup, err)) - return - } - m.ruleConfig.groups[g.ID] = g - }) + return m.storage.LoadRuleGroups(func(k, v string) { + g, err := NewRuleGroupFromJSON([]byte(v)) + if err != nil { + log.Error("failed to unmarshal rule group", zap.String("group-id", k), errs.ZapError(errs.ErrLoadRuleGroup, err)) + return + } + m.ruleConfig.groups[g.ID] = g }) } @@ -492,30 +493,35 @@ func (m *RuleManager) TryCommitPatch(patch *RuleConfigPatch) error { } func (m *RuleManager) savePatch(p *ruleConfig) error { - return m.storage.RunInTxn(m.ctx, func(txn kv.Txn) (err error) { - for key, r := range p.rules { - if r == nil { - r = &Rule{GroupID: key[0], ID: key[1]} - err = m.storage.DeleteRule(txn, r.StoreKey()) - } else { - err = m.storage.SaveRule(txn, r.StoreKey(), r) - } - if err != nil { - return err - } + var batch []func(kv.Txn) error + // add rules to batch + for key, r := range p.rules { + localKey, localRule := key, r + if r == nil { + rule := &Rule{GroupID: localKey[0], ID: localKey[1]} + batch = append(batch, func(txn kv.Txn) error { + return m.storage.DeleteRule(txn, rule.StoreKey()) + }) + } else { + batch = append(batch, func(txn kv.Txn) error { + return m.storage.SaveRule(txn, localRule.StoreKey(), localRule) + }) } - for id, g := range p.groups { - if g.isDefault() { - err = m.storage.DeleteRuleGroup(txn, id) - } else { - err = m.storage.SaveRuleGroup(txn, id, g) - } - if err != nil { - return err - } + } + // add groups to batch + for id, g := range p.groups { + localID, localGroup := id, g + if g.isDefault() { + batch = append(batch, func(txn kv.Txn) error { + return m.storage.DeleteRuleGroup(txn, localID) + }) + } else { + batch = append(batch, func(txn kv.Txn) error { + return m.storage.SaveRuleGroup(txn, localID, localGroup) + }) } - return nil - }) + } + return m.runBatchOpInTxn(batch) } // SetRules inserts or updates lots of Rules at once. @@ -808,6 +814,28 @@ func (m *RuleManager) IsInitialized() bool { return m.initialized } +func (m *RuleManager) runBatchOpInTxn(batch []func(kv.Txn) error) error { + // execute batch in transaction with limited operations per transaction + for start := 0; start < len(batch); start += etcdutil.MaxEtcdTxnOps { + end := start + etcdutil.MaxEtcdTxnOps + if end > len(batch) { + end = len(batch) + } + err := m.storage.RunInTxn(m.ctx, func(txn kv.Txn) (err error) { + for _, op := range batch[start:end] { + if err = op(txn); err != nil { + return err + } + } + return nil + }) + if err != nil { + return err + } + } + return nil +} + // checkRule check the rule whether will have RuleFit after FitRegion // in order to reduce the calculation. func checkRule(rule *Rule, stores []*core.StoreInfo) bool { diff --git a/pkg/storage/endpoint/rule.go b/pkg/storage/endpoint/rule.go index b18360040ea..ad245f527bb 100644 --- a/pkg/storage/endpoint/rule.go +++ b/pkg/storage/endpoint/rule.go @@ -22,14 +22,17 @@ import ( // RuleStorage defines the storage operations on the rule. type RuleStorage interface { - LoadRules(txn kv.Txn, f func(k, v string)) error + // Load in txn is unnecessary and may cause txn too large. + // because scheduling server will load rules from etcd rather than watching. + LoadRule(ruleKey string) (string, error) + LoadRules(f func(k, v string)) error + LoadRuleGroups(f func(k, v string)) error + // We need to use txn to avoid concurrent modification. + // And it is helpful for the scheduling server to watch the rule. SaveRule(txn kv.Txn, ruleKey string, rule interface{}) error DeleteRule(txn kv.Txn, ruleKey string) error - LoadRuleGroups(txn kv.Txn, f func(k, v string)) error SaveRuleGroup(txn kv.Txn, groupID string, group interface{}) error DeleteRuleGroup(txn kv.Txn, groupID string) error - // LoadRule is used only in rule watcher. - LoadRule(ruleKey string) (string, error) LoadRegionRules(f func(k, v string)) error SaveRegionRule(ruleKey string, rule interface{}) error @@ -50,8 +53,8 @@ func (se *StorageEndpoint) DeleteRule(txn kv.Txn, ruleKey string) error { } // LoadRuleGroups loads all rule groups from storage. -func (se *StorageEndpoint) LoadRuleGroups(txn kv.Txn, f func(k, v string)) error { - return loadRangeByPrefixInTxn(txn, ruleGroupPath+"/", f) +func (se *StorageEndpoint) LoadRuleGroups(f func(k, v string)) error { + return se.loadRangeByPrefix(ruleGroupPath+"/", f) } // SaveRuleGroup stores a rule group config to storage. @@ -85,6 +88,6 @@ func (se *StorageEndpoint) LoadRule(ruleKey string) (string, error) { } // LoadRules loads placement rules from storage. -func (se *StorageEndpoint) LoadRules(txn kv.Txn, f func(k, v string)) error { - return loadRangeByPrefixInTxn(txn, rulesPath+"/", f) +func (se *StorageEndpoint) LoadRules(f func(k, v string)) error { + return se.loadRangeByPrefix(rulesPath+"/", f) } diff --git a/pkg/storage/endpoint/util.go b/pkg/storage/endpoint/util.go index 3058c059628..62b170a1a8e 100644 --- a/pkg/storage/endpoint/util.go +++ b/pkg/storage/endpoint/util.go @@ -58,14 +58,10 @@ func saveJSONInTxn(txn kv.Txn, key string, data interface{}) error { // loadRangeByPrefix iterates all key-value pairs in the storage that has the prefix. func (se *StorageEndpoint) loadRangeByPrefix(prefix string, f func(k, v string)) error { - return loadRangeByPrefixInTxn(se /* use the same interface */, prefix, f) -} - -func loadRangeByPrefixInTxn(txn kv.Txn, prefix string, f func(k, v string)) error { nextKey := prefix endKey := clientv3.GetPrefixRangeEnd(prefix) for { - keys, values, err := txn.LoadRange(nextKey, endKey, MinKVRangeLimit) + keys, values, err := se.LoadRange(nextKey, endKey, MinKVRangeLimit) if err != nil { return err } diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index f6beafee511..11c640fe4ef 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -64,6 +64,11 @@ const ( DefaultSlowRequestTime = time.Second healthyPath = "health" + + // MaxEtcdTxnOps is the max value of operations in an etcd txn. The default limit of etcd txn op is 128. + // We use 120 here to leave some space for other operations. + // See: https://github.com/etcd-io/etcd/blob/d3e43d4de6f6d9575b489dd7850a85e37e0f6b6c/server/embed/config.go#L61 + MaxEtcdTxnOps = 120 ) // CheckClusterID checks etcd cluster ID, returns an error if mismatch. @@ -776,7 +781,10 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision revision, err = lw.load(ctx) if err != nil { log.Warn("force load key failed in watch loop", - zap.String("name", lw.name), zap.String("key", lw.key), zap.Error(err)) + zap.String("name", lw.name), zap.String("key", lw.key), zap.Int64("revision", revision), zap.Error(err)) + } else { + log.Info("force load key successfully in watch loop", + zap.String("name", lw.name), zap.String("key", lw.key), zap.Int64("revision", revision)) } continue case <-ticker.C: diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index d1a4cf35db4..52248086249 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -30,11 +30,11 @@ import ( pd "github.com/tikv/pd/client" "github.com/tikv/pd/pkg/election" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/keyspace" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/storage/endpoint" tsopkg "github.com/tikv/pd/pkg/tso" + "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/server/apiv2/handlers" @@ -761,7 +761,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspaceGroupMergeIntoDefault re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) var ( - keyspaceGroupNum = keyspace.MaxEtcdTxnOps + keyspaceGroupNum = etcdutil.MaxEtcdTxnOps keyspaceGroups = make([]*endpoint.KeyspaceGroup, 0, keyspaceGroupNum) keyspaces = make([]uint32, 0, keyspaceGroupNum) ) @@ -772,7 +772,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspaceGroupMergeIntoDefault Keyspaces: []uint32{uint32(i)}, }) keyspaces = append(keyspaces, uint32(i)) - if len(keyspaceGroups) < keyspace.MaxEtcdTxnOps/2 && i != keyspaceGroupNum { + if len(keyspaceGroups) < etcdutil.MaxEtcdTxnOps/2 && i != keyspaceGroupNum { continue } handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ diff --git a/tests/server/api/rule_test.go b/tests/server/api/rule_test.go index 14bb23e383a..9819e821d29 100644 --- a/tests/server/api/rule_test.go +++ b/tests/server/api/rule_test.go @@ -32,6 +32,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/syncutil" tu "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server/config" @@ -504,8 +505,7 @@ func (suite *ruleTestSuite) checkGetAllByRegion(cluster *tests.TestCluster) { } func (suite *ruleTestSuite) TestGetAllByKey() { - // Fixme: after delete+set rule, the key range will be empty, so the test will fail in api mode. - suite.env.RunTestInPDMode(suite.checkGetAllByKey) + suite.env.RunTestInTwoModes(suite.checkGetAllByKey) } func (suite *ruleTestSuite) checkGetAllByKey(cluster *tests.TestCluster) { @@ -1035,7 +1035,6 @@ func (suite *ruleTestSuite) TestDeleteAndUpdate() { } func (suite *ruleTestSuite) checkDeleteAndUpdate(cluster *tests.TestCluster) { - re := suite.Require() leaderServer := cluster.GetLeaderServer() pdAddr := leaderServer.GetAddr() urlPrefix := fmt.Sprintf("%s%s/api/v1", pdAddr, apiPrefix) @@ -1108,28 +1107,7 @@ func (suite *ruleTestSuite) checkDeleteAndUpdate(cluster *tests.TestCluster) { } for _, bundle := range bundles { - data, err := json.Marshal(bundle) - re.NoError(err) - err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) - re.NoError(err) - - tu.Eventually(re, func() bool { - respBundle := make([]placement.GroupBundle, 0) - err = tu.CheckGetJSON(testDialClient, urlPrefix+"/config/placement-rule", nil, - tu.StatusOK(re), tu.ExtractJSON(re, &respBundle)) - re.NoError(err) - if len(respBundle) != len(bundle) { - return false - } - sort.Slice(respBundle, func(i, j int) bool { return respBundle[i].ID < respBundle[j].ID }) - sort.Slice(bundle, func(i, j int) bool { return bundle[i].ID < bundle[j].ID }) - for i := range respBundle { - if !suite.compareBundle(respBundle[i], bundle[i]) { - return false - } - } - return true - }) + suite.postAndCheckRuleBundle(urlPrefix, bundle) } } @@ -1222,6 +1200,34 @@ func (suite *ruleTestSuite) checkConcurrencyWith(cluster *tests.TestCluster, }) } +func (suite *ruleTestSuite) TestLargeRules() { + suite.env.RunTestInTwoModes(suite.checkLargeRules) +} + +func (suite *ruleTestSuite) checkLargeRules(cluster *tests.TestCluster) { + leaderServer := cluster.GetLeaderServer() + pdAddr := leaderServer.GetAddr() + urlPrefix := fmt.Sprintf("%s%s/api/v1", pdAddr, apiPrefix) + genBundlesWithRulesNum := func(num int) []placement.GroupBundle { + bundle := []placement.GroupBundle{ + { + ID: "1", + Index: 1, + Rules: make([]*placement.Rule, 0), + }, + } + for i := 0; i < num; i++ { + bundle[0].Rules = append(bundle[0].Rules, &placement.Rule{ + ID: strconv.Itoa(i), Index: i, Role: placement.Voter, Count: 1, GroupID: "1", + StartKey: []byte(strconv.Itoa(i)), EndKey: []byte(strconv.Itoa(i + 1)), + }) + } + return bundle + } + suite.postAndCheckRuleBundle(urlPrefix, genBundlesWithRulesNum(etcdutil.MaxEtcdTxnOps/2)) + suite.postAndCheckRuleBundle(urlPrefix, genBundlesWithRulesNum(etcdutil.MaxEtcdTxnOps*2)) +} + func (suite *ruleTestSuite) assertBundleEqual(re *require.Assertions, b1, b2 placement.GroupBundle) { tu.Eventually(re, func() bool { return suite.compareBundle(b1, b2) @@ -1251,6 +1257,32 @@ func (suite *ruleTestSuite) compareRule(r1 *placement.Rule, r2 *placement.Rule) r2.Count == r1.Count } +func (suite *ruleTestSuite) postAndCheckRuleBundle(urlPrefix string, bundle []placement.GroupBundle) { + re := suite.Require() + data, err := json.Marshal(bundle) + re.NoError(err) + err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) + re.NoError(err) + + tu.Eventually(re, func() bool { + respBundle := make([]placement.GroupBundle, 0) + err = tu.CheckGetJSON(testDialClient, urlPrefix+"/config/placement-rule", nil, + tu.StatusOK(re), tu.ExtractJSON(re, &respBundle)) + re.NoError(err) + if len(respBundle) != len(bundle) { + return false + } + sort.Slice(respBundle, func(i, j int) bool { return respBundle[i].ID < respBundle[j].ID }) + sort.Slice(bundle, func(i, j int) bool { return bundle[i].ID < bundle[j].ID }) + for i := range respBundle { + if !suite.compareBundle(respBundle[i], bundle[i]) { + return false + } + } + return true + }) +} + type regionRuleTestSuite struct { suite.Suite env *tests.SchedulingTestEnvironment