Skip to content

Commit

Permalink
fix(consensus): improve consensus alghorithm (#1329)
Browse files Browse the repository at this point in the history
  • Loading branch information
b00f authored Jun 9, 2024
1 parent 1e92b24 commit d419154
Show file tree
Hide file tree
Showing 54 changed files with 464 additions and 236 deletions.
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ linters-settings:
- name: "get-return"
disabled: true

- name: "confusing-naming"
disabled: true

- name: "function-result-limit"
disabled: true

Expand Down
6 changes: 3 additions & 3 deletions committee/committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,18 @@ func (c *committee) find(addr crypto.Address) *validator.Validator {

// IsProposer checks if the given address is the proposer for the specified round.
func (c *committee) IsProposer(addr crypto.Address, round int16) bool {
p := c.getProposer(round)
p := c.proposer(round)

return p.Address() == addr
}

// Proposer returns an instance of the proposer validator for the specified round.
// A cloned instance of the proposer is returned to avoid modification of the original object.
func (c *committee) Proposer(round int16) *validator.Validator {
return c.getProposer(round).Clone()
return c.proposer(round).Clone()
}

func (c *committee) getProposer(round int16) *validator.Validator {
func (c *committee) proposer(round int16) *validator.Validator {
pos := c.proposerPos
for i := 0; i < int(round); i++ {
pos = pos.Next
Expand Down
11 changes: 6 additions & 5 deletions consensus/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import "time"
type Config struct {
ChangeProposerTimeout time.Duration `toml:"-"`
ChangeProposerDelta time.Duration `toml:"-"`
QueryVoteTimeout time.Duration `toml:"-"`
MinimumAvailabilityScore float64 `toml:"-"`
}

func DefaultConfig() *Config {
return &Config{
ChangeProposerTimeout: 8 * time.Second,
ChangeProposerDelta: 4 * time.Second,
ChangeProposerTimeout: 5 * time.Second,
ChangeProposerDelta: 5 * time.Second,
QueryVoteTimeout: 5 * time.Second,
MinimumAvailabilityScore: 0.9,
}
}
Expand All @@ -38,7 +40,6 @@ func (conf *Config) BasicCheck() error {
}

func (conf *Config) CalculateChangeProposerTimeout(round int16) time.Duration {
return time.Duration(
conf.ChangeProposerTimeout.Milliseconds()+conf.ChangeProposerDelta.Milliseconds()*int64(round),
) * time.Millisecond
return conf.ChangeProposerTimeout +
conf.ChangeProposerDelta*time.Duration(round)
}
90 changes: 59 additions & 31 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,7 @@ func (cs *consensus) Start() {
cs.lk.Lock()
defer cs.lk.Unlock()

cs.doMoveToNewHeight()
if cs.active {
cs.queryProposal()
cs.queryVotes()
}
cs.moveToNewHeight()
}

func (cs *consensus) String() string {
Expand Down Expand Up @@ -184,10 +180,10 @@ func (cs *consensus) MoveToNewHeight() {
cs.lk.Lock()
defer cs.lk.Unlock()

cs.doMoveToNewHeight()
cs.moveToNewHeight()
}

func (cs *consensus) doMoveToNewHeight() {
func (cs *consensus) moveToNewHeight() {
stateHeight := cs.bcState.LastBlockHeight()
if cs.height != stateHeight+1 {
cs.enterNewState(cs.newHeightState)
Expand All @@ -205,6 +201,23 @@ func (cs *consensus) scheduleTimeout(duration time.Duration, height uint32, roun
}()
}

func (cs *consensus) handleTimeout(t *ticker) {
cs.lk.Lock()
defer cs.lk.Unlock()

cs.logger.Trace("handle ticker", "ticker", t)

// Old tickers might be triggered now. Ignore them.
if cs.height != t.Height || cs.round != t.Round {
cs.logger.Trace("stale ticker", "ticker", t)

return
}

cs.logger.Debug("timer expired", "ticker", t)
cs.currentState.onTimeout(t)
}

func (cs *consensus) SetProposal(p *proposal.Proposal) {
cs.lk.Lock()
defer cs.lk.Unlock()
Expand All @@ -222,7 +235,13 @@ func (cs *consensus) SetProposal(p *proposal.Proposal) {
}

if p.Round() < cs.round {
cs.logger.Trace("expired round", "proposal", p)
cs.logger.Trace("proposal for expired round", "proposal", p)

return
}

if err := p.BasicCheck(); err != nil {
cs.logger.Warn("invalid proposal", "proposal", p, "error", err)

return
}
Expand Down Expand Up @@ -266,23 +285,6 @@ func (cs *consensus) SetProposal(p *proposal.Proposal) {
cs.currentState.onSetProposal(p)
}

func (cs *consensus) handleTimeout(t *ticker) {
cs.lk.Lock()
defer cs.lk.Unlock()

cs.logger.Trace("handle ticker", "ticker", t)

// Old tickers might be triggered now. Ignore them.
if cs.height != t.Height || cs.round != t.Round {
cs.logger.Trace("stale ticker", "ticker", t)

return
}

cs.logger.Debug("timer expired", "ticker", t)
cs.currentState.onTimeout(t)
}

func (cs *consensus) AddVote(v *vote.Vote) {
cs.lk.Lock()
defer cs.lk.Unlock()
Expand All @@ -299,6 +301,12 @@ func (cs *consensus) AddVote(v *vote.Vote) {
return
}

if v.Round() < cs.round {
cs.logger.Trace("vote for expired round", "vote", v)

return
}

if v.Type() == vote.VoteTypeCPPreVote ||
v.Type() == vote.VoteTypeCPMainVote ||
v.Type() == vote.VoteTypeCPDecided {
Expand All @@ -318,13 +326,26 @@ func (cs *consensus) AddVote(v *vote.Vote) {
cs.logger.Info("new vote added", "vote", v)

cs.currentState.onAddVote(v)

if v.Type() == vote.VoteTypeCPDecided {
if v.Round() > cs.round {
cs.changeProposer.cpDecide(v.Round(), v.CPValue())
}
}
}
}

func (cs *consensus) proposer(round int16) *validator.Validator {
return cs.bcState.Proposer(round)
}

func (cs *consensus) IsProposer() bool {
cs.lk.RLock()
defer cs.lk.RUnlock()

return cs.isProposer()
}

func (cs *consensus) isProposer() bool {
return cs.proposer(cs.round).Address() == cs.valKey.Address()
}
Expand Down Expand Up @@ -377,7 +398,7 @@ func (cs *consensus) signAddVote(v *vote.Vote) {

func (cs *consensus) queryProposal() {
cs.broadcaster(cs.valKey.Address(),
message.NewQueryProposalMessage(cs.height, cs.valKey.Address()))
message.NewQueryProposalMessage(cs.height, cs.round, cs.valKey.Address()))
}

// queryVotes is an anti-entropy mechanism to retrieve missed votes
Expand Down Expand Up @@ -462,14 +483,21 @@ func (cs *consensus) PickRandomVote(round int16) *vote.Vote {
defer cs.lk.RUnlock()

votes := []*vote.Vote{}
if round == cs.round {
switch {
case round < cs.round:
// Past round: Only broadcast cp:decided votes
vs := cs.log.CPDecidedVoteSet(round)
votes = append(votes, vs.AllVotes()...)

case round == cs.round:
// Current round
m := cs.log.RoundMessages(round)
votes = append(votes, m.AllVotes()...)
} else {
// Only broadcast cp:decided votes
vs := cs.log.CPDecidedVoteVoteSet(round)
votes = append(votes, vs.AllVotes()...)

case round > cs.round:
// Future round
}

if len(votes) == 0 {
return nil
}
Expand Down
32 changes: 23 additions & 9 deletions consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ func testConfig() *Config {
return &Config{
ChangeProposerTimeout: 1 * time.Hour, // Disabling timers
ChangeProposerDelta: 1 * time.Hour, // Disabling timers
QueryVoteTimeout: 1 * time.Hour, // Disabling timers
}
}

func setup(t *testing.T) *testData {
t.Helper()
queryVoteInitialTimeout = 2 * time.Hour

return setupWithSeed(t, testsuite.GenerateSeed())
}
Expand All @@ -89,8 +89,10 @@ func setupWithSeed(t *testing.T, seed int64) *testData {
params := param.DefaultParams()
params.CommitteeSize = 4

// to prevent triggering timers before starting the tests to avoid double entries for new heights in some tests.
getTime := util.RoundNow(params.BlockIntervalInSecond).Add(time.Duration(params.BlockIntervalInSecond) * time.Second)
// To prevent triggering timers before starting the tests and
// avoid double entries for new heights in some tests.
getTime := util.RoundNow(params.BlockIntervalInSecond).
Add(time.Duration(params.BlockIntervalInSecond) * time.Second)
genDoc := genesis.MakeGenesis(getTime, accs, vals, params)
stX, err := state.LoadOrNewState(genDoc, []*bls.ValidatorKey{valKeys[tIndexX]},
store.MockingStore(ts), txPool, nil)
Expand Down Expand Up @@ -401,8 +403,7 @@ func TestStart(t *testing.T) {
td := setup(t)

td.consX.Start()
td.shouldPublishQueryProposal(t, td.consX, 1)
td.shouldPublishQueryVote(t, td.consX, 1, 0)
td.checkHeightRound(t, td.consX, 1, 0)
}

func TestNotInCommittee(t *testing.T) {
Expand Down Expand Up @@ -474,14 +475,14 @@ func TestConsensusAddVote(t *testing.T) {
v6, _ := td.GenerateTestPrepareVote(1, 0)
td.consP.AddVote(v6)

assert.True(t, td.consP.HasVote(v1.Hash())) // previous round
assert.True(t, td.consP.HasVote(v2.Hash())) // next round
assert.False(t, td.consP.HasVote(v1.Hash())) // previous round
assert.True(t, td.consP.HasVote(v2.Hash())) // next round
assert.True(t, td.consP.HasVote(v3.Hash()))
assert.True(t, td.consP.HasVote(v4.Hash()))
assert.False(t, td.consP.HasVote(v5.Hash())) // valid votes for the next height
assert.False(t, td.consP.HasVote(v6.Hash())) // invalid votes

assert.Equal(t, td.consP.AllVotes(), []*vote.Vote{v1, v3, v4})
assert.Equal(t, td.consP.AllVotes(), []*vote.Vote{v3, v4})
assert.NotContains(t, td.consP.AllVotes(), v2)
}

Expand Down Expand Up @@ -604,6 +605,9 @@ func TestPickRandomVote(t *testing.T) {

rndVote1 := td.consP.PickRandomVote(1)
assert.Equal(t, rndVote1.Type(), vote.VoteTypePrepare)

rndVote2 := td.consP.PickRandomVote(2)
assert.Nil(t, rndVote2)
}

func TestSetProposalFromPreviousRound(t *testing.T) {
Expand Down Expand Up @@ -724,7 +728,17 @@ func TestProposalWithBigRound(t *testing.T) {

p := td.makeProposal(t, 1, util.MaxInt16)
td.consP.SetProposal(p)
assert.Equal(t, td.consP.log.RoundProposal(util.MaxInt16), p)
assert.Nil(t, td.consP.Proposal())
}

func TestInvalidProposal(t *testing.T) {
td := setup(t)

td.enterNewHeight(td.consP)

p := td.makeProposal(t, 1, 0)
p.SetSignature(nil) // Make proposal invalid
td.consP.SetProposal(p)
assert.Nil(t, td.consP.Proposal())
}

Expand Down
17 changes: 7 additions & 10 deletions consensus/cp.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,25 +310,22 @@ func (cp *changeProposer) checkJust(v *vote.Vote) error {
}
}

func (cp *changeProposer) strongTermination() {
cpDecided := cp.log.CPDecidedVoteVoteSet(cp.round)
func (cp *changeProposer) cpStrongTermination() {
cpDecided := cp.log.CPDecidedVoteSet(cp.round)
if cpDecided.HasAnyVoteFor(cp.cpRound, vote.CPValueNo) {
cp.cpDecide(vote.CPValueNo)
cp.cpDecide(cp.round, vote.CPValueNo)
} else if cpDecided.HasAnyVoteFor(cp.cpRound, vote.CPValueYes) {
cp.cpDecide(vote.CPValueYes)
cp.cpDecide(cp.round, vote.CPValueYes)
}
}

func (cp *changeProposer) cpDecide(cpValue vote.CPValue) {
func (cp *changeProposer) cpDecide(round int16, cpValue vote.CPValue) {
if cpValue == vote.CPValueYes {
cp.round++
cp.round = round + 1
cp.cpDecided = 1
cp.enterNewState(cp.proposeState)
} else if cpValue == vote.CPValueNo {
roundProposal := cp.log.RoundProposal(cp.round)
if roundProposal == nil {
cp.queryProposal()
}
cp.round = round
cp.cpDecided = 0
cp.enterNewState(cp.prepareState)
}
Expand Down
6 changes: 3 additions & 3 deletions consensus/cp_decide.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (s *cpDecideState) decide() {
QCert: cert,
}
s.signAddCPDecidedVote(hash.UndefHash, s.cpRound, vote.CPValueYes, just)
s.cpDecide(vote.CPValueYes)
s.cpDecide(s.round, vote.CPValueYes)
} else if cpMainVotes.HasQuorumVotesFor(s.cpRound, vote.CPValueNo) {
// decided for no and proceeds to the next round
s.logger.Info("binary agreement decided", "value", 0, "round", s.cpRound)
Expand All @@ -37,7 +37,7 @@ func (s *cpDecideState) decide() {
QCert: cert,
}
s.signAddCPDecidedVote(*s.cpWeakValidity, s.cpRound, vote.CPValueNo, just)
s.cpDecide(vote.CPValueNo)
s.cpDecide(s.round, vote.CPValueNo)
} else {
// conflicting votes
s.logger.Debug("conflicting main votes", "round", s.cpRound)
Expand All @@ -52,7 +52,7 @@ func (s *cpDecideState) onAddVote(v *vote.Vote) {
s.decide()
}

s.strongTermination()
s.cpStrongTermination()
}

func (*cpDecideState) name() string {
Expand Down
2 changes: 1 addition & 1 deletion consensus/cp_mainvote.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *cpMainVoteState) onAddVote(v *vote.Vote) {
s.decide()
}

s.strongTermination()
s.cpStrongTermination()
}

func (*cpMainVoteState) name() string {
Expand Down
Loading

0 comments on commit d419154

Please sign in to comment.