Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

member: avoid frequent campaign times #7301

Merged
merged 5 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ type Client interface {
LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]GlobalConfigItem, int64, error)
// StoreGlobalConfig set the config from etcd
StoreGlobalConfig(ctx context.Context, configPath string, items []GlobalConfigItem) error
// WatchGlobalConfig returns an stream with all global config and updates
// WatchGlobalConfig returns a stream with all global config and updates
WatchGlobalConfig(ctx context.Context, configPath string, revision int64) (chan []GlobalConfigItem, error)
// UpdateOption updates the client option.
UpdateOption(option DynamicOption, value interface{}) error
Expand Down
32 changes: 27 additions & 5 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
"go.uber.org/zap"
)

const watchLoopUnhealthyTimeout = 60 * time.Second
const (
watchLoopUnhealthyTimeout = 60 * time.Second
campaignTimesRecordTimeout = 5 * time.Minute
)

// GetLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func GetLeader(c *clientv3.Client, leaderPath string) (*pdpb.Member, int64, error) {
Expand Down Expand Up @@ -62,20 +65,24 @@
keepAliveCtx context.Context
keepAliveCancelFunc context.CancelFunc
keepAliveCancelFuncLock syncutil.Mutex
// CampaignTimes is used to record the campaign times of the leader within `campaignTimesRecordTimeout`.
// It is ordered by time to prevent the leader from campaigning too frequently.
CampaignTimes []time.Time
}

// NewLeadership creates a new Leadership.
func NewLeadership(client *clientv3.Client, leaderKey, purpose string) *Leadership {
leadership := &Leadership{
purpose: purpose,
client: client,
leaderKey: leaderKey,
purpose: purpose,
client: client,
leaderKey: leaderKey,
CampaignTimes: make([]time.Time, 0, 10),
}
return leadership
}

// getLease gets the lease of leadership, only if leadership is valid,
// i.e the owner is a true leader, the lease is not nil.
// i.e. the owner is a true leader, the lease is not nil.
func (ls *Leadership) getLease() *lease {
l := ls.lease.Load()
if l == nil {
Expand Down Expand Up @@ -104,8 +111,23 @@
return ls.leaderKey
}

// addCampaignTimes is used to add the campaign times of the leader.
func (ls *Leadership) addCampaignTimes() {
for i := len(ls.CampaignTimes) - 1; i >= 0; i-- {
if time.Since(ls.CampaignTimes[i]) > campaignTimesRecordTimeout {
// remove the time which is more than `campaignTimesRecordTimeout`
// array is sorted by time
ls.CampaignTimes = ls.CampaignTimes[i:]
break

Check warning on line 121 in pkg/election/leadership.go

View check run for this annotation

Codecov / codecov/patch

pkg/election/leadership.go#L120-L121

Added lines #L120 - L121 were not covered by tests
}
}

ls.CampaignTimes = append(ls.CampaignTimes, time.Now())
}

// Campaign is used to campaign the leader with given lease and returns a leadership
func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...clientv3.Cmp) error {
ls.addCampaignTimes()
ls.leaderValue = leaderData
// Create a new lease to campaign
newLease := &lease{
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (s *Server) primaryElectionLoop() {

func (s *Server) campaignLeader() {
log.Info("start to campaign the primary/leader", zap.String("campaign-resource-manager-primary-name", s.participant.Name()))
if err := s.participant.CampaignLeader(s.cfg.LeaderLease); err != nil {
if err := s.participant.CampaignLeader(s.Context(), s.cfg.LeaderLease); err != nil {
Copy link
Member Author

@HuSharp HuSharp Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, pass context to make sure resign can be used

if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info("campaign resource manager primary meets error due to txn conflict, another server may campaign successfully",
zap.String("campaign-resource-manager-primary-name", s.participant.Name()))
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (s *Server) primaryElectionLoop() {

func (s *Server) campaignLeader() {
log.Info("start to campaign the primary/leader", zap.String("campaign-scheduling-primary-name", s.participant.Name()))
if err := s.participant.CampaignLeader(s.cfg.LeaderLease); err != nil {
if err := s.participant.CampaignLeader(s.Context(), s.cfg.LeaderLease); err != nil {
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info("campaign scheduling primary meets error due to txn conflict, another server may campaign successfully",
zap.String("campaign-scheduling-primary-name", s.participant.Name()))
Expand Down
12 changes: 11 additions & 1 deletion pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ const (
// The timeout to wait transfer etcd leader to complete.
moveLeaderTimeout = 5 * time.Second
dcLocationConfigEtcdPrefix = "dc-location"
// If the campaign times is more than this value in `campaignTimesRecordTimeout`, the PD will resign and campaign again.
campaignLeaderFrequencyTimes = 3
)

// EmbeddedEtcdMember is used for the election related logic. It implements Member interface.
Expand Down Expand Up @@ -177,7 +179,15 @@ func (m *EmbeddedEtcdMember) GetLastLeaderUpdatedTime() time.Time {

// CampaignLeader is used to campaign a PD member's leadership
// and make it become a PD leader.
func (m *EmbeddedEtcdMember) CampaignLeader(leaseTimeout int64) error {
// leader should be changed when campaign leader frequently.
func (m *EmbeddedEtcdMember) CampaignLeader(ctx context.Context, leaseTimeout int64) error {
if len(m.leadership.CampaignTimes) >= campaignLeaderFrequencyTimes {
log.Warn("campaign times is too frequent, resign and campaign again",
zap.String("leader-name", m.Name()), zap.String("leader-key", m.GetLeaderPath()))
// remove all campaign times
m.leadership.CampaignTimes = nil
return m.ResignEtcdLeader(ctx, m.Name(), "")
}
return m.leadership.Campaign(leaseTimeout, m.MemberValue())
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/member/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (m *Participant) GetLeadership() *election.Leadership {
}

// CampaignLeader is used to campaign the leadership and make it become a leader.
func (m *Participant) CampaignLeader(leaseTimeout int64) error {
func (m *Participant) CampaignLeader(_ context.Context, leaseTimeout int64) error {
if !m.campaignCheck() {
return errs.ErrCheckCampaign
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ func (info *DCLocationInfo) clone() DCLocationInfo {
type ElectionMember interface {
// ID returns the unique ID in the election group. For example, it can be unique
// server id of a cluster or the unique keyspace group replica id of the election
// group comprised of the replicas of a keyspace group.
// group composed of the replicas of a keyspace group.
ID() uint64
// ID returns the unique name in the election group.
// Name returns the unique name in the election group.
Name() string
// MemberValue returns the member value.
MemberValue() string
// GetMember() returns the current member
// GetMember returns the current member
GetMember() interface{}
// Client returns the etcd client.
Client() *clientv3.Client
Expand All @@ -124,7 +124,7 @@ type ElectionMember interface {
// KeepLeader is used to keep the leader's leadership.
KeepLeader(ctx context.Context)
// CampaignLeader is used to campaign the leadership and make it become a leader in an election group.
CampaignLeader(leaseTimeout int64) error
CampaignLeader(ctx context.Context, leaseTimeout int64) error
// ResetLeader is used to reset the member's current leadership.
// Basically it will reset the leader lease and unset leader info.
ResetLeader()
Expand Down
2 changes: 1 addition & 1 deletion pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ func (gta *GlobalTSOAllocator) campaignLeader() {
log.Info("start to campaign the primary",
logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0),
zap.String("campaign-tso-primary-name", gta.member.Name()))
if err := gta.am.member.CampaignLeader(gta.am.leaderLease); err != nil {
if err := gta.am.member.CampaignLeader(gta.ctx, gta.am.leaderLease); err != nil {
if errors.Is(err, errs.ErrEtcdTxnConflict) {
log.Info("campaign tso primary meets error due to txn conflict, another tso server may campaign successfully",
logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0),
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1636,7 +1636,7 @@ func (s *Server) leaderLoop() {

func (s *Server) campaignLeader() {
log.Info(fmt.Sprintf("start to campaign %s leader", s.mode), zap.String("campaign-leader-name", s.Name()))
if err := s.member.CampaignLeader(s.cfg.LeaderLease); err != nil {
if err := s.member.CampaignLeader(s.ctx, s.cfg.LeaderLease); err != nil {
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info(fmt.Sprintf("campaign %s leader meets error due to txn conflict, another PD/API server may campaign successfully", s.mode),
zap.String("campaign-leader-name", s.Name()))
Expand Down
7 changes: 7 additions & 0 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ func (s *TestServer) Destroy() error {
return nil
}

// ResetPDLeader resigns the leader of the server.
func (s *TestServer) ResetPDLeader() {
s.Lock()
defer s.Unlock()
s.server.GetMember().ResetLeader()
}

// ResignLeader resigns the leader of the server.
func (s *TestServer) ResignLeader() error {
s.Lock()
Expand Down
24 changes: 24 additions & 0 deletions tests/server/member/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,30 @@ func TestMoveLeader(t *testing.T) {
}
}

func TestCampaignLeaderFrequently(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 5)
defer cluster.Destroy()
re.NoError(err)

err = cluster.RunInitialServers()
re.NoError(err)
cluster.WaitLeader()
leader := cluster.GetLeader()
re.NotEmpty(cluster.GetLeader())

for i := 0; i < 3; i++ {
cluster.GetServers()[cluster.GetLeader()].ResetPDLeader()
cluster.WaitLeader()
}
// leader should be changed when campaign leader frequently
cluster.WaitLeader()
re.NotEmpty(cluster.GetLeader())
re.NotEqual(leader, cluster.GetLeader())
}

func TestGetLeader(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down
Loading