From d419154551e708290c1ea6b153b55614a3b6c614 Mon Sep 17 00:00:00 2001 From: b00f Date: Sun, 9 Jun 2024 16:37:19 +0800 Subject: [PATCH] fix(consensus): improve consensus alghorithm (#1329) --- .golangci.yml | 3 + committee/committee.go | 6 +- consensus/config.go | 11 +-- consensus/consensus.go | 90 +++++++++++++-------- consensus/consensus_test.go | 32 +++++--- consensus/cp.go | 17 ++-- consensus/cp_decide.go | 6 +- consensus/cp_mainvote.go | 2 +- consensus/cp_prevote.go | 6 +- consensus/interface.go | 2 + consensus/log/log.go | 4 +- consensus/manager.go | 12 +++ consensus/mock.go | 8 ++ consensus/voteset/binary_voteset.go | 2 +- consensus/voteset/voteset_test.go | 2 +- fastconsensus/config.go | 11 +-- fastconsensus/consensus.go | 92 ++++++++++++++-------- fastconsensus/consensus_test.go | 42 +++++++--- fastconsensus/cp.go | 23 ++++-- fastconsensus/cp_decide.go | 4 +- fastconsensus/cp_prevote.go | 6 +- fastconsensus/cp_test.go | 29 ++++++- fastconsensus/interface.go | 4 +- fastconsensus/log/log.go | 4 +- fastconsensus/manager.go | 12 +++ fastconsensus/mock.go | 8 ++ fastconsensus/precommit_test.go | 2 +- fastconsensus/prepare_test.go | 6 +- fastconsensus/propose_test.go | 2 +- fastconsensus/voteset/binary_voteset.go | 2 +- fastconsensus/voteset/voteset_test.go | 2 +- network/gater_test.go | 14 ++-- network/gossip.go | 8 +- network/notifee.go | 4 +- network/peermgr.go | 4 +- network/peermgr_test.go | 18 ++--- network/utils.go | 2 +- state/state.go | 4 +- state/validation.go | 2 +- sync/bundle/message/proposal.go | 6 +- sync/bundle/message/proposal_test.go | 9 --- sync/bundle/message/query_proposal.go | 11 ++- sync/bundle/message/query_proposal_test.go | 15 +++- sync/handler_query_proposal.go | 31 ++++++-- sync/handler_query_proposal_test.go | 40 ++++++++-- sync/handler_query_votes.go | 23 ++++-- sync/handler_query_votes_test.go | 9 +++ sync/sync.go | 2 - sync/sync_test.go | 4 +- tests/block_test.go | 2 +- tests/main_test.go | 15 ++-- txpool/txpool.go | 6 +- util/linkedmap/linkedmap.go | 15 ++-- util/testsuite/testsuite.go | 4 - 54 files changed, 464 insertions(+), 236 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 2c2000cac..c2faebab6 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -122,6 +122,9 @@ linters-settings: - name: "get-return" disabled: true + - name: "confusing-naming" + disabled: true + - name: "function-result-limit" disabled: true diff --git a/committee/committee.go b/committee/committee.go index 297b542c2..614442b5d 100644 --- a/committee/committee.go +++ b/committee/committee.go @@ -144,7 +144,7 @@ func (c *committee) find(addr crypto.Address) *validator.Validator { // IsProposer checks if the given address is the proposer for the specified round. func (c *committee) IsProposer(addr crypto.Address, round int16) bool { - p := c.getProposer(round) + p := c.proposer(round) return p.Address() == addr } @@ -152,10 +152,10 @@ func (c *committee) IsProposer(addr crypto.Address, round int16) bool { // Proposer returns an instance of the proposer validator for the specified round. // A cloned instance of the proposer is returned to avoid modification of the original object. func (c *committee) Proposer(round int16) *validator.Validator { - return c.getProposer(round).Clone() + return c.proposer(round).Clone() } -func (c *committee) getProposer(round int16) *validator.Validator { +func (c *committee) proposer(round int16) *validator.Validator { pos := c.proposerPos for i := 0; i < int(round); i++ { pos = pos.Next diff --git a/consensus/config.go b/consensus/config.go index 275cc689c..648dc7704 100644 --- a/consensus/config.go +++ b/consensus/config.go @@ -5,13 +5,15 @@ import "time" type Config struct { ChangeProposerTimeout time.Duration `toml:"-"` ChangeProposerDelta time.Duration `toml:"-"` + QueryVoteTimeout time.Duration `toml:"-"` MinimumAvailabilityScore float64 `toml:"-"` } func DefaultConfig() *Config { return &Config{ - ChangeProposerTimeout: 8 * time.Second, - ChangeProposerDelta: 4 * time.Second, + ChangeProposerTimeout: 5 * time.Second, + ChangeProposerDelta: 5 * time.Second, + QueryVoteTimeout: 5 * time.Second, MinimumAvailabilityScore: 0.9, } } @@ -38,7 +40,6 @@ func (conf *Config) BasicCheck() error { } func (conf *Config) CalculateChangeProposerTimeout(round int16) time.Duration { - return time.Duration( - conf.ChangeProposerTimeout.Milliseconds()+conf.ChangeProposerDelta.Milliseconds()*int64(round), - ) * time.Millisecond + return conf.ChangeProposerTimeout + + conf.ChangeProposerDelta*time.Duration(round) } diff --git a/consensus/consensus.go b/consensus/consensus.go index 0d7bfe90a..5286f27a1 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -118,11 +118,7 @@ func (cs *consensus) Start() { cs.lk.Lock() defer cs.lk.Unlock() - cs.doMoveToNewHeight() - if cs.active { - cs.queryProposal() - cs.queryVotes() - } + cs.moveToNewHeight() } func (cs *consensus) String() string { @@ -184,10 +180,10 @@ func (cs *consensus) MoveToNewHeight() { cs.lk.Lock() defer cs.lk.Unlock() - cs.doMoveToNewHeight() + cs.moveToNewHeight() } -func (cs *consensus) doMoveToNewHeight() { +func (cs *consensus) moveToNewHeight() { stateHeight := cs.bcState.LastBlockHeight() if cs.height != stateHeight+1 { cs.enterNewState(cs.newHeightState) @@ -205,6 +201,23 @@ func (cs *consensus) scheduleTimeout(duration time.Duration, height uint32, roun }() } +func (cs *consensus) handleTimeout(t *ticker) { + cs.lk.Lock() + defer cs.lk.Unlock() + + cs.logger.Trace("handle ticker", "ticker", t) + + // Old tickers might be triggered now. Ignore them. + if cs.height != t.Height || cs.round != t.Round { + cs.logger.Trace("stale ticker", "ticker", t) + + return + } + + cs.logger.Debug("timer expired", "ticker", t) + cs.currentState.onTimeout(t) +} + func (cs *consensus) SetProposal(p *proposal.Proposal) { cs.lk.Lock() defer cs.lk.Unlock() @@ -222,7 +235,13 @@ func (cs *consensus) SetProposal(p *proposal.Proposal) { } if p.Round() < cs.round { - cs.logger.Trace("expired round", "proposal", p) + cs.logger.Trace("proposal for expired round", "proposal", p) + + return + } + + if err := p.BasicCheck(); err != nil { + cs.logger.Warn("invalid proposal", "proposal", p, "error", err) return } @@ -266,23 +285,6 @@ func (cs *consensus) SetProposal(p *proposal.Proposal) { cs.currentState.onSetProposal(p) } -func (cs *consensus) handleTimeout(t *ticker) { - cs.lk.Lock() - defer cs.lk.Unlock() - - cs.logger.Trace("handle ticker", "ticker", t) - - // Old tickers might be triggered now. Ignore them. - if cs.height != t.Height || cs.round != t.Round { - cs.logger.Trace("stale ticker", "ticker", t) - - return - } - - cs.logger.Debug("timer expired", "ticker", t) - cs.currentState.onTimeout(t) -} - func (cs *consensus) AddVote(v *vote.Vote) { cs.lk.Lock() defer cs.lk.Unlock() @@ -299,6 +301,12 @@ func (cs *consensus) AddVote(v *vote.Vote) { return } + if v.Round() < cs.round { + cs.logger.Trace("vote for expired round", "vote", v) + + return + } + if v.Type() == vote.VoteTypeCPPreVote || v.Type() == vote.VoteTypeCPMainVote || v.Type() == vote.VoteTypeCPDecided { @@ -318,6 +326,12 @@ func (cs *consensus) AddVote(v *vote.Vote) { cs.logger.Info("new vote added", "vote", v) cs.currentState.onAddVote(v) + + if v.Type() == vote.VoteTypeCPDecided { + if v.Round() > cs.round { + cs.changeProposer.cpDecide(v.Round(), v.CPValue()) + } + } } } @@ -325,6 +339,13 @@ func (cs *consensus) proposer(round int16) *validator.Validator { return cs.bcState.Proposer(round) } +func (cs *consensus) IsProposer() bool { + cs.lk.RLock() + defer cs.lk.RUnlock() + + return cs.isProposer() +} + func (cs *consensus) isProposer() bool { return cs.proposer(cs.round).Address() == cs.valKey.Address() } @@ -377,7 +398,7 @@ func (cs *consensus) signAddVote(v *vote.Vote) { func (cs *consensus) queryProposal() { cs.broadcaster(cs.valKey.Address(), - message.NewQueryProposalMessage(cs.height, cs.valKey.Address())) + message.NewQueryProposalMessage(cs.height, cs.round, cs.valKey.Address())) } // queryVotes is an anti-entropy mechanism to retrieve missed votes @@ -462,14 +483,21 @@ func (cs *consensus) PickRandomVote(round int16) *vote.Vote { defer cs.lk.RUnlock() votes := []*vote.Vote{} - if round == cs.round { + switch { + case round < cs.round: + // Past round: Only broadcast cp:decided votes + vs := cs.log.CPDecidedVoteSet(round) + votes = append(votes, vs.AllVotes()...) + + case round == cs.round: + // Current round m := cs.log.RoundMessages(round) votes = append(votes, m.AllVotes()...) - } else { - // Only broadcast cp:decided votes - vs := cs.log.CPDecidedVoteVoteSet(round) - votes = append(votes, vs.AllVotes()...) + + case round > cs.round: + // Future round } + if len(votes) == 0 { return nil } diff --git a/consensus/consensus_test.go b/consensus/consensus_test.go index da95f69a5..772f1d09d 100644 --- a/consensus/consensus_test.go +++ b/consensus/consensus_test.go @@ -57,12 +57,12 @@ func testConfig() *Config { return &Config{ ChangeProposerTimeout: 1 * time.Hour, // Disabling timers ChangeProposerDelta: 1 * time.Hour, // Disabling timers + QueryVoteTimeout: 1 * time.Hour, // Disabling timers } } func setup(t *testing.T) *testData { t.Helper() - queryVoteInitialTimeout = 2 * time.Hour return setupWithSeed(t, testsuite.GenerateSeed()) } @@ -89,8 +89,10 @@ func setupWithSeed(t *testing.T, seed int64) *testData { params := param.DefaultParams() params.CommitteeSize = 4 - // to prevent triggering timers before starting the tests to avoid double entries for new heights in some tests. - getTime := util.RoundNow(params.BlockIntervalInSecond).Add(time.Duration(params.BlockIntervalInSecond) * time.Second) + // To prevent triggering timers before starting the tests and + // avoid double entries for new heights in some tests. + getTime := util.RoundNow(params.BlockIntervalInSecond). + Add(time.Duration(params.BlockIntervalInSecond) * time.Second) genDoc := genesis.MakeGenesis(getTime, accs, vals, params) stX, err := state.LoadOrNewState(genDoc, []*bls.ValidatorKey{valKeys[tIndexX]}, store.MockingStore(ts), txPool, nil) @@ -401,8 +403,7 @@ func TestStart(t *testing.T) { td := setup(t) td.consX.Start() - td.shouldPublishQueryProposal(t, td.consX, 1) - td.shouldPublishQueryVote(t, td.consX, 1, 0) + td.checkHeightRound(t, td.consX, 1, 0) } func TestNotInCommittee(t *testing.T) { @@ -474,14 +475,14 @@ func TestConsensusAddVote(t *testing.T) { v6, _ := td.GenerateTestPrepareVote(1, 0) td.consP.AddVote(v6) - assert.True(t, td.consP.HasVote(v1.Hash())) // previous round - assert.True(t, td.consP.HasVote(v2.Hash())) // next round + assert.False(t, td.consP.HasVote(v1.Hash())) // previous round + assert.True(t, td.consP.HasVote(v2.Hash())) // next round assert.True(t, td.consP.HasVote(v3.Hash())) assert.True(t, td.consP.HasVote(v4.Hash())) assert.False(t, td.consP.HasVote(v5.Hash())) // valid votes for the next height assert.False(t, td.consP.HasVote(v6.Hash())) // invalid votes - assert.Equal(t, td.consP.AllVotes(), []*vote.Vote{v1, v3, v4}) + assert.Equal(t, td.consP.AllVotes(), []*vote.Vote{v3, v4}) assert.NotContains(t, td.consP.AllVotes(), v2) } @@ -604,6 +605,9 @@ func TestPickRandomVote(t *testing.T) { rndVote1 := td.consP.PickRandomVote(1) assert.Equal(t, rndVote1.Type(), vote.VoteTypePrepare) + + rndVote2 := td.consP.PickRandomVote(2) + assert.Nil(t, rndVote2) } func TestSetProposalFromPreviousRound(t *testing.T) { @@ -724,7 +728,17 @@ func TestProposalWithBigRound(t *testing.T) { p := td.makeProposal(t, 1, util.MaxInt16) td.consP.SetProposal(p) - assert.Equal(t, td.consP.log.RoundProposal(util.MaxInt16), p) + assert.Nil(t, td.consP.Proposal()) +} + +func TestInvalidProposal(t *testing.T) { + td := setup(t) + + td.enterNewHeight(td.consP) + + p := td.makeProposal(t, 1, 0) + p.SetSignature(nil) // Make proposal invalid + td.consP.SetProposal(p) assert.Nil(t, td.consP.Proposal()) } diff --git a/consensus/cp.go b/consensus/cp.go index 7292b9b86..6c6642810 100644 --- a/consensus/cp.go +++ b/consensus/cp.go @@ -310,25 +310,22 @@ func (cp *changeProposer) checkJust(v *vote.Vote) error { } } -func (cp *changeProposer) strongTermination() { - cpDecided := cp.log.CPDecidedVoteVoteSet(cp.round) +func (cp *changeProposer) cpStrongTermination() { + cpDecided := cp.log.CPDecidedVoteSet(cp.round) if cpDecided.HasAnyVoteFor(cp.cpRound, vote.CPValueNo) { - cp.cpDecide(vote.CPValueNo) + cp.cpDecide(cp.round, vote.CPValueNo) } else if cpDecided.HasAnyVoteFor(cp.cpRound, vote.CPValueYes) { - cp.cpDecide(vote.CPValueYes) + cp.cpDecide(cp.round, vote.CPValueYes) } } -func (cp *changeProposer) cpDecide(cpValue vote.CPValue) { +func (cp *changeProposer) cpDecide(round int16, cpValue vote.CPValue) { if cpValue == vote.CPValueYes { - cp.round++ + cp.round = round + 1 cp.cpDecided = 1 cp.enterNewState(cp.proposeState) } else if cpValue == vote.CPValueNo { - roundProposal := cp.log.RoundProposal(cp.round) - if roundProposal == nil { - cp.queryProposal() - } + cp.round = round cp.cpDecided = 0 cp.enterNewState(cp.prepareState) } diff --git a/consensus/cp_decide.go b/consensus/cp_decide.go index ddcd4fa5f..8f87ad6a4 100644 --- a/consensus/cp_decide.go +++ b/consensus/cp_decide.go @@ -26,7 +26,7 @@ func (s *cpDecideState) decide() { QCert: cert, } s.signAddCPDecidedVote(hash.UndefHash, s.cpRound, vote.CPValueYes, just) - s.cpDecide(vote.CPValueYes) + s.cpDecide(s.round, vote.CPValueYes) } else if cpMainVotes.HasQuorumVotesFor(s.cpRound, vote.CPValueNo) { // decided for no and proceeds to the next round s.logger.Info("binary agreement decided", "value", 0, "round", s.cpRound) @@ -37,7 +37,7 @@ func (s *cpDecideState) decide() { QCert: cert, } s.signAddCPDecidedVote(*s.cpWeakValidity, s.cpRound, vote.CPValueNo, just) - s.cpDecide(vote.CPValueNo) + s.cpDecide(s.round, vote.CPValueNo) } else { // conflicting votes s.logger.Debug("conflicting main votes", "round", s.cpRound) @@ -52,7 +52,7 @@ func (s *cpDecideState) onAddVote(v *vote.Vote) { s.decide() } - s.strongTermination() + s.cpStrongTermination() } func (*cpDecideState) name() string { diff --git a/consensus/cp_mainvote.go b/consensus/cp_mainvote.go index cd03d9e0a..1685c9b86 100644 --- a/consensus/cp_mainvote.go +++ b/consensus/cp_mainvote.go @@ -91,7 +91,7 @@ func (s *cpMainVoteState) onAddVote(v *vote.Vote) { s.decide() } - s.strongTermination() + s.cpStrongTermination() } func (*cpMainVoteState) name() string { diff --git a/consensus/cp_prevote.go b/consensus/cp_prevote.go index 8c0b002ce..b7af57e22 100644 --- a/consensus/cp_prevote.go +++ b/consensus/cp_prevote.go @@ -1,14 +1,10 @@ package consensus import ( - "time" - "github.com/pactus-project/pactus/crypto/hash" "github.com/pactus-project/pactus/types/vote" ) -var queryVoteInitialTimeout = 2 * time.Second - type cpPreVoteState struct { *changeProposer } @@ -33,7 +29,7 @@ func (s *cpPreVoteState) decide() { just := &vote.JustInitYes{} s.signAddCPPreVote(hash.UndefHash, s.cpRound, 1, just) } - s.scheduleTimeout(queryVoteInitialTimeout, s.height, s.round, tickerTargetQueryVotes) + s.scheduleTimeout(s.config.QueryVoteTimeout, s.height, s.round, tickerTargetQueryVotes) } else { cpMainVotes := s.log.CPMainVoteVoteSet(s.round) switch { diff --git a/consensus/interface.go b/consensus/interface.go index 8da60b86e..21288778c 100644 --- a/consensus/interface.go +++ b/consensus/interface.go @@ -15,6 +15,7 @@ type Reader interface { HasVote(h hash.Hash) bool HeightRound() (uint32, int16) IsActive() bool + IsProposer() bool } type Consensus interface { @@ -32,6 +33,7 @@ type ManagerReader interface { Proposal() *proposal.Proposal HeightRound() (uint32, int16) HasActiveInstance() bool + HasProposer() bool } type Manager interface { diff --git a/consensus/log/log.go b/consensus/log/log.go index ef59b3c96..87834bf7e 100644 --- a/consensus/log/log.go +++ b/consensus/log/log.go @@ -43,7 +43,7 @@ func (log *Log) mustGetRoundMessages(round int16) *Messages { precommitVotes: voteset.NewPrecommitVoteSet(round, log.totalPower, log.validators), cpPreVotes: voteset.NewCPPreVoteVoteSet(round, log.totalPower, log.validators), cpMainVotes: voteset.NewCPMainVoteVoteSet(round, log.totalPower, log.validators), - cpDecidedVotes: voteset.NewCPDecidedVoteVoteSet(round, log.totalPower, log.validators), + cpDecidedVotes: voteset.NewCPDecidedVoteSet(round, log.totalPower, log.validators), } log.roundMessages[round] = rm } @@ -81,7 +81,7 @@ func (log *Log) CPMainVoteVoteSet(round int16) *voteset.BinaryVoteSet { return m.cpMainVotes } -func (log *Log) CPDecidedVoteVoteSet(round int16) *voteset.BinaryVoteSet { +func (log *Log) CPDecidedVoteSet(round int16) *voteset.BinaryVoteSet { m := log.mustGetRoundMessages(round) return m.cpDecidedVotes diff --git a/consensus/manager.go b/consensus/manager.go index 1eebcca44..d31171179 100644 --- a/consensus/manager.go +++ b/consensus/manager.go @@ -106,6 +106,18 @@ func (mgr *manager) HasActiveInstance() bool { return false } +// HasProposer checks if any of the consensus instances is the proposer +// for the current round. +func (mgr *manager) HasProposer() bool { + for _, cons := range mgr.instances { + if cons.IsProposer() { + return true + } + } + + return false +} + // MoveToNewHeight moves all consensus instances to a new height. func (mgr *manager) MoveToNewHeight() { for _, cons := range mgr.instances { diff --git a/consensus/mock.go b/consensus/mock.go index 09f2da8c7..49b414fdb 100644 --- a/consensus/mock.go +++ b/consensus/mock.go @@ -21,6 +21,7 @@ type MockConsensus struct { Votes []*vote.Vote CurProposal *proposal.Proposal Active bool + Proposer bool Height uint32 Round int16 } @@ -132,6 +133,13 @@ func (m *MockConsensus) IsActive() bool { return m.Active } +func (m *MockConsensus) IsProposer() bool { + m.lk.Lock() + defer m.lk.Unlock() + + return m.Proposer +} + func (m *MockConsensus) SetActive(active bool) { m.lk.Lock() defer m.lk.Unlock() diff --git a/consensus/voteset/binary_voteset.go b/consensus/voteset/binary_voteset.go index 107addb4b..c1250b184 100644 --- a/consensus/voteset/binary_voteset.go +++ b/consensus/voteset/binary_voteset.go @@ -53,7 +53,7 @@ func NewCPMainVoteVoteSet(round int16, totalPower int64, return newBinaryVoteSet(voteSet) } -func NewCPDecidedVoteVoteSet(round int16, totalPower int64, +func NewCPDecidedVoteSet(round int16, totalPower int64, validators map[crypto.Address]*validator.Validator, ) *BinaryVoteSet { voteSet := newVoteSet(round, totalPower, validators) diff --git a/consensus/voteset/voteset_test.go b/consensus/voteset/voteset_test.go index 6154b49a3..976c1ffa2 100644 --- a/consensus/voteset/voteset_test.go +++ b/consensus/voteset/voteset_test.go @@ -389,7 +389,7 @@ func TestDecidedVoteset(t *testing.T) { height := ts.RandHeight() round := ts.RandRound() just := &vote.JustInitYes{} - vs := NewCPDecidedVoteVoteSet(round, totalPower, valsMap) + vs := NewCPDecidedVoteSet(round, totalPower, valsMap) v1 := vote.NewCPDecidedVote(h, height, round, 0, vote.CPValueYes, just, valKeys[0].Address()) diff --git a/fastconsensus/config.go b/fastconsensus/config.go index 805a94294..94c2c6be6 100644 --- a/fastconsensus/config.go +++ b/fastconsensus/config.go @@ -5,13 +5,15 @@ import "time" type Config struct { ChangeProposerTimeout time.Duration `toml:"-"` ChangeProposerDelta time.Duration `toml:"-"` + QueryVoteTimeout time.Duration `toml:"-"` MinimumAvailabilityScore float64 `toml:"-"` } func DefaultConfig() *Config { return &Config{ - ChangeProposerTimeout: 8 * time.Second, - ChangeProposerDelta: 4 * time.Second, + ChangeProposerTimeout: 5 * time.Second, + ChangeProposerDelta: 5 * time.Second, + QueryVoteTimeout: 5 * time.Second, MinimumAvailabilityScore: 0.9, } } @@ -38,7 +40,6 @@ func (conf *Config) BasicCheck() error { } func (conf *Config) CalculateChangeProposerTimeout(round int16) time.Duration { - return time.Duration( - conf.ChangeProposerTimeout.Milliseconds()+conf.ChangeProposerDelta.Milliseconds()*int64(round), - ) * time.Millisecond + return conf.ChangeProposerTimeout + + conf.ChangeProposerDelta*time.Duration(round) } diff --git a/fastconsensus/consensus.go b/fastconsensus/consensus.go index d45fff1f9..69d03e51f 100644 --- a/fastconsensus/consensus.go +++ b/fastconsensus/consensus.go @@ -119,13 +119,7 @@ func (cs *consensus) Start() { cs.lk.Lock() defer cs.lk.Unlock() - cs.doMoveToNewHeight() - // We have just started the consensus (possibly restarting the node). - // Therefore, let's query the votes and proposals in case we missed any. - if cs.active { - cs.queryProposal() - cs.queryVotes() - } + cs.moveToNewHeight() } func (cs *consensus) String() string { @@ -187,10 +181,10 @@ func (cs *consensus) MoveToNewHeight() { cs.lk.Lock() defer cs.lk.Unlock() - cs.doMoveToNewHeight() + cs.moveToNewHeight() } -func (cs *consensus) doMoveToNewHeight() { +func (cs *consensus) moveToNewHeight() { stateHeight := cs.bcState.LastBlockHeight() if cs.height != stateHeight+1 { cs.enterNewState(cs.newHeightState) @@ -208,6 +202,23 @@ func (cs *consensus) scheduleTimeout(duration time.Duration, height uint32, roun }() } +func (cs *consensus) handleTimeout(t *ticker) { + cs.lk.Lock() + defer cs.lk.Unlock() + + cs.logger.Trace("handle ticker", "ticker", t) + + // Old tickers might be triggered now. Ignore them. + if cs.height != t.Height || cs.round != t.Round { + cs.logger.Trace("stale ticker", "ticker", t) + + return + } + + cs.logger.Debug("timer expired", "ticker", t) + cs.currentState.onTimeout(t) +} + func (cs *consensus) SetProposal(p *proposal.Proposal) { cs.lk.Lock() defer cs.lk.Unlock() @@ -230,6 +241,12 @@ func (cs *consensus) SetProposal(p *proposal.Proposal) { return } + if err := p.BasicCheck(); err != nil { + cs.logger.Warn("invalid proposal", "proposal", p, "error", err) + + return + } + roundProposal := cs.log.RoundProposal(p.Round()) if roundProposal != nil { cs.logger.Trace("this round has proposal", "proposal", p) @@ -269,23 +286,6 @@ func (cs *consensus) SetProposal(p *proposal.Proposal) { cs.currentState.onSetProposal(p) } -func (cs *consensus) handleTimeout(t *ticker) { - cs.lk.Lock() - defer cs.lk.Unlock() - - cs.logger.Trace("handle ticker", "ticker", t) - - // Old tickers might be triggered now. Ignore them. - if cs.height != t.Height || cs.round != t.Round { - cs.logger.Trace("stale ticker", "ticker", t) - - return - } - - cs.logger.Debug("timer expired", "ticker", t) - cs.currentState.onTimeout(t) -} - func (cs *consensus) AddVote(v *vote.Vote) { cs.lk.Lock() defer cs.lk.Unlock() @@ -302,6 +302,12 @@ func (cs *consensus) AddVote(v *vote.Vote) { return } + if v.Round() < cs.round { + cs.logger.Trace("vote for expired round", "vote", v) + + return + } + if v.Type() == vote.VoteTypeCPPreVote || v.Type() == vote.VoteTypeCPMainVote || v.Type() == vote.VoteTypeCPDecided { @@ -321,6 +327,14 @@ func (cs *consensus) AddVote(v *vote.Vote) { cs.logger.Info("new vote added", "vote", v) cs.currentState.onAddVote(v) + + // If there is a proper and justified "Decide" vote for a subsequent round, move consensus to that round. + // This especially helps validators to catch up with the network when they restart their node. + if v.Type() == vote.VoteTypeCPDecided { + if v.Round() > cs.round { + cs.changeProposer.cpDecide(v.Round(), v.CPValue()) + } + } } } @@ -328,6 +342,13 @@ func (cs *consensus) proposer(round int16) *validator.Validator { return cs.bcState.Proposer(round) } +func (cs *consensus) IsProposer() bool { + cs.lk.RLock() + defer cs.lk.RUnlock() + + return cs.isProposer() +} + func (cs *consensus) isProposer() bool { return cs.proposer(cs.round).Address() == cs.valKey.Address() } @@ -380,7 +401,7 @@ func (cs *consensus) signAddVote(v *vote.Vote) { func (cs *consensus) queryProposal() { cs.broadcaster(cs.valKey.Address(), - message.NewQueryProposalMessage(cs.height, cs.valKey.Address())) + message.NewQueryProposalMessage(cs.height, cs.round, cs.valKey.Address())) } // queryVotes is an anti-entropy mechanism to retrieve missed votes @@ -465,14 +486,21 @@ func (cs *consensus) PickRandomVote(round int16) *vote.Vote { defer cs.lk.RUnlock() votes := []*vote.Vote{} - if round == cs.round { + switch { + case round < cs.round: + // Past round: Only broadcast cp:decided votes + vs := cs.log.CPDecidedVoteSet(round) + votes = append(votes, vs.AllVotes()...) + + case round == cs.round: + // Current round m := cs.log.RoundMessages(round) votes = append(votes, m.AllVotes()...) - } else { - // Only broadcast cp:decided votes - vs := cs.log.CPDecidedVoteVoteSet(round) - votes = append(votes, vs.AllVotes()...) + + case round > cs.round: + // Future round } + if len(votes) == 0 { return nil } diff --git a/fastconsensus/consensus_test.go b/fastconsensus/consensus_test.go index 9cd4479d8..969356731 100644 --- a/fastconsensus/consensus_test.go +++ b/fastconsensus/consensus_test.go @@ -61,12 +61,12 @@ func testConfig() *Config { return &Config{ ChangeProposerTimeout: 1 * time.Hour, // Disabling timers ChangeProposerDelta: 1 * time.Hour, // Disabling timers + QueryVoteTimeout: 1 * time.Hour, // Disabling timers } } func setup(t *testing.T) *testData { t.Helper() - queryVoteInitialTimeout = 2 * time.Hour return setupWithSeed(t, testsuite.GenerateSeed()) } @@ -93,8 +93,10 @@ func setupWithSeed(t *testing.T, seed int64) *testData { params := param.DefaultParams() params.CommitteeSize = 6 - // to prevent triggering timers before starting the tests to avoid double entries for new heights in some tests. - getTime := util.RoundNow(params.BlockIntervalInSecond).Add(time.Duration(params.BlockIntervalInSecond) * time.Second) + // To prevent triggering timers before starting the tests and + // avoid double entries for new heights in some tests. + getTime := util.RoundNow(params.BlockIntervalInSecond). + Add(time.Duration(params.BlockIntervalInSecond) * time.Second) genDoc := genesis.MakeGenesis(getTime, accs, vals, params) consMessages := make([]consMessage, 0) @@ -196,7 +198,7 @@ func (td *testData) shouldPublishProposal(t *testing.T, cons *consensus, return nil } -func (td *testData) shouldPublishQueryProposal(t *testing.T, cons *consensus, height uint32) { +func (td *testData) shouldPublishQueryProposal(t *testing.T, cons *consensus, height uint32, round int16) { t.Helper() for _, consMsg := range td.consMessages { @@ -207,6 +209,7 @@ func (td *testData) shouldPublishQueryProposal(t *testing.T, cons *consensus, he m := consMsg.message.(*message.QueryProposalMessage) assert.Equal(t, m.Height, height) + assert.Equal(t, m.Round, round) assert.Equal(t, m.Querier, cons.valKey.Address()) return @@ -378,6 +381,7 @@ func (td *testData) commitBlockForAllStates(t *testing.T) (*block.Block, *certif return blk, cert } +// makeProposal generates a signed and valid proposal for the given height and round. func (td *testData) makeProposal(t *testing.T, height uint32, round int16) *proposal.Proposal { t.Helper() @@ -486,8 +490,7 @@ func TestStart(t *testing.T) { td := setup(t) td.consX.Start() - td.shouldPublishQueryProposal(t, td.consX, 1) - td.shouldPublishQueryVote(t, td.consX, 1, 0) + td.checkHeightRound(t, td.consX, 1, 0) } func TestNotInCommittee(t *testing.T) { @@ -557,14 +560,14 @@ func TestConsensusAddVote(t *testing.T) { v6, _ := td.GenerateTestPrepareVote(1, 0) td.consP.AddVote(v6) - assert.True(t, td.consP.HasVote(v1.Hash())) // previous round - assert.True(t, td.consP.HasVote(v2.Hash())) // next round + assert.False(t, td.consP.HasVote(v1.Hash())) // previous round + assert.True(t, td.consP.HasVote(v2.Hash())) // next round assert.True(t, td.consP.HasVote(v3.Hash())) assert.True(t, td.consP.HasVote(v4.Hash())) assert.False(t, td.consP.HasVote(v5.Hash())) // valid votes for the next height assert.False(t, td.consP.HasVote(v6.Hash())) // invalid votes - assert.Equal(t, td.consP.AllVotes(), []*vote.Vote{v1, v3, v4}) + assert.Equal(t, td.consP.AllVotes(), []*vote.Vote{v3, v4}) assert.NotContains(t, td.consP.AllVotes(), v2) } @@ -591,7 +594,7 @@ func TestConsensusLateProposal(t *testing.T) { td.addPrepareVote(td.consP, blockHash, h, r, tIndexM) td.addPrepareVote(td.consP, blockHash, h, r, tIndexN) - td.shouldPublishQueryProposal(t, td.consP, h) + td.shouldPublishQueryProposal(t, td.consP, h, r) // consP receives proposal now td.consP.SetProposal(prop) @@ -630,7 +633,7 @@ func TestConsensusVeryLateProposal(t *testing.T) { td.addPrecommitVote(td.consP, blockHash, h, r, tIndexM) td.addPrecommitVote(td.consP, blockHash, h, r, tIndexN) - td.shouldPublishQueryProposal(t, td.consP, h) + td.shouldPublishQueryProposal(t, td.consP, h, r) // consP receives proposal now td.consP.SetProposal(prop) @@ -673,6 +676,9 @@ func TestPickRandomVote(t *testing.T) { rndVote1 := td.consP.PickRandomVote(r + 1) assert.Equal(t, rndVote1, v6) + + rndVote2 := td.consP.PickRandomVote(r + 2) + assert.Nil(t, rndVote2) } func TestSetProposalFromPreviousRound(t *testing.T) { @@ -793,7 +799,17 @@ func TestProposalWithBigRound(t *testing.T) { prop := td.makeProposal(t, 1, util.MaxInt16) td.consP.SetProposal(prop) - assert.Equal(t, td.consP.log.RoundProposal(util.MaxInt16), prop) + assert.Nil(t, td.consP.Proposal()) +} + +func TestInvalidProposal(t *testing.T) { + td := setup(t) + + td.enterNewHeight(td.consP) + + p := td.makeProposal(t, 1, 0) + p.SetSignature(nil) // Make proposal invalid + td.consP.SetProposal(p) assert.Nil(t, td.consP.Proposal()) } @@ -808,7 +824,7 @@ func TestCases(t *testing.T) { // {1694849103290580532, 1, "Conflicting votes, cp-round=0"}, // {1697900665869342730, 1, "Conflicting votes, cp-round=1"}, // {1697887970998950590, 1, "consP & consB: Change Proposer, consX & consY: Commit (2 block announces)"}, - {1702913410152124511, 0, ""}, + {1717870730391411396, 2, "move to the next round on decided vote"}, } for i, test := range tests { diff --git a/fastconsensus/cp.go b/fastconsensus/cp.go index ee5ef01a1..14ebcaa79 100644 --- a/fastconsensus/cp.go +++ b/fastconsensus/cp.go @@ -312,20 +312,29 @@ func (cp *changeProposer) cpCheckJust(v *vote.Vote) error { } // cpStrongTermination decides if the Change Proposer phase should be terminated. -// If there is only one proper and justified `decided` vote, the validators can +// If there is only one proper and justified `Decided` vote, the validators can // move on to the next phase. -// If the decided vote is for "No," then validators move to the precommit step and +// If the `Decided` vote is for "No", then validators move to the precommit step and // wait for committing the current proposal by gathering enough precommit votes. -// If the decided vote is for "Yes," then the validator moves to the propose step +// If the `Decided` vote is for "Yes", then the validator moves to the propose step // and starts a new round. func (cp *changeProposer) cpStrongTermination() { - cpDecided := cp.log.CPDecidedVoteVoteSet(cp.round) + cpDecided := cp.log.CPDecidedVoteSet(cp.round) if cpDecided.HasAnyVoteFor(cp.cpRound, vote.CPValueNo) { - cp.cpDecided = 0 - cp.enterNewState(cp.precommitState) + cp.cpDecide(cp.round, vote.CPValueNo) } else if cpDecided.HasAnyVoteFor(cp.cpRound, vote.CPValueYes) { - cp.round++ + cp.cpDecide(cp.round, vote.CPValueYes) + } +} + +func (cp *changeProposer) cpDecide(round int16, cpValue vote.CPValue) { + if cpValue == vote.CPValueYes { + cp.round = round + 1 cp.cpDecided = 1 cp.enterNewState(cp.proposeState) + } else if cpValue == vote.CPValueNo { + cp.round = round + cp.cpDecided = 0 + cp.enterNewState(cp.precommitState) } } diff --git a/fastconsensus/cp_decide.go b/fastconsensus/cp_decide.go index 53abbb51a..4e98d3211 100644 --- a/fastconsensus/cp_decide.go +++ b/fastconsensus/cp_decide.go @@ -29,7 +29,7 @@ func (s *cpDecideState) decide() { QCert: cert, } s.signAddCPDecidedVote(hash.UndefHash, s.cpRound, vote.CPValueYes, just) - s.cpStrongTermination() + s.cpDecide(s.round, vote.CPValueYes) } else if cpMainVotes.HasTwoFPlusOneVotesFor(s.cpRound, vote.CPValueNo) { // decided for no and proceeds to the next round s.logger.Info("binary agreement decided", "value", "no", "round", s.cpRound) @@ -40,7 +40,7 @@ func (s *cpDecideState) decide() { QCert: cert, } s.signAddCPDecidedVote(*s.cpWeakValidity, s.cpRound, vote.CPValueNo, just) - s.cpStrongTermination() + s.cpDecide(s.round, vote.CPValueNo) } else { // conflicting votes s.logger.Debug("conflicting main votes", "round", s.cpRound) diff --git a/fastconsensus/cp_prevote.go b/fastconsensus/cp_prevote.go index 0ed758092..ca01aa997 100644 --- a/fastconsensus/cp_prevote.go +++ b/fastconsensus/cp_prevote.go @@ -1,15 +1,11 @@ package fastconsensus import ( - "time" - "github.com/pactus-project/pactus/crypto/hash" "github.com/pactus-project/pactus/types/proposal" "github.com/pactus-project/pactus/types/vote" ) -var queryVoteInitialTimeout = 2 * time.Second - type cpPreVoteState struct { *changeProposer } @@ -47,7 +43,7 @@ func (s *cpPreVoteState) decide() { just := &vote.JustInitYes{} s.signAddCPPreVote(hash.UndefHash, s.cpRound, 1, just) } - s.scheduleTimeout(queryVoteInitialTimeout, s.height, s.round, tickerTargetQueryVotes) + s.scheduleTimeout(s.config.QueryVoteTimeout, s.height, s.round, tickerTargetQueryVotes) } else { cpMainVotes := s.log.CPMainVoteVoteSet(s.round) switch { diff --git a/fastconsensus/cp_test.go b/fastconsensus/cp_test.go index 377b6fed3..b4eeb3e78 100644 --- a/fastconsensus/cp_test.go +++ b/fastconsensus/cp_test.go @@ -415,7 +415,7 @@ func TestInvalidJustDecided(t *testing.T) { }) t.Run("invalid certificate", func(t *testing.T) { - v := vote.NewCPDecidedVote(td.RandHash(), h, r, 0, vote.CPValueYes, just, td.consB.valKey.Address()) + v := vote.NewCPDecidedVote(hash.UndefHash, h, r, 0, vote.CPValueYes, just, td.consB.valKey.Address()) err := td.consX.changeProposer.cpCheckJust(v) assert.ErrorIs(t, err, invalidJustificationError{ @@ -423,3 +423,30 @@ func TestInvalidJustDecided(t *testing.T) { }) }) } + +func TestMoveToNextRoundOnDecidedVoteYes(t *testing.T) { + td := setup(t) + + td.enterNewHeight(td.consP) + h := uint32(1) + r := int16(3) + + _, _, decideJust := td.makeChangeProposerJusts(t, hash.UndefHash, h, r) + td.addCPDecidedVote(td.consP, hash.UndefHash, h, r, vote.CPValueYes, decideJust, tIndexX) + + td.checkHeightRound(t, td.consP, h, r+1) +} + +func TestMoveToNextRoundOnDecidedVoteNo(t *testing.T) { + td := setup(t) + + td.enterNewHeight(td.consP) + h := uint32(1) + r := int16(3) + propHash := td.RandHash() + + _, _, decideJust := td.makeChangeProposerJusts(t, propHash, h, r) + td.addCPDecidedVote(td.consP, propHash, h, r, vote.CPValueNo, decideJust, tIndexX) + + td.checkHeightRound(t, td.consP, h, r) +} diff --git a/fastconsensus/interface.go b/fastconsensus/interface.go index adae51b8e..0637450ce 100644 --- a/fastconsensus/interface.go +++ b/fastconsensus/interface.go @@ -15,6 +15,7 @@ type Reader interface { HasVote(h hash.Hash) bool HeightRound() (uint32, int16) IsActive() bool + IsProposer() bool } type Consensus interface { @@ -32,6 +33,7 @@ type ManagerReader interface { Proposal() *proposal.Proposal HeightRound() (uint32, int16) HasActiveInstance() bool + HasProposer() bool } type Manager interface { @@ -40,6 +42,6 @@ type Manager interface { Start() error Stop() MoveToNewHeight() - AddVote(vot *vote.Vote) + AddVote(vte *vote.Vote) SetProposal(prop *proposal.Proposal) } diff --git a/fastconsensus/log/log.go b/fastconsensus/log/log.go index 56ddd3b0a..c3f75c26b 100644 --- a/fastconsensus/log/log.go +++ b/fastconsensus/log/log.go @@ -43,7 +43,7 @@ func (log *Log) mustGetRoundMessages(round int16) *Messages { precommitVotes: voteset.NewPrecommitVoteSet(round, log.totalPower, log.validators), cpPreVotes: voteset.NewCPPreVoteVoteSet(round, log.totalPower, log.validators), cpMainVotes: voteset.NewCPMainVoteVoteSet(round, log.totalPower, log.validators), - cpDecidedVotes: voteset.NewCPDecidedVoteVoteSet(round, log.totalPower, log.validators), + cpDecidedVotes: voteset.NewCPDecidedVoteSet(round, log.totalPower, log.validators), } log.roundMessages[round] = rm } @@ -81,7 +81,7 @@ func (log *Log) CPMainVoteVoteSet(round int16) *voteset.BinaryVoteSet { return m.cpMainVotes } -func (log *Log) CPDecidedVoteVoteSet(round int16) *voteset.BinaryVoteSet { +func (log *Log) CPDecidedVoteSet(round int16) *voteset.BinaryVoteSet { m := log.mustGetRoundMessages(round) return m.cpDecidedVotes diff --git a/fastconsensus/manager.go b/fastconsensus/manager.go index 62f177fb0..f1d95fa49 100644 --- a/fastconsensus/manager.go +++ b/fastconsensus/manager.go @@ -106,6 +106,18 @@ func (mgr *manager) HasActiveInstance() bool { return false } +// HasProposer checks if any of the consensus instances is the proposer +// for the current round. +func (mgr *manager) HasProposer() bool { + for _, cons := range mgr.instances { + if cons.IsProposer() { + return true + } + } + + return false +} + // MoveToNewHeight moves all consensus instances to a new height. func (mgr *manager) MoveToNewHeight() { for _, cons := range mgr.instances { diff --git a/fastconsensus/mock.go b/fastconsensus/mock.go index f7dbcc6ca..82af6c773 100644 --- a/fastconsensus/mock.go +++ b/fastconsensus/mock.go @@ -21,6 +21,7 @@ type MockConsensus struct { Votes []*vote.Vote CurProposal *proposal.Proposal Active bool + Proposer bool Height uint32 Round int16 } @@ -132,6 +133,13 @@ func (m *MockConsensus) IsActive() bool { return m.Active } +func (m *MockConsensus) IsProposer() bool { + m.lk.Lock() + defer m.lk.Unlock() + + return m.Proposer +} + func (m *MockConsensus) SetActive(active bool) { m.lk.Lock() defer m.lk.Unlock() diff --git a/fastconsensus/precommit_test.go b/fastconsensus/precommit_test.go index 2604e4000..f1e34f5aa 100644 --- a/fastconsensus/precommit_test.go +++ b/fastconsensus/precommit_test.go @@ -33,5 +33,5 @@ func TestPrecommitQueryProposal(t *testing.T) { td.addPrecommitVote(td.consP, propBlockHash, h, r, tIndexM) td.addPrecommitVote(td.consP, propBlockHash, h, r, tIndexN) - td.shouldPublishQueryProposal(t, td.consP, h) + td.shouldPublishQueryProposal(t, td.consP, h, r) } diff --git a/fastconsensus/prepare_test.go b/fastconsensus/prepare_test.go index d74b23216..49e6d6acc 100644 --- a/fastconsensus/prepare_test.go +++ b/fastconsensus/prepare_test.go @@ -29,7 +29,7 @@ func TestQueryProposal(t *testing.T) { td.enterNextRound(td.consP) td.queryProposalTimeout(td.consP) - td.shouldPublishQueryProposal(t, td.consP, h) + td.shouldPublishQueryProposal(t, td.consP, h, 1) td.shouldNotPublish(t, td.consP, message.TypeQueryVote) } @@ -87,7 +87,7 @@ func TestByzantineProposal(t *testing.T) { td.addPrepareVote(td.consP, propBlockHash, h, r, tIndexN) assert.Nil(t, td.consP.Proposal()) - td.shouldPublishQueryProposal(t, td.consP, h) + td.shouldPublishQueryProposal(t, td.consP, h, r) // Byzantine node sends second proposal to Partitioned node. trx := tx.NewTransferTx(h, td.consX.rewardAddr, @@ -99,6 +99,6 @@ func TestByzantineProposal(t *testing.T) { td.consP.SetProposal(byzProp) assert.Nil(t, td.consP.Proposal()) - td.shouldPublishQueryProposal(t, td.consP, h) + td.shouldPublishQueryProposal(t, td.consP, h, r) td.checkHeightRound(t, td.consP, h, r) } diff --git a/fastconsensus/propose_test.go b/fastconsensus/propose_test.go index cea898393..c0d699cb2 100644 --- a/fastconsensus/propose_test.go +++ b/fastconsensus/propose_test.go @@ -77,7 +77,7 @@ func TestNetworkLagging(t *testing.T) { td.addPrepareVote(td.consP, prop.Block().Hash(), h, r, tIndexY) td.queryProposalTimeout(td.consP) - td.shouldPublishQueryProposal(t, td.consP, h) + td.shouldPublishQueryProposal(t, td.consP, h, r) // Proposal is received now td.consP.SetProposal(prop) diff --git a/fastconsensus/voteset/binary_voteset.go b/fastconsensus/voteset/binary_voteset.go index 8221d9032..69dc0a75b 100644 --- a/fastconsensus/voteset/binary_voteset.go +++ b/fastconsensus/voteset/binary_voteset.go @@ -53,7 +53,7 @@ func NewCPMainVoteVoteSet(round int16, totalPower int64, return newBinaryVoteSet(voteSet) } -func NewCPDecidedVoteVoteSet(round int16, totalPower int64, +func NewCPDecidedVoteSet(round int16, totalPower int64, validators map[crypto.Address]*validator.Validator, ) *BinaryVoteSet { voteSet := newVoteSet(round, totalPower, validators) diff --git a/fastconsensus/voteset/voteset_test.go b/fastconsensus/voteset/voteset_test.go index c803e4ad1..722e1d826 100644 --- a/fastconsensus/voteset/voteset_test.go +++ b/fastconsensus/voteset/voteset_test.go @@ -409,7 +409,7 @@ func TestDecidedVoteset(t *testing.T) { height := ts.RandHeight() round := ts.RandRound() just := &vote.JustInitYes{} - vs := NewCPDecidedVoteVoteSet(round, totalPower, valsMap) + vs := NewCPDecidedVoteSet(round, totalPower, valsMap) v1 := vote.NewCPDecidedVote(h, height, round, 0, vote.CPValueYes, just, valKeys[0].Address()) diff --git a/network/gater_test.go b/network/gater_test.go index b05f795c3..bd3669d1b 100644 --- a/network/gater_test.go +++ b/network/gater_test.go @@ -71,11 +71,11 @@ func TestMaxConnection(t *testing.T) { cmaPublic := &mockConnMultiaddrs{remote: maPublic} pid := ts.RandPeerID() - net.peerMgr.PeerConnected(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirOutbound) - net.peerMgr.PeerConnected(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirInbound) - net.peerMgr.PeerConnected(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirInbound) - net.peerMgr.PeerConnected(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirInbound) - net.peerMgr.PeerConnected(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirInbound) + net.peerMgr.SetPeerConnected(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirOutbound) + net.peerMgr.SetPeerConnected(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirInbound) + net.peerMgr.SetPeerConnected(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirInbound) + net.peerMgr.SetPeerConnected(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirInbound) + net.peerMgr.SetPeerConnected(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirInbound) assert.True(t, net.connGater.InterceptPeerDial(pid)) assert.True(t, net.connGater.InterceptAddrDial(pid, maPrivate)) @@ -83,7 +83,7 @@ func TestMaxConnection(t *testing.T) { assert.True(t, net.connGater.InterceptAccept(cmaPrivate)) assert.True(t, net.connGater.InterceptAccept(cmaPublic)) - net.peerMgr.PeerConnected(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirOutbound) + net.peerMgr.SetPeerConnected(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirOutbound) assert.False(t, net.connGater.InterceptPeerDial(pid)) assert.False(t, net.connGater.InterceptAddrDial(pid, maPrivate)) @@ -91,7 +91,7 @@ func TestMaxConnection(t *testing.T) { assert.True(t, net.connGater.InterceptAccept(cmaPrivate)) assert.True(t, net.connGater.InterceptAccept(cmaPublic)) - net.peerMgr.PeerConnected(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirInbound) + net.peerMgr.SetPeerConnected(ts.RandPeerID(), aMultiAddr, lp2pnetwork.DirInbound) assert.False(t, net.connGater.InterceptPeerDial(pid)) assert.False(t, net.connGater.InterceptAddrDial(pid, maPrivate)) diff --git a/network/gossip.go b/network/gossip.go index 9cd58c167..983c222c8 100644 --- a/network/gossip.go +++ b/network/gossip.go @@ -121,7 +121,7 @@ func (g *gossipService) JoinTopic(topicID TopicID, sp ShouldPropagate) error { return nil } - topic, err := g.doJoinTopic(topicID, sp) + topic, err := g.joinTopic(topicID, sp) if err != nil { return err } @@ -136,7 +136,7 @@ func (g *gossipService) JoinTopic(topicID TopicID, sp ShouldPropagate) error { return nil } - topic, err := g.doJoinTopic(topicID, sp) + topic, err := g.joinTopic(topicID, sp) if err != nil { return err } @@ -151,7 +151,7 @@ func (g *gossipService) JoinTopic(topicID TopicID, sp ShouldPropagate) error { return nil } - topic, err := g.doJoinTopic(topicID, sp) + topic, err := g.joinTopic(topicID, sp) if err != nil { return err } @@ -168,7 +168,7 @@ func (g *gossipService) TopicName(topicID TopicID) string { return fmt.Sprintf("/%s/topic/%s/v1", g.networkName, topicID.String()) } -func (g *gossipService) doJoinTopic(topicID TopicID, sp ShouldPropagate) (*lp2pps.Topic, error) { +func (g *gossipService) joinTopic(topicID TopicID, sp ShouldPropagate) (*lp2pps.Topic, error) { topicName := g.TopicName(topicID) topic, err := g.pubsub.Join(topicName) if err != nil { diff --git a/network/notifee.go b/network/notifee.go index 10f8c9fb7..d54637926 100644 --- a/network/notifee.go +++ b/network/notifee.go @@ -99,7 +99,7 @@ func (s *NotifeeService) Connected(_ lp2pnetwork.Network, conn lp2pnetwork.Conn) pid := conn.RemotePeer() s.logger.Info("connected to peer", "pid", pid, "direction", conn.Stat().Direction, "addr", conn.RemoteMultiaddr()) - s.peerMgr.PeerConnected(pid, conn.RemoteMultiaddr(), conn.Stat().Direction) + s.peerMgr.SetPeerConnected(pid, conn.RemoteMultiaddr(), conn.Stat().Direction) s.sendConnectEvent(pid, conn.RemoteMultiaddr(), conn.Stat().Direction) } @@ -107,7 +107,7 @@ func (s *NotifeeService) Disconnected(_ lp2pnetwork.Network, conn lp2pnetwork.Co pid := conn.RemotePeer() s.logger.Info("disconnected from peer", "pid", pid) - s.peerMgr.PeerDisconnected(pid) + s.peerMgr.SetPeerDisconnected(pid) s.sendDisconnectEvent(pid) } diff --git a/network/peermgr.go b/network/peermgr.go index 4e9b720f7..386c8dccc 100644 --- a/network/peermgr.go +++ b/network/peermgr.go @@ -81,7 +81,7 @@ func (mgr *peerMgr) Start() { func (*peerMgr) Stop() { } -func (mgr *peerMgr) PeerConnected(pid lp2ppeer.ID, ma multiaddr.Multiaddr, +func (mgr *peerMgr) SetPeerConnected(pid lp2ppeer.ID, ma multiaddr.Multiaddr, direction lp2pnet.Direction, ) { mgr.lk.Lock() @@ -116,7 +116,7 @@ func (mgr *peerMgr) setPeerConnected(pid lp2ppeer.ID, ma multiaddr.Multiaddr, } } -func (mgr *peerMgr) PeerDisconnected(pid lp2ppeer.ID) { +func (mgr *peerMgr) SetPeerDisconnected(pid lp2ppeer.ID) { mgr.lk.Lock() defer mgr.lk.Unlock() diff --git a/network/peermgr_test.go b/network/peermgr_test.go index 25e61db46..6710ffbb7 100644 --- a/network/peermgr_test.go +++ b/network/peermgr_test.go @@ -20,15 +20,15 @@ func TestNumInboundOutbound(t *testing.T) { pid2 := ts.RandPeerID() pid3 := ts.RandPeerID() - net.peerMgr.PeerConnected(pid1, addr, lp2pnet.DirInbound) - net.peerMgr.PeerConnected(pid1, addr, lp2pnet.DirInbound) // Duplicated event - net.peerMgr.PeerConnected(pid2, addr, lp2pnet.DirOutbound) - net.peerMgr.PeerConnected(pid3, addr, lp2pnet.DirOutbound) - net.peerMgr.PeerDisconnected(pid1) - net.peerMgr.PeerDisconnected(pid1) // Duplicated event - net.peerMgr.PeerDisconnected(pid2) - net.peerMgr.PeerDisconnected(ts.RandPeerID()) - net.peerMgr.PeerConnected(pid1, addr, lp2pnet.DirInbound) // Connect again + net.peerMgr.SetPeerConnected(pid1, addr, lp2pnet.DirInbound) + net.peerMgr.SetPeerConnected(pid1, addr, lp2pnet.DirInbound) // Duplicated event + net.peerMgr.SetPeerConnected(pid2, addr, lp2pnet.DirOutbound) + net.peerMgr.SetPeerConnected(pid3, addr, lp2pnet.DirOutbound) + net.peerMgr.SetPeerDisconnected(pid1) + net.peerMgr.SetPeerDisconnected(pid1) // Duplicated event + net.peerMgr.SetPeerDisconnected(pid2) + net.peerMgr.SetPeerDisconnected(ts.RandPeerID()) + net.peerMgr.SetPeerConnected(pid1, addr, lp2pnet.DirInbound) // Connect again assert.Equal(t, 1, net.NumInbound()) assert.Equal(t, 1, net.NumOutbound()) diff --git a/network/utils.go b/network/utils.go index 71cb4c811..da7b2e090 100644 --- a/network/utils.go +++ b/network/utils.go @@ -75,7 +75,7 @@ func ConnectAsync(ctx context.Context, h lp2phost.Host, addrInfo lp2ppeer.AddrIn err := ConnectSync(ctx, h, addrInfo) if log != nil { if err != nil { - log.Warn("connection failed", "addr", addrInfo.Addrs, "err", err) + log.Info("connection failed", "addr", addrInfo.Addrs, "err", err) } else { log.Debug("connection successful", "addr", addrInfo.Addrs) } diff --git a/state/state.go b/state/state.go index f506ad9ea..0d8cceb7a 100644 --- a/state/state.go +++ b/state/state.go @@ -405,7 +405,7 @@ func (st *state) ValidateBlock(blk *block.Block, round int16) error { st.lk.Lock() defer st.lk.Unlock() - if err := st.doValidateBlock(blk, round); err != nil { + if err := st.validateBlock(blk, round); err != nil { return err } @@ -450,7 +450,7 @@ func (st *state) CommitBlock(blk *block.Block, cert *certificate.BlockCertificat return errors.Error(errors.ErrInvalidBlock) } - err = st.doValidateBlock(blk, cert.Round()) + err = st.validateBlock(blk, cert.Round()) if err != nil { return err } diff --git a/state/validation.go b/state/validation.go index 253460805..da9addfda 100644 --- a/state/validation.go +++ b/state/validation.go @@ -7,7 +7,7 @@ import ( "github.com/pactus-project/pactus/util/errors" ) -func (st *state) doValidateBlock(blk *block.Block, round int16) error { +func (st *state) validateBlock(blk *block.Block, round int16) error { if blk.Header().Version() != st.params.BlockVersion { return errors.Errorf(errors.ErrInvalidBlock, "invalid version") diff --git a/sync/bundle/message/proposal.go b/sync/bundle/message/proposal.go index a8ce488f7..51d836b86 100644 --- a/sync/bundle/message/proposal.go +++ b/sync/bundle/message/proposal.go @@ -14,8 +14,10 @@ func NewProposalMessage(p *proposal.Proposal) *ProposalMessage { } } -func (m *ProposalMessage) BasicCheck() error { - return m.Proposal.BasicCheck() +func (*ProposalMessage) BasicCheck() error { + // Basic checks for the proposal are deferred to the consensus phase + // to avoid unnecessary validation for validators outside the committee. + return nil } func (*ProposalMessage) Type() Type { diff --git a/sync/bundle/message/proposal_test.go b/sync/bundle/message/proposal_test.go index f7be30a42..20b68dd40 100644 --- a/sync/bundle/message/proposal_test.go +++ b/sync/bundle/message/proposal_test.go @@ -3,8 +3,6 @@ package message import ( "testing" - "github.com/pactus-project/pactus/types/proposal" - "github.com/pactus-project/pactus/util/errors" "github.com/pactus-project/pactus/util/testsuite" "github.com/stretchr/testify/assert" ) @@ -17,13 +15,6 @@ func TestProposalType(t *testing.T) { func TestProposalMessage(t *testing.T) { ts := testsuite.NewTestSuite(t) - t.Run("Invalid proposal", func(t *testing.T) { - prop := proposal.NewProposal(0, 0, nil) - m := NewProposalMessage(prop) - - assert.Equal(t, errors.Code(m.BasicCheck()), errors.ErrInvalidBlock) - }) - t.Run("OK", func(t *testing.T) { prop, _ := ts.GenerateTestProposal(100, 0) m := NewProposalMessage(prop) diff --git a/sync/bundle/message/query_proposal.go b/sync/bundle/message/query_proposal.go index 221884eea..16d076bc6 100644 --- a/sync/bundle/message/query_proposal.go +++ b/sync/bundle/message/query_proposal.go @@ -4,21 +4,28 @@ import ( "fmt" "github.com/pactus-project/pactus/crypto" + "github.com/pactus-project/pactus/util/errors" ) type QueryProposalMessage struct { Height uint32 `cbor:"1,keyasint"` + Round int16 `cbor:"3,keyasint"` Querier crypto.Address `cbor:"2,keyasint"` } -func NewQueryProposalMessage(height uint32, querier crypto.Address) *QueryProposalMessage { +func NewQueryProposalMessage(height uint32, round int16, querier crypto.Address) *QueryProposalMessage { return &QueryProposalMessage{ Height: height, + Round: round, Querier: querier, } } -func (*QueryProposalMessage) BasicCheck() error { +func (m *QueryProposalMessage) BasicCheck() error { + if m.Round < 0 { + return errors.Error(errors.ErrInvalidRound) + } + return nil } diff --git a/sync/bundle/message/query_proposal_test.go b/sync/bundle/message/query_proposal_test.go index 1ce2c703d..9b3b16bee 100644 --- a/sync/bundle/message/query_proposal_test.go +++ b/sync/bundle/message/query_proposal_test.go @@ -3,6 +3,7 @@ package message import ( "testing" + "github.com/pactus-project/pactus/util/errors" "github.com/pactus-project/pactus/util/testsuite" "github.com/stretchr/testify/assert" ) @@ -15,6 +16,16 @@ func TestQueryProposalType(t *testing.T) { func TestQueryProposalMessage(t *testing.T) { ts := testsuite.NewTestSuite(t) - m := NewQueryProposalMessage(0, ts.RandValAddress()) - assert.NoError(t, m.BasicCheck()) + t.Run("Invalid round", func(t *testing.T) { + m := NewQueryProposalMessage(0, -1, ts.RandValAddress()) + + assert.Equal(t, errors.Code(m.BasicCheck()), errors.ErrInvalidRound) + }) + + t.Run("OK", func(t *testing.T) { + m := NewQueryProposalMessage(100, 0, ts.RandValAddress()) + + assert.NoError(t, m.BasicCheck()) + assert.Contains(t, m.String(), "100") + }) } diff --git a/sync/handler_query_proposal.go b/sync/handler_query_proposal.go index c46997bd2..ec29effd7 100644 --- a/sync/handler_query_proposal.go +++ b/sync/handler_query_proposal.go @@ -20,13 +20,30 @@ func (handler *queryProposalHandler) ParseMessage(m message.Message, _ peer.ID) msg := m.(*message.QueryProposalMessage) handler.logger.Trace("parsing QueryProposal message", "msg", msg) - height, _ := handler.consMgr.HeightRound() - if msg.Height == height { - prop := handler.consMgr.Proposal() - if prop != nil { - response := message.NewProposalMessage(prop) - handler.broadcast(response) - } + if !handler.consMgr.HasActiveInstance() { + handler.logger.Debug("ignoring QueryProposal, not active", "msg", msg) + + return nil + } + + if !handler.consMgr.HasProposer() { + handler.logger.Debug("ignoring QueryProposal, not proposer", "msg", msg) + + return nil + } + + height, round := handler.consMgr.HeightRound() + if msg.Height != height || msg.Round != round { + handler.logger.Debug("ignoring QueryProposal, not same height/round", "msg", msg, + "height", height, "round", round) + + return nil + } + + prop := handler.consMgr.Proposal() + if prop != nil { + response := message.NewProposalMessage(prop) + handler.broadcast(response) } return nil diff --git a/sync/handler_query_proposal_test.go b/sync/handler_query_proposal_test.go index d11b8cc6c..065aa2ff0 100644 --- a/sync/handler_query_proposal_test.go +++ b/sync/handler_query_proposal_test.go @@ -10,19 +10,45 @@ import ( func TestParsingQueryProposalMessages(t *testing.T) { td := setup(t, nil) - consensusHeight, _ := td.consMgr.HeightRound() - prop, _ := td.GenerateTestProposal(consensusHeight, 0) + consHeight, consRound := td.consMgr.HeightRound() + prop, _ := td.GenerateTestProposal(consHeight, 0) pid := td.RandPeerID() td.consMgr.SetProposal(prop) + t.Run("doesn't have active validator", func(t *testing.T) { + msg := message.NewQueryProposalMessage(consHeight, consRound, td.RandValAddress()) + assert.NoError(t, td.receivingNewMessage(td.sync, msg, pid)) + + td.shouldNotPublishMessageWithThisType(t, message.TypeProposal) + }) + + td.consMocks[0].Active = true + + t.Run("not the proposer", func(t *testing.T) { + msg := message.NewQueryProposalMessage(consHeight, consRound, td.RandValAddress()) + assert.NoError(t, td.receivingNewMessage(td.sync, msg, pid)) + + td.shouldNotPublishMessageWithThisType(t, message.TypeProposal) + }) + + td.consMocks[0].Proposer = true + t.Run("not the same height", func(t *testing.T) { - msg := message.NewQueryProposalMessage(consensusHeight+1, td.RandValAddress()) + msg := message.NewQueryProposalMessage(consHeight+1, consRound, td.RandValAddress()) assert.NoError(t, td.receivingNewMessage(td.sync, msg, pid)) td.shouldNotPublishMessageWithThisType(t, message.TypeProposal) }) + + t.Run("not the same round", func(t *testing.T) { + msg := message.NewQueryProposalMessage(consHeight, consRound+1, td.RandValAddress()) + assert.NoError(t, td.receivingNewMessage(td.sync, msg, pid)) + + td.shouldNotPublishMessageWithThisType(t, message.TypeProposal) + }) + t.Run("should respond to the query proposal message", func(t *testing.T) { - msg := message.NewQueryProposalMessage(consensusHeight, td.RandValAddress()) + msg := message.NewQueryProposalMessage(consHeight, consRound, td.RandValAddress()) assert.NoError(t, td.receivingNewMessage(td.sync, msg, pid)) bdl := td.shouldPublishMessageWithThisType(t, message.TypeProposal) @@ -31,7 +57,7 @@ func TestParsingQueryProposalMessages(t *testing.T) { t.Run("doesn't have the proposal", func(t *testing.T) { td.consMocks[0].CurProposal = nil - msg := message.NewQueryProposalMessage(consensusHeight, td.RandValAddress()) + msg := message.NewQueryProposalMessage(consHeight, consRound, td.RandValAddress()) assert.NoError(t, td.receivingNewMessage(td.sync, msg, pid)) td.shouldNotPublishMessageWithThisType(t, message.TypeProposal) @@ -41,8 +67,8 @@ func TestParsingQueryProposalMessages(t *testing.T) { func TestBroadcastingQueryProposalMessages(t *testing.T) { td := setup(t, nil) - consensusHeight := td.state.LastBlockHeight() + 1 - msg := message.NewQueryProposalMessage(consensusHeight, td.RandValAddress()) + consHeight, consRound := td.consMgr.HeightRound() + msg := message.NewQueryProposalMessage(consHeight, consRound, td.RandValAddress()) td.sync.broadcast(msg) td.shouldPublishMessageWithThisType(t, message.TypeQueryProposal) diff --git a/sync/handler_query_votes.go b/sync/handler_query_votes.go index 5e111ae44..d71de440e 100644 --- a/sync/handler_query_votes.go +++ b/sync/handler_query_votes.go @@ -20,13 +20,24 @@ func (handler *queryVotesHandler) ParseMessage(m message.Message, _ peer.ID) err msg := m.(*message.QueryVotesMessage) handler.logger.Trace("parsing QueryVotes message", "msg", msg) + if !handler.consMgr.HasActiveInstance() { + handler.logger.Debug("ignoring QueryVotes, not active", "msg", msg) + + return nil + } + height, _ := handler.consMgr.HeightRound() - if msg.Height == height { - v := handler.consMgr.PickRandomVote(msg.Round) - if v != nil { - response := message.NewVoteMessage(v) - handler.broadcast(response) - } + if msg.Height != height { + handler.logger.Debug("ignoring QueryVotes, not same height", "msg", msg, + "height", height) + + return nil + } + + v := handler.consMgr.PickRandomVote(msg.Round) + if v != nil { + response := message.NewVoteMessage(v) + handler.broadcast(response) } return nil diff --git a/sync/handler_query_votes_test.go b/sync/handler_query_votes_test.go index df0f472e8..741208c23 100644 --- a/sync/handler_query_votes_test.go +++ b/sync/handler_query_votes_test.go @@ -15,6 +15,15 @@ func TestParsingQueryVotesMessages(t *testing.T) { td.consMgr.AddVote(v1) pid := td.RandPeerID() + t.Run("doesn't have active validator", func(t *testing.T) { + msg := message.NewQueryVotesMessage(consensusHeight, 1, td.RandValAddress()) + assert.NoError(t, td.receivingNewMessage(td.sync, msg, pid)) + + td.shouldNotPublishMessageWithThisType(t, message.TypeVote) + }) + + td.consMocks[0].Active = true + t.Run("should respond to the query votes message", func(t *testing.T) { msg := message.NewQueryVotesMessage(consensusHeight, 1, td.RandValAddress()) assert.NoError(t, td.receivingNewMessage(td.sync, msg, pid)) diff --git a/sync/sync.go b/sync/sync.go index 9355bfb6b..bec7d7d1c 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -604,8 +604,6 @@ func (sync *synchronizer) prepareBlocks(from, count uint32) [][]byte { return blocks } -// weAreInTheCommittee checks if one of the validators is a member of the committee -// at the current height. func (*synchronizer) shouldPropagateGeneralMessage(_ *network.GossipMessage) bool { return true } diff --git a/sync/sync_test.go b/sync/sync_test.go index 4a70e0c65..4f76e6921 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -316,7 +316,7 @@ func TestTestNetFlags(t *testing.T) { td := setup(t, nil) td.addValidatorToCommittee(t, td.sync.valKeys[0].PublicKey()) - bdl := td.sync.prepareBundle(message.NewQueryProposalMessage(td.RandHeight(), td.RandValAddress())) + bdl := td.sync.prepareBundle(message.NewQueryProposalMessage(td.RandHeight(), td.RandRound(), td.RandValAddress())) require.False(t, util.IsFlagSet(bdl.Flags, bundle.BundleFlagNetworkMainnet), "invalid flag: %v", bdl) require.True(t, util.IsFlagSet(bdl.Flags, bundle.BundleFlagNetworkTestnet), "invalid flag: %v", bdl) } @@ -415,7 +415,7 @@ func TestBroadcastBlockAnnounce(t *testing.T) { func TestBundleSequenceNo(t *testing.T) { td := setup(t, nil) - msg := message.NewQueryProposalMessage(td.RandHeight(), td.RandValAddress()) + msg := message.NewQueryProposalMessage(td.RandHeight(), td.RandRound(), td.RandValAddress()) td.sync.broadcast(msg) bdl1 := td.shouldPublishMessageWithThisType(t, message.TypeQueryProposal) diff --git a/tests/block_test.go b/tests/block_test.go index 1a46495b8..0900b724d 100644 --- a/tests/block_test.go +++ b/tests/block_test.go @@ -24,7 +24,7 @@ func waitForNewBlocks(num uint32) { if lastHeight() > height { break } - time.Sleep(2 * time.Second) + time.Sleep(4 * time.Second) } } diff --git a/tests/main_test.go b/tests/main_test.go index 5c90ae792..8b212102c 100644 --- a/tests/main_test.go +++ b/tests/main_test.go @@ -75,13 +75,14 @@ func TestMain(m *testing.M) { tConfigs[i].TxPool.MinFeePAC = 0.000001 tConfigs[i].Store.Path = util.TempDirPath() tConfigs[i].Consensus.ChangeProposerTimeout = 4 * time.Second - tConfigs[i].Consensus.ChangeProposerDelta = 0 - tConfigs[i].Logger.Levels["default"] = "warn" - tConfigs[i].Logger.Levels["_state"] = "warn" - tConfigs[i].Logger.Levels["_sync"] = "debug" - tConfigs[i].Logger.Levels["_consensus"] = "debug" - tConfigs[i].Logger.Levels["_network"] = "debug" - tConfigs[i].Logger.Levels["_pool"] = "warn" + tConfigs[i].Consensus.ChangeProposerDelta = 4 * time.Second + tConfigs[i].Consensus.QueryVoteTimeout = 4 * time.Second + tConfigs[i].Logger.Levels["default"] = "info" + tConfigs[i].Logger.Levels["_state"] = "info" + tConfigs[i].Logger.Levels["_sync"] = "info" + tConfigs[i].Logger.Levels["_consensus"] = "info" + tConfigs[i].Logger.Levels["_network"] = "info" + tConfigs[i].Logger.Levels["_pool"] = "info" tConfigs[i].Sync.NodeNetwork = false tConfigs[i].Sync.Firewall.BannedNets = make([]string, 0) tConfigs[i].Sync.LatestBlockInterval = 10 diff --git a/txpool/txpool.go b/txpool/txpool.go index eef33572b..5da87fdab 100644 --- a/txpool/txpool.go +++ b/txpool/txpool.go @@ -74,7 +74,7 @@ func (p *txPool) AppendTx(trx *tx.Tx) error { p.lk.Lock() defer p.lk.Unlock() - return p.doAppendTx(trx) + return p.appendTx(trx) } // AppendTxAndBroadcast validates the transaction, add it into the transaction pool @@ -83,7 +83,7 @@ func (p *txPool) AppendTxAndBroadcast(trx *tx.Tx) error { p.lk.Lock() defer p.lk.Unlock() - if err := p.doAppendTx(trx); err != nil { + if err := p.appendTx(trx); err != nil { return err } @@ -94,7 +94,7 @@ func (p *txPool) AppendTxAndBroadcast(trx *tx.Tx) error { return nil } -func (p *txPool) doAppendTx(trx *tx.Tx) error { +func (p *txPool) appendTx(trx *tx.Tx) error { payloadType := trx.Payload().Type() payloadPool := p.pools[payloadType] if payloadPool.list.Has(trx.ID()) { diff --git a/util/linkedmap/linkedmap.go b/util/linkedmap/linkedmap.go index f9fd85d85..66e235ddc 100644 --- a/util/linkedmap/linkedmap.go +++ b/util/linkedmap/linkedmap.go @@ -93,7 +93,7 @@ func (lm *LinkedMap[K, V]) TailNode() *ll.Element[Pair[K, V]] { } func (lm *LinkedMap[K, V]) RemoveTail() { - lm.doRemove(lm.list.Tail, lm.list.Tail.Data.Key) + lm.remove(lm.list.Tail) } // HeadNode returns the LinkNode at the beginning (head) of the LinkedMap. @@ -107,25 +107,24 @@ func (lm *LinkedMap[K, V]) HeadNode() *ll.Element[Pair[K, V]] { } func (lm *LinkedMap[K, V]) RemoveHead() { - lm.doRemove(lm.list.Head, lm.list.Head.Data.Key) + lm.remove(lm.list.Head) } // Remove removes the key-value pair with the specified key from the LinkedMap. // It returns true if the key was found and removed, otherwise false. func (lm *LinkedMap[K, V]) Remove(key K) bool { - ln, found := lm.hashmap[key] + element, found := lm.hashmap[key] if found { - lm.list.Delete(ln) - delete(lm.hashmap, ln.Data.Key) + lm.remove(element) } return found } -// doRemove removes the key-value pair with the specified key from the LinkedMap and LinkedList. -func (lm *LinkedMap[K, V]) doRemove(element *ll.Element[Pair[K, V]], key K) { +// remove removes the specified element pair from the LinkedMap. +func (lm *LinkedMap[K, V]) remove(element *ll.Element[Pair[K, V]]) { lm.list.Delete(element) - delete(lm.hashmap, key) + delete(lm.hashmap, element.Data.Key) } // Empty checks if the LinkedMap is empty (contains no key-value pairs). diff --git a/util/testsuite/testsuite.go b/util/testsuite/testsuite.go index c59e2b686..ef5f5a624 100644 --- a/util/testsuite/testsuite.go +++ b/util/testsuite/testsuite.go @@ -526,10 +526,6 @@ func (*TestSuite) HelperSignVote(valKey *bls.ValidatorKey, v *vote.Vote) { func (*TestSuite) HelperSignProposal(valKey *bls.ValidatorKey, p *proposal.Proposal) { sig := valKey.Sign(p.SignBytes()) p.SetSignature(sig) - - if err := p.BasicCheck(); err != nil { - panic(err) - } } func (*TestSuite) HelperSignTransaction(prv crypto.PrivateKey, trx *tx.Tx) {