Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

member: avoid frequent campaign times (#7301) #7790

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ type Client interface {
LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]GlobalConfigItem, int64, error)
// StoreGlobalConfig set the config from etcd
StoreGlobalConfig(ctx context.Context, configPath string, items []GlobalConfigItem) error
// WatchGlobalConfig returns an stream with all global config and updates
// WatchGlobalConfig returns a stream with all global config and updates
WatchGlobalConfig(ctx context.Context, configPath string, revision int64) (chan []GlobalConfigItem, error)
// UpdateOption updates the client option.
UpdateOption(option DynamicOption, value interface{}) error
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,11 @@ error = '''
cannot set invalid configuration
'''

["PD:server:ErrLeaderFrequentlyChange"]
error = '''
leader %s frequently changed, leader-key is [%s]
'''

["PD:server:ErrLeaderNil"]
error = '''
leader is nil
Expand Down
63 changes: 58 additions & 5 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
Expand All @@ -32,7 +33,11 @@ import (
"go.uber.org/zap"
)

const watchLoopUnhealthyTimeout = 60 * time.Second
const (
defaultCampaignTimesSlot = 10
watchLoopUnhealthyTimeout = 60 * time.Second
campaignTimesRecordTimeout = 5 * time.Minute
)

// GetLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func GetLeader(c *clientv3.Client, leaderPath string) (*pdpb.Member, int64, error) {
Expand Down Expand Up @@ -62,20 +67,24 @@ type Leadership struct {
keepAliveCtx context.Context
keepAliveCancelFunc context.CancelFunc
keepAliveCancelFuncLock syncutil.Mutex
// campaignTimes is used to record the campaign times of the leader within `campaignTimesRecordTimeout`.
// It is ordered by time to prevent the leader from campaigning too frequently.
campaignTimes []time.Time
}

// NewLeadership creates a new Leadership.
func NewLeadership(client *clientv3.Client, leaderKey, purpose string) *Leadership {
leadership := &Leadership{
purpose: purpose,
client: client,
leaderKey: leaderKey,
purpose: purpose,
client: client,
leaderKey: leaderKey,
campaignTimes: make([]time.Time, 0, defaultCampaignTimesSlot),
}
return leadership
}

// getLease gets the lease of leadership, only if leadership is valid,
// i.e the owner is a true leader, the lease is not nil.
// i.e. the owner is a true leader, the lease is not nil.
func (ls *Leadership) getLease() *lease {
l := ls.lease.Load()
if l == nil {
Expand Down Expand Up @@ -104,8 +113,42 @@ func (ls *Leadership) GetLeaderKey() string {
return ls.leaderKey
}

// GetCampaignTimesNum is used to get the campaign times of the leader within `campaignTimesRecordTimeout`.
func (ls *Leadership) GetCampaignTimesNum() int {
if ls == nil {
return 0
}
return len(ls.campaignTimes)
}

// ResetCampaignTimes is used to reset the campaign times of the leader.
func (ls *Leadership) ResetCampaignTimes() {
if ls == nil {
return
}
ls.campaignTimes = make([]time.Time, 0, defaultCampaignTimesSlot)
}

// addCampaignTimes is used to add the campaign times of the leader.
func (ls *Leadership) addCampaignTimes() {
if ls == nil {
return
}
for i := len(ls.campaignTimes) - 1; i >= 0; i-- {
if time.Since(ls.campaignTimes[i]) > campaignTimesRecordTimeout {
// remove the time which is more than `campaignTimesRecordTimeout`
// array is sorted by time
ls.campaignTimes = ls.campaignTimes[i:]
break
}
}

ls.campaignTimes = append(ls.campaignTimes, time.Now())
}

// Campaign is used to campaign the leader with given lease and returns a leadership
func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...clientv3.Cmp) error {
ls.addCampaignTimes()
ls.leaderValue = leaderData
// Create a new lease to campaign
newLease := &lease{
Expand All @@ -114,6 +157,16 @@ func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...cl
lease: clientv3.NewLease(ls.client),
}
ls.setLease(newLease)

failpoint.Inject("skipGrantLeader", func(val failpoint.Value) {
var member pdpb.Member
member.Unmarshal([]byte(leaderData))
name, ok := val.(string)
if ok && member.Name == name {
failpoint.Return(errors.Errorf("failed to grant lease"))
}
})

if err := newLease.Grant(leaseTimeout); err != nil {
return err
}
Expand Down
17 changes: 9 additions & 8 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,15 @@ var (

// server errors
var (
ErrServiceRegistered = errors.Normalize("service with path [%s] already registered", errors.RFCCodeText("PD:server:ErrServiceRegistered"))
ErrAPIInformationInvalid = errors.Normalize("invalid api information, group %s version %s", errors.RFCCodeText("PD:server:ErrAPIInformationInvalid"))
ErrClientURLEmpty = errors.Normalize("client url empty", errors.RFCCodeText("PD:server:ErrClientEmpty"))
ErrLeaderNil = errors.Normalize("leader is nil", errors.RFCCodeText("PD:server:ErrLeaderNil"))
ErrCancelStartEtcd = errors.Normalize("etcd start canceled", errors.RFCCodeText("PD:server:ErrCancelStartEtcd"))
ErrConfigItem = errors.Normalize("cannot set invalid configuration", errors.RFCCodeText("PD:server:ErrConfiguration"))
ErrServerNotStarted = errors.Normalize("server not started", errors.RFCCodeText("PD:server:ErrServerNotStarted"))
ErrRateLimitExceeded = errors.Normalize("rate limit exceeded", errors.RFCCodeText("PD:server:ErrRateLimitExceeded"))
ErrServiceRegistered = errors.Normalize("service with path [%s] already registered", errors.RFCCodeText("PD:server:ErrServiceRegistered"))
ErrAPIInformationInvalid = errors.Normalize("invalid api information, group %s version %s", errors.RFCCodeText("PD:server:ErrAPIInformationInvalid"))
ErrClientURLEmpty = errors.Normalize("client url empty", errors.RFCCodeText("PD:server:ErrClientEmpty"))
ErrLeaderNil = errors.Normalize("leader is nil", errors.RFCCodeText("PD:server:ErrLeaderNil"))
ErrCancelStartEtcd = errors.Normalize("etcd start canceled", errors.RFCCodeText("PD:server:ErrCancelStartEtcd"))
ErrConfigItem = errors.Normalize("cannot set invalid configuration", errors.RFCCodeText("PD:server:ErrConfiguration"))
ErrServerNotStarted = errors.Normalize("server not started", errors.RFCCodeText("PD:server:ErrServerNotStarted"))
ErrRateLimitExceeded = errors.Normalize("rate limit exceeded", errors.RFCCodeText("PD:server:ErrRateLimitExceeded"))
ErrLeaderFrequentlyChange = errors.Normalize("leader %s frequently changed, leader-key is [%s]", errors.RFCCodeText("PD:server:ErrLeaderFrequentlyChange"))
)

// logutil errors
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (s *Server) primaryElectionLoop() {

func (s *Server) campaignLeader() {
log.Info("start to campaign the primary/leader", zap.String("campaign-resource-manager-primary-name", s.participant.Name()))
if err := s.participant.CampaignLeader(s.cfg.LeaderLease); err != nil {
if err := s.participant.CampaignLeader(s.Context(), s.cfg.LeaderLease); err != nil {
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info("campaign resource manager primary meets error due to txn conflict, another server may campaign successfully",
zap.String("campaign-resource-manager-primary-name", s.participant.Name()))
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (s *Server) primaryElectionLoop() {

func (s *Server) campaignLeader() {
log.Info("start to campaign the primary/leader", zap.String("campaign-scheduling-primary-name", s.participant.Name()))
if err := s.participant.CampaignLeader(s.cfg.LeaderLease); err != nil {
if err := s.participant.CampaignLeader(s.Context(), s.cfg.LeaderLease); err != nil {
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info("campaign scheduling primary meets error due to txn conflict, another server may campaign successfully",
zap.String("campaign-scheduling-primary-name", s.participant.Name()))
Expand Down
18 changes: 17 additions & 1 deletion pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/election"
Expand All @@ -42,6 +43,8 @@ const (
// The timeout to wait transfer etcd leader to complete.
moveLeaderTimeout = 5 * time.Second
dcLocationConfigEtcdPrefix = "dc-location"
// If the campaign times is more than this value in `campaignTimesRecordTimeout`, the PD will resign and campaign again.
campaignLeaderFrequencyTimes = 3
)

// EmbeddedEtcdMember is used for the election related logic. It implements Member interface.
Expand Down Expand Up @@ -177,7 +180,20 @@ func (m *EmbeddedEtcdMember) GetLastLeaderUpdatedTime() time.Time {

// CampaignLeader is used to campaign a PD member's leadership
// and make it become a PD leader.
func (m *EmbeddedEtcdMember) CampaignLeader(leaseTimeout int64) error {
// leader should be changed when campaign leader frequently.
func (m *EmbeddedEtcdMember) CampaignLeader(ctx context.Context, leaseTimeout int64) error {
failpoint.Inject("skipCampaignLeaderCheck", func() {
failpoint.Return(m.leadership.Campaign(leaseTimeout, m.MemberValue()))
})
if m.leadership.GetCampaignTimesNum() >= campaignLeaderFrequencyTimes {
log.Warn("campaign times is too frequent, resign and campaign again",
zap.String("leader-name", m.Name()), zap.String("leader-key", m.GetLeaderPath()))
if err := m.ResignEtcdLeader(ctx, m.Name(), ""); err != nil {
return err
}
m.leadership.ResetCampaignTimes()
return errs.ErrLeaderFrequentlyChange.FastGenByArgs(m.Name(), m.GetLeaderPath())
}
return m.leadership.Campaign(leaseTimeout, m.MemberValue())
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/member/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (m *Participant) GetLeadership() *election.Leadership {
}

// CampaignLeader is used to campaign the leadership and make it become a leader.
func (m *Participant) CampaignLeader(leaseTimeout int64) error {
func (m *Participant) CampaignLeader(_ context.Context, leaseTimeout int64) error {
if !m.campaignCheck() {
return errs.ErrCheckCampaign
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ func (info *DCLocationInfo) clone() DCLocationInfo {
type ElectionMember interface {
// ID returns the unique ID in the election group. For example, it can be unique
// server id of a cluster or the unique keyspace group replica id of the election
// group comprised of the replicas of a keyspace group.
// group composed of the replicas of a keyspace group.
ID() uint64
// ID returns the unique name in the election group.
// Name returns the unique name in the election group.
Name() string
// MemberValue returns the member value.
MemberValue() string
// GetMember() returns the current member
// GetMember returns the current member
GetMember() interface{}
// Client returns the etcd client.
Client() *clientv3.Client
Expand All @@ -124,7 +124,7 @@ type ElectionMember interface {
// KeepLeader is used to keep the leader's leadership.
KeepLeader(ctx context.Context)
// CampaignLeader is used to campaign the leadership and make it become a leader in an election group.
CampaignLeader(leaseTimeout int64) error
CampaignLeader(ctx context.Context, leaseTimeout int64) error
// ResetLeader is used to reset the member's current leadership.
// Basically it will reset the leader lease and unset leader info.
ResetLeader()
Expand Down
2 changes: 1 addition & 1 deletion pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ func (gta *GlobalTSOAllocator) campaignLeader() {
log.Info("start to campaign the primary",
logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0),
zap.String("campaign-tso-primary-name", gta.member.Name()))
if err := gta.am.member.CampaignLeader(gta.am.leaderLease); err != nil {
if err := gta.am.member.CampaignLeader(gta.ctx, gta.am.leaderLease); err != nil {
if errors.Is(err, errs.ErrEtcdTxnConflict) {
log.Info("campaign tso primary meets error due to txn conflict, another tso server may campaign successfully",
logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0),
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1640,7 +1640,7 @@ func (s *Server) leaderLoop() {

func (s *Server) campaignLeader() {
log.Info(fmt.Sprintf("start to campaign %s leader", s.mode), zap.String("campaign-leader-name", s.Name()))
if err := s.member.CampaignLeader(s.cfg.LeaderLease); err != nil {
if err := s.member.CampaignLeader(s.ctx, s.cfg.LeaderLease); err != nil {
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info(fmt.Sprintf("campaign %s leader meets error due to txn conflict, another PD/API server may campaign successfully", s.mode),
zap.String("campaign-leader-name", s.Name()))
Expand Down
7 changes: 7 additions & 0 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ func (s *TestServer) Destroy() error {
return nil
}

// ResetPDLeader resigns the leader of the server.
func (s *TestServer) ResetPDLeader() {
s.Lock()
defer s.Unlock()
s.server.GetMember().ResetLeader()
}

// ResignLeader resigns the leader of the server.
func (s *TestServer) ResignLeader() error {
s.Lock()
Expand Down
4 changes: 4 additions & 0 deletions tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ func TestLeaderTransfer(t *testing.T) {
cluster, err := tests.NewTestCluster(ctx, 2)
re.NoError(err)
defer cluster.Destroy()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/member/skipCampaignLeaderCheck", "return(true)"))
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/member/skipCampaignLeaderCheck"))
}()

endpoints := runServer(re, cluster)
cli := setupCli(re, ctx, endpoints)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (suite *resourceManagerClientTestSuite) SetupSuite() {
re := suite.Require()

re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/resourcemanager/server/enableDegradedMode", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/member/skipCampaignLeaderCheck", "return(true)"))

suite.ctx, suite.clean = context.WithCancel(context.Background())

Expand Down Expand Up @@ -148,6 +149,7 @@ func (suite *resourceManagerClientTestSuite) TearDownSuite() {
suite.cluster.Destroy()
suite.clean()
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/resourcemanager/server/enableDegradedMode"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/member/skipCampaignLeaderCheck"))
}

func (suite *resourceManagerClientTestSuite) TearDownTest() {
Expand Down
5 changes: 5 additions & 0 deletions tests/integrations/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,11 @@ func (suite *APIServerForwardTestSuite) TestResignTSOPrimaryForward() {
defer tc.Destroy()
tc.WaitForDefaultPrimaryServing(re)

re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/member/skipCampaignLeaderCheck", "return(true)"))
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/member/skipCampaignLeaderCheck"))
}()

for j := 0; j < 10; j++ {
tc.ResignPrimary(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID)
tc.WaitForDefaultPrimaryServing(re)
Expand Down
10 changes: 9 additions & 1 deletion tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ func (suite *tsoClientTestSuite) TestUpdateAfterResetTSO() {
re := suite.Require()
ctx, cancel := context.WithCancel(suite.ctx)
defer cancel()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/member/skipCampaignLeaderCheck", "return(true)"))
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/member/skipCampaignLeaderCheck"))
}()

for i := 0; i < len(suite.clients); i++ {
client := suite.clients[i]
Expand Down Expand Up @@ -336,6 +340,11 @@ func (suite *tsoClientTestSuite) TestUpdateAfterResetTSO() {
func (suite *tsoClientTestSuite) TestRandomResignLeader() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/member/skipCampaignLeaderCheck", "return(true)"))
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/member/skipCampaignLeaderCheck"))
}()

parallelAct := func() {
// After https://github.com/tikv/pd/issues/6376 is fixed, we can use a smaller number here.
Expand Down Expand Up @@ -373,7 +382,6 @@ func (suite *tsoClientTestSuite) TestRandomResignLeader() {
}

mcs.CheckMultiKeyspacesTSO(suite.ctx, re, suite.clients, parallelAct)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval"))
}

func (suite *tsoClientTestSuite) TestRandomShutdown() {
Expand Down
Loading
Loading