diff --git a/client/client.go b/client/client.go index 067872d2d391..bb30da1e0c06 100644 --- a/client/client.go +++ b/client/client.go @@ -136,7 +136,7 @@ type Client interface { LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]GlobalConfigItem, int64, error) // StoreGlobalConfig set the config from etcd StoreGlobalConfig(ctx context.Context, configPath string, items []GlobalConfigItem) error - // WatchGlobalConfig returns an stream with all global config and updates + // WatchGlobalConfig returns a stream with all global config and updates WatchGlobalConfig(ctx context.Context, configPath string, revision int64) (chan []GlobalConfigItem, error) // UpdateOption updates the client option. UpdateOption(option DynamicOption, value interface{}) error diff --git a/pkg/election/leadership.go b/pkg/election/leadership.go index d5d73e90b58f..ab4bd564f6f5 100644 --- a/pkg/election/leadership.go +++ b/pkg/election/leadership.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/cache" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/etcdutil" @@ -62,6 +63,9 @@ type Leadership struct { keepAliveCtx context.Context keepAliveCancelFunc context.CancelFunc keepAliveCancelFuncLock syncutil.Mutex + // CampaignTimes is used to record the campaign times of the leader in 5min. + // To avoid the leader campaign too frequently. + CampaignTimes []cache.TTLString } // NewLeadership creates a new Leadership. @@ -105,7 +109,8 @@ func (ls *Leadership) GetLeaderKey() string { } // Campaign is used to campaign the leader with given lease and returns a leadership -func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...clientv3.Cmp) error { +func (ls *Leadership) Campaign(ctx context.Context, leaseTimeout int64, leaderData string, cmps ...clientv3.Cmp) error { + ls.CampaignTimes = append(ls.CampaignTimes, *cache.NewStringTTL(ctx, 5*time.Second, 5*time.Minute)) ls.leaderValue = leaderData // Create a new lease to campaign newLease := &lease{ diff --git a/pkg/mcs/resourcemanager/server/server.go b/pkg/mcs/resourcemanager/server/server.go index 47248208c8a2..b4df91ffd06e 100644 --- a/pkg/mcs/resourcemanager/server/server.go +++ b/pkg/mcs/resourcemanager/server/server.go @@ -152,7 +152,7 @@ func (s *Server) primaryElectionLoop() { func (s *Server) campaignLeader() { log.Info("start to campaign the primary/leader", zap.String("campaign-resource-manager-primary-name", s.participant.Name())) - if err := s.participant.CampaignLeader(s.cfg.LeaderLease); err != nil { + if err := s.participant.CampaignLeader(s.Context(), s.cfg.LeaderLease); err != nil { if err.Error() == errs.ErrEtcdTxnConflict.Error() { log.Info("campaign resource manager primary meets error due to txn conflict, another server may campaign successfully", zap.String("campaign-resource-manager-primary-name", s.participant.Name())) diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 5e2ed58a0096..65956b6f6cd8 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -241,7 +241,7 @@ func (s *Server) primaryElectionLoop() { func (s *Server) campaignLeader() { log.Info("start to campaign the primary/leader", zap.String("campaign-scheduling-primary-name", s.participant.Name())) - if err := s.participant.CampaignLeader(s.cfg.LeaderLease); err != nil { + if err := s.participant.CampaignLeader(s.Context(), s.cfg.LeaderLease); err != nil { if err.Error() == errs.ErrEtcdTxnConflict.Error() { log.Info("campaign scheduling primary meets error due to txn conflict, another server may campaign successfully", zap.String("campaign-scheduling-primary-name", s.participant.Name())) diff --git a/pkg/member/member.go b/pkg/member/member.go index 80332a65f941..dabd38a72bc1 100644 --- a/pkg/member/member.go +++ b/pkg/member/member.go @@ -42,6 +42,8 @@ const ( // The timeout to wait transfer etcd leader to complete. moveLeaderTimeout = 5 * time.Second dcLocationConfigEtcdPrefix = "dc-location" + // If the campaign times is more than this value in 5min, the PD will resign and campaign again. + campaignLeaderFrequencyTimes = 5 ) // EmbeddedEtcdMember is used for the election related logic. It implements Member interface. @@ -177,8 +179,13 @@ func (m *EmbeddedEtcdMember) GetLastLeaderUpdatedTime() time.Time { // CampaignLeader is used to campaign a PD member's leadership // and make it become a PD leader. -func (m *EmbeddedEtcdMember) CampaignLeader(leaseTimeout int64) error { - return m.leadership.Campaign(leaseTimeout, m.MemberValue()) +func (m *EmbeddedEtcdMember) CampaignLeader(ctx context.Context, leaseTimeout int64) error { + // leader should be changed when campaign leader frequently. + if len(m.leadership.CampaignTimes) > campaignLeaderFrequencyTimes { + log.Error("campaign times is too much", zap.String("leader-name", m.Name()), zap.String("leader-key", m.GetLeaderPath())) + return m.ResignEtcdLeader(ctx, m.Name(), "") + } + return m.leadership.Campaign(ctx, leaseTimeout, m.MemberValue()) } // KeepLeader is used to keep the PD leader's leadership. diff --git a/pkg/member/participant.go b/pkg/member/participant.go index b3034a868070..9b4f9e5ac248 100644 --- a/pkg/member/participant.go +++ b/pkg/member/participant.go @@ -196,11 +196,11 @@ func (m *Participant) GetLeadership() *election.Leadership { } // CampaignLeader is used to campaign the leadership and make it become a leader. -func (m *Participant) CampaignLeader(leaseTimeout int64) error { +func (m *Participant) CampaignLeader(ctx context.Context, leaseTimeout int64) error { if !m.campaignCheck() { return errs.ErrCheckCampaign } - return m.leadership.Campaign(leaseTimeout, m.MemberValue()) + return m.leadership.Campaign(ctx, leaseTimeout, m.MemberValue()) } // KeepLeader is used to keep the leader's leadership. diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index df0ca0affc97..3278b6223605 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -101,13 +101,13 @@ func (info *DCLocationInfo) clone() DCLocationInfo { type ElectionMember interface { // ID returns the unique ID in the election group. For example, it can be unique // server id of a cluster or the unique keyspace group replica id of the election - // group comprised of the replicas of a keyspace group. + // group composed of the replicas of a keyspace group. ID() uint64 - // ID returns the unique name in the election group. + // Name returns the unique name in the election group. Name() string // MemberValue returns the member value. MemberValue() string - // GetMember() returns the current member + // GetMember returns the current member GetMember() interface{} // Client returns the etcd client. Client() *clientv3.Client @@ -124,7 +124,7 @@ type ElectionMember interface { // KeepLeader is used to keep the leader's leadership. KeepLeader(ctx context.Context) // CampaignLeader is used to campaign the leadership and make it become a leader in an election group. - CampaignLeader(leaseTimeout int64) error + CampaignLeader(ctx context.Context, leaseTimeout int64) error // ResetLeader is used to reset the member's current leadership. // Basically it will reset the leader lease and unset leader info. ResetLeader() @@ -640,7 +640,7 @@ func (am *AllocatorManager) campaignAllocatorLeader( } } }) - if err := allocator.CampaignAllocatorLeader(am.leaderLease, cmps...); err != nil { + if err := allocator.CampaignAllocatorLeader(am.ctx, am.leaderLease, cmps...); err != nil { if err.Error() == errs.ErrEtcdTxnConflict.Error() { log.Info("failed to campaign local tso allocator leader due to txn conflict, another allocator may campaign successfully", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 613ceb3eafc6..a37bcc738813 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -568,7 +568,7 @@ func (gta *GlobalTSOAllocator) campaignLeader() { log.Info("start to campaign the primary", logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), zap.String("campaign-tso-primary-name", gta.member.Name())) - if err := gta.am.member.CampaignLeader(gta.am.leaderLease); err != nil { + if err := gta.am.member.CampaignLeader(gta.ctx, gta.am.leaderLease); err != nil { if errors.Is(err, errs.ErrEtcdTxnConflict) { log.Info("campaign tso primary meets error due to txn conflict, another tso server may campaign successfully", logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), diff --git a/pkg/tso/local_allocator.go b/pkg/tso/local_allocator.go index 9d244d2531db..f64530fe7673 100644 --- a/pkg/tso/local_allocator.go +++ b/pkg/tso/local_allocator.go @@ -190,8 +190,8 @@ func (lta *LocalTSOAllocator) EnableAllocatorLeader() { } // CampaignAllocatorLeader is used to campaign a Local TSO Allocator's leadership. -func (lta *LocalTSOAllocator) CampaignAllocatorLeader(leaseTimeout int64, cmps ...clientv3.Cmp) error { - return lta.leadership.Campaign(leaseTimeout, lta.allocatorManager.member.MemberValue(), cmps...) +func (lta *LocalTSOAllocator) CampaignAllocatorLeader(ctx context.Context, leaseTimeout int64, cmps ...clientv3.Cmp) error { + return lta.leadership.Campaign(ctx, leaseTimeout, lta.allocatorManager.member.MemberValue(), cmps...) } // KeepAllocatorLeader is used to keep the PD leader's leadership. diff --git a/server/server.go b/server/server.go index 9cd7f18578ee..b6503dbc620d 100644 --- a/server/server.go +++ b/server/server.go @@ -1627,7 +1627,7 @@ func (s *Server) leaderLoop() { func (s *Server) campaignLeader() { log.Info(fmt.Sprintf("start to campaign %s leader", s.mode), zap.String("campaign-leader-name", s.Name())) - if err := s.member.CampaignLeader(s.cfg.LeaderLease); err != nil { + if err := s.member.CampaignLeader(s.ctx, s.cfg.LeaderLease); err != nil { if err.Error() == errs.ErrEtcdTxnConflict.Error() { log.Info(fmt.Sprintf("campaign %s leader meets error due to txn conflict, another PD/API server may campaign successfully", s.mode), zap.String("campaign-leader-name", s.Name())) diff --git a/tests/cluster.go b/tests/cluster.go index ae1ae331856c..b3465e06725f 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -155,6 +155,14 @@ func (s *TestServer) Destroy() error { return nil } +// TransferLeader resigns the leader of the server and transfers it to another server. +func (s *TestServer) TransferLeader(name string) error { + s.Lock() + defer s.Unlock() + s.server.GetMember().ResetLeader() + return s.server.GetMember().ResignEtcdLeader(s.server.Context(), s.server.Name(), name) +} + // ResignLeader resigns the leader of the server. func (s *TestServer) ResignLeader() error { s.Lock() diff --git a/tests/server/member/member_test.go b/tests/server/member/member_test.go index 26d4fa2a9044..41969573bb2a 100644 --- a/tests/server/member/member_test.go +++ b/tests/server/member/member_test.go @@ -323,6 +323,31 @@ func TestMoveLeader(t *testing.T) { } } +func TestCampaignLeaderFrequently(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestCluster(ctx, 5) + defer cluster.Destroy() + re.NoError(err) + + err = cluster.RunInitialServers() + re.NoError(err) + cluster.WaitLeader() + leader := cluster.GetLeader() + re.NotEmpty(cluster.GetLeader()) + + for i := 0; i < 6; i++ { + err := cluster.GetServers()[cluster.GetLeader()].TransferLeader(cluster.GetLeader()) + re.NoError(err) + cluster.WaitLeader() + } + // leader should be changed when campaign leader frequently + cluster.WaitLeader() + re.NotEmpty(cluster.GetLeader()) + re.NotEqual(leader, cluster.GetLeader()) +} + func TestGetLeader(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background())