Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(consensus): handle query for decided proposal #1438

Merged
merged 1 commit into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 53 additions & 24 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,6 @@ func (cs *consensus) HeightRound() (uint32, int16) {
return cs.height, cs.round
}

func (cs *consensus) Proposal() *proposal.Proposal {
cs.lk.RLock()
defer cs.lk.RUnlock()

return cs.log.RoundProposal(cs.round)
}

func (cs *consensus) HasVote(h hash.Hash) bool {
cs.lk.RLock()
defer cs.lk.RUnlock()
Expand Down Expand Up @@ -223,19 +216,15 @@ func (cs *consensus) SetProposal(p *proposal.Proposal) {
defer cs.lk.Unlock()

if !cs.active {
cs.logger.Trace("we are not in the committee")

return
}

if p.Height() != cs.height {
cs.logger.Trace("invalid height", "proposal", p)

return
}

if p.Round() < cs.round {
cs.logger.Trace("proposal for expired round", "proposal", p)
cs.logger.Debug("proposal for expired round", "proposal", p)

return
}
Expand All @@ -258,7 +247,7 @@ func (cs *consensus) SetProposal(p *proposal.Proposal) {
// In this case, we accept the proposal and allow nodes to continue.
// By doing so, we enable the validator to broadcast its votes and
// prevent it from being marked as absent in the block certificate.
cs.logger.Trace("block is committed for this height", "proposal", p)
cs.logger.Warn("block committed before receiving proposal", "proposal", p)
if p.Block().Hash() != cs.bcState.LastBlockHash() {
cs.logger.Warn("proposal is not for the committed block", "proposal", p)

Expand Down Expand Up @@ -290,19 +279,15 @@ func (cs *consensus) AddVote(v *vote.Vote) {
defer cs.lk.Unlock()

if !cs.active {
cs.logger.Trace("we are not in the committee")

return
}

if v.Height() != cs.height {
cs.logger.Trace("vote has invalid height", "vote", v)

return
}

if v.Round() < cs.round {
cs.logger.Trace("vote for expired round", "vote", v)
cs.logger.Debug("vote for expired round", "vote", v)

return
}
Expand Down Expand Up @@ -396,17 +381,16 @@ func (cs *consensus) signAddVote(v *vote.Vote) {
cs.broadcastVote(v)
}

// queryProposal requests any missing proposal from other validators.
func (cs *consensus) queryProposal() {
cs.broadcaster(cs.valKey.Address(),
message.NewQueryProposalMessage(cs.height, cs.round, cs.valKey.Address()))
}

// queryVotes is an anti-entropy mechanism to retrieve missed votes
// when a validator falls behind the network.
// However, invoking this method might result in unnecessary bandwidth usage.
func (cs *consensus) queryVotes() {
// queryVote requests any missing votes from other validators.
func (cs *consensus) queryVote() {
cs.broadcaster(cs.valKey.Address(),
message.NewQueryVotesMessage(cs.height, cs.round, cs.valKey.Address()))
message.NewQueryVoteMessage(cs.height, cs.round, cs.valKey.Address()))
}

func (cs *consensus) broadcastProposal(p *proposal.Proposal) {
Expand Down Expand Up @@ -477,11 +461,56 @@ func (cs *consensus) IsActive() bool {
return cs.active
}

func (cs *consensus) Proposal() *proposal.Proposal {
cs.lk.RLock()
defer cs.lk.RUnlock()

return cs.log.RoundProposal(cs.round)
}

func (cs *consensus) HandleQueryProposal(height uint32, round int16) *proposal.Proposal {
cs.lk.RLock()
defer cs.lk.RUnlock()

if !cs.active {
return nil
}

if height != cs.height {
return nil
}

if round != cs.round {
return nil
}

if cs.isProposer() {
return cs.log.RoundProposal(cs.round)
}

if cs.cpDecided == 0 {
// It is decided not to change the proposer and the proposal is locked.
// Locked proposals can be sent by all validators.
// This helps prevent a situation where the proposer goes offline after proposing the block.
return cs.log.RoundProposal(cs.round)
}

return nil
}

// TODO: Improve the performance?
func (cs *consensus) PickRandomVote(round int16) *vote.Vote {
func (cs *consensus) HandleQueryVote(height uint32, round int16) *vote.Vote {
cs.lk.RLock()
defer cs.lk.RUnlock()

if !cs.active {
return nil
}

if height != cs.height {
return nil
}

votes := []*vote.Vote{}
switch {
case round < cs.round:
Expand Down
118 changes: 91 additions & 27 deletions consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (td *testData) shouldPublishQueryVote(t *testing.T, cons *consensus, height
continue
}

m := consMsg.message.(*message.QueryVotesMessage)
m := consMsg.message.(*message.QueryVoteMessage)
assert.Equal(t, m.Height, height)
assert.Equal(t, m.Round, round)
assert.Equal(t, m.Querier, cons.valKey.Address())
Expand Down Expand Up @@ -422,6 +422,18 @@ func TestNotInCommittee(t *testing.T) {
assert.Equal(t, "new-height", cons.currentState.name())
}

func TestIsProposer(t *testing.T) {
td := setup(t)

td.commitBlockForAllStates(t) // height 1

td.enterNewHeight(td.consX)
td.enterNewHeight(td.consY)

assert.False(t, td.consX.IsProposer())
assert.True(t, td.consY.IsProposer())
}

func TestVoteWithInvalidHeight(t *testing.T) {
td := setup(t)

Expand Down Expand Up @@ -487,7 +499,7 @@ func TestConsensusAddVote(t *testing.T) {
}

// TestConsensusLateProposal tests the scenario where a slow node receives a proposal
// in precommit phase.
// after committing the block.
func TestConsensusLateProposal(t *testing.T) {
td := setup(t)

Expand All @@ -502,6 +514,26 @@ func TestConsensusLateProposal(t *testing.T) {

td.commitBlockForAllStates(t) // height 2

// Partitioned node receives proposal now
td.consP.SetProposal(p)

td.shouldPublishVote(t, td.consP, vote.VoteTypePrepare, p.Block().Hash())
}

// TestSetProposalOnPrepare tests the scenario where a slow node receives a proposal
// in prepare phase.
func TestSetProposalOnPrepare(t *testing.T) {
td := setup(t)

td.commitBlockForAllStates(t) // height 1

td.enterNewHeight(td.consP)

h := uint32(2)
r := int16(0)
p := td.makeProposal(t, h, r)
require.NotNil(t, p)

// The partitioned node receives all the votes first
td.addPrepareVote(td.consP, p.Block().Hash(), h, r, tIndexX)
td.addPrepareVote(td.consP, p.Block().Hash(), h, r, tIndexY)
Expand All @@ -513,9 +545,9 @@ func TestConsensusLateProposal(t *testing.T) {
td.shouldPublishVote(t, td.consP, vote.VoteTypePrecommit, p.Block().Hash())
}

// TestConsensusVeryLateProposal tests the scenario where a slow node receives a proposal
// in prepare phase.
func TestConsensusVeryLateProposal(t *testing.T) {
// TestSetProposalOnPrecommit tests the scenario where a slow node receives a proposal
// in precommit phase.
func TestSetProposalOnPrecommit(t *testing.T) {
td := setup(t)

td.commitBlockForAllStates(t) // height 1
Expand All @@ -527,29 +559,27 @@ func TestConsensusVeryLateProposal(t *testing.T) {
p := td.makeProposal(t, h, r)
require.NotNil(t, p)

td.commitBlockForAllStates(t) // height 2

// The partitioned node receives all the votes first
td.addPrecommitVote(td.consP, p.Block().Hash(), h, r, tIndexX)
td.addPrecommitVote(td.consP, p.Block().Hash(), h, r, tIndexY)
td.addPrecommitVote(td.consP, p.Block().Hash(), h, r, tIndexB)

td.addPrepareVote(td.consP, p.Block().Hash(), h, r, tIndexX)
td.addPrepareVote(td.consP, p.Block().Hash(), h, r, tIndexY)
td.addPrepareVote(td.consP, p.Block().Hash(), h, r, tIndexB)

td.addPrecommitVote(td.consP, p.Block().Hash(), h, r, tIndexX)
td.addPrecommitVote(td.consP, p.Block().Hash(), h, r, tIndexY)
td.addPrecommitVote(td.consP, p.Block().Hash(), h, r, tIndexB)

// Partitioned node receives proposal now
td.consP.SetProposal(p)

td.shouldPublishVote(t, td.consP, vote.VoteTypePrecommit, p.Block().Hash())
td.shouldPublishBlockAnnounce(t, td.consP, p.Block().Hash())
}

func TestPickRandomVote(t *testing.T) {
func TestHandleQueryVote(t *testing.T) {
td := setup(t)

td.enterNewHeight(td.consP)
assert.Nil(t, td.consP.PickRandomVote(0))
assert.Nil(t, td.consP.HandleQueryVote(1, 0))
cpRound := int16(0)

// === make valid certificate
Expand Down Expand Up @@ -594,20 +624,51 @@ func TestPickRandomVote(t *testing.T) {
td.addCPDecidedVote(td.consP, hash.UndefHash, 1, 0, vote.CPValueYes,
&vote.JustDecided{QCert: certMainVote}, tIndexY)

assert.NotNil(t, td.consP.PickRandomVote(0))
assert.NotNil(t, td.consP.HandleQueryVote(1, 0))

// Round 1
td.enterNextRound(td.consP)
td.addPrepareVote(td.consP, td.RandHash(), 1, 1, tIndexY)

rndVote0 := td.consP.PickRandomVote(0)
assert.NotEqual(t, vote.VoteTypePrepare, rndVote0.Type(), "Should not pick prepare votes")
rndVote0 := td.consP.HandleQueryVote(1, 0)
assert.Equal(t, vote.VoteTypeCPDecided, rndVote0.Type(), "should send the decided vote for the previous round")

rndVote1 := td.consP.HandleQueryVote(1, 1)
assert.Equal(t, vote.VoteTypePrepare, rndVote1.Type(), "should send the prepare vote for the current round")

rndVote1 := td.consP.PickRandomVote(1)
assert.Equal(t, vote.VoteTypePrepare, rndVote1.Type())
rndVote2 := td.consP.HandleQueryVote(1, 2)
assert.Nil(t, rndVote2, "should not send a vote for the next round")

rndVote2 := td.consP.PickRandomVote(2)
assert.Nil(t, rndVote2)
rndVote4 := td.consP.HandleQueryVote(2, 0)
assert.Nil(t, rndVote4, "should not have a vote for the next height")
}

func TestHandleQueryProposal(t *testing.T) {
td := setup(t)

td.enterNewHeight(td.consX)
td.enterNewHeight(td.consY)

// Round 1
td.enterNextRound(td.consX)
td.enterNextRound(td.consY) // consY is the proposer

prop0 := td.consY.HandleQueryProposal(1, 0)
assert.Nil(t, prop0, "proposer should not send a proposal for the previous round")

prop1 := td.consX.HandleQueryProposal(1, 1)
assert.Nil(t, prop1, "non-proposer should not send a proposal")

prop2 := td.consY.HandleQueryProposal(1, 1)
assert.NotNil(t, prop2, "proposer should send a proposal")

td.consX.cpDecided = 0
td.consX.SetProposal(td.consY.Proposal())
prop3 := td.consX.HandleQueryProposal(1, 1)
assert.NotNil(t, prop3, "non-proposer should send a proposal on decided proposal")

prop4 := td.consX.HandleQueryProposal(2, 0)
assert.Nil(t, prop4, "should not have a proposal for the next height")
}

func TestSetProposalFromPreviousRound(t *testing.T) {
Expand Down Expand Up @@ -940,13 +1001,16 @@ func checkConsensus(td *testData, height uint32, byzVotes []*vote.Vote) (
}

case message.TypeQueryProposal:
for _, cons := range instances {
p := cons.Proposal()
if p != nil {
td.consMessages = append(td.consMessages, consMessage{
sender: cons.valKey.Address(),
message: message.NewProposalMessage(p),
})
m := rndMsg.message.(*message.QueryProposalMessage)
if m.Height == height {
for _, cons := range instances {
p := cons.HandleQueryProposal(m.Height, m.Round)
if p != nil {
td.consMessages = append(td.consMessages, consMessage{
sender: cons.valKey.Address(),
message: message.NewProposalMessage(p),
})
}
}
}

Expand Down
12 changes: 9 additions & 3 deletions consensus/cp.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ func (*changeProposer) onSetProposal(_ *proposal.Proposal) {
}

func (cp *changeProposer) onTimeout(t *ticker) {
if t.Target == tickerTargetQueryVotes {
cp.queryVotes()
cp.scheduleTimeout(t.Duration*2, cp.height, cp.round, tickerTargetQueryVotes)
if t.Target == tickerTargetQueryVote {
cp.queryVote()
cp.scheduleTimeout(t.Duration*2, cp.height, cp.round, tickerTargetQueryVote)
}
}

Expand Down Expand Up @@ -327,6 +327,12 @@ func (cp *changeProposer) cpDecide(round int16, cpValue vote.CPValue) {
} else if cpValue == vote.CPValueNo {
cp.round = round
cp.cpDecided = 0

roundProposal := cp.log.RoundProposal(cp.round)
if roundProposal == nil {
cp.queryProposal()
}

cp.enterNewState(cp.prepareState)
}
}
2 changes: 1 addition & 1 deletion consensus/cp_prevote.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (s *cpPreVoteState) decide() {
just := &vote.JustInitYes{}
s.signAddCPPreVote(hash.UndefHash, s.cpRound, 1, just)
}
s.scheduleTimeout(s.config.QueryVoteTimeout, s.height, s.round, tickerTargetQueryVotes)
s.scheduleTimeout(s.config.QueryVoteTimeout, s.height, s.round, tickerTargetQueryVote)
} else {
cpMainVotes := s.log.CPMainVoteVoteSet(s.round)
switch {
Expand Down
7 changes: 4 additions & 3 deletions consensus/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
type Reader interface {
ConsensusKey() *bls.PublicKey
AllVotes() []*vote.Vote
PickRandomVote(round int16) *vote.Vote
HandleQueryVote(height uint32, round int16) *vote.Vote
HandleQueryProposal(height uint32, round int16) *proposal.Proposal
Proposal() *proposal.Proposal
HasVote(h hash.Hash) bool
HeightRound() (uint32, int16)
Expand All @@ -29,11 +30,11 @@ type Consensus interface {

type ManagerReader interface {
Instances() []Reader
PickRandomVote(round int16) *vote.Vote
HandleQueryVote(height uint32, round int16) *vote.Vote
HandleQueryProposal(height uint32, round int16) *proposal.Proposal
Proposal() *proposal.Proposal
HeightRound() (uint32, int16)
HasActiveInstance() bool
HasProposer() bool
}

type Manager interface {
Expand Down
Loading
Loading