diff --git a/client/client.go b/client/client.go index 56923b697e2..2d30d9fb6c4 100644 --- a/client/client.go +++ b/client/client.go @@ -136,7 +136,7 @@ type Client interface { LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]GlobalConfigItem, int64, error) // StoreGlobalConfig set the config from etcd StoreGlobalConfig(ctx context.Context, configPath string, items []GlobalConfigItem) error - // WatchGlobalConfig returns an stream with all global config and updates + // WatchGlobalConfig returns a stream with all global config and updates WatchGlobalConfig(ctx context.Context, configPath string, revision int64) (chan []GlobalConfigItem, error) // UpdateOption updates the client option. UpdateOption(option DynamicOption, value interface{}) error diff --git a/pkg/election/leadership.go b/pkg/election/leadership.go index 8cfdcf423ac..572dae132b6 100644 --- a/pkg/election/leadership.go +++ b/pkg/election/leadership.go @@ -32,7 +32,10 @@ import ( "go.uber.org/zap" ) -const watchLoopUnhealthyTimeout = 60 * time.Second +const ( + watchLoopUnhealthyTimeout = 60 * time.Second + campaignTimesRecordTimeout = 5 * time.Minute +) // GetLeader gets the corresponding leader from etcd by given leaderPath (as the key). func GetLeader(c *clientv3.Client, leaderPath string) (*pdpb.Member, int64, error) { @@ -62,20 +65,24 @@ type Leadership struct { keepAliveCtx context.Context keepAliveCancelFunc context.CancelFunc keepAliveCancelFuncLock syncutil.Mutex + // CampaignTimes is used to record the campaign times of the leader within `campaignTimesRecordTimeout`. + // It is ordered by time to prevent the leader from campaigning too frequently. + CampaignTimes []time.Time } // NewLeadership creates a new Leadership. func NewLeadership(client *clientv3.Client, leaderKey, purpose string) *Leadership { leadership := &Leadership{ - purpose: purpose, - client: client, - leaderKey: leaderKey, + purpose: purpose, + client: client, + leaderKey: leaderKey, + CampaignTimes: make([]time.Time, 0, 10), } return leadership } // getLease gets the lease of leadership, only if leadership is valid, -// i.e the owner is a true leader, the lease is not nil. +// i.e. the owner is a true leader, the lease is not nil. func (ls *Leadership) getLease() *lease { l := ls.lease.Load() if l == nil { @@ -104,8 +111,23 @@ func (ls *Leadership) GetLeaderKey() string { return ls.leaderKey } +// addCampaignTimes is used to add the campaign times of the leader. +func (ls *Leadership) addCampaignTimes() { + for i := len(ls.CampaignTimes) - 1; i >= 0; i-- { + if time.Since(ls.CampaignTimes[i]) > campaignTimesRecordTimeout { + // remove the time which is more than `campaignTimesRecordTimeout` + // array is sorted by time + ls.CampaignTimes = ls.CampaignTimes[i:] + break + } + } + + ls.CampaignTimes = append(ls.CampaignTimes, time.Now()) +} + // Campaign is used to campaign the leader with given lease and returns a leadership func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...clientv3.Cmp) error { + ls.addCampaignTimes() ls.leaderValue = leaderData // Create a new lease to campaign newLease := &lease{ diff --git a/pkg/mcs/resourcemanager/server/server.go b/pkg/mcs/resourcemanager/server/server.go index 7b660c07605..2a1be3e0ca5 100644 --- a/pkg/mcs/resourcemanager/server/server.go +++ b/pkg/mcs/resourcemanager/server/server.go @@ -152,7 +152,7 @@ func (s *Server) primaryElectionLoop() { func (s *Server) campaignLeader() { log.Info("start to campaign the primary/leader", zap.String("campaign-resource-manager-primary-name", s.participant.Name())) - if err := s.participant.CampaignLeader(s.cfg.LeaderLease); err != nil { + if err := s.participant.CampaignLeader(s.Context(), s.cfg.LeaderLease); err != nil { if err.Error() == errs.ErrEtcdTxnConflict.Error() { log.Info("campaign resource manager primary meets error due to txn conflict, another server may campaign successfully", zap.String("campaign-resource-manager-primary-name", s.participant.Name())) diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 4304ffb218a..32b241fee91 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -241,7 +241,7 @@ func (s *Server) primaryElectionLoop() { func (s *Server) campaignLeader() { log.Info("start to campaign the primary/leader", zap.String("campaign-scheduling-primary-name", s.participant.Name())) - if err := s.participant.CampaignLeader(s.cfg.LeaderLease); err != nil { + if err := s.participant.CampaignLeader(s.Context(), s.cfg.LeaderLease); err != nil { if err.Error() == errs.ErrEtcdTxnConflict.Error() { log.Info("campaign scheduling primary meets error due to txn conflict, another server may campaign successfully", zap.String("campaign-scheduling-primary-name", s.participant.Name())) diff --git a/pkg/member/member.go b/pkg/member/member.go index 80332a65f94..6eddf9a7c77 100644 --- a/pkg/member/member.go +++ b/pkg/member/member.go @@ -42,6 +42,8 @@ const ( // The timeout to wait transfer etcd leader to complete. moveLeaderTimeout = 5 * time.Second dcLocationConfigEtcdPrefix = "dc-location" + // If the campaign times is more than this value in `campaignTimesRecordTimeout`, the PD will resign and campaign again. + campaignLeaderFrequencyTimes = 3 ) // EmbeddedEtcdMember is used for the election related logic. It implements Member interface. @@ -177,7 +179,15 @@ func (m *EmbeddedEtcdMember) GetLastLeaderUpdatedTime() time.Time { // CampaignLeader is used to campaign a PD member's leadership // and make it become a PD leader. -func (m *EmbeddedEtcdMember) CampaignLeader(leaseTimeout int64) error { +// leader should be changed when campaign leader frequently. +func (m *EmbeddedEtcdMember) CampaignLeader(ctx context.Context, leaseTimeout int64) error { + if len(m.leadership.CampaignTimes) >= campaignLeaderFrequencyTimes { + log.Warn("campaign times is too frequent, resign and campaign again", + zap.String("leader-name", m.Name()), zap.String("leader-key", m.GetLeaderPath())) + // remove all campaign times + m.leadership.CampaignTimes = nil + return m.ResignEtcdLeader(ctx, m.Name(), "") + } return m.leadership.Campaign(leaseTimeout, m.MemberValue()) } diff --git a/pkg/member/participant.go b/pkg/member/participant.go index b3034a86807..82cd7e05f5e 100644 --- a/pkg/member/participant.go +++ b/pkg/member/participant.go @@ -196,7 +196,7 @@ func (m *Participant) GetLeadership() *election.Leadership { } // CampaignLeader is used to campaign the leadership and make it become a leader. -func (m *Participant) CampaignLeader(leaseTimeout int64) error { +func (m *Participant) CampaignLeader(_ context.Context, leaseTimeout int64) error { if !m.campaignCheck() { return errs.ErrCheckCampaign } diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index df0ca0affc9..251a3aaf2e6 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -101,13 +101,13 @@ func (info *DCLocationInfo) clone() DCLocationInfo { type ElectionMember interface { // ID returns the unique ID in the election group. For example, it can be unique // server id of a cluster or the unique keyspace group replica id of the election - // group comprised of the replicas of a keyspace group. + // group composed of the replicas of a keyspace group. ID() uint64 - // ID returns the unique name in the election group. + // Name returns the unique name in the election group. Name() string // MemberValue returns the member value. MemberValue() string - // GetMember() returns the current member + // GetMember returns the current member GetMember() interface{} // Client returns the etcd client. Client() *clientv3.Client @@ -124,7 +124,7 @@ type ElectionMember interface { // KeepLeader is used to keep the leader's leadership. KeepLeader(ctx context.Context) // CampaignLeader is used to campaign the leadership and make it become a leader in an election group. - CampaignLeader(leaseTimeout int64) error + CampaignLeader(ctx context.Context, leaseTimeout int64) error // ResetLeader is used to reset the member's current leadership. // Basically it will reset the leader lease and unset leader info. ResetLeader() diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 613ceb3eafc..a37bcc73881 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -568,7 +568,7 @@ func (gta *GlobalTSOAllocator) campaignLeader() { log.Info("start to campaign the primary", logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), zap.String("campaign-tso-primary-name", gta.member.Name())) - if err := gta.am.member.CampaignLeader(gta.am.leaderLease); err != nil { + if err := gta.am.member.CampaignLeader(gta.ctx, gta.am.leaderLease); err != nil { if errors.Is(err, errs.ErrEtcdTxnConflict) { log.Info("campaign tso primary meets error due to txn conflict, another tso server may campaign successfully", logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), diff --git a/server/server.go b/server/server.go index a2c99d0cbec..38064a3b92f 100644 --- a/server/server.go +++ b/server/server.go @@ -1636,7 +1636,7 @@ func (s *Server) leaderLoop() { func (s *Server) campaignLeader() { log.Info(fmt.Sprintf("start to campaign %s leader", s.mode), zap.String("campaign-leader-name", s.Name())) - if err := s.member.CampaignLeader(s.cfg.LeaderLease); err != nil { + if err := s.member.CampaignLeader(s.ctx, s.cfg.LeaderLease); err != nil { if err.Error() == errs.ErrEtcdTxnConflict.Error() { log.Info(fmt.Sprintf("campaign %s leader meets error due to txn conflict, another PD/API server may campaign successfully", s.mode), zap.String("campaign-leader-name", s.Name())) diff --git a/tests/cluster.go b/tests/cluster.go index ae1ae331856..41efc2b045d 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -155,6 +155,13 @@ func (s *TestServer) Destroy() error { return nil } +// ResetPDLeader resigns the leader of the server. +func (s *TestServer) ResetPDLeader() { + s.Lock() + defer s.Unlock() + s.server.GetMember().ResetLeader() +} + // ResignLeader resigns the leader of the server. func (s *TestServer) ResignLeader() error { s.Lock() diff --git a/tests/server/member/member_test.go b/tests/server/member/member_test.go index 26d4fa2a904..5965f9e22a6 100644 --- a/tests/server/member/member_test.go +++ b/tests/server/member/member_test.go @@ -323,6 +323,30 @@ func TestMoveLeader(t *testing.T) { } } +func TestCampaignLeaderFrequently(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestCluster(ctx, 5) + defer cluster.Destroy() + re.NoError(err) + + err = cluster.RunInitialServers() + re.NoError(err) + cluster.WaitLeader() + leader := cluster.GetLeader() + re.NotEmpty(cluster.GetLeader()) + + for i := 0; i < 3; i++ { + cluster.GetServers()[cluster.GetLeader()].ResetPDLeader() + cluster.WaitLeader() + } + // leader should be changed when campaign leader frequently + cluster.WaitLeader() + re.NotEmpty(cluster.GetLeader()) + re.NotEqual(leader, cluster.GetLeader()) +} + func TestGetLeader(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background())