Skip to content

Commit

Permalink
fix(network): add block and transaction topics (#1319)
Browse files Browse the repository at this point in the history
  • Loading branch information
b00f authored Jun 4, 2024
1 parent 9ea4764 commit d1efe2f
Show file tree
Hide file tree
Showing 17 changed files with 195 additions and 153 deletions.
6 changes: 3 additions & 3 deletions consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (td *testData) shouldPublishQueryVote(t *testing.T, cons *consensus, height

for _, consMsg := range td.consMessages {
if consMsg.sender != cons.valKey.Address() ||
consMsg.message.Type() != message.TypeQueryVotes {
consMsg.message.Type() != message.TypeQueryVote {
continue
}

Expand Down Expand Up @@ -938,8 +938,8 @@ func checkConsensus(td *testData, height uint32, byzVotes []*vote.Vote) (
case
message.TypeHello,
message.TypeHelloAck,
message.TypeTransactions,
message.TypeQueryVotes,
message.TypeTransaction,
message.TypeQueryVote,
message.TypeBlocksRequest,
message.TypeBlocksResponse:
//
Expand Down
2 changes: 1 addition & 1 deletion consensus/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestQueryProposal(t *testing.T) {
td.queryProposalTimeout(td.consP)

td.shouldPublishQueryProposal(t, td.consP, h)
td.shouldNotPublish(t, td.consP, message.TypeQueryVotes)
td.shouldNotPublish(t, td.consP, message.TypeQueryVote)
}

func TestQueryVotes(t *testing.T) {
Expand Down
140 changes: 117 additions & 23 deletions network/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package network

import (
"context"
"fmt"
"sync"
"time"

Expand All @@ -12,14 +13,18 @@ import (
)

type gossipService struct {
ctx context.Context
wg sync.WaitGroup
host lp2phost.Host
pubsub *lp2pps.PubSub
topics []*lp2pps.Topic
subs []*lp2pps.Subscription
eventCh chan Event
logger *logger.SubLogger
ctx context.Context
wg sync.WaitGroup
host lp2phost.Host
pubsub *lp2pps.PubSub
topics []*lp2pps.Topic
subs []*lp2pps.Subscription
topicBlock *lp2pps.Topic
topicTransaction *lp2pps.Topic
topicConsensus *lp2pps.Topic
networkName string
eventCh chan Event
logger *logger.SubLogger
}

func newGossipService(ctx context.Context, host lp2phost.Host, eventCh chan Event,
Expand Down Expand Up @@ -55,17 +60,49 @@ func newGossipService(ctx context.Context, host lp2phost.Host, eventCh chan Even
}

return &gossipService{
ctx: ctx,
host: host,
pubsub: pubsub,
wg: sync.WaitGroup{},
eventCh: eventCh,
logger: log,
ctx: ctx,
networkName: conf.NetworkName,
host: host,
pubsub: pubsub,
wg: sync.WaitGroup{},
eventCh: eventCh,
logger: log,
}
}

// BroadcastMessage broadcasts a message to the specified topic.
func (g *gossipService) BroadcastMessage(msg []byte, topic *lp2pps.Topic) error {
// Broadcast broadcasts a message with the specified topic ID to the network.
func (g *gossipService) Broadcast(msg []byte, topicID TopicID) error {
g.logger.Trace("publishing new message", "topic", topicID)

switch topicID {
case TopicIDBlock:
if g.topicBlock == nil {
return NotSubscribedError{TopicID: topicID}
}

return g.publish(msg, g.topicBlock)

case TopicIDTransaction:
if g.topicTransaction == nil {
return NotSubscribedError{TopicID: topicID}
}

return g.publish(msg, g.topicTransaction)

case TopicIDConsensus:
if g.topicConsensus == nil {
return NotSubscribedError{TopicID: topicID}
}

return g.publish(msg, g.topicConsensus)

default:
return InvalidTopicError{TopicID: topicID}
}
}

// publish publishes a message with the specified topic to the network.
func (g *gossipService) publish(msg []byte, topic *lp2pps.Topic) error {
err := topic.Publish(g.ctx, msg)
if err != nil {
return LibP2PError{Err: err}
Expand All @@ -74,10 +111,66 @@ func (g *gossipService) BroadcastMessage(msg []byte, topic *lp2pps.Topic) error
return nil
}

// JoinTopic joins a topic with the given name.
// It creates a subscription to the topic and returns the joined topic.
func (g *gossipService) JoinTopic(name string, sp ShouldPropagate) (*lp2pps.Topic, error) {
topic, err := g.pubsub.Join(name)
// JoinTopic joins to the topic with the given name and subscribes to receive topic messages.
func (g *gossipService) JoinTopic(topicID TopicID, sp ShouldPropagate) error {
switch topicID {
case TopicIDBlock:
if g.topicBlock != nil {
g.logger.Warn("already subscribed to block topic")

return nil
}

topic, err := g.doJoinTopic(topicID, sp)
if err != nil {
return err
}
g.topicBlock = topic

return nil

case TopicIDTransaction:
if g.topicTransaction != nil {
g.logger.Warn("already subscribed to transaction topic")

return nil
}

topic, err := g.doJoinTopic(topicID, sp)
if err != nil {
return err
}
g.topicTransaction = topic

return nil

case TopicIDConsensus:
if g.topicConsensus != nil {
g.logger.Warn("already subscribed to consensus topic")

return nil
}

topic, err := g.doJoinTopic(topicID, sp)
if err != nil {
return err
}
g.topicConsensus = topic

return nil

default:
return InvalidTopicError{TopicID: topicID}
}
}

func (g *gossipService) TopicName(topicID TopicID) string {
return fmt.Sprintf("/%s/topic/%s/v1", g.networkName, topicID.String())
}

func (g *gossipService) doJoinTopic(topicID TopicID, sp ShouldPropagate) (*lp2pps.Topic, error) {
topicName := g.TopicName(topicID)
topic, err := g.pubsub.Join(topicName)
if err != nil {
return nil, LibP2PError{Err: err}
}
Expand All @@ -87,11 +180,12 @@ func (g *gossipService) JoinTopic(name string, sp ShouldPropagate) (*lp2pps.Topi
return nil, LibP2PError{Err: err}
}

err = g.pubsub.RegisterTopicValidator(name,
err = g.pubsub.RegisterTopicValidator(topicName,
func(_ context.Context, peerId lp2pcore.PeerID, m *lp2pps.Message) lp2pps.ValidationResult {
msg := &GossipMessage{
From: peerId,
Data: m.Data,
From: peerId,
Data: m.Data,
TopicID: topicID,
}
if !sp(msg) {
// Consume the message first
Expand Down
2 changes: 1 addition & 1 deletion network/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestJoinConsensusTopic(t *testing.T) {
NotSubscribedError{
TopicID: TopicIDConsensus,
})
require.NoError(t, net.JoinConsensusTopic(alwaysPropagate))
require.NoError(t, net.JoinTopic(TopicIDConsensus, alwaysPropagate))
require.NoError(t, net.Broadcast(msg, TopicIDConsensus))
}

Expand Down
22 changes: 13 additions & 9 deletions network/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@ import (
type TopicID int

const (
TopicIDGeneral TopicID = 1
TopicIDConsensus TopicID = 2
TopicIDBlock TopicID = 1
TopicIDTransaction TopicID = 2
TopicIDConsensus TopicID = 3
)

func (t TopicID) String() string {
switch t {
case TopicIDGeneral:
return "general"
case TopicIDBlock:
return "block"

case TopicIDTransaction:
return "transaction"

case TopicIDConsensus:
return "consensus"
Expand Down Expand Up @@ -63,8 +67,9 @@ type Event interface {
// GossipMessage represents message from PubSub module.
// `From` is the ID of the peer that we received a message from.
type GossipMessage struct {
From lp2pcore.PeerID
Data []byte
From lp2pcore.PeerID
Data []byte
TopicID TopicID
}

func (*GossipMessage) Type() EventType {
Expand Down Expand Up @@ -123,9 +128,8 @@ type Network interface {
EventChannel() <-chan Event
Broadcast([]byte, TopicID) error
SendTo([]byte, lp2pcore.PeerID) error
JoinGeneralTopic(shouldPropagate ShouldPropagate) error
JoinConsensusTopic(shouldPropagate ShouldPropagate) error
CloseConnection(pid lp2pcore.PeerID)
JoinTopic(TopicID, ShouldPropagate) error
CloseConnection(lp2pcore.PeerID)
SelfID() lp2pcore.PeerID
NumConnectedPeers() int
NumInbound() int
Expand Down
6 changes: 1 addition & 5 deletions network/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,7 @@ func (mock *MockNetwork) EventChannel() <-chan Event {
return mock.EventCh
}

func (*MockNetwork) JoinGeneralTopic(_ ShouldPropagate) error {
return nil
}

func (*MockNetwork) JoinConsensusTopic(_ ShouldPropagate) error {
func (*MockNetwork) JoinTopic(_ TopicID, _ ShouldPropagate) error {
return nil
}

Expand Down
91 changes: 16 additions & 75 deletions network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

lp2p "github.com/libp2p/go-libp2p"
lp2pps "github.com/libp2p/go-libp2p-pubsub"
lp2pcore "github.com/libp2p/go-libp2p/core"
lp2pcrypto "github.com/libp2p/go-libp2p/core/crypto"
lp2phost "github.com/libp2p/go-libp2p/core/host"
Expand All @@ -30,21 +29,19 @@ import (
var _ Network = &network{}

type network struct {
ctx context.Context
cancel context.CancelFunc
config *Config
host lp2phost.Host
mdns *mdnsService
dht *dhtService
peerMgr *peerMgr
connGater *ConnectionGater
stream *streamService
gossip *gossipService
notifee *NotifeeService
generalTopic *lp2pps.Topic
consensusTopic *lp2pps.Topic
eventChannel chan Event
logger *logger.SubLogger
ctx context.Context
cancel context.CancelFunc
config *Config
host lp2phost.Host
mdns *mdnsService
dht *dhtService
peerMgr *peerMgr
connGater *ConnectionGater
stream *streamService
gossip *gossipService
notifee *NotifeeService
eventChannel chan Event
logger *logger.SubLogger
}

func loadOrCreateKey(path string) (lp2pcrypto.PrivKey, error) {
Expand Down Expand Up @@ -378,67 +375,11 @@ func (n *network) SendTo(msg []byte, pid lp2pcore.PeerID) error {
}

func (n *network) Broadcast(msg []byte, topicID TopicID) error {
n.logger.Trace("publishing new message", "topic", topicID)
switch topicID {
case TopicIDGeneral:
if n.generalTopic == nil {
return NotSubscribedError{TopicID: topicID}
}

return n.gossip.BroadcastMessage(msg, n.generalTopic)

case TopicIDConsensus:
if n.consensusTopic == nil {
return NotSubscribedError{TopicID: topicID}
}

return n.gossip.BroadcastMessage(msg, n.consensusTopic)

default:
return InvalidTopicError{TopicID: topicID}
}
}

func (n *network) JoinGeneralTopic(sp ShouldPropagate) error {
if n.generalTopic != nil {
n.logger.Debug("already subscribed to general topic")

return nil
}
topic, err := n.gossip.JoinTopic(n.generalTopicName(), sp)
if err != nil {
return err
}
n.generalTopic = topic

return nil
}

func (n *network) JoinConsensusTopic(sp ShouldPropagate) error {
if n.consensusTopic != nil {
n.logger.Debug("already subscribed to consensus topic")

return nil
}
topic, err := n.gossip.JoinTopic(n.consensusTopicName(), sp)
if err != nil {
return err
}
n.consensusTopic = topic

return nil
}

func (n *network) generalTopicName() string {
return n.TopicName("general")
}

func (n *network) consensusTopicName() string {
return n.TopicName("consensus")
return n.gossip.Broadcast(msg, topicID)
}

func (n *network) TopicName(topic string) string {
return fmt.Sprintf("/%s/topic/%s/v1", n.config.NetworkName, topic)
func (n *network) JoinTopic(topicID TopicID, sp ShouldPropagate) error {
return n.gossip.JoinTopic(topicID, sp)
}

func (n *network) CloseConnection(pid lp2ppeer.ID) {
Expand Down
Loading

0 comments on commit d1efe2f

Please sign in to comment.