From fa0213c863bdb001e2032184d4376f80a08da099 Mon Sep 17 00:00:00 2001 From: husharp Date: Fri, 2 Feb 2024 15:48:57 +0800 Subject: [PATCH] cp #7566 and #7795 Signed-off-by: husharp --- errors.toml | 5 ++++ pkg/election/leadership.go | 45 +++++++++++++++++++++++++----- pkg/errs/errno.go | 17 +++++------ pkg/member/member.go | 10 ++++--- tests/server/member/member_test.go | 26 ++++++++++++++++- 5 files changed, 83 insertions(+), 20 deletions(-) diff --git a/errors.toml b/errors.toml index 6b535d51d2f..b692e1dea74 100644 --- a/errors.toml +++ b/errors.toml @@ -731,6 +731,11 @@ error = ''' cannot set invalid configuration ''' +["PD:server:ErrLeaderFrequentlyChange"] +error = ''' +leader %s frequently changed, leader-key is [%s] +''' + ["PD:server:ErrLeaderNil"] error = ''' leader is nil diff --git a/pkg/election/leadership.go b/pkg/election/leadership.go index 572dae132b6..02f519dbc75 100644 --- a/pkg/election/leadership.go +++ b/pkg/election/leadership.go @@ -19,6 +19,7 @@ import ( "sync/atomic" "time" + "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" @@ -33,6 +34,7 @@ import ( ) const ( + defaultCampaignTimesSlot = 10 watchLoopUnhealthyTimeout = 60 * time.Second campaignTimesRecordTimeout = 5 * time.Minute ) @@ -65,9 +67,9 @@ type Leadership struct { keepAliveCtx context.Context keepAliveCancelFunc context.CancelFunc keepAliveCancelFuncLock syncutil.Mutex - // CampaignTimes is used to record the campaign times of the leader within `campaignTimesRecordTimeout`. + // campaignTimes is used to record the campaign times of the leader within `campaignTimesRecordTimeout`. // It is ordered by time to prevent the leader from campaigning too frequently. - CampaignTimes []time.Time + campaignTimes []time.Time } // NewLeadership creates a new Leadership. @@ -76,7 +78,7 @@ func NewLeadership(client *clientv3.Client, leaderKey, purpose string) *Leadersh purpose: purpose, client: client, leaderKey: leaderKey, - CampaignTimes: make([]time.Time, 0, 10), + campaignTimes: make([]time.Time, 0, defaultCampaignTimesSlot), } return leadership } @@ -111,18 +113,37 @@ func (ls *Leadership) GetLeaderKey() string { return ls.leaderKey } +// GetCampaignTimesNum is used to get the campaign times of the leader within `campaignTimesRecordTimeout`. +func (ls *Leadership) GetCampaignTimesNum() int { + if ls == nil { + return 0 + } + return len(ls.campaignTimes) +} + +// ResetCampaignTimes is used to reset the campaign times of the leader. +func (ls *Leadership) ResetCampaignTimes() { + if ls == nil { + return + } + ls.campaignTimes = make([]time.Time, 0, defaultCampaignTimesSlot) +} + // addCampaignTimes is used to add the campaign times of the leader. func (ls *Leadership) addCampaignTimes() { - for i := len(ls.CampaignTimes) - 1; i >= 0; i-- { - if time.Since(ls.CampaignTimes[i]) > campaignTimesRecordTimeout { + if ls == nil { + return + } + for i := len(ls.campaignTimes) - 1; i >= 0; i-- { + if time.Since(ls.campaignTimes[i]) > campaignTimesRecordTimeout { // remove the time which is more than `campaignTimesRecordTimeout` // array is sorted by time - ls.CampaignTimes = ls.CampaignTimes[i:] + ls.campaignTimes = ls.campaignTimes[i:] break } } - ls.CampaignTimes = append(ls.CampaignTimes, time.Now()) + ls.campaignTimes = append(ls.campaignTimes, time.Now()) } // Campaign is used to campaign the leader with given lease and returns a leadership @@ -136,6 +157,16 @@ func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...cl lease: clientv3.NewLease(ls.client), } ls.setLease(newLease) + + failpoint.Inject("skipGrantLeader", func(val failpoint.Value) { + var member pdpb.Member + member.Unmarshal([]byte(leaderData)) + name, ok := val.(string) + if ok && member.Name == name { + failpoint.Return(errors.Errorf("failed to grant lease")) + } + }) + if err := newLease.Grant(leaseTimeout); err != nil { return err } diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index 73445cdf2bb..c49db72c6a1 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -201,14 +201,15 @@ var ( // server errors var ( - ErrServiceRegistered = errors.Normalize("service with path [%s] already registered", errors.RFCCodeText("PD:server:ErrServiceRegistered")) - ErrAPIInformationInvalid = errors.Normalize("invalid api information, group %s version %s", errors.RFCCodeText("PD:server:ErrAPIInformationInvalid")) - ErrClientURLEmpty = errors.Normalize("client url empty", errors.RFCCodeText("PD:server:ErrClientEmpty")) - ErrLeaderNil = errors.Normalize("leader is nil", errors.RFCCodeText("PD:server:ErrLeaderNil")) - ErrCancelStartEtcd = errors.Normalize("etcd start canceled", errors.RFCCodeText("PD:server:ErrCancelStartEtcd")) - ErrConfigItem = errors.Normalize("cannot set invalid configuration", errors.RFCCodeText("PD:server:ErrConfiguration")) - ErrServerNotStarted = errors.Normalize("server not started", errors.RFCCodeText("PD:server:ErrServerNotStarted")) - ErrRateLimitExceeded = errors.Normalize("rate limit exceeded", errors.RFCCodeText("PD:server:ErrRateLimitExceeded")) + ErrServiceRegistered = errors.Normalize("service with path [%s] already registered", errors.RFCCodeText("PD:server:ErrServiceRegistered")) + ErrAPIInformationInvalid = errors.Normalize("invalid api information, group %s version %s", errors.RFCCodeText("PD:server:ErrAPIInformationInvalid")) + ErrClientURLEmpty = errors.Normalize("client url empty", errors.RFCCodeText("PD:server:ErrClientEmpty")) + ErrLeaderNil = errors.Normalize("leader is nil", errors.RFCCodeText("PD:server:ErrLeaderNil")) + ErrCancelStartEtcd = errors.Normalize("etcd start canceled", errors.RFCCodeText("PD:server:ErrCancelStartEtcd")) + ErrConfigItem = errors.Normalize("cannot set invalid configuration", errors.RFCCodeText("PD:server:ErrConfiguration")) + ErrServerNotStarted = errors.Normalize("server not started", errors.RFCCodeText("PD:server:ErrServerNotStarted")) + ErrRateLimitExceeded = errors.Normalize("rate limit exceeded", errors.RFCCodeText("PD:server:ErrRateLimitExceeded")) + ErrLeaderFrequentlyChange = errors.Normalize("leader %s frequently changed, leader-key is [%s]", errors.RFCCodeText("PD:server:ErrLeaderFrequentlyChange")) ) // logutil errors diff --git a/pkg/member/member.go b/pkg/member/member.go index 6eddf9a7c77..ba0757e6209 100644 --- a/pkg/member/member.go +++ b/pkg/member/member.go @@ -181,12 +181,14 @@ func (m *EmbeddedEtcdMember) GetLastLeaderUpdatedTime() time.Time { // and make it become a PD leader. // leader should be changed when campaign leader frequently. func (m *EmbeddedEtcdMember) CampaignLeader(ctx context.Context, leaseTimeout int64) error { - if len(m.leadership.CampaignTimes) >= campaignLeaderFrequencyTimes { + if m.leadership.GetCampaignTimesNum() >= campaignLeaderFrequencyTimes { log.Warn("campaign times is too frequent, resign and campaign again", zap.String("leader-name", m.Name()), zap.String("leader-key", m.GetLeaderPath())) - // remove all campaign times - m.leadership.CampaignTimes = nil - return m.ResignEtcdLeader(ctx, m.Name(), "") + if err := m.ResignEtcdLeader(ctx, m.Name(), ""); err != nil { + return err + } + m.leadership.ResetCampaignTimes() + return errs.ErrLeaderFrequentlyChange.FastGenByArgs(m.Name(), m.GetLeaderPath()) } return m.leadership.Campaign(leaseTimeout, m.MemberValue()) } diff --git a/tests/server/member/member_test.go b/tests/server/member/member_test.go index 5965f9e22a6..f428fb7c198 100644 --- a/tests/server/member/member_test.go +++ b/tests/server/member/member_test.go @@ -341,10 +341,34 @@ func TestCampaignLeaderFrequently(t *testing.T) { cluster.GetServers()[cluster.GetLeader()].ResetPDLeader() cluster.WaitLeader() } - // leader should be changed when campaign leader frequently + // PD leader should be different from before because etcd leader changed. + re.NotEmpty(cluster.GetLeader()) + re.NotEqual(leader, cluster.GetLeader()) +} + +func TestGrantLeaseFailed(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestCluster(ctx, 5) + defer cluster.Destroy() + re.NoError(err) + + err = cluster.RunInitialServers() + re.NoError(err) cluster.WaitLeader() + leader := cluster.GetLeader() + re.NotEmpty(cluster.GetLeader()) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/election/skipGrantLeader", fmt.Sprintf("return(\"%s\")", leader))) + + for i := 0; i < 3; i++ { + cluster.GetLeaderServer().ResetPDLeader() + cluster.WaitLeader() + } + // PD leader should be different from before because etcd leader changed. re.NotEmpty(cluster.GetLeader()) re.NotEqual(leader, cluster.GetLeader()) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/election/skipGrantLeader")) } func TestGetLeader(t *testing.T) {