diff --git a/errors.toml b/errors.toml index a7e039564d7..9bfd4a79190 100644 --- a/errors.toml +++ b/errors.toml @@ -901,16 +901,6 @@ error = ''' reset user timestamp failed, %s ''' -["PD:tso:ErrSetLocalTSOConfig"] -error = ''' -set local tso config failed, %s -''' - -["PD:tso:ErrSyncMaxTS"] -error = ''' -sync max ts failed, %s -''' - ["PD:tso:ErrUpdateTimestamp"] error = ''' update timestamp failed, %s diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index 29065c7c13d..e5c23cffde2 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -40,10 +40,8 @@ var ( // tso errors var ( - ErrSetLocalTSOConfig = errors.Normalize("set local tso config failed, %s", errors.RFCCodeText("PD:tso:ErrSetLocalTSOConfig")) ErrGetAllocator = errors.Normalize("get allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocator")) ErrGetLocalAllocator = errors.Normalize("get local allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetLocalAllocator")) - ErrSyncMaxTS = errors.Normalize("sync max ts failed, %s", errors.RFCCodeText("PD:tso:ErrSyncMaxTS")) ErrResetUserTimestamp = errors.Normalize("reset user timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrResetUserTimestamp")) ErrGenerateTimestamp = errors.Normalize("generate timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrGenerateTimestamp")) ErrUpdateTimestamp = errors.Normalize("update timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrUpdateTimestamp")) diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 821e1fc5d62..ea1eeeeb18a 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -33,7 +33,6 @@ import ( "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/storage/endpoint" - "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/grpcutil" "github.com/tikv/pd/pkg/utils/logutil" @@ -315,64 +314,6 @@ func (am *AllocatorManager) GetMember() ElectionMember { return am.member } -// SetLocalTSOConfig receives the zone label of this PD server and write it into etcd as dc-location -// to make the whole cluster know the DC-level topology for later Local TSO Allocator campaign. -func (am *AllocatorManager) SetLocalTSOConfig(dcLocation string) error { - serverName := am.member.Name() - serverID := am.member.ID() - if err := am.checkDCLocationUpperLimit(dcLocation); err != nil { - log.Error("check dc-location upper limit failed", - logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID != 0), - zap.Int("upper-limit", int(math.Pow(2, MaxSuffixBits))-1), - zap.String("dc-location", dcLocation), - zap.String("server-name", serverName), - zap.Uint64("server-id", serverID), - errs.ZapError(err)) - return err - } - // The key-value pair in etcd will be like: serverID -> dcLocation - dcLocationKey := am.member.GetDCLocationPath(serverID) - resp, err := kv. - NewSlowLogTxn(am.member.Client()). - Then(clientv3.OpPut(dcLocationKey, dcLocation)). - Commit() - if err != nil { - return errs.ErrEtcdTxnInternal.Wrap(err).GenWithStackByCause() - } - if !resp.Succeeded { - log.Warn("write dc-location configuration into etcd failed", - logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), - zap.String("dc-location", dcLocation), - zap.String("server-name", serverName), - zap.Uint64("server-id", serverID)) - return errs.ErrEtcdTxnConflict.FastGenByArgs() - } - log.Info("write dc-location configuration into etcd", - logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), - zap.String("dc-location", dcLocation), - zap.String("server-name", serverName), - zap.Uint64("server-id", serverID)) - go am.ClusterDCLocationChecker() - return nil -} - -func (am *AllocatorManager) checkDCLocationUpperLimit(dcLocation string) error { - clusterDCLocations, err := am.GetClusterDCLocationsFromEtcd() - if err != nil { - return err - } - // It's ok to add a new PD to the old dc-location. - if _, ok := clusterDCLocations[dcLocation]; ok { - return nil - } - // Check whether the dc-location number meets the upper limit 2**(LogicalBits-1)-1, - // which includes 1 global and 2**(LogicalBits-1) local - if len(clusterDCLocations) == int(math.Pow(2, MaxSuffixBits))-1 { - return errs.ErrSetLocalTSOConfig.FastGenByArgs("the number of dc-location meets the upper limit") - } - return nil -} - // GetClusterDCLocationsFromEtcd fetches dcLocation topology from etcd func (am *AllocatorManager) GetClusterDCLocationsFromEtcd() (clusterDCLocations map[string][]uint64, err error) { resp, err := etcdutil.EtcdKVGet( @@ -413,26 +354,6 @@ func (am *AllocatorManager) GetDCLocationInfo(dcLocation string) (DCLocationInfo return infoPtr.clone(), true } -// CleanUpDCLocation cleans up certain server's DCLocationInfo -func (am *AllocatorManager) CleanUpDCLocation() error { - serverID := am.member.ID() - dcLocationKey := am.member.GetDCLocationPath(serverID) - // remove dcLocationKey from etcd - if resp, err := kv. - NewSlowLogTxn(am.member.Client()). - Then(clientv3.OpDelete(dcLocationKey)). - Commit(); err != nil { - return errs.ErrEtcdTxnInternal.Wrap(err).GenWithStackByCause() - } else if !resp.Succeeded { - return errs.ErrEtcdTxnConflict.FastGenByArgs() - } - log.Info("delete the dc-location key previously written in etcd", - logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), - zap.Uint64("server-id", serverID)) - go am.ClusterDCLocationChecker() - return nil -} - // GetClusterDCLocations returns all dc-locations of a cluster with a copy of map, // which satisfies dcLocation -> DCLocationInfo. func (am *AllocatorManager) GetClusterDCLocations() map[string]DCLocationInfo { @@ -467,20 +388,6 @@ func CalSuffixBits(maxSuffix int32) int { return int(math.Ceil(math.Log2(float64(maxSuffix + 1)))) } -func (am *AllocatorManager) getAllocatorPath(dcLocation string) string { - // For backward compatibility, the global timestamp's store path will still use the old one - if dcLocation == GlobalDCLocation { - return am.rootPath - } - return path.Join(am.getLocalTSOAllocatorPath(), dcLocation) -} - -// Add a prefix to the root path to prevent being conflicted -// with other system key paths such as leader, member, alloc_id, raft, etc. -func (am *AllocatorManager) getLocalTSOAllocatorPath() string { - return path.Join(am.rootPath, localTSOAllocatorEtcdPrefix) -} - // AllocatorDaemon is used to update every allocator's TSO and check whether we have // any new local allocator that needs to be set up. func (am *AllocatorManager) AllocatorDaemon(ctx context.Context) { @@ -491,19 +398,12 @@ func (am *AllocatorManager) AllocatorDaemon(ctx context.Context) { tsTicker.Reset(time.Millisecond) }) defer tsTicker.Stop() - checkerTicker := time.NewTicker(PriorityCheck) - defer checkerTicker.Stop() for { select { case <-tsTicker.C: // Update the initialized TSO Allocator to advance TSO. am.allocatorUpdater() - case <-checkerTicker.C: - // Check and maintain the cluster's meta info about dc-location distribution. - go am.ClusterDCLocationChecker() - // PS: ClusterDCLocationChecker are time consuming and low frequent to run, - // we should run them concurrently to speed up the progress. case <-ctx.Done(): log.Info("exit allocator daemon", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0)) return @@ -554,188 +454,6 @@ func (am *AllocatorManager) updateAllocator(ag *allocatorGroup) { } } -// ClusterDCLocationChecker collects all dc-locations of a cluster, computes some related info -// and stores them into the DCLocationInfo, then finally writes them into am.mu.clusterDCLocations. -func (am *AllocatorManager) ClusterDCLocationChecker() { - defer logutil.LogPanic() - // Wait for the group leader to be elected out. - if !am.member.IsLeaderElected() { - return - } - newClusterDCLocations, err := am.GetClusterDCLocationsFromEtcd() - if err != nil { - log.Error("get cluster dc-locations from etcd failed", - logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), - errs.ZapError(err)) - return - } - am.mu.Lock() - // Clean up the useless dc-locations - for dcLocation := range am.mu.clusterDCLocations { - if _, ok := newClusterDCLocations[dcLocation]; !ok { - delete(am.mu.clusterDCLocations, dcLocation) - } - } - // May be used to rollback the updating after - newDCLocations := make([]string, 0) - // Update the new dc-locations - for dcLocation, serverIDs := range newClusterDCLocations { - if _, ok := am.mu.clusterDCLocations[dcLocation]; !ok { - am.mu.clusterDCLocations[dcLocation] = &DCLocationInfo{ - ServerIDs: serverIDs, - Suffix: -1, - } - newDCLocations = append(newDCLocations, dcLocation) - } - } - // Only leader can write the TSO suffix to etcd in order to make it consistent in the cluster - if am.IsLeader() { - for dcLocation, info := range am.mu.clusterDCLocations { - if info.Suffix > 0 { - continue - } - suffix, err := am.getOrCreateLocalTSOSuffix(dcLocation) - if err != nil { - log.Warn("get or create the local tso suffix failed", - logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), - zap.String("dc-location", dcLocation), - errs.ZapError(err)) - continue - } - if suffix > am.mu.maxSuffix { - am.mu.maxSuffix = suffix - } - am.mu.clusterDCLocations[dcLocation].Suffix = suffix - } - } else { - // Follower should check and update the am.mu.maxSuffix - maxSuffix, err := am.getMaxLocalTSOSuffix() - if err != nil { - log.Error("get the max local tso suffix from etcd failed", - logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), - errs.ZapError(err)) - // Rollback the new dc-locations we update before - for _, dcLocation := range newDCLocations { - delete(am.mu.clusterDCLocations, dcLocation) - } - } else if maxSuffix > am.mu.maxSuffix { - am.mu.maxSuffix = maxSuffix - } - } - am.mu.Unlock() -} - -// getOrCreateLocalTSOSuffix will check whether we have the Local TSO suffix written into etcd. -// If not, it will write a number into etcd according to the its joining order. -// If yes, it will just return the previous persisted one. -func (am *AllocatorManager) getOrCreateLocalTSOSuffix(dcLocation string) (int32, error) { - // Try to get the suffix from etcd - dcLocationSuffix, err := am.getDCLocationSuffixMapFromEtcd() - if err != nil { - return -1, nil - } - var maxSuffix int32 - for curDCLocation, suffix := range dcLocationSuffix { - // If we already have the suffix persisted in etcd before, - // just use it as the result directly. - if curDCLocation == dcLocation { - return suffix, nil - } - if suffix > maxSuffix { - maxSuffix = suffix - } - } - maxSuffix++ - localTSOSuffixKey := am.GetLocalTSOSuffixPath(dcLocation) - // The Local TSO suffix is determined by the joining order of this dc-location. - localTSOSuffixValue := strconv.FormatInt(int64(maxSuffix), 10) - txnResp, err := kv.NewSlowLogTxn(am.member.Client()). - If(clientv3.Compare(clientv3.CreateRevision(localTSOSuffixKey), "=", 0)). - Then(clientv3.OpPut(localTSOSuffixKey, localTSOSuffixValue)). - Commit() - if err != nil { - return -1, errs.ErrEtcdTxnInternal.Wrap(err).GenWithStackByCause() - } - if !txnResp.Succeeded { - log.Warn("write local tso suffix into etcd failed", - logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), - zap.String("dc-location", dcLocation), - zap.String("local-tso-suffix", localTSOSuffixValue), - zap.String("server-name", am.member.Name()), - zap.Uint64("server-id", am.member.ID())) - return -1, errs.ErrEtcdTxnConflict.FastGenByArgs() - } - return maxSuffix, nil -} - -func (am *AllocatorManager) getDCLocationSuffixMapFromEtcd() (map[string]int32, error) { - resp, err := etcdutil.EtcdKVGet( - am.member.Client(), - am.GetLocalTSOSuffixPathPrefix(), - clientv3.WithPrefix()) - if err != nil { - return nil, err - } - dcLocationSuffix := make(map[string]int32) - for _, kv := range resp.Kvs { - suffix, err := strconv.ParseInt(string(kv.Value), 10, 32) - if err != nil { - return nil, err - } - splitKey := strings.Split(string(kv.Key), "/") - dcLocation := splitKey[len(splitKey)-1] - dcLocationSuffix[dcLocation] = int32(suffix) - } - return dcLocationSuffix, nil -} - -func (am *AllocatorManager) getMaxLocalTSOSuffix() (int32, error) { - // Try to get the suffix from etcd - dcLocationSuffix, err := am.getDCLocationSuffixMapFromEtcd() - if err != nil { - return -1, err - } - var maxSuffix int32 - for _, suffix := range dcLocationSuffix { - if suffix > maxSuffix { - maxSuffix = suffix - } - } - return maxSuffix, nil -} - -// GetLocalTSOSuffixPathPrefix returns the etcd key prefix of the Local TSO suffix for the given dc-location. -func (am *AllocatorManager) GetLocalTSOSuffixPathPrefix() string { - return path.Join(am.rootPath, localTSOSuffixEtcdPrefix) -} - -// GetLocalTSOSuffixPath returns the etcd key of the Local TSO suffix for the given dc-location. -func (am *AllocatorManager) GetLocalTSOSuffixPath(dcLocation string) string { - return path.Join(am.GetLocalTSOSuffixPathPrefix(), dcLocation) -} - -// TransferAllocatorForDCLocation transfer local tso allocator to the target member for the given dcLocation -func (am *AllocatorManager) TransferAllocatorForDCLocation(dcLocation string, memberID uint64) error { - if dcLocation == GlobalDCLocation { - return fmt.Errorf("dc-location %v should be transferred by transfer leader", dcLocation) - } - dcLocationsInfo := am.GetClusterDCLocations() - _, ok := dcLocationsInfo[dcLocation] - if !ok { - return fmt.Errorf("dc-location %v haven't been discovered yet", dcLocation) - } - allocator, err := am.GetAllocator(dcLocation) - if err != nil { - return err - } - localTSOAllocator, _ := allocator.(*LocalTSOAllocator) - leaderServerID := localTSOAllocator.GetAllocatorLeader().GetMemberId() - if leaderServerID == memberID { - return nil - } - return am.transferLocalAllocator(dcLocation, memberID) -} - // HandleRequest forwards TSO allocation requests to correct TSO Allocators. func (am *AllocatorManager) HandleRequest(ctx context.Context, dcLocation string, count uint32) (pdpb.Timestamp, error) { defer trace.StartRegion(ctx, "AllocatorManager.HandleRequest").End() @@ -841,113 +559,6 @@ func (am *AllocatorManager) GetLocalAllocatorLeaders() (map[string]*pdpb.Member, return localAllocatorLeaderMember, nil } -func (am *AllocatorManager) getOrCreateGRPCConn(ctx context.Context, addr string) (*grpc.ClientConn, error) { - conn, ok := am.getGRPCConn(addr) - if ok { - return conn, nil - } - tlsCfg, err := am.securityConfig.ToTLSConfig() - if err != nil { - return nil, err - } - ctxWithTimeout, cancel := context.WithTimeout(ctx, dialTimeout) - defer cancel() - cc, err := grpcutil.GetClientConn(ctxWithTimeout, addr, tlsCfg) - if err != nil { - return nil, err - } - am.setGRPCConn(cc, addr) - conn, _ = am.getGRPCConn(addr) - return conn, nil -} - -// GetMaxLocalTSO will sync with the current Local TSO Allocators among the cluster to get the -// max Local TSO. -func (am *AllocatorManager) GetMaxLocalTSO(ctx context.Context) (*pdpb.Timestamp, error) { - // Sync the max local TSO from the other Local TSO Allocators who has been initialized - clusterDCLocations := am.GetClusterDCLocations() - for dcLocation := range clusterDCLocations { - allocatorGroup, ok := am.getAllocatorGroup(dcLocation) - if !(ok && allocatorGroup.leadership.Check()) { - delete(clusterDCLocations, dcLocation) - } - } - maxTSO := &pdpb.Timestamp{} - if len(clusterDCLocations) == 0 { - return maxTSO, nil - } - globalAllocator, err := am.GetAllocator(GlobalDCLocation) - if err != nil { - return nil, err - } - if err := globalAllocator.(*GlobalTSOAllocator).SyncMaxTS(ctx, clusterDCLocations, maxTSO, false); err != nil { - return nil, err - } - return maxTSO, nil -} - -func (am *AllocatorManager) getGRPCConn(addr string) (*grpc.ClientConn, bool) { - am.localAllocatorConn.RLock() - defer am.localAllocatorConn.RUnlock() - conn, ok := am.localAllocatorConn.clientConns[addr] - return conn, ok -} - -func (am *AllocatorManager) setGRPCConn(newConn *grpc.ClientConn, addr string) { - am.localAllocatorConn.Lock() - defer am.localAllocatorConn.Unlock() - if _, ok := am.localAllocatorConn.clientConns[addr]; ok { - newConn.Close() - log.Debug("use old connection", - logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), - zap.String("target", newConn.Target()), - zap.String("state", newConn.GetState().String())) - return - } - am.localAllocatorConn.clientConns[addr] = newConn -} - -func (am *AllocatorManager) transferLocalAllocator(dcLocation string, serverID uint64) error { - nextLeaderKey := am.nextLeaderKey(dcLocation) - // Grant a etcd lease with checkStep * 1.5 - nextLeaderLease := clientv3.NewLease(am.member.Client()) - ctx, cancel := context.WithTimeout(am.member.Client().Ctx(), etcdutil.DefaultRequestTimeout) - leaseResp, err := nextLeaderLease.Grant(ctx, int64(checkStep.Seconds()*1.5)) - cancel() - if err != nil { - err = errs.ErrEtcdGrantLease.Wrap(err).GenWithStackByCause() - log.Error("failed to grant the lease of the next leader key", - logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), - zap.String("dc-location", dcLocation), - zap.Uint64("serverID", serverID), - errs.ZapError(err)) - return err - } - resp, err := kv.NewSlowLogTxn(am.member.Client()). - If(clientv3.Compare(clientv3.CreateRevision(nextLeaderKey), "=", 0)). - Then(clientv3.OpPut(nextLeaderKey, fmt.Sprint(serverID), clientv3.WithLease(leaseResp.ID))). - Commit() - if err != nil { - err = errs.ErrEtcdTxnInternal.Wrap(err).GenWithStackByCause() - log.Error("failed to write next leader key into etcd", - logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), - zap.String("dc-location", dcLocation), zap.Uint64("serverID", serverID), - errs.ZapError(err)) - return err - } - if !resp.Succeeded { - log.Warn("write next leader id into etcd unsuccessfully", - logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), - zap.String("dc-location", dcLocation)) - return errs.ErrEtcdTxnConflict.GenWithStack("write next leader id into etcd unsuccessfully") - } - return nil -} - -func (am *AllocatorManager) nextLeaderKey(dcLocation string) string { - return path.Join(am.getAllocatorPath(dcLocation), "next-leader") -} - // EnableLocalTSO returns the value of AllocatorManager.enableLocalTSO. func (am *AllocatorManager) EnableLocalTSO() bool { return am.enableLocalTSO diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index d44297b803e..cb599875dbd 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -24,7 +24,6 @@ import ( "sync/atomic" "time" - "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" @@ -33,13 +32,9 @@ import ( mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/mcs/utils/constant" "github.com/tikv/pd/pkg/member" - "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/logutil" - "github.com/tikv/pd/pkg/utils/tsoutil" - "github.com/tikv/pd/pkg/utils/typeutil" "go.uber.org/zap" - "google.golang.org/grpc" ) // Allocator is a Timestamp Oracle allocator. @@ -85,10 +80,6 @@ type GlobalTSOAllocator struct { // expectedPrimaryLease is used to store the expected primary lease. expectedPrimaryLease atomic.Value // store as *election.LeaderLease timestampOracle *timestampOracle - // syncRTT is the RTT duration a SyncMaxTS RPC call will cost, - // which is used to estimate the MaxTS in a Global TSO generation - // to reduce the gRPC network IO latency. - syncRTT atomic.Value // store as int64 milliseconds // pre-initialized metrics tsoAllocatorRoleGauge prometheus.Gauge } @@ -142,19 +133,6 @@ func (gta *GlobalTSOAllocator) getGroupID() uint32 { return gta.am.getGroupID() } -func (gta *GlobalTSOAllocator) setSyncRTT(rtt int64) { - gta.syncRTT.Store(rtt) - gta.getMetrics().globalTSOSyncRTTGauge.Set(float64(rtt)) -} - -func (gta *GlobalTSOAllocator) getSyncRTT() int64 { - syncRTT := gta.syncRTT.Load() - if syncRTT == nil { - return 0 - } - return syncRTT.(int64) -} - // GetTimestampPath returns the timestamp path in etcd. func (gta *GlobalTSOAllocator) GetTimestampPath() string { if gta == nil || gta.timestampOracle == nil { @@ -163,24 +141,6 @@ func (gta *GlobalTSOAllocator) GetTimestampPath() string { return gta.timestampOracle.GetTimestampPath() } -func (gta *GlobalTSOAllocator) estimateMaxTS(ctx context.Context, count uint32, suffixBits int) (*pdpb.Timestamp, bool, error) { - physical, logical, lastUpdateTime := gta.timestampOracle.generateTSO(ctx, int64(count), 0) - if physical == 0 { - return &pdpb.Timestamp{}, false, errs.ErrGenerateTimestamp.FastGenByArgs("timestamp in memory isn't initialized") - } - estimatedMaxTSO := &pdpb.Timestamp{ - Physical: physical + time.Since(lastUpdateTime).Milliseconds() + 2*gta.getSyncRTT(), // TODO: make the coefficient of RTT configurable - Logical: logical, - } - // Precheck to make sure the logical part won't overflow after being differentiated. - // If precheckLogical returns false, it means the logical part is overflow, - // we need to wait a updatePhysicalInterval and retry the estimation later. - if !gta.precheckLogical(estimatedMaxTSO, suffixBits) { - return nil, true, nil - } - return estimatedMaxTSO, false, nil -} - // Initialize will initialize the created global TSO allocator. func (gta *GlobalTSOAllocator) Initialize(int) error { gta.tsoAllocatorRoleGauge.Set(1) @@ -208,321 +168,15 @@ func (gta *GlobalTSOAllocator) SetTSO(tso uint64, ignoreSmaller, skipUpperBoundC // Make sure you have initialized the TSO allocator before calling this method. // Basically, there are two ways to generate a Global TSO: // 1. The old way to generate a normal TSO from memory directly, which makes the TSO service node become single point. -// 2. The new way to generate a Global TSO by synchronizing with all other Local TSO Allocators. -// -// And for the new way, there are two different strategies: -// 1. Collect the max Local TSO from all Local TSO Allocator leaders and write it back to them as MaxTS. -// 2. Estimate a MaxTS and try to write it to all Local TSO Allocator leaders directly to reduce the RTT. -// During the process, if the estimated MaxTS is not accurate, it will fallback to the collecting way. +// 2. Deprecated: The new way to generate a Global TSO by synchronizing with all other Local TSO Allocators. func (gta *GlobalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (pdpb.Timestamp, error) { defer trace.StartRegion(ctx, "GlobalTSOAllocator.GenerateTSO").End() if !gta.member.GetLeadership().Check() { gta.getMetrics().notLeaderEvent.Inc() return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("requested pd %s of cluster", errs.NotLeaderErr)) } - // To check if we have any dc-location configured in the cluster - dcLocationMap := gta.am.GetClusterDCLocations() - // No dc-locations configured in the cluster, use the normal Global TSO generation way. - // (without synchronization with other Local TSO Allocators) - if len(dcLocationMap) == 0 { - return gta.timestampOracle.getTS(ctx, gta.member.GetLeadership(), count, 0) - } - ctx1 := ctx - - // Have dc-locations configured in the cluster, use the Global TSO generation way. - // (whit synchronization with other Local TSO Allocators) - ctx, cancel := context.WithCancel(gta.ctx) - defer cancel() - for range maxRetryCount { - var ( - err error - shouldRetry, skipCheck bool - globalTSOResp pdpb.Timestamp - estimatedMaxTSO *pdpb.Timestamp - suffixBits = gta.am.GetSuffixBits() - ) - // TODO: add a switch to control whether to enable the MaxTSO estimation. - // 1. Estimate a MaxTS among all Local TSO Allocator leaders according to the RTT. - estimatedMaxTSO, shouldRetry, err = gta.estimateMaxTS(ctx1, count, suffixBits) - if err != nil { - log.Error("global tso allocator estimates MaxTS failed", - logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), - errs.ZapError(err)) - continue - } - if shouldRetry { - time.Sleep(gta.timestampOracle.updatePhysicalInterval) - continue - } - SETTING_PHASE: - // 2. Send the MaxTSO to all Local TSO Allocators leaders to make sure the subsequent Local TSOs will be bigger than it. - // It's not safe to skip check at the first time here because the estimated maxTSO may not be big enough, - // we need to validate it first before we write it into every Local TSO Allocator's memory. - globalTSOResp = *estimatedMaxTSO - if err = gta.SyncMaxTS(ctx, dcLocationMap, &globalTSOResp, skipCheck); err != nil { - log.Error("global tso allocator synchronizes MaxTS failed", - logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), - errs.ZapError(err)) - continue - } - // 3. If skipCheck is false and the maxTSO is bigger than estimatedMaxTSO, - // we need to redo the setting phase with the bigger one and skip the check safely. - if !skipCheck && tsoutil.CompareTimestamp(&globalTSOResp, estimatedMaxTSO) > 0 { - gta.getMetrics().globalTSOSyncEvent.Inc() - *estimatedMaxTSO = globalTSOResp - // Re-add the count and check the overflow. - estimatedMaxTSO.Logical += int64(count) - if !gta.precheckLogical(estimatedMaxTSO, suffixBits) { - estimatedMaxTSO.Physical += UpdateTimestampGuard.Milliseconds() - estimatedMaxTSO.Logical = int64(count) - } - skipCheck = true - goto SETTING_PHASE - } - // Is skipCheck is false and globalTSOResp remains the same, it means the estimatedTSO is valid. - if !skipCheck && tsoutil.CompareTimestamp(&globalTSOResp, estimatedMaxTSO) == 0 { - gta.getMetrics().globalTSOEstimateEvent.Inc() - } - // 4. Persist MaxTS into memory, and etcd if needed - var currentGlobalTSO *pdpb.Timestamp - if currentGlobalTSO, err = gta.getCurrentTSO(ctx1); err != nil { - log.Error("global tso allocator gets the current global tso in memory failed", - logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), - errs.ZapError(err)) - continue - } - if tsoutil.CompareTimestamp(currentGlobalTSO, &globalTSOResp) < 0 { - gta.getMetrics().globalTSOPersistEvent.Inc() - // Update the Global TSO in memory - if err = gta.timestampOracle.resetUserTimestamp(ctx1, gta.member.GetLeadership(), tsoutil.GenerateTS(&globalTSOResp), true); err != nil { - gta.getMetrics().errGlobalTSOPersistEvent.Inc() - log.Error("global tso allocator update the global tso in memory failed", - logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), - errs.ZapError(err)) - continue - } - } - // 5. Check leadership again before we returning the response. - if !gta.member.GetLeadership().Check() { - gta.getMetrics().notLeaderAnymoreEvent.Inc() - return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("requested %s anymore", errs.NotLeaderErr)) - } - // 6. Calibrate the logical part to make the TSO unique globally by giving it a unique suffix in the whole cluster - globalTSOResp.Logical = gta.timestampOracle.calibrateLogical(globalTSOResp.GetLogical(), suffixBits) - globalTSOResp.SuffixBits = uint32(suffixBits) - return globalTSOResp, nil - } - gta.getMetrics().exceededMaxRetryEvent.Inc() - return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("global tso allocator maximum number of retries exceeded") -} -// Only used for test -var globalTSOOverflowFlag = true - -func (gta *GlobalTSOAllocator) precheckLogical(maxTSO *pdpb.Timestamp, suffixBits int) bool { - failpoint.Inject("globalTSOOverflow", func() { - if globalTSOOverflowFlag { - maxTSO.Logical = maxLogical - globalTSOOverflowFlag = false - } - }) - // Make sure the physical time is not empty again. - if maxTSO.GetPhysical() == 0 { - return false - } - // Check if the logical part will reach the overflow condition after being differentiated. - if caliLogical := gta.timestampOracle.calibrateLogical(maxTSO.Logical, suffixBits); caliLogical >= maxLogical { - log.Error("estimated logical part outside of max logical interval, please check ntp time", - logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), - zap.Reflect("max-tso", maxTSO), errs.ZapError(errs.ErrLogicOverflow)) - gta.getMetrics().precheckLogicalOverflowEvent.Inc() - return false - } - return true -} - -const ( - dialTimeout = 3 * time.Second - rpcTimeout = 3 * time.Second - // TODO: maybe make syncMaxRetryCount configurable - syncMaxRetryCount = 2 -) - -type syncResp struct { - rpcRes *pdpb.SyncMaxTSResponse - err error - rtt time.Duration -} - -// SyncMaxTS is used to sync MaxTS with all Local TSO Allocator leaders in dcLocationMap. -// If maxTSO is the biggest TSO among all Local TSO Allocators, it will be written into -// each allocator and remains the same after the synchronization. -// If not, it will be replaced with the new max Local TSO and return. -func (gta *GlobalTSOAllocator) SyncMaxTS( - ctx context.Context, - dcLocationMap map[string]DCLocationInfo, - maxTSO *pdpb.Timestamp, - skipCheck bool, -) error { - defer trace.StartRegion(ctx, "GlobalTSOAllocator.SyncMaxTS").End() - originalMaxTSO := *maxTSO - for i := range syncMaxRetryCount { - // Collect all allocator leaders' client URLs - allocatorLeaders := make(map[string]*pdpb.Member) - for dcLocation := range dcLocationMap { - allocator, err := gta.am.GetAllocator(dcLocation) - if err != nil { - return err - } - allocatorLeader := allocator.(*LocalTSOAllocator).GetAllocatorLeader() - if allocatorLeader.GetMemberId() == 0 { - return errs.ErrSyncMaxTS.FastGenByArgs(fmt.Sprintf("%s does not have the local allocator leader yet", dcLocation)) - } - allocatorLeaders[dcLocation] = allocatorLeader - } - leaderURLs := make([]string, 0) - for _, allocator := range allocatorLeaders { - // Check if its client URLs are empty - if len(allocator.GetClientUrls()) < 1 { - continue - } - leaderURL := allocator.GetClientUrls()[0] - if slice.NoneOf(leaderURLs, func(i int) bool { return leaderURLs[i] == leaderURL }) { - leaderURLs = append(leaderURLs, leaderURL) - } - } - // Prepare to make RPC requests concurrently - respCh := make(chan *syncResp, len(leaderURLs)) - wg := sync.WaitGroup{} - request := &pdpb.SyncMaxTSRequest{ - Header: &pdpb.RequestHeader{ - SenderId: gta.am.member.ID(), - }, - SkipCheck: skipCheck, - MaxTs: maxTSO, - } - for _, leaderURL := range leaderURLs { - leaderConn, err := gta.am.getOrCreateGRPCConn(ctx, leaderURL) - if err != nil { - return err - } - // Send SyncMaxTSRequest to all allocator leaders concurrently. - wg.Add(1) - go func(ctx context.Context, conn *grpc.ClientConn, respCh chan<- *syncResp) { - defer logutil.LogPanic() - defer wg.Done() - syncMaxTSResp := &syncResp{} - syncCtx, cancel := context.WithTimeout(ctx, rpcTimeout) - startTime := time.Now() - syncMaxTSResp.rpcRes, syncMaxTSResp.err = pdpb.NewPDClient(conn).SyncMaxTS(syncCtx, request) - // Including RPC request -> RPC processing -> RPC response - syncMaxTSResp.rtt = time.Since(startTime) - cancel() - respCh <- syncMaxTSResp - if syncMaxTSResp.err != nil { - log.Error("sync max ts rpc failed, got an error", - logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), - zap.String("local-allocator-leader-url", leaderConn.Target()), - errs.ZapError(err)) - return - } - if syncMaxTSResp.rpcRes.GetHeader().GetError() != nil { - log.Error("sync max ts rpc failed, got an error", - logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), - zap.String("local-allocator-leader-url", leaderConn.Target()), - errs.ZapError(errors.New(syncMaxTSResp.rpcRes.GetHeader().GetError().String()))) - return - } - }(ctx, leaderConn, respCh) - } - wg.Wait() - close(respCh) - var ( - errList []error - syncedDCs []string - maxTSORtt time.Duration - ) - // Iterate each response to handle the error and compare MaxTSO. - for resp := range respCh { - if resp.err != nil { - errList = append(errList, resp.err) - } - // If any error occurs, just jump out of the loop. - if len(errList) != 0 { - break - } - if resp.rpcRes == nil { - return errs.ErrSyncMaxTS.FastGenByArgs("got nil response") - } - if skipCheck { - // Set all the Local TSOs to the maxTSO unconditionally, so the MaxLocalTS in response should be nil. - if resp.rpcRes.GetMaxLocalTs() != nil { - return errs.ErrSyncMaxTS.FastGenByArgs("got non-nil max local ts in the second sync phase") - } - syncedDCs = append(syncedDCs, resp.rpcRes.GetSyncedDcs()...) - } else { - // Compare and get the max one - if tsoutil.CompareTimestamp(resp.rpcRes.GetMaxLocalTs(), maxTSO) > 0 { - *maxTSO = *(resp.rpcRes.GetMaxLocalTs()) - if resp.rtt > maxTSORtt { - maxTSORtt = resp.rtt - } - } - syncedDCs = append(syncedDCs, resp.rpcRes.GetSyncedDcs()...) - } - } - // We need to collect all info needed to ensure the consistency of TSO. - // So if any error occurs, the synchronization process will fail directly. - if len(errList) != 0 { - return errs.ErrSyncMaxTS.FastGenWithCause(errList) - } - // Check whether all dc-locations have been considered during the synchronization and retry once if any dc-location missed. - if ok, unsyncedDCs := gta.checkSyncedDCs(dcLocationMap, syncedDCs); !ok { - log.Info("unsynced dc-locations found, will retry", - logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), - zap.Bool("skip-check", skipCheck), - zap.Strings("synced-DCs", syncedDCs), - zap.Strings("unsynced-DCs", unsyncedDCs)) - if i < syncMaxRetryCount-1 { - // maxTSO should remain the same. - *maxTSO = originalMaxTSO - // To make sure we have the latest dc-location info - gta.am.ClusterDCLocationChecker() - continue - } - return errs.ErrSyncMaxTS.FastGenByArgs( - fmt.Sprintf("unsynced dc-locations found, skip-check: %t, synced dc-locations: %+v, unsynced dc-locations: %+v", - skipCheck, syncedDCs, unsyncedDCs)) - } - // Update the sync RTT to help estimate MaxTS later. - if maxTSORtt != 0 { - gta.setSyncRTT(maxTSORtt.Milliseconds()) - } - } - return nil -} - -func (gta *GlobalTSOAllocator) checkSyncedDCs(dcLocationMap map[string]DCLocationInfo, syncedDCs []string) (bool, []string) { - var unsyncedDCs []string - for dcLocation := range dcLocationMap { - if slice.NoneOf(syncedDCs, func(i int) bool { return syncedDCs[i] == dcLocation }) { - unsyncedDCs = append(unsyncedDCs, dcLocation) - } - } - log.Debug("check unsynced dc-locations", - logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), - zap.Strings("unsynced-DCs", unsyncedDCs), - zap.Strings("synced-DCs", syncedDCs)) - return len(unsyncedDCs) == 0, unsyncedDCs -} - -func (gta *GlobalTSOAllocator) getCurrentTSO(ctx context.Context) (*pdpb.Timestamp, error) { - defer trace.StartRegion(ctx, "GlobalTSOAllocator.getCurrentTSO").End() - currentPhysical, currentLogical := gta.timestampOracle.getTSO() - if currentPhysical == typeutil.ZeroTime { - return &pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("timestamp in memory isn't initialized") - } - return tsoutil.GenerateTimestamp(currentPhysical, uint64(currentLogical)), nil + return gta.timestampOracle.getTS(ctx, gta.member.GetLeadership(), count, 0) } // Reset is used to reset the TSO allocator. diff --git a/server/api/member_test.go b/server/api/member_test.go index ad4812249c7..b56e84b2a1a 100644 --- a/server/api/member_test.go +++ b/server/api/member_test.go @@ -45,12 +45,7 @@ func TestMemberTestSuite(t *testing.T) { } func (suite *memberTestSuite) SetupSuite() { - suite.cfgs, suite.servers, suite.clean = mustNewCluster(suite.Require(), 3, func(cfg *config.Config) { - cfg.EnableLocalTSO = true - cfg.Labels = map[string]string{ - config.ZoneLabel: "dc-1", - } - }) + suite.cfgs, suite.servers, suite.clean = mustNewCluster(suite.Require(), 3) } func (suite *memberTestSuite) TearDownSuite() { @@ -76,7 +71,6 @@ func checkListResponse(re *require.Assertions, body []byte, cfgs []*config.Confi if member.GetName() != cfg.Name { continue } - re.Equal("dc-1", member.DcLocation) relaxEqualStings(re, member.ClientUrls, strings.Split(cfg.ClientUrls, ",")) relaxEqualStings(re, member.PeerUrls, strings.Split(cfg.PeerUrls, ",")) } diff --git a/server/config/config.go b/server/config/config.go index 1ccafe7248a..ed37d899390 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -256,12 +256,6 @@ const ( defaultEnableTSODynamicSwitching = false ) -// Special keys for Labels -const ( - // ZoneLabel is the name of the key which indicates DC location of this PD server. - ZoneLabel = "zone" -) - var ( defaultEnableTelemetry = false defaultRuntimeServices = []string{} diff --git a/server/grpc_service.go b/server/grpc_service.go index d5fd8ae3e32..3aa07e841ff 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -2756,54 +2756,11 @@ func scatterRegions(cluster *cluster.RaftCluster, regionsID []uint64, group stri return percentage, nil } -// GetDCLocationInfo gets the dc-location info of the given dc-location from PD leader's TSO allocator manager. -func (s *GrpcServer) GetDCLocationInfo(ctx context.Context, request *pdpb.GetDCLocationInfoRequest) (*pdpb.GetDCLocationInfoResponse, error) { - // TODO: support local tso forward in api service mode in the future. - var err error - if err = s.validateInternalRequest(request.GetHeader(), false); err != nil { - return nil, err - } - if !s.member.IsLeader() { - return nil, ErrNotLeader - } - if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { - fName := currentFunction() - limiter := s.GetGRPCRateLimiter() - if done, err := limiter.Allow(fName); err == nil { - defer done() - } else { - return &pdpb.GetDCLocationInfoResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil - } - } - am := s.GetTSOAllocatorManager() - info, ok := am.GetDCLocationInfo(request.GetDcLocation()) - if !ok { - am.ClusterDCLocationChecker() - return &pdpb.GetDCLocationInfoResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, - fmt.Sprintf("dc-location %s is not found", request.GetDcLocation())), - }, nil - } - resp := &pdpb.GetDCLocationInfoResponse{ +// Deprecated +func (*GrpcServer) GetDCLocationInfo(_ context.Context, _ *pdpb.GetDCLocationInfoRequest) (*pdpb.GetDCLocationInfoResponse, error) { + return &pdpb.GetDCLocationInfoResponse{ Header: wrapHeader(), - Suffix: info.Suffix, - } - // Because the number of suffix bits is changing dynamically according to the dc-location number, - // there is a corner case may cause the Local TSO is not unique while member changing. - // Example: - // t1: xxxxxxxxxxxxxxx1 | 11 - // t2: xxxxxxxxxxxxxxx | 111 - // So we will force the newly added Local TSO Allocator to have a Global TSO synchronization - // when it becomes the Local TSO Allocator leader. - // Please take a look at https://github.com/tikv/pd/issues/3260 for more details. - if resp.MaxTs, err = am.GetMaxLocalTSO(ctx); err != nil { - return &pdpb.GetDCLocationInfoResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil - } - return resp, nil + }, nil } // validateInternalRequest checks if server is closed, which is used to validate diff --git a/server/server.go b/server/server.go index ed61b14834f..e2c9b31580d 100644 --- a/server/server.go +++ b/server/server.go @@ -466,18 +466,6 @@ func (s *Server) startServer(ctx context.Context) error { s.tsoProtoFactory = &tsoutil.TSOProtoFactory{} s.pdProtoFactory = &tsoutil.PDProtoFactory{} s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, constant.DefaultKeyspaceGroupID, s.member, s.rootPath, s.storage, s) - // When disabled the Local TSO, we should clean up the Local TSO Allocator's meta info written in etcd if it exists. - if !s.cfg.EnableLocalTSO { - if err = s.tsoAllocatorManager.CleanUpDCLocation(); err != nil { - return err - } - } - if zone, exist := s.cfg.Labels[config.ZoneLabel]; exist && zone != "" && s.cfg.EnableLocalTSO { - if err = s.tsoAllocatorManager.SetLocalTSOConfig(zone); err != nil { - return err - } - } - s.gcSafePointManager = gc.NewSafePointManager(s.storage, s.cfg.PDServerCfg) s.basicCluster = core.NewBasicCluster() s.cluster = cluster.NewRaftCluster(ctx, s.GetMember(), s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient, s.tsoAllocatorManager) @@ -1630,10 +1618,6 @@ func (s *Server) leaderLoop() { log.Error("reload config failed", errs.ZapError(err)) continue } - if !s.IsAPIServiceMode() { - // Check the cluster dc-location after the PD leader is elected - go s.tsoAllocatorManager.ClusterDCLocationChecker() - } syncer := s.cluster.GetRegionSyncer() if s.persistOptions.IsUseRegionStorage() { syncer.StartSyncWithLeader(leader.GetListenUrls()[0]) @@ -1745,10 +1729,6 @@ func (s *Server) campaignLeader() { // EnableLeader to accept the remaining service, such as GetStore, GetRegion. s.member.EnableLeader() member.ServiceMemberGauge.WithLabelValues(s.mode).Set(1) - if !s.IsAPIServiceMode() { - // Check the cluster dc-location after the PD leader is elected. - go s.tsoAllocatorManager.ClusterDCLocationChecker() - } defer resetLeaderOnce.Do(func() { // as soon as cancel the leadership keepalive, then other member have chance // to be new leader. diff --git a/tests/cluster.go b/tests/cluster.go index 6ad15e3291f..7996e289cf5 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" - "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/autoscaling" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/dashboard" @@ -41,7 +40,6 @@ import ( "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" - "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server" "github.com/tikv/pd/server/api" "github.com/tikv/pd/server/apiv2" @@ -719,56 +717,6 @@ func (c *TestCluster) ResignLeader() error { return errors.New("no leader") } -// WaitAllocatorLeader is used to get the Local TSO Allocator leader. -// If it exceeds the maximum number of loops, it will return an empty string. -func (c *TestCluster) WaitAllocatorLeader(dcLocation string, ops ...WaitOption) string { - option := &WaitOp{ - retryTimes: WaitLeaderRetryTimes, - waitInterval: WaitLeaderCheckInterval, - } - for _, op := range ops { - op(option) - } - for range option.retryTimes { - counter := make(map[string]int) - running := 0 - for _, s := range c.servers { - if s.state == Running && s.GetTSOAllocatorManager().EnableLocalTSO() { - running++ - } - serverName := s.GetAllocatorLeader(dcLocation).GetName() - if serverName != "" { - counter[serverName]++ - } - } - for serverName, num := range counter { - if num == running && c.GetServer(serverName).IsAllocatorLeader(dcLocation) { - return serverName - } - } - time.Sleep(option.waitInterval) - } - return "" -} - -// WaitAllLeaders will block and wait for the election of PD leader and all Local TSO Allocator leaders. -func (c *TestCluster) WaitAllLeaders(re *require.Assertions, dcLocations map[string]string) { - c.WaitLeader() - c.CheckClusterDCLocation() - // Wait for each DC's Local TSO Allocator leader - wg := sync.WaitGroup{} - for _, dcLocation := range dcLocations { - wg.Add(1) - go func(dc string) { - testutil.Eventually(re, func() bool { - return c.WaitAllocatorLeader(dc) != "" - }) - wg.Done() - }(dcLocation) - } - wg.Wait() -} - // GetCluster returns PD cluster. func (c *TestCluster) GetCluster() *metapb.Cluster { leader := c.GetLeader() @@ -853,19 +801,6 @@ func (c *TestCluster) Destroy() { } } -// CheckClusterDCLocation will force the cluster to do the dc-location check in order to speed up the test. -func (c *TestCluster) CheckClusterDCLocation() { - wg := sync.WaitGroup{} - for _, server := range c.GetServers() { - wg.Add(1) - go func(s *TestServer) { - s.GetTSOAllocatorManager().ClusterDCLocationChecker() - wg.Done() - }(server) - } - wg.Wait() -} - // CheckTSOUnique will check whether the TSO is unique among the cluster in the past and present. func (c *TestCluster) CheckTSOUnique(ts uint64) bool { c.tsPool.Lock()