Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Nov 2, 2023
1 parent a1a1eea commit 05e25f3
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 17 deletions.
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ type Client interface {
LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]GlobalConfigItem, int64, error)
// StoreGlobalConfig set the config from etcd
StoreGlobalConfig(ctx context.Context, configPath string, items []GlobalConfigItem) error
// WatchGlobalConfig returns an stream with all global config and updates
// WatchGlobalConfig returns a stream with all global config and updates
WatchGlobalConfig(ctx context.Context, configPath string, revision int64) (chan []GlobalConfigItem, error)
// UpdateOption updates the client option.
UpdateOption(option DynamicOption, value interface{}) error
Expand Down
7 changes: 6 additions & 1 deletion pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/cache"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
Expand Down Expand Up @@ -62,6 +63,9 @@ type Leadership struct {
keepAliveCtx context.Context
keepAliveCancelFunc context.CancelFunc
keepAliveCancelFuncLock syncutil.Mutex
// CampaignTimes is used to record the campaign times of the leader in 5min.
// To avoid the leader campaign too frequently.
CampaignTimes []cache.TTLString
}

// NewLeadership creates a new Leadership.
Expand Down Expand Up @@ -105,7 +109,8 @@ func (ls *Leadership) GetLeaderKey() string {
}

// Campaign is used to campaign the leader with given lease and returns a leadership
func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...clientv3.Cmp) error {
func (ls *Leadership) Campaign(ctx context.Context, leaseTimeout int64, leaderData string, cmps ...clientv3.Cmp) error {
ls.CampaignTimes = append(ls.CampaignTimes, *cache.NewStringTTL(ctx, 5*time.Second, 5*time.Minute))
ls.leaderValue = leaderData
// Create a new lease to campaign
newLease := &lease{
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (s *Server) primaryElectionLoop() {

func (s *Server) campaignLeader() {
log.Info("start to campaign the primary/leader", zap.String("campaign-resource-manager-primary-name", s.participant.Name()))
if err := s.participant.CampaignLeader(s.cfg.LeaderLease); err != nil {
if err := s.participant.CampaignLeader(s.Context(), s.cfg.LeaderLease); err != nil {
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info("campaign resource manager primary meets error due to txn conflict, another server may campaign successfully",
zap.String("campaign-resource-manager-primary-name", s.participant.Name()))
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (s *Server) primaryElectionLoop() {

func (s *Server) campaignLeader() {
log.Info("start to campaign the primary/leader", zap.String("campaign-scheduling-primary-name", s.participant.Name()))
if err := s.participant.CampaignLeader(s.cfg.LeaderLease); err != nil {
if err := s.participant.CampaignLeader(s.Context(), s.cfg.LeaderLease); err != nil {
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info("campaign scheduling primary meets error due to txn conflict, another server may campaign successfully",
zap.String("campaign-scheduling-primary-name", s.participant.Name()))
Expand Down
11 changes: 9 additions & 2 deletions pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ const (
// The timeout to wait transfer etcd leader to complete.
moveLeaderTimeout = 5 * time.Second
dcLocationConfigEtcdPrefix = "dc-location"
// If the campaign times is more than this value in 5min, the PD will resign and campaign again.
campaignLeaderFrequencyTimes = 5
)

// EmbeddedEtcdMember is used for the election related logic. It implements Member interface.
Expand Down Expand Up @@ -177,8 +179,13 @@ func (m *EmbeddedEtcdMember) GetLastLeaderUpdatedTime() time.Time {

// CampaignLeader is used to campaign a PD member's leadership
// and make it become a PD leader.
func (m *EmbeddedEtcdMember) CampaignLeader(leaseTimeout int64) error {
return m.leadership.Campaign(leaseTimeout, m.MemberValue())
func (m *EmbeddedEtcdMember) CampaignLeader(ctx context.Context, leaseTimeout int64) error {
// leader should be changed when campaign leader frequently.
if len(m.leadership.CampaignTimes) > campaignLeaderFrequencyTimes {
log.Error("campaign times is too much", zap.String("leader-name", m.Name()), zap.String("leader-key", m.GetLeaderPath()))
return m.ResignEtcdLeader(ctx, m.Name(), "")
}
return m.leadership.Campaign(ctx, leaseTimeout, m.MemberValue())
}

// KeepLeader is used to keep the PD leader's leadership.
Expand Down
4 changes: 2 additions & 2 deletions pkg/member/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,11 @@ func (m *Participant) GetLeadership() *election.Leadership {
}

// CampaignLeader is used to campaign the leadership and make it become a leader.
func (m *Participant) CampaignLeader(leaseTimeout int64) error {
func (m *Participant) CampaignLeader(ctx context.Context, leaseTimeout int64) error {
if !m.campaignCheck() {
return errs.ErrCheckCampaign
}
return m.leadership.Campaign(leaseTimeout, m.MemberValue())
return m.leadership.Campaign(ctx, leaseTimeout, m.MemberValue())
}

// KeepLeader is used to keep the leader's leadership.
Expand Down
10 changes: 5 additions & 5 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ func (info *DCLocationInfo) clone() DCLocationInfo {
type ElectionMember interface {
// ID returns the unique ID in the election group. For example, it can be unique
// server id of a cluster or the unique keyspace group replica id of the election
// group comprised of the replicas of a keyspace group.
// group composed of the replicas of a keyspace group.
ID() uint64
// ID returns the unique name in the election group.
// Name returns the unique name in the election group.
Name() string
// MemberValue returns the member value.
MemberValue() string
// GetMember() returns the current member
// GetMember returns the current member
GetMember() interface{}
// Client returns the etcd client.
Client() *clientv3.Client
Expand All @@ -124,7 +124,7 @@ type ElectionMember interface {
// KeepLeader is used to keep the leader's leadership.
KeepLeader(ctx context.Context)
// CampaignLeader is used to campaign the leadership and make it become a leader in an election group.
CampaignLeader(leaseTimeout int64) error
CampaignLeader(ctx context.Context, leaseTimeout int64) error
// ResetLeader is used to reset the member's current leadership.
// Basically it will reset the leader lease and unset leader info.
ResetLeader()
Expand Down Expand Up @@ -640,7 +640,7 @@ func (am *AllocatorManager) campaignAllocatorLeader(
}
}
})
if err := allocator.CampaignAllocatorLeader(am.leaderLease, cmps...); err != nil {
if err := allocator.CampaignAllocatorLeader(am.ctx, am.leaderLease, cmps...); err != nil {
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info("failed to campaign local tso allocator leader due to txn conflict, another allocator may campaign successfully",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
Expand Down
2 changes: 1 addition & 1 deletion pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ func (gta *GlobalTSOAllocator) campaignLeader() {
log.Info("start to campaign the primary",
logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0),
zap.String("campaign-tso-primary-name", gta.member.Name()))
if err := gta.am.member.CampaignLeader(gta.am.leaderLease); err != nil {
if err := gta.am.member.CampaignLeader(gta.ctx, gta.am.leaderLease); err != nil {
if errors.Is(err, errs.ErrEtcdTxnConflict) {
log.Info("campaign tso primary meets error due to txn conflict, another tso server may campaign successfully",
logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0),
Expand Down
4 changes: 2 additions & 2 deletions pkg/tso/local_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ func (lta *LocalTSOAllocator) EnableAllocatorLeader() {
}

// CampaignAllocatorLeader is used to campaign a Local TSO Allocator's leadership.
func (lta *LocalTSOAllocator) CampaignAllocatorLeader(leaseTimeout int64, cmps ...clientv3.Cmp) error {
return lta.leadership.Campaign(leaseTimeout, lta.allocatorManager.member.MemberValue(), cmps...)
func (lta *LocalTSOAllocator) CampaignAllocatorLeader(ctx context.Context, leaseTimeout int64, cmps ...clientv3.Cmp) error {
return lta.leadership.Campaign(ctx, leaseTimeout, lta.allocatorManager.member.MemberValue(), cmps...)
}

// KeepAllocatorLeader is used to keep the PD leader's leadership.
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1627,7 +1627,7 @@ func (s *Server) leaderLoop() {

func (s *Server) campaignLeader() {
log.Info(fmt.Sprintf("start to campaign %s leader", s.mode), zap.String("campaign-leader-name", s.Name()))
if err := s.member.CampaignLeader(s.cfg.LeaderLease); err != nil {
if err := s.member.CampaignLeader(s.ctx, s.cfg.LeaderLease); err != nil {
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info(fmt.Sprintf("campaign %s leader meets error due to txn conflict, another PD/API server may campaign successfully", s.mode),
zap.String("campaign-leader-name", s.Name()))
Expand Down
8 changes: 8 additions & 0 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,14 @@ func (s *TestServer) Destroy() error {
return nil
}

// TransferLeader resigns the leader of the server and transfers it to another server.
func (s *TestServer) TransferLeader(name string) error {
s.Lock()
defer s.Unlock()
s.server.GetMember().ResetLeader()
return s.server.GetMember().ResignEtcdLeader(s.server.Context(), s.server.Name(), name)
}

// ResignLeader resigns the leader of the server.
func (s *TestServer) ResignLeader() error {
s.Lock()
Expand Down
25 changes: 25 additions & 0 deletions tests/server/member/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,31 @@ func TestMoveLeader(t *testing.T) {
}
}

func TestCampaignLeaderFrequently(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 5)
defer cluster.Destroy()
re.NoError(err)

err = cluster.RunInitialServers()
re.NoError(err)
cluster.WaitLeader()
leader := cluster.GetLeader()
re.NotEmpty(cluster.GetLeader())

for i := 0; i < 6; i++ {
err := cluster.GetServers()[cluster.GetLeader()].TransferLeader(cluster.GetLeader())
re.NoError(err)
cluster.WaitLeader()
}
// leader should be changed when campaign leader frequently
cluster.WaitLeader()
re.NotEmpty(cluster.GetLeader())
re.NotEqual(leader, cluster.GetLeader())
}

func TestGetLeader(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down

0 comments on commit 05e25f3

Please sign in to comment.