Skip to content

Commit

Permalink
fix(consensus): handle query for decided proposal (#1438)
Browse files Browse the repository at this point in the history
  • Loading branch information
b00f authored Jul 26, 2024
1 parent 89938c4 commit b850baa
Show file tree
Hide file tree
Showing 22 changed files with 226 additions and 208 deletions.
77 changes: 53 additions & 24 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,6 @@ func (cs *consensus) HeightRound() (uint32, int16) {
return cs.height, cs.round
}

func (cs *consensus) Proposal() *proposal.Proposal {
cs.lk.RLock()
defer cs.lk.RUnlock()

return cs.log.RoundProposal(cs.round)
}

func (cs *consensus) HasVote(h hash.Hash) bool {
cs.lk.RLock()
defer cs.lk.RUnlock()
Expand Down Expand Up @@ -223,19 +216,15 @@ func (cs *consensus) SetProposal(p *proposal.Proposal) {
defer cs.lk.Unlock()

if !cs.active {
cs.logger.Trace("we are not in the committee")

return
}

if p.Height() != cs.height {
cs.logger.Trace("invalid height", "proposal", p)

return
}

if p.Round() < cs.round {
cs.logger.Trace("proposal for expired round", "proposal", p)
cs.logger.Debug("proposal for expired round", "proposal", p)

return
}
Expand All @@ -258,7 +247,7 @@ func (cs *consensus) SetProposal(p *proposal.Proposal) {
// In this case, we accept the proposal and allow nodes to continue.
// By doing so, we enable the validator to broadcast its votes and
// prevent it from being marked as absent in the block certificate.
cs.logger.Trace("block is committed for this height", "proposal", p)
cs.logger.Warn("block committed before receiving proposal", "proposal", p)
if p.Block().Hash() != cs.bcState.LastBlockHash() {
cs.logger.Warn("proposal is not for the committed block", "proposal", p)

Expand Down Expand Up @@ -290,19 +279,15 @@ func (cs *consensus) AddVote(v *vote.Vote) {
defer cs.lk.Unlock()

if !cs.active {
cs.logger.Trace("we are not in the committee")

return
}

if v.Height() != cs.height {
cs.logger.Trace("vote has invalid height", "vote", v)

return
}

if v.Round() < cs.round {
cs.logger.Trace("vote for expired round", "vote", v)
cs.logger.Debug("vote for expired round", "vote", v)

return
}
Expand Down Expand Up @@ -396,17 +381,16 @@ func (cs *consensus) signAddVote(v *vote.Vote) {
cs.broadcastVote(v)
}

// queryProposal requests any missing proposal from other validators.
func (cs *consensus) queryProposal() {
cs.broadcaster(cs.valKey.Address(),
message.NewQueryProposalMessage(cs.height, cs.round, cs.valKey.Address()))
}

// queryVotes is an anti-entropy mechanism to retrieve missed votes
// when a validator falls behind the network.
// However, invoking this method might result in unnecessary bandwidth usage.
func (cs *consensus) queryVotes() {
// queryVote requests any missing votes from other validators.
func (cs *consensus) queryVote() {
cs.broadcaster(cs.valKey.Address(),
message.NewQueryVotesMessage(cs.height, cs.round, cs.valKey.Address()))
message.NewQueryVoteMessage(cs.height, cs.round, cs.valKey.Address()))
}

func (cs *consensus) broadcastProposal(p *proposal.Proposal) {
Expand Down Expand Up @@ -477,11 +461,56 @@ func (cs *consensus) IsActive() bool {
return cs.active
}

func (cs *consensus) Proposal() *proposal.Proposal {
cs.lk.RLock()
defer cs.lk.RUnlock()

return cs.log.RoundProposal(cs.round)
}

func (cs *consensus) HandleQueryProposal(height uint32, round int16) *proposal.Proposal {
cs.lk.RLock()
defer cs.lk.RUnlock()

if !cs.active {
return nil
}

if height != cs.height {
return nil
}

if round != cs.round {
return nil
}

if cs.isProposer() {
return cs.log.RoundProposal(cs.round)
}

if cs.cpDecided == 0 {
// It is decided not to change the proposer and the proposal is locked.
// Locked proposals can be sent by all validators.
// This helps prevent a situation where the proposer goes offline after proposing the block.
return cs.log.RoundProposal(cs.round)
}

return nil
}

// TODO: Improve the performance?
func (cs *consensus) PickRandomVote(round int16) *vote.Vote {
func (cs *consensus) HandleQueryVote(height uint32, round int16) *vote.Vote {
cs.lk.RLock()
defer cs.lk.RUnlock()

if !cs.active {
return nil
}

if height != cs.height {
return nil
}

votes := []*vote.Vote{}
switch {
case round < cs.round:
Expand Down
118 changes: 91 additions & 27 deletions consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (td *testData) shouldPublishQueryVote(t *testing.T, cons *consensus, height
continue
}

m := consMsg.message.(*message.QueryVotesMessage)
m := consMsg.message.(*message.QueryVoteMessage)
assert.Equal(t, m.Height, height)
assert.Equal(t, m.Round, round)
assert.Equal(t, m.Querier, cons.valKey.Address())
Expand Down Expand Up @@ -422,6 +422,18 @@ func TestNotInCommittee(t *testing.T) {
assert.Equal(t, "new-height", cons.currentState.name())
}

func TestIsProposer(t *testing.T) {
td := setup(t)

td.commitBlockForAllStates(t) // height 1

td.enterNewHeight(td.consX)
td.enterNewHeight(td.consY)

assert.False(t, td.consX.IsProposer())
assert.True(t, td.consY.IsProposer())
}

func TestVoteWithInvalidHeight(t *testing.T) {
td := setup(t)

Expand Down Expand Up @@ -487,7 +499,7 @@ func TestConsensusAddVote(t *testing.T) {
}

// TestConsensusLateProposal tests the scenario where a slow node receives a proposal
// in precommit phase.
// after committing the block.
func TestConsensusLateProposal(t *testing.T) {
td := setup(t)

Expand All @@ -502,6 +514,26 @@ func TestConsensusLateProposal(t *testing.T) {

td.commitBlockForAllStates(t) // height 2

// Partitioned node receives proposal now
td.consP.SetProposal(p)

td.shouldPublishVote(t, td.consP, vote.VoteTypePrepare, p.Block().Hash())
}

// TestSetProposalOnPrepare tests the scenario where a slow node receives a proposal
// in prepare phase.
func TestSetProposalOnPrepare(t *testing.T) {
td := setup(t)

td.commitBlockForAllStates(t) // height 1

td.enterNewHeight(td.consP)

h := uint32(2)
r := int16(0)
p := td.makeProposal(t, h, r)
require.NotNil(t, p)

// The partitioned node receives all the votes first
td.addPrepareVote(td.consP, p.Block().Hash(), h, r, tIndexX)
td.addPrepareVote(td.consP, p.Block().Hash(), h, r, tIndexY)
Expand All @@ -513,9 +545,9 @@ func TestConsensusLateProposal(t *testing.T) {
td.shouldPublishVote(t, td.consP, vote.VoteTypePrecommit, p.Block().Hash())
}

// TestConsensusVeryLateProposal tests the scenario where a slow node receives a proposal
// in prepare phase.
func TestConsensusVeryLateProposal(t *testing.T) {
// TestSetProposalOnPrecommit tests the scenario where a slow node receives a proposal
// in precommit phase.
func TestSetProposalOnPrecommit(t *testing.T) {
td := setup(t)

td.commitBlockForAllStates(t) // height 1
Expand All @@ -527,29 +559,27 @@ func TestConsensusVeryLateProposal(t *testing.T) {
p := td.makeProposal(t, h, r)
require.NotNil(t, p)

td.commitBlockForAllStates(t) // height 2

// The partitioned node receives all the votes first
td.addPrecommitVote(td.consP, p.Block().Hash(), h, r, tIndexX)
td.addPrecommitVote(td.consP, p.Block().Hash(), h, r, tIndexY)
td.addPrecommitVote(td.consP, p.Block().Hash(), h, r, tIndexB)

td.addPrepareVote(td.consP, p.Block().Hash(), h, r, tIndexX)
td.addPrepareVote(td.consP, p.Block().Hash(), h, r, tIndexY)
td.addPrepareVote(td.consP, p.Block().Hash(), h, r, tIndexB)

td.addPrecommitVote(td.consP, p.Block().Hash(), h, r, tIndexX)
td.addPrecommitVote(td.consP, p.Block().Hash(), h, r, tIndexY)
td.addPrecommitVote(td.consP, p.Block().Hash(), h, r, tIndexB)

// Partitioned node receives proposal now
td.consP.SetProposal(p)

td.shouldPublishVote(t, td.consP, vote.VoteTypePrecommit, p.Block().Hash())
td.shouldPublishBlockAnnounce(t, td.consP, p.Block().Hash())
}

func TestPickRandomVote(t *testing.T) {
func TestHandleQueryVote(t *testing.T) {
td := setup(t)

td.enterNewHeight(td.consP)
assert.Nil(t, td.consP.PickRandomVote(0))
assert.Nil(t, td.consP.HandleQueryVote(1, 0))
cpRound := int16(0)

// === make valid certificate
Expand Down Expand Up @@ -594,20 +624,51 @@ func TestPickRandomVote(t *testing.T) {
td.addCPDecidedVote(td.consP, hash.UndefHash, 1, 0, vote.CPValueYes,
&vote.JustDecided{QCert: certMainVote}, tIndexY)

assert.NotNil(t, td.consP.PickRandomVote(0))
assert.NotNil(t, td.consP.HandleQueryVote(1, 0))

// Round 1
td.enterNextRound(td.consP)
td.addPrepareVote(td.consP, td.RandHash(), 1, 1, tIndexY)

rndVote0 := td.consP.PickRandomVote(0)
assert.NotEqual(t, vote.VoteTypePrepare, rndVote0.Type(), "Should not pick prepare votes")
rndVote0 := td.consP.HandleQueryVote(1, 0)
assert.Equal(t, vote.VoteTypeCPDecided, rndVote0.Type(), "should send the decided vote for the previous round")

rndVote1 := td.consP.HandleQueryVote(1, 1)
assert.Equal(t, vote.VoteTypePrepare, rndVote1.Type(), "should send the prepare vote for the current round")

rndVote1 := td.consP.PickRandomVote(1)
assert.Equal(t, vote.VoteTypePrepare, rndVote1.Type())
rndVote2 := td.consP.HandleQueryVote(1, 2)
assert.Nil(t, rndVote2, "should not send a vote for the next round")

rndVote2 := td.consP.PickRandomVote(2)
assert.Nil(t, rndVote2)
rndVote4 := td.consP.HandleQueryVote(2, 0)
assert.Nil(t, rndVote4, "should not have a vote for the next height")
}

func TestHandleQueryProposal(t *testing.T) {
td := setup(t)

td.enterNewHeight(td.consX)
td.enterNewHeight(td.consY)

// Round 1
td.enterNextRound(td.consX)
td.enterNextRound(td.consY) // consY is the proposer

prop0 := td.consY.HandleQueryProposal(1, 0)
assert.Nil(t, prop0, "proposer should not send a proposal for the previous round")

prop1 := td.consX.HandleQueryProposal(1, 1)
assert.Nil(t, prop1, "non-proposer should not send a proposal")

prop2 := td.consY.HandleQueryProposal(1, 1)
assert.NotNil(t, prop2, "proposer should send a proposal")

td.consX.cpDecided = 0
td.consX.SetProposal(td.consY.Proposal())
prop3 := td.consX.HandleQueryProposal(1, 1)
assert.NotNil(t, prop3, "non-proposer should send a proposal on decided proposal")

prop4 := td.consX.HandleQueryProposal(2, 0)
assert.Nil(t, prop4, "should not have a proposal for the next height")
}

func TestSetProposalFromPreviousRound(t *testing.T) {
Expand Down Expand Up @@ -940,13 +1001,16 @@ func checkConsensus(td *testData, height uint32, byzVotes []*vote.Vote) (
}

case message.TypeQueryProposal:
for _, cons := range instances {
p := cons.Proposal()
if p != nil {
td.consMessages = append(td.consMessages, consMessage{
sender: cons.valKey.Address(),
message: message.NewProposalMessage(p),
})
m := rndMsg.message.(*message.QueryProposalMessage)
if m.Height == height {
for _, cons := range instances {
p := cons.HandleQueryProposal(m.Height, m.Round)
if p != nil {
td.consMessages = append(td.consMessages, consMessage{
sender: cons.valKey.Address(),
message: message.NewProposalMessage(p),
})
}
}
}

Expand Down
12 changes: 9 additions & 3 deletions consensus/cp.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ func (*changeProposer) onSetProposal(_ *proposal.Proposal) {
}

func (cp *changeProposer) onTimeout(t *ticker) {
if t.Target == tickerTargetQueryVotes {
cp.queryVotes()
cp.scheduleTimeout(t.Duration*2, cp.height, cp.round, tickerTargetQueryVotes)
if t.Target == tickerTargetQueryVote {
cp.queryVote()
cp.scheduleTimeout(t.Duration*2, cp.height, cp.round, tickerTargetQueryVote)
}
}

Expand Down Expand Up @@ -327,6 +327,12 @@ func (cp *changeProposer) cpDecide(round int16, cpValue vote.CPValue) {
} else if cpValue == vote.CPValueNo {
cp.round = round
cp.cpDecided = 0

roundProposal := cp.log.RoundProposal(cp.round)
if roundProposal == nil {
cp.queryProposal()
}

cp.enterNewState(cp.prepareState)
}
}
2 changes: 1 addition & 1 deletion consensus/cp_prevote.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (s *cpPreVoteState) decide() {
just := &vote.JustInitYes{}
s.signAddCPPreVote(hash.UndefHash, s.cpRound, 1, just)
}
s.scheduleTimeout(s.config.QueryVoteTimeout, s.height, s.round, tickerTargetQueryVotes)
s.scheduleTimeout(s.config.QueryVoteTimeout, s.height, s.round, tickerTargetQueryVote)
} else {
cpMainVotes := s.log.CPMainVoteVoteSet(s.round)
switch {
Expand Down
7 changes: 4 additions & 3 deletions consensus/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
type Reader interface {
ConsensusKey() *bls.PublicKey
AllVotes() []*vote.Vote
PickRandomVote(round int16) *vote.Vote
HandleQueryVote(height uint32, round int16) *vote.Vote
HandleQueryProposal(height uint32, round int16) *proposal.Proposal
Proposal() *proposal.Proposal
HasVote(h hash.Hash) bool
HeightRound() (uint32, int16)
Expand All @@ -29,11 +30,11 @@ type Consensus interface {

type ManagerReader interface {
Instances() []Reader
PickRandomVote(round int16) *vote.Vote
HandleQueryVote(height uint32, round int16) *vote.Vote
HandleQueryProposal(height uint32, round int16) *proposal.Proposal
Proposal() *proposal.Proposal
HeightRound() (uint32, int16)
HasActiveInstance() bool
HasProposer() bool
}

type Manager interface {
Expand Down
Loading

0 comments on commit b850baa

Please sign in to comment.