diff --git a/consensus/consensus.go b/consensus/consensus.go index 0ad15935a..b68253c7c 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -141,13 +141,6 @@ func (cs *consensus) HeightRound() (uint32, int16) { return cs.height, cs.round } -func (cs *consensus) Proposal() *proposal.Proposal { - cs.lk.RLock() - defer cs.lk.RUnlock() - - return cs.log.RoundProposal(cs.round) -} - func (cs *consensus) HasVote(h hash.Hash) bool { cs.lk.RLock() defer cs.lk.RUnlock() @@ -223,19 +216,15 @@ func (cs *consensus) SetProposal(p *proposal.Proposal) { defer cs.lk.Unlock() if !cs.active { - cs.logger.Trace("we are not in the committee") - return } if p.Height() != cs.height { - cs.logger.Trace("invalid height", "proposal", p) - return } if p.Round() < cs.round { - cs.logger.Trace("proposal for expired round", "proposal", p) + cs.logger.Debug("proposal for expired round", "proposal", p) return } @@ -258,7 +247,7 @@ func (cs *consensus) SetProposal(p *proposal.Proposal) { // In this case, we accept the proposal and allow nodes to continue. // By doing so, we enable the validator to broadcast its votes and // prevent it from being marked as absent in the block certificate. - cs.logger.Trace("block is committed for this height", "proposal", p) + cs.logger.Warn("block committed before receiving proposal", "proposal", p) if p.Block().Hash() != cs.bcState.LastBlockHash() { cs.logger.Warn("proposal is not for the committed block", "proposal", p) @@ -290,19 +279,15 @@ func (cs *consensus) AddVote(v *vote.Vote) { defer cs.lk.Unlock() if !cs.active { - cs.logger.Trace("we are not in the committee") - return } if v.Height() != cs.height { - cs.logger.Trace("vote has invalid height", "vote", v) - return } if v.Round() < cs.round { - cs.logger.Trace("vote for expired round", "vote", v) + cs.logger.Debug("vote for expired round", "vote", v) return } @@ -396,17 +381,16 @@ func (cs *consensus) signAddVote(v *vote.Vote) { cs.broadcastVote(v) } +// queryProposal requests any missing proposal from other validators. func (cs *consensus) queryProposal() { cs.broadcaster(cs.valKey.Address(), message.NewQueryProposalMessage(cs.height, cs.round, cs.valKey.Address())) } -// queryVotes is an anti-entropy mechanism to retrieve missed votes -// when a validator falls behind the network. -// However, invoking this method might result in unnecessary bandwidth usage. -func (cs *consensus) queryVotes() { +// queryVote requests any missing votes from other validators. +func (cs *consensus) queryVote() { cs.broadcaster(cs.valKey.Address(), - message.NewQueryVotesMessage(cs.height, cs.round, cs.valKey.Address())) + message.NewQueryVoteMessage(cs.height, cs.round, cs.valKey.Address())) } func (cs *consensus) broadcastProposal(p *proposal.Proposal) { @@ -477,11 +461,56 @@ func (cs *consensus) IsActive() bool { return cs.active } +func (cs *consensus) Proposal() *proposal.Proposal { + cs.lk.RLock() + defer cs.lk.RUnlock() + + return cs.log.RoundProposal(cs.round) +} + +func (cs *consensus) HandleQueryProposal(height uint32, round int16) *proposal.Proposal { + cs.lk.RLock() + defer cs.lk.RUnlock() + + if !cs.active { + return nil + } + + if height != cs.height { + return nil + } + + if round != cs.round { + return nil + } + + if cs.isProposer() { + return cs.log.RoundProposal(cs.round) + } + + if cs.cpDecided == 0 { + // It is decided not to change the proposer and the proposal is locked. + // Locked proposals can be sent by all validators. + // This helps prevent a situation where the proposer goes offline after proposing the block. + return cs.log.RoundProposal(cs.round) + } + + return nil +} + // TODO: Improve the performance? -func (cs *consensus) PickRandomVote(round int16) *vote.Vote { +func (cs *consensus) HandleQueryVote(height uint32, round int16) *vote.Vote { cs.lk.RLock() defer cs.lk.RUnlock() + if !cs.active { + return nil + } + + if height != cs.height { + return nil + } + votes := []*vote.Vote{} switch { case round < cs.round: diff --git a/consensus/consensus_test.go b/consensus/consensus_test.go index a153f81c4..65e036b73 100644 --- a/consensus/consensus_test.go +++ b/consensus/consensus_test.go @@ -222,7 +222,7 @@ func (td *testData) shouldPublishQueryVote(t *testing.T, cons *consensus, height continue } - m := consMsg.message.(*message.QueryVotesMessage) + m := consMsg.message.(*message.QueryVoteMessage) assert.Equal(t, m.Height, height) assert.Equal(t, m.Round, round) assert.Equal(t, m.Querier, cons.valKey.Address()) @@ -422,6 +422,18 @@ func TestNotInCommittee(t *testing.T) { assert.Equal(t, "new-height", cons.currentState.name()) } +func TestIsProposer(t *testing.T) { + td := setup(t) + + td.commitBlockForAllStates(t) // height 1 + + td.enterNewHeight(td.consX) + td.enterNewHeight(td.consY) + + assert.False(t, td.consX.IsProposer()) + assert.True(t, td.consY.IsProposer()) +} + func TestVoteWithInvalidHeight(t *testing.T) { td := setup(t) @@ -487,7 +499,7 @@ func TestConsensusAddVote(t *testing.T) { } // TestConsensusLateProposal tests the scenario where a slow node receives a proposal -// in precommit phase. +// after committing the block. func TestConsensusLateProposal(t *testing.T) { td := setup(t) @@ -502,6 +514,26 @@ func TestConsensusLateProposal(t *testing.T) { td.commitBlockForAllStates(t) // height 2 + // Partitioned node receives proposal now + td.consP.SetProposal(p) + + td.shouldPublishVote(t, td.consP, vote.VoteTypePrepare, p.Block().Hash()) +} + +// TestSetProposalOnPrepare tests the scenario where a slow node receives a proposal +// in prepare phase. +func TestSetProposalOnPrepare(t *testing.T) { + td := setup(t) + + td.commitBlockForAllStates(t) // height 1 + + td.enterNewHeight(td.consP) + + h := uint32(2) + r := int16(0) + p := td.makeProposal(t, h, r) + require.NotNil(t, p) + // The partitioned node receives all the votes first td.addPrepareVote(td.consP, p.Block().Hash(), h, r, tIndexX) td.addPrepareVote(td.consP, p.Block().Hash(), h, r, tIndexY) @@ -513,9 +545,9 @@ func TestConsensusLateProposal(t *testing.T) { td.shouldPublishVote(t, td.consP, vote.VoteTypePrecommit, p.Block().Hash()) } -// TestConsensusVeryLateProposal tests the scenario where a slow node receives a proposal -// in prepare phase. -func TestConsensusVeryLateProposal(t *testing.T) { +// TestSetProposalOnPrecommit tests the scenario where a slow node receives a proposal +// in precommit phase. +func TestSetProposalOnPrecommit(t *testing.T) { td := setup(t) td.commitBlockForAllStates(t) // height 1 @@ -527,17 +559,15 @@ func TestConsensusVeryLateProposal(t *testing.T) { p := td.makeProposal(t, h, r) require.NotNil(t, p) - td.commitBlockForAllStates(t) // height 2 - // The partitioned node receives all the votes first - td.addPrecommitVote(td.consP, p.Block().Hash(), h, r, tIndexX) - td.addPrecommitVote(td.consP, p.Block().Hash(), h, r, tIndexY) - td.addPrecommitVote(td.consP, p.Block().Hash(), h, r, tIndexB) - td.addPrepareVote(td.consP, p.Block().Hash(), h, r, tIndexX) td.addPrepareVote(td.consP, p.Block().Hash(), h, r, tIndexY) td.addPrepareVote(td.consP, p.Block().Hash(), h, r, tIndexB) + td.addPrecommitVote(td.consP, p.Block().Hash(), h, r, tIndexX) + td.addPrecommitVote(td.consP, p.Block().Hash(), h, r, tIndexY) + td.addPrecommitVote(td.consP, p.Block().Hash(), h, r, tIndexB) + // Partitioned node receives proposal now td.consP.SetProposal(p) @@ -545,11 +575,11 @@ func TestConsensusVeryLateProposal(t *testing.T) { td.shouldPublishBlockAnnounce(t, td.consP, p.Block().Hash()) } -func TestPickRandomVote(t *testing.T) { +func TestHandleQueryVote(t *testing.T) { td := setup(t) td.enterNewHeight(td.consP) - assert.Nil(t, td.consP.PickRandomVote(0)) + assert.Nil(t, td.consP.HandleQueryVote(1, 0)) cpRound := int16(0) // === make valid certificate @@ -594,20 +624,51 @@ func TestPickRandomVote(t *testing.T) { td.addCPDecidedVote(td.consP, hash.UndefHash, 1, 0, vote.CPValueYes, &vote.JustDecided{QCert: certMainVote}, tIndexY) - assert.NotNil(t, td.consP.PickRandomVote(0)) + assert.NotNil(t, td.consP.HandleQueryVote(1, 0)) // Round 1 td.enterNextRound(td.consP) td.addPrepareVote(td.consP, td.RandHash(), 1, 1, tIndexY) - rndVote0 := td.consP.PickRandomVote(0) - assert.NotEqual(t, vote.VoteTypePrepare, rndVote0.Type(), "Should not pick prepare votes") + rndVote0 := td.consP.HandleQueryVote(1, 0) + assert.Equal(t, vote.VoteTypeCPDecided, rndVote0.Type(), "should send the decided vote for the previous round") + + rndVote1 := td.consP.HandleQueryVote(1, 1) + assert.Equal(t, vote.VoteTypePrepare, rndVote1.Type(), "should send the prepare vote for the current round") - rndVote1 := td.consP.PickRandomVote(1) - assert.Equal(t, vote.VoteTypePrepare, rndVote1.Type()) + rndVote2 := td.consP.HandleQueryVote(1, 2) + assert.Nil(t, rndVote2, "should not send a vote for the next round") - rndVote2 := td.consP.PickRandomVote(2) - assert.Nil(t, rndVote2) + rndVote4 := td.consP.HandleQueryVote(2, 0) + assert.Nil(t, rndVote4, "should not have a vote for the next height") +} + +func TestHandleQueryProposal(t *testing.T) { + td := setup(t) + + td.enterNewHeight(td.consX) + td.enterNewHeight(td.consY) + + // Round 1 + td.enterNextRound(td.consX) + td.enterNextRound(td.consY) // consY is the proposer + + prop0 := td.consY.HandleQueryProposal(1, 0) + assert.Nil(t, prop0, "proposer should not send a proposal for the previous round") + + prop1 := td.consX.HandleQueryProposal(1, 1) + assert.Nil(t, prop1, "non-proposer should not send a proposal") + + prop2 := td.consY.HandleQueryProposal(1, 1) + assert.NotNil(t, prop2, "proposer should send a proposal") + + td.consX.cpDecided = 0 + td.consX.SetProposal(td.consY.Proposal()) + prop3 := td.consX.HandleQueryProposal(1, 1) + assert.NotNil(t, prop3, "non-proposer should send a proposal on decided proposal") + + prop4 := td.consX.HandleQueryProposal(2, 0) + assert.Nil(t, prop4, "should not have a proposal for the next height") } func TestSetProposalFromPreviousRound(t *testing.T) { @@ -940,13 +1001,16 @@ func checkConsensus(td *testData, height uint32, byzVotes []*vote.Vote) ( } case message.TypeQueryProposal: - for _, cons := range instances { - p := cons.Proposal() - if p != nil { - td.consMessages = append(td.consMessages, consMessage{ - sender: cons.valKey.Address(), - message: message.NewProposalMessage(p), - }) + m := rndMsg.message.(*message.QueryProposalMessage) + if m.Height == height { + for _, cons := range instances { + p := cons.HandleQueryProposal(m.Height, m.Round) + if p != nil { + td.consMessages = append(td.consMessages, consMessage{ + sender: cons.valKey.Address(), + message: message.NewProposalMessage(p), + }) + } } } diff --git a/consensus/cp.go b/consensus/cp.go index 6c6642810..abb278568 100644 --- a/consensus/cp.go +++ b/consensus/cp.go @@ -17,9 +17,9 @@ func (*changeProposer) onSetProposal(_ *proposal.Proposal) { } func (cp *changeProposer) onTimeout(t *ticker) { - if t.Target == tickerTargetQueryVotes { - cp.queryVotes() - cp.scheduleTimeout(t.Duration*2, cp.height, cp.round, tickerTargetQueryVotes) + if t.Target == tickerTargetQueryVote { + cp.queryVote() + cp.scheduleTimeout(t.Duration*2, cp.height, cp.round, tickerTargetQueryVote) } } @@ -327,6 +327,12 @@ func (cp *changeProposer) cpDecide(round int16, cpValue vote.CPValue) { } else if cpValue == vote.CPValueNo { cp.round = round cp.cpDecided = 0 + + roundProposal := cp.log.RoundProposal(cp.round) + if roundProposal == nil { + cp.queryProposal() + } + cp.enterNewState(cp.prepareState) } } diff --git a/consensus/cp_prevote.go b/consensus/cp_prevote.go index b7af57e22..a70bf78d9 100644 --- a/consensus/cp_prevote.go +++ b/consensus/cp_prevote.go @@ -29,7 +29,7 @@ func (s *cpPreVoteState) decide() { just := &vote.JustInitYes{} s.signAddCPPreVote(hash.UndefHash, s.cpRound, 1, just) } - s.scheduleTimeout(s.config.QueryVoteTimeout, s.height, s.round, tickerTargetQueryVotes) + s.scheduleTimeout(s.config.QueryVoteTimeout, s.height, s.round, tickerTargetQueryVote) } else { cpMainVotes := s.log.CPMainVoteVoteSet(s.round) switch { diff --git a/consensus/interface.go b/consensus/interface.go index 21288778c..27543f552 100644 --- a/consensus/interface.go +++ b/consensus/interface.go @@ -10,7 +10,8 @@ import ( type Reader interface { ConsensusKey() *bls.PublicKey AllVotes() []*vote.Vote - PickRandomVote(round int16) *vote.Vote + HandleQueryVote(height uint32, round int16) *vote.Vote + HandleQueryProposal(height uint32, round int16) *proposal.Proposal Proposal() *proposal.Proposal HasVote(h hash.Hash) bool HeightRound() (uint32, int16) @@ -29,11 +30,11 @@ type Consensus interface { type ManagerReader interface { Instances() []Reader - PickRandomVote(round int16) *vote.Vote + HandleQueryVote(height uint32, round int16) *vote.Vote + HandleQueryProposal(height uint32, round int16) *proposal.Proposal Proposal() *proposal.Proposal HeightRound() (uint32, int16) HasActiveInstance() bool - HasProposer() bool } type Manager interface { diff --git a/consensus/manager.go b/consensus/manager.go index d31171179..24400c81a 100644 --- a/consensus/manager.go +++ b/consensus/manager.go @@ -74,18 +74,25 @@ func (mgr *manager) Instances() []Reader { return readers } -// PickRandomVote returns a random vote from a random consensus instance. -func (mgr *manager) PickRandomVote(round int16) *vote.Vote { +// Proposal returns the current proposal for the active round from a random consensus instance. +func (mgr *manager) Proposal() *proposal.Proposal { cons := mgr.getBestInstance() - return cons.PickRandomVote(round) + return cons.Proposal() } -// Proposal returns the proposal for a specific round from a random consensus instance. -func (mgr *manager) Proposal() *proposal.Proposal { +// HandleQueryProposal returns the proposal for a specific round from a random consensus instance. +func (mgr *manager) HandleQueryProposal(height uint32, round int16) *proposal.Proposal { cons := mgr.getBestInstance() - return cons.Proposal() + return cons.HandleQueryProposal(height, round) +} + +// HandleQueryVote returns a random vote from a random consensus instance. +func (mgr *manager) HandleQueryVote(height uint32, round int16) *vote.Vote { + cons := mgr.getBestInstance() + + return cons.HandleQueryVote(height, round) } // HeightRound retrieves the current height and round from a random consensus instance. @@ -106,18 +113,6 @@ func (mgr *manager) HasActiveInstance() bool { return false } -// HasProposer checks if any of the consensus instances is the proposer -// for the current round. -func (mgr *manager) HasProposer() bool { - for _, cons := range mgr.instances { - if cons.IsProposer() { - return true - } - } - - return false -} - // MoveToNewHeight moves all consensus instances to a new height. func (mgr *manager) MoveToNewHeight() { for _, cons := range mgr.instances { diff --git a/consensus/mock.go b/consensus/mock.go index 25aa5ded6..3baed042e 100644 --- a/consensus/mock.go +++ b/consensus/mock.go @@ -86,6 +86,10 @@ func (m *MockConsensus) Proposal() *proposal.Proposal { return m.CurProposal } +func (m *MockConsensus) HandleQueryProposal(_ uint32, _ int16) *proposal.Proposal { + return m.CurProposal +} + func (m *MockConsensus) HeightRound() (uint32, int16) { return m.Height, m.Round } @@ -94,7 +98,7 @@ func (*MockConsensus) String() string { return "" } -func (m *MockConsensus) PickRandomVote(_ int16) *vote.Vote { +func (m *MockConsensus) HandleQueryVote(_ uint32, _ int16) *vote.Vote { if len(m.Votes) == 0 { return nil } diff --git a/consensus/precommit.go b/consensus/precommit.go index e849b9008..6c18dc34d 100644 --- a/consensus/precommit.go +++ b/consensus/precommit.go @@ -24,13 +24,7 @@ func (s *precommitState) decide() { if precommitQH != nil { s.logger.Debug("pre-commit has quorum", "hash", precommitQH) - roundProposal := s.log.RoundProposal(s.round) - if roundProposal == nil { - // There is a consensus about a proposal that we don't have yet. - // Ask peers for this proposal. - s.logger.Info("query for a decided proposal", "hash", precommitQH) - s.queryProposal() - } else if s.hasVoted { + if s.hasVoted { // To ensure we have voted and won't be absent from the certificate s.enterNewState(s.commitState) } diff --git a/consensus/prepare.go b/consensus/prepare.go index 454a78d5a..6643d7286 100644 --- a/consensus/prepare.go +++ b/consensus/prepare.go @@ -66,7 +66,7 @@ func (s *prepareState) onTimeout(t *ticker) { s.queryProposal() } if s.isProposer() { - s.queryVotes() + s.queryVote() } } else if t.Target == tickerTargetChangeProposer { s.startChangingProposer() diff --git a/consensus/prepare_test.go b/consensus/prepare_test.go index ec0c3802a..75f26f0a9 100644 --- a/consensus/prepare_test.go +++ b/consensus/prepare_test.go @@ -31,7 +31,7 @@ func TestQueryProposal(t *testing.T) { td.shouldNotPublish(t, td.consP, message.TypeQueryVote) } -func TestQueryVotes(t *testing.T) { +func TestQueryVote(t *testing.T) { td := setup(t) td.commitBlockForAllStates(t) diff --git a/consensus/ticker.go b/consensus/ticker.go index 4892a4ed2..fbaba629e 100644 --- a/consensus/ticker.go +++ b/consensus/ticker.go @@ -11,7 +11,7 @@ const ( tickerTargetNewHeight = tickerTarget(1) tickerTargetChangeProposer = tickerTarget(2) tickerTargetQueryProposal = tickerTarget(3) - tickerTargetQueryVotes = tickerTarget(4) + tickerTargetQueryVote = tickerTarget(4) ) func (rs tickerTarget) String() string { @@ -22,7 +22,7 @@ func (rs tickerTarget) String() string { return "change-proposer" case tickerTargetQueryProposal: return "query-proposal" - case tickerTargetQueryVotes: + case tickerTargetQueryVote: return "query-votes" default: return "Unknown" diff --git a/sync/bundle/message/message.go b/sync/bundle/message/message.go index 9a617682c..0f9262837 100644 --- a/sync/bundle/message/message.go +++ b/sync/bundle/message/message.go @@ -107,7 +107,7 @@ func MakeMessage(t Type) Message { return &ProposalMessage{} case TypeQueryVote: - return &QueryVotesMessage{} + return &QueryVoteMessage{} case TypeVote: return &VoteMessage{} diff --git a/sync/bundle/message/query_votes.go b/sync/bundle/message/query_votes.go index 4339a9465..a013b9eb9 100644 --- a/sync/bundle/message/query_votes.go +++ b/sync/bundle/message/query_votes.go @@ -8,21 +8,21 @@ import ( "github.com/pactus-project/pactus/util/errors" ) -type QueryVotesMessage struct { +type QueryVoteMessage struct { Height uint32 `cbor:"1,keyasint"` Round int16 `cbor:"2,keyasint"` Querier crypto.Address `cbor:"3,keyasint"` } -func NewQueryVotesMessage(height uint32, round int16, querier crypto.Address) *QueryVotesMessage { - return &QueryVotesMessage{ +func NewQueryVoteMessage(height uint32, round int16, querier crypto.Address) *QueryVoteMessage { + return &QueryVoteMessage{ Height: height, Round: round, Querier: querier, } } -func (m *QueryVotesMessage) BasicCheck() error { +func (m *QueryVoteMessage) BasicCheck() error { if m.Round < 0 { return errors.Error(errors.ErrInvalidRound) } @@ -30,18 +30,18 @@ func (m *QueryVotesMessage) BasicCheck() error { return nil } -func (*QueryVotesMessage) Type() Type { +func (*QueryVoteMessage) Type() Type { return TypeQueryVote } -func (*QueryVotesMessage) TopicID() network.TopicID { +func (*QueryVoteMessage) TopicID() network.TopicID { return network.TopicIDConsensus } -func (*QueryVotesMessage) ShouldBroadcast() bool { +func (*QueryVoteMessage) ShouldBroadcast() bool { return true } -func (m *QueryVotesMessage) String() string { +func (m *QueryVoteMessage) String() string { return fmt.Sprintf("{%d/%d %s}", m.Height, m.Round, m.Querier.ShortString()) } diff --git a/sync/bundle/message/query_votes_test.go b/sync/bundle/message/query_votes_test.go index 4e9757306..c31b6fc5a 100644 --- a/sync/bundle/message/query_votes_test.go +++ b/sync/bundle/message/query_votes_test.go @@ -9,21 +9,21 @@ import ( ) func TestQueryVotesType(t *testing.T) { - m := &QueryVotesMessage{} + m := &QueryVoteMessage{} assert.Equal(t, TypeQueryVote, m.Type()) } -func TestQueryVotesMessage(t *testing.T) { +func TestQueryVoteMessage(t *testing.T) { ts := testsuite.NewTestSuite(t) t.Run("Invalid round", func(t *testing.T) { - m := NewQueryVotesMessage(0, -1, ts.RandValAddress()) + m := NewQueryVoteMessage(0, -1, ts.RandValAddress()) assert.Equal(t, errors.ErrInvalidRound, errors.Code(m.BasicCheck())) }) t.Run("OK", func(t *testing.T) { - m := NewQueryVotesMessage(100, 0, ts.RandValAddress()) + m := NewQueryVoteMessage(100, 0, ts.RandValAddress()) assert.NoError(t, m.BasicCheck()) assert.Contains(t, m.String(), "100") diff --git a/sync/firewall/firewall_test.go b/sync/firewall/firewall_test.go index 067c831dc..4c25a7e55 100644 --- a/sync/firewall/firewall_test.go +++ b/sync/firewall/firewall_test.go @@ -74,7 +74,7 @@ func setup(t *testing.T, conf *Config) *testData { } func (td *testData) testGossipBundle() []byte { - bdl := bundle.NewBundle(message.NewQueryVotesMessage(td.RandHeight(), td.RandRound(), td.RandValAddress())) + bdl := bundle.NewBundle(message.NewQueryVoteMessage(td.RandHeight(), td.RandRound(), td.RandValAddress())) bdl.Flags = util.SetFlag(bdl.Flags, bundle.BundleFlagNetworkMainnet) d, _ := bdl.Encode() @@ -326,7 +326,7 @@ func TestBannedAddress(t *testing.T) { func TestNetworkFlagsMainnet(t *testing.T) { td := setup(t, nil) - bdl := bundle.NewBundle(message.NewQueryVotesMessage(td.RandHeight(), td.RandRound(), td.RandValAddress())) + bdl := bundle.NewBundle(message.NewQueryVoteMessage(td.RandHeight(), td.RandRound(), td.RandValAddress())) bdl.Flags = util.SetFlag(bdl.Flags, bundle.BundleFlagNetworkMainnet) assert.NoError(t, td.firewall.checkBundle(bdl)) @@ -341,7 +341,7 @@ func TestNetworkFlagsTestnet(t *testing.T) { td := setup(t, nil) td.state.TestGenesis = genesis.TestnetGenesis() - bdl := bundle.NewBundle(message.NewQueryVotesMessage(td.RandHeight(), td.RandRound(), td.RandValAddress())) + bdl := bundle.NewBundle(message.NewQueryVoteMessage(td.RandHeight(), td.RandRound(), td.RandValAddress())) bdl.Flags = util.SetFlag(bdl.Flags, bundle.BundleFlagNetworkTestnet) assert.NoError(t, td.firewall.checkBundle(bdl)) @@ -356,7 +356,7 @@ func TestNetworkFlagsLocalnet(t *testing.T) { td := setup(t, nil) td.state.TestParams.BlockVersion = 0x3f // changing genesis hash - bdl := bundle.NewBundle(message.NewQueryVotesMessage(td.RandHeight(), td.RandRound(), td.RandValAddress())) + bdl := bundle.NewBundle(message.NewQueryVoteMessage(td.RandHeight(), td.RandRound(), td.RandValAddress())) bdl.Flags = util.SetFlag(bdl.Flags, bundle.BundleFlagNetworkTestnet) assert.Error(t, td.firewall.checkBundle(bdl)) diff --git a/sync/handler_proposal_test.go b/sync/handler_proposal_test.go index 791d8b9bf..fa06f6f43 100644 --- a/sync/handler_proposal_test.go +++ b/sync/handler_proposal_test.go @@ -17,6 +17,6 @@ func TestParsingProposalMessages(t *testing.T) { pid := td.RandPeerID() td.receivingNewMessage(td.sync, msg, pid) - assert.NotNil(t, td.consMgr.Proposal()) + assert.Equal(t, prop, td.consMgr.Proposal()) }) } diff --git a/sync/handler_query_proposal.go b/sync/handler_query_proposal.go index cf02a996d..6aebf5853 100644 --- a/sync/handler_query_proposal.go +++ b/sync/handler_query_proposal.go @@ -20,27 +20,7 @@ func (handler *queryProposalHandler) ParseMessage(m message.Message, _ peer.ID) msg := m.(*message.QueryProposalMessage) handler.logger.Trace("parsing QueryProposal message", "msg", msg) - if !handler.consMgr.HasActiveInstance() { - handler.logger.Debug("ignoring QueryProposal, not active", "msg", msg) - - return - } - - if !handler.consMgr.HasProposer() { - handler.logger.Debug("ignoring QueryProposal, not proposer", "msg", msg) - - return - } - - height, round := handler.consMgr.HeightRound() - if msg.Height != height || msg.Round != round { - handler.logger.Debug("ignoring QueryProposal, not same height/round", "msg", msg, - "height", height, "round", round) - - return - } - - prop := handler.consMgr.Proposal() + prop := handler.consMgr.HandleQueryProposal(msg.Height, msg.Round) if prop != nil { response := message.NewProposalMessage(prop) handler.broadcast(response) diff --git a/sync/handler_query_proposal_test.go b/sync/handler_query_proposal_test.go index 08d5e7900..1b900e738 100644 --- a/sync/handler_query_proposal_test.go +++ b/sync/handler_query_proposal_test.go @@ -11,57 +11,25 @@ func TestParsingQueryProposalMessages(t *testing.T) { td := setup(t, nil) consHeight, consRound := td.consMgr.HeightRound() - prop, _ := td.GenerateTestProposal(consHeight, 0) - pid := td.RandPeerID() - td.consMgr.SetProposal(prop) - t.Run("doesn't have active validator", func(t *testing.T) { - msg := message.NewQueryProposalMessage(consHeight, consRound, td.RandValAddress()) - td.receivingNewMessage(td.sync, msg, pid) - - td.shouldNotPublishMessageWithThisType(t, message.TypeProposal) - }) - - td.consMocks[0].Active = true - - t.Run("not the proposer", func(t *testing.T) { + t.Run("doesn't have the proposal", func(t *testing.T) { + pid := td.RandPeerID() msg := message.NewQueryProposalMessage(consHeight, consRound, td.RandValAddress()) td.receivingNewMessage(td.sync, msg, pid) td.shouldNotPublishMessageWithThisType(t, message.TypeProposal) }) - td.consMocks[0].Proposer = true - - t.Run("not the same height", func(t *testing.T) { - msg := message.NewQueryProposalMessage(consHeight+1, consRound, td.RandValAddress()) - td.receivingNewMessage(td.sync, msg, pid) - - td.shouldNotPublishMessageWithThisType(t, message.TypeProposal) - }) - - t.Run("not the same round", func(t *testing.T) { - msg := message.NewQueryProposalMessage(consHeight, consRound+1, td.RandValAddress()) - td.receivingNewMessage(td.sync, msg, pid) - - td.shouldNotPublishMessageWithThisType(t, message.TypeProposal) - }) - t.Run("should respond to the query proposal message", func(t *testing.T) { + prop, _ := td.GenerateTestProposal(consHeight, 0) + pid := td.RandPeerID() + td.consMgr.SetProposal(prop) msg := message.NewQueryProposalMessage(consHeight, consRound, td.RandValAddress()) td.receivingNewMessage(td.sync, msg, pid) bdl := td.shouldPublishMessageWithThisType(t, message.TypeProposal) assert.Equal(t, prop.Hash(), bdl.Message.(*message.ProposalMessage).Proposal.Hash()) }) - - t.Run("doesn't have the proposal", func(t *testing.T) { - td.consMocks[0].CurProposal = nil - msg := message.NewQueryProposalMessage(consHeight, consRound, td.RandValAddress()) - td.receivingNewMessage(td.sync, msg, pid) - - td.shouldNotPublishMessageWithThisType(t, message.TypeProposal) - }) } func TestBroadcastingQueryProposalMessages(t *testing.T) { diff --git a/sync/handler_query_votes.go b/sync/handler_query_votes.go index b01508072..bb1844da5 100644 --- a/sync/handler_query_votes.go +++ b/sync/handler_query_votes.go @@ -6,42 +6,28 @@ import ( "github.com/pactus-project/pactus/sync/peerset/peer" ) -type queryVotesHandler struct { +type queryVoteHandler struct { *synchronizer } -func newQueryVotesHandler(sync *synchronizer) messageHandler { - return &queryVotesHandler{ +func newQueryVoteHandler(sync *synchronizer) messageHandler { + return &queryVoteHandler{ sync, } } -func (handler *queryVotesHandler) ParseMessage(m message.Message, _ peer.ID) { - msg := m.(*message.QueryVotesMessage) - handler.logger.Trace("parsing QueryVotes message", "msg", msg) +func (handler *queryVoteHandler) ParseMessage(m message.Message, _ peer.ID) { + msg := m.(*message.QueryVoteMessage) + handler.logger.Trace("parsing QueryVote message", "msg", msg) - if !handler.consMgr.HasActiveInstance() { - handler.logger.Debug("ignoring QueryVotes, not active", "msg", msg) - - return - } - - height, _ := handler.consMgr.HeightRound() - if msg.Height != height { - handler.logger.Debug("ignoring QueryVotes, not same height", "msg", msg, - "height", height) - - return - } - - v := handler.consMgr.PickRandomVote(msg.Round) + v := handler.consMgr.HandleQueryVote(msg.Height, msg.Round) if v != nil { response := message.NewVoteMessage(v) handler.broadcast(response) } } -func (*queryVotesHandler) PrepareBundle(m message.Message) *bundle.Bundle { +func (*queryVoteHandler) PrepareBundle(m message.Message) *bundle.Bundle { bdl := bundle.NewBundle(m) return bdl diff --git a/sync/handler_query_votes_test.go b/sync/handler_query_votes_test.go index ef6c773ff..731dc3c92 100644 --- a/sync/handler_query_votes_test.go +++ b/sync/handler_query_votes_test.go @@ -7,44 +7,35 @@ import ( "github.com/stretchr/testify/assert" ) -func TestParsingQueryVotesMessages(t *testing.T) { +func TestParsingQueryVoteMessages(t *testing.T) { td := setup(t, nil) - consensusHeight, _ := td.consMgr.HeightRound() - v1, _ := td.GenerateTestPrecommitVote(consensusHeight, 0) - td.consMgr.AddVote(v1) - pid := td.RandPeerID() - - t.Run("doesn't have active validator", func(t *testing.T) { - msg := message.NewQueryVotesMessage(consensusHeight, 1, td.RandValAddress()) + consHeight, consRound := td.consMgr.HeightRound() + t.Run("doesn't have any votes", func(t *testing.T) { + pid := td.RandPeerID() + msg := message.NewQueryVoteMessage(consHeight, consRound, td.RandValAddress()) td.receivingNewMessage(td.sync, msg, pid) td.shouldNotPublishMessageWithThisType(t, message.TypeVote) }) - td.consMocks[0].Active = true - t.Run("should respond to the query votes message", func(t *testing.T) { - msg := message.NewQueryVotesMessage(consensusHeight, 1, td.RandValAddress()) + v1, _ := td.GenerateTestPrecommitVote(consHeight, consRound) + td.consMgr.AddVote(v1) + pid := td.RandPeerID() + msg := message.NewQueryVoteMessage(consHeight, consRound, td.RandValAddress()) td.receivingNewMessage(td.sync, msg, pid) bdl := td.shouldPublishMessageWithThisType(t, message.TypeVote) assert.Equal(t, v1.Hash(), bdl.Message.(*message.VoteMessage).Vote.Hash()) }) - - t.Run("doesn't have any votes", func(t *testing.T) { - msg := message.NewQueryVotesMessage(consensusHeight+1, 1, td.RandValAddress()) - td.receivingNewMessage(td.sync, msg, pid) - - td.shouldNotPublishMessageWithThisType(t, message.TypeVote) - }) } -func TestBroadcastingQueryVotesMessages(t *testing.T) { +func TestBroadcastingQueryVoteMessages(t *testing.T) { td := setup(t, nil) consensusHeight := td.state.LastBlockHeight() + 1 - msg := message.NewQueryVotesMessage(consensusHeight, 1, td.RandValAddress()) + msg := message.NewQueryVoteMessage(consensusHeight, 1, td.RandValAddress()) td.sync.broadcast(msg) td.shouldPublishMessageWithThisType(t, message.TypeQueryVote) diff --git a/sync/handler_vote_test.go b/sync/handler_vote_test.go index 7b228e1b3..a020ed7f0 100644 --- a/sync/handler_vote_test.go +++ b/sync/handler_vote_test.go @@ -16,6 +16,6 @@ func TestParsingVoteMessages(t *testing.T) { pid := td.RandPeerID() td.receivingNewMessage(td.sync, msg, pid) - assert.Equal(t, v.Hash(), td.consMgr.PickRandomVote(0).Hash()) + assert.Contains(t, td.consMocks[0].AllVotes(), v) }) } diff --git a/sync/sync.go b/sync/sync.go index 731934cdb..db01341ff 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -96,7 +96,7 @@ func NewSynchronizer( handlers[message.TypeTransaction] = newTransactionsHandler(sync) handlers[message.TypeQueryProposal] = newQueryProposalHandler(sync) handlers[message.TypeProposal] = newProposalHandler(sync) - handlers[message.TypeQueryVote] = newQueryVotesHandler(sync) + handlers[message.TypeQueryVote] = newQueryVoteHandler(sync) handlers[message.TypeVote] = newVoteHandler(sync) handlers[message.TypeBlockAnnounce] = newBlockAnnounceHandler(sync) handlers[message.TypeBlocksRequest] = newBlocksRequestHandler(sync)