diff --git a/client/client.go b/client/client.go index 067872d2d39..bb30da1e0c0 100644 --- a/client/client.go +++ b/client/client.go @@ -136,7 +136,7 @@ type Client interface { LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]GlobalConfigItem, int64, error) // StoreGlobalConfig set the config from etcd StoreGlobalConfig(ctx context.Context, configPath string, items []GlobalConfigItem) error - // WatchGlobalConfig returns an stream with all global config and updates + // WatchGlobalConfig returns a stream with all global config and updates WatchGlobalConfig(ctx context.Context, configPath string, revision int64) (chan []GlobalConfigItem, error) // UpdateOption updates the client option. UpdateOption(option DynamicOption, value interface{}) error diff --git a/pkg/election/leadership.go b/pkg/election/leadership.go index d5d73e90b58..ab4bd564f6f 100644 --- a/pkg/election/leadership.go +++ b/pkg/election/leadership.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/cache" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/etcdutil" @@ -62,6 +63,9 @@ type Leadership struct { keepAliveCtx context.Context keepAliveCancelFunc context.CancelFunc keepAliveCancelFuncLock syncutil.Mutex + // CampaignTimes is used to record the campaign times of the leader in 5min. + // To avoid the leader campaign too frequently. + CampaignTimes []cache.TTLString } // NewLeadership creates a new Leadership. @@ -105,7 +109,8 @@ func (ls *Leadership) GetLeaderKey() string { } // Campaign is used to campaign the leader with given lease and returns a leadership -func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...clientv3.Cmp) error { +func (ls *Leadership) Campaign(ctx context.Context, leaseTimeout int64, leaderData string, cmps ...clientv3.Cmp) error { + ls.CampaignTimes = append(ls.CampaignTimes, *cache.NewStringTTL(ctx, 5*time.Second, 5*time.Minute)) ls.leaderValue = leaderData // Create a new lease to campaign newLease := &lease{ diff --git a/pkg/election/leadership_test.go b/pkg/election/leadership_test.go index c259476e44e..8f7de53e347 100644 --- a/pkg/election/leadership_test.go +++ b/pkg/election/leadership_test.go @@ -33,6 +33,8 @@ const defaultLeaseTimeout = 1 func TestLeadership(t *testing.T) { re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) defer clean() @@ -41,10 +43,10 @@ func TestLeadership(t *testing.T) { leadership2 := NewLeadership(client, "/test_leader", "test_leader_2") // leadership1 starts first and get the leadership - err := leadership1.Campaign(defaultLeaseTimeout, "test_leader_1") + err := leadership1.Campaign(ctx, defaultLeaseTimeout, "test_leader_1") re.NoError(err) // leadership2 starts then and can not get the leadership - err = leadership2.Campaign(defaultLeaseTimeout, "test_leader_2") + err = leadership2.Campaign(ctx, defaultLeaseTimeout, "test_leader_2") re.Error(err) re.True(leadership1.Check()) @@ -60,11 +62,9 @@ func TestLeadership(t *testing.T) { // Delete the leader key and campaign for leadership1 err = leadership1.DeleteLeaderKey() re.NoError(err) - err = leadership1.Campaign(defaultLeaseTimeout, "test_leader_1") + err = leadership1.Campaign(ctx, defaultLeaseTimeout, "test_leader_1") re.NoError(err) re.True(leadership1.Check()) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() go leadership1.Keep(ctx) // Sleep longer than the defaultLeaseTimeout @@ -76,7 +76,7 @@ func TestLeadership(t *testing.T) { // Delete the leader key and re-campaign for leadership2 err = leadership1.DeleteLeaderKey() re.NoError(err) - err = leadership2.Campaign(defaultLeaseTimeout, "test_leader_2") + err = leadership2.Campaign(ctx, defaultLeaseTimeout, "test_leader_2") re.NoError(err) re.True(leadership2.Check()) ctx, cancel = context.WithCancel(context.Background()) @@ -187,6 +187,8 @@ func TestExitWatch(t *testing.T) { func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embed.Etcd, client *clientv3.Client) func()) { re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() servers, client1, clean := etcdutil.NewTestEtcdCluster(t, 1) defer clean() client2, err := etcdutil.CreateEtcdClient(nil, servers[0].Config().LCUrls) @@ -195,7 +197,7 @@ func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embe leadership1 := NewLeadership(client1, leaderKey, "test_leader_1") leadership2 := NewLeadership(client2, leaderKey, "test_leader_2") - err = leadership1.Campaign(defaultLeaseTimeout, "test_leader_1") + err = leadership1.Campaign(ctx, defaultLeaseTimeout, "test_leader_1") re.NoError(err) resp, err := client2.Get(context.Background(), leaderKey) re.NoError(err) @@ -221,6 +223,8 @@ func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embe func TestRequestProgress(t *testing.T) { checkWatcherRequestProgress := func(injectWatchChanBlock bool) { re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() fname := testutil.InitTempFileLogger("debug") defer os.RemoveAll(fname) servers, client1, clean := etcdutil.NewTestEtcdCluster(t, 1) @@ -232,11 +236,9 @@ func TestRequestProgress(t *testing.T) { leaderKey := "/test_leader" leadership1 := NewLeadership(client1, leaderKey, "test_leader_1") leadership2 := NewLeadership(client2, leaderKey, "test_leader_2") - err = leadership1.Campaign(defaultLeaseTimeout, "test_leader_1") + err = leadership1.Campaign(ctx, defaultLeaseTimeout, "test_leader_1") re.NoError(err) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() resp, err := client2.Get(ctx, leaderKey) re.NoError(err) go func() { diff --git a/pkg/encryption/key_manager_test.go b/pkg/encryption/key_manager_test.go index 3134e714543..88db406119b 100644 --- a/pkg/encryption/key_manager_test.go +++ b/pkg/encryption/key_manager_test.go @@ -68,9 +68,11 @@ func newTestKeyFile(t *testing.T, re *require.Assertions, key ...string) (keyFil } func newTestLeader(re *require.Assertions, client *clientv3.Client) *election.Leadership { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() leader := election.NewLeadership(client, "test_leader", "test") timeout := int64(30000000) // about a year. - err := leader.Campaign(timeout, "") + err := leader.Campaign(ctx, timeout, "") re.NoError(err) return leader } diff --git a/pkg/mcs/resourcemanager/server/server.go b/pkg/mcs/resourcemanager/server/server.go index 47248208c8a..b4df91ffd06 100644 --- a/pkg/mcs/resourcemanager/server/server.go +++ b/pkg/mcs/resourcemanager/server/server.go @@ -152,7 +152,7 @@ func (s *Server) primaryElectionLoop() { func (s *Server) campaignLeader() { log.Info("start to campaign the primary/leader", zap.String("campaign-resource-manager-primary-name", s.participant.Name())) - if err := s.participant.CampaignLeader(s.cfg.LeaderLease); err != nil { + if err := s.participant.CampaignLeader(s.Context(), s.cfg.LeaderLease); err != nil { if err.Error() == errs.ErrEtcdTxnConflict.Error() { log.Info("campaign resource manager primary meets error due to txn conflict, another server may campaign successfully", zap.String("campaign-resource-manager-primary-name", s.participant.Name())) diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 5e2ed58a009..65956b6f6cd 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -241,7 +241,7 @@ func (s *Server) primaryElectionLoop() { func (s *Server) campaignLeader() { log.Info("start to campaign the primary/leader", zap.String("campaign-scheduling-primary-name", s.participant.Name())) - if err := s.participant.CampaignLeader(s.cfg.LeaderLease); err != nil { + if err := s.participant.CampaignLeader(s.Context(), s.cfg.LeaderLease); err != nil { if err.Error() == errs.ErrEtcdTxnConflict.Error() { log.Info("campaign scheduling primary meets error due to txn conflict, another server may campaign successfully", zap.String("campaign-scheduling-primary-name", s.participant.Name())) diff --git a/pkg/member/member.go b/pkg/member/member.go index 80332a65f94..dabd38a72bc 100644 --- a/pkg/member/member.go +++ b/pkg/member/member.go @@ -42,6 +42,8 @@ const ( // The timeout to wait transfer etcd leader to complete. moveLeaderTimeout = 5 * time.Second dcLocationConfigEtcdPrefix = "dc-location" + // If the campaign times is more than this value in 5min, the PD will resign and campaign again. + campaignLeaderFrequencyTimes = 5 ) // EmbeddedEtcdMember is used for the election related logic. It implements Member interface. @@ -177,8 +179,13 @@ func (m *EmbeddedEtcdMember) GetLastLeaderUpdatedTime() time.Time { // CampaignLeader is used to campaign a PD member's leadership // and make it become a PD leader. -func (m *EmbeddedEtcdMember) CampaignLeader(leaseTimeout int64) error { - return m.leadership.Campaign(leaseTimeout, m.MemberValue()) +func (m *EmbeddedEtcdMember) CampaignLeader(ctx context.Context, leaseTimeout int64) error { + // leader should be changed when campaign leader frequently. + if len(m.leadership.CampaignTimes) > campaignLeaderFrequencyTimes { + log.Error("campaign times is too much", zap.String("leader-name", m.Name()), zap.String("leader-key", m.GetLeaderPath())) + return m.ResignEtcdLeader(ctx, m.Name(), "") + } + return m.leadership.Campaign(ctx, leaseTimeout, m.MemberValue()) } // KeepLeader is used to keep the PD leader's leadership. diff --git a/pkg/member/participant.go b/pkg/member/participant.go index b3034a86807..9b4f9e5ac24 100644 --- a/pkg/member/participant.go +++ b/pkg/member/participant.go @@ -196,11 +196,11 @@ func (m *Participant) GetLeadership() *election.Leadership { } // CampaignLeader is used to campaign the leadership and make it become a leader. -func (m *Participant) CampaignLeader(leaseTimeout int64) error { +func (m *Participant) CampaignLeader(ctx context.Context, leaseTimeout int64) error { if !m.campaignCheck() { return errs.ErrCheckCampaign } - return m.leadership.Campaign(leaseTimeout, m.MemberValue()) + return m.leadership.Campaign(ctx, leaseTimeout, m.MemberValue()) } // KeepLeader is used to keep the leader's leadership. diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index df0ca0affc9..3278b622360 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -101,13 +101,13 @@ func (info *DCLocationInfo) clone() DCLocationInfo { type ElectionMember interface { // ID returns the unique ID in the election group. For example, it can be unique // server id of a cluster or the unique keyspace group replica id of the election - // group comprised of the replicas of a keyspace group. + // group composed of the replicas of a keyspace group. ID() uint64 - // ID returns the unique name in the election group. + // Name returns the unique name in the election group. Name() string // MemberValue returns the member value. MemberValue() string - // GetMember() returns the current member + // GetMember returns the current member GetMember() interface{} // Client returns the etcd client. Client() *clientv3.Client @@ -124,7 +124,7 @@ type ElectionMember interface { // KeepLeader is used to keep the leader's leadership. KeepLeader(ctx context.Context) // CampaignLeader is used to campaign the leadership and make it become a leader in an election group. - CampaignLeader(leaseTimeout int64) error + CampaignLeader(ctx context.Context, leaseTimeout int64) error // ResetLeader is used to reset the member's current leadership. // Basically it will reset the leader lease and unset leader info. ResetLeader() @@ -640,7 +640,7 @@ func (am *AllocatorManager) campaignAllocatorLeader( } } }) - if err := allocator.CampaignAllocatorLeader(am.leaderLease, cmps...); err != nil { + if err := allocator.CampaignAllocatorLeader(am.ctx, am.leaderLease, cmps...); err != nil { if err.Error() == errs.ErrEtcdTxnConflict.Error() { log.Info("failed to campaign local tso allocator leader due to txn conflict, another allocator may campaign successfully", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 613ceb3eafc..a37bcc73881 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -568,7 +568,7 @@ func (gta *GlobalTSOAllocator) campaignLeader() { log.Info("start to campaign the primary", logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), zap.String("campaign-tso-primary-name", gta.member.Name())) - if err := gta.am.member.CampaignLeader(gta.am.leaderLease); err != nil { + if err := gta.am.member.CampaignLeader(gta.ctx, gta.am.leaderLease); err != nil { if errors.Is(err, errs.ErrEtcdTxnConflict) { log.Info("campaign tso primary meets error due to txn conflict, another tso server may campaign successfully", logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), diff --git a/pkg/tso/local_allocator.go b/pkg/tso/local_allocator.go index 9d244d2531d..f64530fe767 100644 --- a/pkg/tso/local_allocator.go +++ b/pkg/tso/local_allocator.go @@ -190,8 +190,8 @@ func (lta *LocalTSOAllocator) EnableAllocatorLeader() { } // CampaignAllocatorLeader is used to campaign a Local TSO Allocator's leadership. -func (lta *LocalTSOAllocator) CampaignAllocatorLeader(leaseTimeout int64, cmps ...clientv3.Cmp) error { - return lta.leadership.Campaign(leaseTimeout, lta.allocatorManager.member.MemberValue(), cmps...) +func (lta *LocalTSOAllocator) CampaignAllocatorLeader(ctx context.Context, leaseTimeout int64, cmps ...clientv3.Cmp) error { + return lta.leadership.Campaign(ctx, leaseTimeout, lta.allocatorManager.member.MemberValue(), cmps...) } // KeepAllocatorLeader is used to keep the PD leader's leadership. diff --git a/server/server.go b/server/server.go index 9cd7f18578e..b6503dbc620 100644 --- a/server/server.go +++ b/server/server.go @@ -1627,7 +1627,7 @@ func (s *Server) leaderLoop() { func (s *Server) campaignLeader() { log.Info(fmt.Sprintf("start to campaign %s leader", s.mode), zap.String("campaign-leader-name", s.Name())) - if err := s.member.CampaignLeader(s.cfg.LeaderLease); err != nil { + if err := s.member.CampaignLeader(s.ctx, s.cfg.LeaderLease); err != nil { if err.Error() == errs.ErrEtcdTxnConflict.Error() { log.Info(fmt.Sprintf("campaign %s leader meets error due to txn conflict, another PD/API server may campaign successfully", s.mode), zap.String("campaign-leader-name", s.Name())) diff --git a/tests/cluster.go b/tests/cluster.go index ae1ae331856..b3465e06725 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -155,6 +155,14 @@ func (s *TestServer) Destroy() error { return nil } +// TransferLeader resigns the leader of the server and transfers it to another server. +func (s *TestServer) TransferLeader(name string) error { + s.Lock() + defer s.Unlock() + s.server.GetMember().ResetLeader() + return s.server.GetMember().ResignEtcdLeader(s.server.Context(), s.server.Name(), name) +} + // ResignLeader resigns the leader of the server. func (s *TestServer) ResignLeader() error { s.Lock() diff --git a/tests/server/member/member_test.go b/tests/server/member/member_test.go index 26d4fa2a904..41969573bb2 100644 --- a/tests/server/member/member_test.go +++ b/tests/server/member/member_test.go @@ -323,6 +323,31 @@ func TestMoveLeader(t *testing.T) { } } +func TestCampaignLeaderFrequently(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestCluster(ctx, 5) + defer cluster.Destroy() + re.NoError(err) + + err = cluster.RunInitialServers() + re.NoError(err) + cluster.WaitLeader() + leader := cluster.GetLeader() + re.NotEmpty(cluster.GetLeader()) + + for i := 0; i < 6; i++ { + err := cluster.GetServers()[cluster.GetLeader()].TransferLeader(cluster.GetLeader()) + re.NoError(err) + cluster.WaitLeader() + } + // leader should be changed when campaign leader frequently + cluster.WaitLeader() + re.NotEmpty(cluster.GetLeader()) + re.NotEqual(leader, cluster.GetLeader()) +} + func TestGetLeader(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background())