From d1efe2f7399999b963f442b36ce9a12f9926a603 Mon Sep 17 00:00:00 2001 From: b00f Date: Tue, 4 Jun 2024 23:59:10 +0800 Subject: [PATCH] fix(network): add block and transaction topics (#1319) --- consensus/consensus_test.go | 6 +- consensus/prepare_test.go | 2 +- network/gossip.go | 140 +++++++++++++++++++---- network/gossip_test.go | 2 +- network/interface.go | 22 ++-- network/mock.go | 6 +- network/network.go | 91 +++------------ network/network_test.go | 26 ++--- sync/bundle/message/message.go | 22 ++-- sync/bundle/message/query_votes.go | 2 +- sync/bundle/message/query_votes_test.go | 2 +- sync/bundle/message/transactions.go | 2 +- sync/bundle/message/transactions_test.go | 2 +- sync/handler_query_votes_test.go | 2 +- sync/peerset/peer_set_test.go | 8 +- sync/sync.go | 11 +- txpool/txpool_test.go | 2 +- 17 files changed, 195 insertions(+), 153 deletions(-) diff --git a/consensus/consensus_test.go b/consensus/consensus_test.go index 9f7fb524d..4a211c186 100644 --- a/consensus/consensus_test.go +++ b/consensus/consensus_test.go @@ -216,7 +216,7 @@ func (td *testData) shouldPublishQueryVote(t *testing.T, cons *consensus, height for _, consMsg := range td.consMessages { if consMsg.sender != cons.valKey.Address() || - consMsg.message.Type() != message.TypeQueryVotes { + consMsg.message.Type() != message.TypeQueryVote { continue } @@ -938,8 +938,8 @@ func checkConsensus(td *testData, height uint32, byzVotes []*vote.Vote) ( case message.TypeHello, message.TypeHelloAck, - message.TypeTransactions, - message.TypeQueryVotes, + message.TypeTransaction, + message.TypeQueryVote, message.TypeBlocksRequest, message.TypeBlocksResponse: // diff --git a/consensus/prepare_test.go b/consensus/prepare_test.go index bfda8f933..03b234959 100644 --- a/consensus/prepare_test.go +++ b/consensus/prepare_test.go @@ -28,7 +28,7 @@ func TestQueryProposal(t *testing.T) { td.queryProposalTimeout(td.consP) td.shouldPublishQueryProposal(t, td.consP, h) - td.shouldNotPublish(t, td.consP, message.TypeQueryVotes) + td.shouldNotPublish(t, td.consP, message.TypeQueryVote) } func TestQueryVotes(t *testing.T) { diff --git a/network/gossip.go b/network/gossip.go index 12991a36d..9cbc270db 100644 --- a/network/gossip.go +++ b/network/gossip.go @@ -2,6 +2,7 @@ package network import ( "context" + "fmt" "sync" "time" @@ -12,14 +13,18 @@ import ( ) type gossipService struct { - ctx context.Context - wg sync.WaitGroup - host lp2phost.Host - pubsub *lp2pps.PubSub - topics []*lp2pps.Topic - subs []*lp2pps.Subscription - eventCh chan Event - logger *logger.SubLogger + ctx context.Context + wg sync.WaitGroup + host lp2phost.Host + pubsub *lp2pps.PubSub + topics []*lp2pps.Topic + subs []*lp2pps.Subscription + topicBlock *lp2pps.Topic + topicTransaction *lp2pps.Topic + topicConsensus *lp2pps.Topic + networkName string + eventCh chan Event + logger *logger.SubLogger } func newGossipService(ctx context.Context, host lp2phost.Host, eventCh chan Event, @@ -55,17 +60,49 @@ func newGossipService(ctx context.Context, host lp2phost.Host, eventCh chan Even } return &gossipService{ - ctx: ctx, - host: host, - pubsub: pubsub, - wg: sync.WaitGroup{}, - eventCh: eventCh, - logger: log, + ctx: ctx, + networkName: conf.NetworkName, + host: host, + pubsub: pubsub, + wg: sync.WaitGroup{}, + eventCh: eventCh, + logger: log, } } -// BroadcastMessage broadcasts a message to the specified topic. -func (g *gossipService) BroadcastMessage(msg []byte, topic *lp2pps.Topic) error { +// Broadcast broadcasts a message with the specified topic ID to the network. +func (g *gossipService) Broadcast(msg []byte, topicID TopicID) error { + g.logger.Trace("publishing new message", "topic", topicID) + + switch topicID { + case TopicIDBlock: + if g.topicBlock == nil { + return NotSubscribedError{TopicID: topicID} + } + + return g.publish(msg, g.topicBlock) + + case TopicIDTransaction: + if g.topicTransaction == nil { + return NotSubscribedError{TopicID: topicID} + } + + return g.publish(msg, g.topicTransaction) + + case TopicIDConsensus: + if g.topicConsensus == nil { + return NotSubscribedError{TopicID: topicID} + } + + return g.publish(msg, g.topicConsensus) + + default: + return InvalidTopicError{TopicID: topicID} + } +} + +// publish publishes a message with the specified topic to the network. +func (g *gossipService) publish(msg []byte, topic *lp2pps.Topic) error { err := topic.Publish(g.ctx, msg) if err != nil { return LibP2PError{Err: err} @@ -74,10 +111,66 @@ func (g *gossipService) BroadcastMessage(msg []byte, topic *lp2pps.Topic) error return nil } -// JoinTopic joins a topic with the given name. -// It creates a subscription to the topic and returns the joined topic. -func (g *gossipService) JoinTopic(name string, sp ShouldPropagate) (*lp2pps.Topic, error) { - topic, err := g.pubsub.Join(name) +// JoinTopic joins to the topic with the given name and subscribes to receive topic messages. +func (g *gossipService) JoinTopic(topicID TopicID, sp ShouldPropagate) error { + switch topicID { + case TopicIDBlock: + if g.topicBlock != nil { + g.logger.Warn("already subscribed to block topic") + + return nil + } + + topic, err := g.doJoinTopic(topicID, sp) + if err != nil { + return err + } + g.topicBlock = topic + + return nil + + case TopicIDTransaction: + if g.topicTransaction != nil { + g.logger.Warn("already subscribed to transaction topic") + + return nil + } + + topic, err := g.doJoinTopic(topicID, sp) + if err != nil { + return err + } + g.topicTransaction = topic + + return nil + + case TopicIDConsensus: + if g.topicConsensus != nil { + g.logger.Warn("already subscribed to consensus topic") + + return nil + } + + topic, err := g.doJoinTopic(topicID, sp) + if err != nil { + return err + } + g.topicConsensus = topic + + return nil + + default: + return InvalidTopicError{TopicID: topicID} + } +} + +func (g *gossipService) TopicName(topicID TopicID) string { + return fmt.Sprintf("/%s/topic/%s/v1", g.networkName, topicID.String()) +} + +func (g *gossipService) doJoinTopic(topicID TopicID, sp ShouldPropagate) (*lp2pps.Topic, error) { + topicName := g.TopicName(topicID) + topic, err := g.pubsub.Join(topicName) if err != nil { return nil, LibP2PError{Err: err} } @@ -87,11 +180,12 @@ func (g *gossipService) JoinTopic(name string, sp ShouldPropagate) (*lp2pps.Topi return nil, LibP2PError{Err: err} } - err = g.pubsub.RegisterTopicValidator(name, + err = g.pubsub.RegisterTopicValidator(topicName, func(_ context.Context, peerId lp2pcore.PeerID, m *lp2pps.Message) lp2pps.ValidationResult { msg := &GossipMessage{ - From: peerId, - Data: m.Data, + From: peerId, + Data: m.Data, + TopicID: topicID, } if !sp(msg) { // Consume the message first diff --git a/network/gossip_test.go b/network/gossip_test.go index 390fca82a..0b6c7e9fe 100644 --- a/network/gossip_test.go +++ b/network/gossip_test.go @@ -16,7 +16,7 @@ func TestJoinConsensusTopic(t *testing.T) { NotSubscribedError{ TopicID: TopicIDConsensus, }) - require.NoError(t, net.JoinConsensusTopic(alwaysPropagate)) + require.NoError(t, net.JoinTopic(TopicIDConsensus, alwaysPropagate)) require.NoError(t, net.Broadcast(msg, TopicIDConsensus)) } diff --git a/network/interface.go b/network/interface.go index 8afbdb5b9..96d6db774 100644 --- a/network/interface.go +++ b/network/interface.go @@ -9,14 +9,18 @@ import ( type TopicID int const ( - TopicIDGeneral TopicID = 1 - TopicIDConsensus TopicID = 2 + TopicIDBlock TopicID = 1 + TopicIDTransaction TopicID = 2 + TopicIDConsensus TopicID = 3 ) func (t TopicID) String() string { switch t { - case TopicIDGeneral: - return "general" + case TopicIDBlock: + return "block" + + case TopicIDTransaction: + return "transaction" case TopicIDConsensus: return "consensus" @@ -63,8 +67,9 @@ type Event interface { // GossipMessage represents message from PubSub module. // `From` is the ID of the peer that we received a message from. type GossipMessage struct { - From lp2pcore.PeerID - Data []byte + From lp2pcore.PeerID + Data []byte + TopicID TopicID } func (*GossipMessage) Type() EventType { @@ -123,9 +128,8 @@ type Network interface { EventChannel() <-chan Event Broadcast([]byte, TopicID) error SendTo([]byte, lp2pcore.PeerID) error - JoinGeneralTopic(shouldPropagate ShouldPropagate) error - JoinConsensusTopic(shouldPropagate ShouldPropagate) error - CloseConnection(pid lp2pcore.PeerID) + JoinTopic(TopicID, ShouldPropagate) error + CloseConnection(lp2pcore.PeerID) SelfID() lp2pcore.PeerID NumConnectedPeers() int NumInbound() int diff --git a/network/mock.go b/network/mock.go index cf9111077..3a7d8d283 100644 --- a/network/mock.go +++ b/network/mock.go @@ -48,11 +48,7 @@ func (mock *MockNetwork) EventChannel() <-chan Event { return mock.EventCh } -func (*MockNetwork) JoinGeneralTopic(_ ShouldPropagate) error { - return nil -} - -func (*MockNetwork) JoinConsensusTopic(_ ShouldPropagate) error { +func (*MockNetwork) JoinTopic(_ TopicID, _ ShouldPropagate) error { return nil } diff --git a/network/network.go b/network/network.go index 22cc69cbf..f7bf64a38 100644 --- a/network/network.go +++ b/network/network.go @@ -7,7 +7,6 @@ import ( "time" lp2p "github.com/libp2p/go-libp2p" - lp2pps "github.com/libp2p/go-libp2p-pubsub" lp2pcore "github.com/libp2p/go-libp2p/core" lp2pcrypto "github.com/libp2p/go-libp2p/core/crypto" lp2phost "github.com/libp2p/go-libp2p/core/host" @@ -30,21 +29,19 @@ import ( var _ Network = &network{} type network struct { - ctx context.Context - cancel context.CancelFunc - config *Config - host lp2phost.Host - mdns *mdnsService - dht *dhtService - peerMgr *peerMgr - connGater *ConnectionGater - stream *streamService - gossip *gossipService - notifee *NotifeeService - generalTopic *lp2pps.Topic - consensusTopic *lp2pps.Topic - eventChannel chan Event - logger *logger.SubLogger + ctx context.Context + cancel context.CancelFunc + config *Config + host lp2phost.Host + mdns *mdnsService + dht *dhtService + peerMgr *peerMgr + connGater *ConnectionGater + stream *streamService + gossip *gossipService + notifee *NotifeeService + eventChannel chan Event + logger *logger.SubLogger } func loadOrCreateKey(path string) (lp2pcrypto.PrivKey, error) { @@ -378,67 +375,11 @@ func (n *network) SendTo(msg []byte, pid lp2pcore.PeerID) error { } func (n *network) Broadcast(msg []byte, topicID TopicID) error { - n.logger.Trace("publishing new message", "topic", topicID) - switch topicID { - case TopicIDGeneral: - if n.generalTopic == nil { - return NotSubscribedError{TopicID: topicID} - } - - return n.gossip.BroadcastMessage(msg, n.generalTopic) - - case TopicIDConsensus: - if n.consensusTopic == nil { - return NotSubscribedError{TopicID: topicID} - } - - return n.gossip.BroadcastMessage(msg, n.consensusTopic) - - default: - return InvalidTopicError{TopicID: topicID} - } -} - -func (n *network) JoinGeneralTopic(sp ShouldPropagate) error { - if n.generalTopic != nil { - n.logger.Debug("already subscribed to general topic") - - return nil - } - topic, err := n.gossip.JoinTopic(n.generalTopicName(), sp) - if err != nil { - return err - } - n.generalTopic = topic - - return nil -} - -func (n *network) JoinConsensusTopic(sp ShouldPropagate) error { - if n.consensusTopic != nil { - n.logger.Debug("already subscribed to consensus topic") - - return nil - } - topic, err := n.gossip.JoinTopic(n.consensusTopicName(), sp) - if err != nil { - return err - } - n.consensusTopic = topic - - return nil -} - -func (n *network) generalTopicName() string { - return n.TopicName("general") -} - -func (n *network) consensusTopicName() string { - return n.TopicName("consensus") + return n.gossip.Broadcast(msg, topicID) } -func (n *network) TopicName(topic string) string { - return fmt.Sprintf("/%s/topic/%s/v1", n.config.NetworkName, topic) +func (n *network) JoinTopic(topicID TopicID, sp ShouldPropagate) error { + return n.gossip.JoinTopic(topicID, sp) } func (n *network) CloseConnection(pid lp2ppeer.ID) { diff --git a/network/network_test.go b/network/network_test.go index 6606ffe03..2ed02c9a2 100644 --- a/network/network_test.go +++ b/network/network_test.go @@ -109,7 +109,7 @@ func TestStoppingNetwork(t *testing.T) { assert.NoError(t, err) assert.NoError(t, net.Start()) - assert.NoError(t, net.JoinGeneralTopic(alwaysPropagate)) + assert.NoError(t, net.JoinTopic(TopicIDBlock, alwaysPropagate)) // Should stop peacefully net.Stop() @@ -195,16 +195,16 @@ func TestNetwork(t *testing.T) { lp2p.ForceReachabilityPrivate(), }) - assert.NoError(t, networkB.JoinGeneralTopic(alwaysPropagate)) - assert.NoError(t, networkP.JoinGeneralTopic(alwaysPropagate)) - assert.NoError(t, networkM.JoinGeneralTopic(alwaysPropagate)) - assert.NoError(t, networkN.JoinGeneralTopic(alwaysPropagate)) - assert.NoError(t, networkX.JoinGeneralTopic(alwaysPropagate)) + assert.NoError(t, networkB.JoinTopic(TopicIDBlock, alwaysPropagate)) + assert.NoError(t, networkP.JoinTopic(TopicIDBlock, alwaysPropagate)) + assert.NoError(t, networkM.JoinTopic(TopicIDBlock, alwaysPropagate)) + assert.NoError(t, networkN.JoinTopic(TopicIDBlock, alwaysPropagate)) + assert.NoError(t, networkX.JoinTopic(TopicIDBlock, alwaysPropagate)) - assert.NoError(t, networkB.JoinConsensusTopic(alwaysPropagate)) - assert.NoError(t, networkP.JoinConsensusTopic(alwaysPropagate)) - assert.NoError(t, networkM.JoinConsensusTopic(alwaysPropagate)) - assert.NoError(t, networkN.JoinConsensusTopic(alwaysPropagate)) + assert.NoError(t, networkB.JoinTopic(TopicIDConsensus, alwaysPropagate)) + assert.NoError(t, networkP.JoinTopic(TopicIDConsensus, alwaysPropagate)) + assert.NoError(t, networkM.JoinTopic(TopicIDConsensus, alwaysPropagate)) + assert.NoError(t, networkN.JoinTopic(TopicIDConsensus, alwaysPropagate)) // Network X doesn't join the consensus topic time.Sleep(2 * time.Second) @@ -254,7 +254,7 @@ func TestNetwork(t *testing.T) { t.Run("Gossip: all nodes receive general gossip messages", func(t *testing.T) { msg := ts.RandBytes(64) - require.NoError(t, networkP.Broadcast(msg, TopicIDGeneral)) + require.NoError(t, networkP.Broadcast(msg, TopicIDBlock)) eB := shouldReceiveEvent(t, networkB, EventTypeGossip).(*GossipMessage) eM := shouldReceiveEvent(t, networkM, EventTypeGossip).(*GossipMessage) @@ -314,8 +314,8 @@ func TestNetwork(t *testing.T) { t.Run("Ignore broadcasting identical messages", func(t *testing.T) { msg := ts.RandBytes(64) - require.NoError(t, networkM.Broadcast(msg, TopicIDGeneral)) - require.NoError(t, networkN.Broadcast(msg, TopicIDGeneral)) + require.NoError(t, networkM.Broadcast(msg, TopicIDBlock)) + require.NoError(t, networkN.Broadcast(msg, TopicIDBlock)) eX := shouldReceiveEvent(t, networkX, EventTypeGossip).(*GossipMessage) diff --git a/sync/bundle/message/message.go b/sync/bundle/message/message.go index 39541d475..a6dc7803e 100644 --- a/sync/bundle/message/message.go +++ b/sync/bundle/message/message.go @@ -42,10 +42,10 @@ type Type int32 const ( TypeHello = Type(1) TypeHelloAck = Type(2) - TypeTransactions = Type(3) + TypeTransaction = Type(3) TypeQueryProposal = Type(4) TypeProposal = Type(5) - TypeQueryVotes = Type(6) + TypeQueryVote = Type(6) TypeVote = Type(7) TypeBlockAnnounce = Type(8) TypeBlocksRequest = Type(9) @@ -54,13 +54,17 @@ const ( func (t Type) TopicID() network.TopicID { switch t { - case TypeTransactions, TypeBlockAnnounce: + case TypeBlockAnnounce: + + return network.TopicIDBlock + + case TypeTransaction: - return network.TopicIDGeneral + return network.TopicIDTransaction case TypeQueryProposal, TypeProposal, - TypeQueryVotes, + TypeQueryVote, TypeVote: return network.TopicIDConsensus @@ -86,7 +90,7 @@ func (t Type) String() string { case TypeHelloAck: return "hello-ack" - case TypeTransactions: + case TypeTransaction: return "txs" case TypeQueryProposal: @@ -95,7 +99,7 @@ func (t Type) String() string { case TypeProposal: return "proposal" - case TypeQueryVotes: + case TypeQueryVote: return "query-votes" case TypeVote: @@ -123,7 +127,7 @@ func MakeMessage(t Type) Message { case TypeHelloAck: return &HelloAckMessage{} - case TypeTransactions: + case TypeTransaction: return &TransactionsMessage{} case TypeQueryProposal: @@ -132,7 +136,7 @@ func MakeMessage(t Type) Message { case TypeProposal: return &ProposalMessage{} - case TypeQueryVotes: + case TypeQueryVote: return &QueryVotesMessage{} case TypeVote: diff --git a/sync/bundle/message/query_votes.go b/sync/bundle/message/query_votes.go index 703805f0d..502f4d49b 100644 --- a/sync/bundle/message/query_votes.go +++ b/sync/bundle/message/query_votes.go @@ -30,7 +30,7 @@ func (m *QueryVotesMessage) BasicCheck() error { } func (*QueryVotesMessage) Type() Type { - return TypeQueryVotes + return TypeQueryVote } func (m *QueryVotesMessage) String() string { diff --git a/sync/bundle/message/query_votes_test.go b/sync/bundle/message/query_votes_test.go index 5f6c16ed8..af08e95b8 100644 --- a/sync/bundle/message/query_votes_test.go +++ b/sync/bundle/message/query_votes_test.go @@ -10,7 +10,7 @@ import ( func TestQueryVotesType(t *testing.T) { m := &QueryVotesMessage{} - assert.Equal(t, m.Type(), TypeQueryVotes) + assert.Equal(t, m.Type(), TypeQueryVote) } func TestQueryVotesMessage(t *testing.T) { diff --git a/sync/bundle/message/transactions.go b/sync/bundle/message/transactions.go index 83dd87d95..e4d1b573a 100644 --- a/sync/bundle/message/transactions.go +++ b/sync/bundle/message/transactions.go @@ -32,7 +32,7 @@ func (m *TransactionsMessage) BasicCheck() error { } func (*TransactionsMessage) Type() Type { - return TypeTransactions + return TypeTransaction } func (m *TransactionsMessage) String() string { diff --git a/sync/bundle/message/transactions_test.go b/sync/bundle/message/transactions_test.go index 67cbdb4b8..8c45563eb 100644 --- a/sync/bundle/message/transactions_test.go +++ b/sync/bundle/message/transactions_test.go @@ -11,7 +11,7 @@ import ( func TestTransactionsType(t *testing.T) { m := &TransactionsMessage{} - assert.Equal(t, m.Type(), TypeTransactions) + assert.Equal(t, m.Type(), TypeTransaction) } func TestTransactionsMessage(t *testing.T) { diff --git a/sync/handler_query_votes_test.go b/sync/handler_query_votes_test.go index 513a4b258..df0f472e8 100644 --- a/sync/handler_query_votes_test.go +++ b/sync/handler_query_votes_test.go @@ -38,5 +38,5 @@ func TestBroadcastingQueryVotesMessages(t *testing.T) { msg := message.NewQueryVotesMessage(consensusHeight, 1, td.RandValAddress()) td.sync.broadcast(msg) - td.shouldPublishMessageWithThisType(t, message.TypeQueryVotes) + td.shouldPublishMessageWithThisType(t, message.TypeQueryVote) } diff --git a/sync/peerset/peer_set_test.go b/sync/peerset/peer_set_test.go index 1b6f1da5e..1679fd831 100644 --- a/sync/peerset/peer_set_test.go +++ b/sync/peerset/peer_set_test.go @@ -86,7 +86,7 @@ func TestPeerSet(t *testing.T) { peerSet.IncreaseInvalidBundlesCounter(pid1) peerSet.IncreaseReceivedBundlesCounter(pid1) peerSet.IncreaseReceivedBytesCounter(pid1, message.TypeBlocksResponse, 100) - peerSet.IncreaseReceivedBytesCounter(pid1, message.TypeTransactions, 150) + peerSet.IncreaseReceivedBytesCounter(pid1, message.TypeTransaction, 150) peerSet.IncreaseSentCounters(message.TypeBlocksRequest, 200, nil) peerSet.IncreaseSentCounters(message.TypeBlocksRequest, 250, &pid1) @@ -94,7 +94,7 @@ func TestPeerSet(t *testing.T) { receivedBytes := make(map[message.Type]int64) receivedBytes[message.TypeBlocksResponse] = 100 - receivedBytes[message.TypeTransactions] = 150 + receivedBytes[message.TypeTransaction] = 150 sentBytes := make(map[message.Type]int64) sentBytes[message.TypeBlocksRequest] = 450 @@ -102,12 +102,12 @@ func TestPeerSet(t *testing.T) { assert.Equal(t, peer1.InvalidBundles, 1) assert.Equal(t, peer1.ReceivedBundles, 1) assert.Equal(t, peer1.ReceivedBytes[message.TypeBlocksResponse], int64(100)) - assert.Equal(t, peer1.ReceivedBytes[message.TypeTransactions], int64(150)) + assert.Equal(t, peer1.ReceivedBytes[message.TypeTransaction], int64(150)) assert.Equal(t, peer1.SentBytes[message.TypeBlocksRequest], int64(250)) assert.Equal(t, peerSet.TotalReceivedBytes(), int64(250)) assert.Equal(t, peerSet.ReceivedBytesMessageType(message.TypeBlocksResponse), int64(100)) - assert.Equal(t, peerSet.ReceivedBytesMessageType(message.TypeTransactions), int64(150)) + assert.Equal(t, peerSet.ReceivedBytesMessageType(message.TypeTransaction), int64(150)) assert.Equal(t, peerSet.ReceivedBytes(), receivedBytes) assert.Equal(t, peerSet.TotalSentBytes(), int64(450)) assert.Equal(t, peerSet.SentBytesMessageType(message.TypeBlocksRequest), int64(450)) diff --git a/sync/sync.go b/sync/sync.go index 796ffd118..3a34eb5ef 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -88,10 +88,10 @@ func NewSynchronizer( handlers[message.TypeHello] = newHelloHandler(sync) handlers[message.TypeHelloAck] = newHelloAckHandler(sync) - handlers[message.TypeTransactions] = newTransactionsHandler(sync) + handlers[message.TypeTransaction] = newTransactionsHandler(sync) handlers[message.TypeQueryProposal] = newQueryProposalHandler(sync) handlers[message.TypeProposal] = newProposalHandler(sync) - handlers[message.TypeQueryVotes] = newQueryVotesHandler(sync) + handlers[message.TypeQueryVote] = newQueryVotesHandler(sync) handlers[message.TypeVote] = newVoteHandler(sync) handlers[message.TypeBlockAnnounce] = newBlockAnnounceHandler(sync) handlers[message.TypeBlocksRequest] = newBlocksRequestHandler(sync) @@ -103,11 +103,14 @@ func NewSynchronizer( } func (sync *synchronizer) Start() error { - if err := sync.network.JoinGeneralTopic(sync.shouldPropagateGeneralMessage); err != nil { + if err := sync.network.JoinTopic(network.TopicIDBlock, sync.shouldPropagateGeneralMessage); err != nil { + return err + } + if err := sync.network.JoinTopic(network.TopicIDTransaction, sync.shouldPropagateGeneralMessage); err != nil { return err } // TODO: Not joining consensus topic when we are syncing - if err := sync.network.JoinConsensusTopic(sync.shouldPropagateConsensusMessage); err != nil { + if err := sync.network.JoinTopic(network.TopicIDConsensus, sync.shouldPropagateConsensusMessage); err != nil { return err } diff --git a/txpool/txpool_test.go b/txpool/txpool_test.go index 6f5ddd108..66d9833ea 100644 --- a/txpool/txpool_test.go +++ b/txpool/txpool_test.go @@ -60,7 +60,7 @@ func (td *testData) shouldPublishTransaction(t *testing.T, id tx.ID) { case msg := <-td.ch: logger.Info("shouldPublishTransaction", "msg", msg) - if msg.Type() == message.TypeTransactions { + if msg.Type() == message.TypeTransaction { m := msg.(*message.TransactionsMessage) assert.Equal(t, m.Transactions[0].ID(), id)