Skip to content

Commit

Permalink
cp #7566 and #7795
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Feb 2, 2024
1 parent 2dd345c commit fa0213c
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 20 deletions.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,11 @@ error = '''
cannot set invalid configuration
'''

["PD:server:ErrLeaderFrequentlyChange"]
error = '''
leader %s frequently changed, leader-key is [%s]
'''

["PD:server:ErrLeaderNil"]
error = '''
leader is nil
Expand Down
45 changes: 38 additions & 7 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
Expand All @@ -33,6 +34,7 @@ import (
)

const (
defaultCampaignTimesSlot = 10
watchLoopUnhealthyTimeout = 60 * time.Second
campaignTimesRecordTimeout = 5 * time.Minute
)
Expand Down Expand Up @@ -65,9 +67,9 @@ type Leadership struct {
keepAliveCtx context.Context
keepAliveCancelFunc context.CancelFunc
keepAliveCancelFuncLock syncutil.Mutex
// CampaignTimes is used to record the campaign times of the leader within `campaignTimesRecordTimeout`.
// campaignTimes is used to record the campaign times of the leader within `campaignTimesRecordTimeout`.
// It is ordered by time to prevent the leader from campaigning too frequently.
CampaignTimes []time.Time
campaignTimes []time.Time
}

// NewLeadership creates a new Leadership.
Expand All @@ -76,7 +78,7 @@ func NewLeadership(client *clientv3.Client, leaderKey, purpose string) *Leadersh
purpose: purpose,
client: client,
leaderKey: leaderKey,
CampaignTimes: make([]time.Time, 0, 10),
campaignTimes: make([]time.Time, 0, defaultCampaignTimesSlot),
}
return leadership
}
Expand Down Expand Up @@ -111,18 +113,37 @@ func (ls *Leadership) GetLeaderKey() string {
return ls.leaderKey
}

// GetCampaignTimesNum is used to get the campaign times of the leader within `campaignTimesRecordTimeout`.
func (ls *Leadership) GetCampaignTimesNum() int {
if ls == nil {
return 0
}
return len(ls.campaignTimes)
}

// ResetCampaignTimes is used to reset the campaign times of the leader.
func (ls *Leadership) ResetCampaignTimes() {
if ls == nil {
return
}
ls.campaignTimes = make([]time.Time, 0, defaultCampaignTimesSlot)
}

// addCampaignTimes is used to add the campaign times of the leader.
func (ls *Leadership) addCampaignTimes() {
for i := len(ls.CampaignTimes) - 1; i >= 0; i-- {
if time.Since(ls.CampaignTimes[i]) > campaignTimesRecordTimeout {
if ls == nil {
return
}
for i := len(ls.campaignTimes) - 1; i >= 0; i-- {
if time.Since(ls.campaignTimes[i]) > campaignTimesRecordTimeout {
// remove the time which is more than `campaignTimesRecordTimeout`
// array is sorted by time
ls.CampaignTimes = ls.CampaignTimes[i:]
ls.campaignTimes = ls.campaignTimes[i:]
break
}
}

ls.CampaignTimes = append(ls.CampaignTimes, time.Now())
ls.campaignTimes = append(ls.campaignTimes, time.Now())
}

// Campaign is used to campaign the leader with given lease and returns a leadership
Expand All @@ -136,6 +157,16 @@ func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...cl
lease: clientv3.NewLease(ls.client),
}
ls.setLease(newLease)

failpoint.Inject("skipGrantLeader", func(val failpoint.Value) {
var member pdpb.Member
member.Unmarshal([]byte(leaderData))
name, ok := val.(string)
if ok && member.Name == name {
failpoint.Return(errors.Errorf("failed to grant lease"))
}
})

if err := newLease.Grant(leaseTimeout); err != nil {
return err
}
Expand Down
17 changes: 9 additions & 8 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,15 @@ var (

// server errors
var (
ErrServiceRegistered = errors.Normalize("service with path [%s] already registered", errors.RFCCodeText("PD:server:ErrServiceRegistered"))
ErrAPIInformationInvalid = errors.Normalize("invalid api information, group %s version %s", errors.RFCCodeText("PD:server:ErrAPIInformationInvalid"))
ErrClientURLEmpty = errors.Normalize("client url empty", errors.RFCCodeText("PD:server:ErrClientEmpty"))
ErrLeaderNil = errors.Normalize("leader is nil", errors.RFCCodeText("PD:server:ErrLeaderNil"))
ErrCancelStartEtcd = errors.Normalize("etcd start canceled", errors.RFCCodeText("PD:server:ErrCancelStartEtcd"))
ErrConfigItem = errors.Normalize("cannot set invalid configuration", errors.RFCCodeText("PD:server:ErrConfiguration"))
ErrServerNotStarted = errors.Normalize("server not started", errors.RFCCodeText("PD:server:ErrServerNotStarted"))
ErrRateLimitExceeded = errors.Normalize("rate limit exceeded", errors.RFCCodeText("PD:server:ErrRateLimitExceeded"))
ErrServiceRegistered = errors.Normalize("service with path [%s] already registered", errors.RFCCodeText("PD:server:ErrServiceRegistered"))
ErrAPIInformationInvalid = errors.Normalize("invalid api information, group %s version %s", errors.RFCCodeText("PD:server:ErrAPIInformationInvalid"))
ErrClientURLEmpty = errors.Normalize("client url empty", errors.RFCCodeText("PD:server:ErrClientEmpty"))
ErrLeaderNil = errors.Normalize("leader is nil", errors.RFCCodeText("PD:server:ErrLeaderNil"))
ErrCancelStartEtcd = errors.Normalize("etcd start canceled", errors.RFCCodeText("PD:server:ErrCancelStartEtcd"))
ErrConfigItem = errors.Normalize("cannot set invalid configuration", errors.RFCCodeText("PD:server:ErrConfiguration"))
ErrServerNotStarted = errors.Normalize("server not started", errors.RFCCodeText("PD:server:ErrServerNotStarted"))
ErrRateLimitExceeded = errors.Normalize("rate limit exceeded", errors.RFCCodeText("PD:server:ErrRateLimitExceeded"))
ErrLeaderFrequentlyChange = errors.Normalize("leader %s frequently changed, leader-key is [%s]", errors.RFCCodeText("PD:server:ErrLeaderFrequentlyChange"))
)

// logutil errors
Expand Down
10 changes: 6 additions & 4 deletions pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,14 @@ func (m *EmbeddedEtcdMember) GetLastLeaderUpdatedTime() time.Time {
// and make it become a PD leader.
// leader should be changed when campaign leader frequently.
func (m *EmbeddedEtcdMember) CampaignLeader(ctx context.Context, leaseTimeout int64) error {
if len(m.leadership.CampaignTimes) >= campaignLeaderFrequencyTimes {
if m.leadership.GetCampaignTimesNum() >= campaignLeaderFrequencyTimes {
log.Warn("campaign times is too frequent, resign and campaign again",
zap.String("leader-name", m.Name()), zap.String("leader-key", m.GetLeaderPath()))
// remove all campaign times
m.leadership.CampaignTimes = nil
return m.ResignEtcdLeader(ctx, m.Name(), "")
if err := m.ResignEtcdLeader(ctx, m.Name(), ""); err != nil {
return err
}
m.leadership.ResetCampaignTimes()
return errs.ErrLeaderFrequentlyChange.FastGenByArgs(m.Name(), m.GetLeaderPath())
}
return m.leadership.Campaign(leaseTimeout, m.MemberValue())
}
Expand Down
26 changes: 25 additions & 1 deletion tests/server/member/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,10 +341,34 @@ func TestCampaignLeaderFrequently(t *testing.T) {
cluster.GetServers()[cluster.GetLeader()].ResetPDLeader()
cluster.WaitLeader()
}
// leader should be changed when campaign leader frequently
// PD leader should be different from before because etcd leader changed.
re.NotEmpty(cluster.GetLeader())
re.NotEqual(leader, cluster.GetLeader())
}

func TestGrantLeaseFailed(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 5)
defer cluster.Destroy()
re.NoError(err)

err = cluster.RunInitialServers()
re.NoError(err)
cluster.WaitLeader()
leader := cluster.GetLeader()
re.NotEmpty(cluster.GetLeader())
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/election/skipGrantLeader", fmt.Sprintf("return(\"%s\")", leader)))

for i := 0; i < 3; i++ {
cluster.GetLeaderServer().ResetPDLeader()
cluster.WaitLeader()
}
// PD leader should be different from before because etcd leader changed.
re.NotEmpty(cluster.GetLeader())
re.NotEqual(leader, cluster.GetLeader())
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/election/skipGrantLeader"))
}

func TestGetLeader(t *testing.T) {
Expand Down

0 comments on commit fa0213c

Please sign in to comment.