Skip to content

Commit

Permalink
fix(network): use goroutines for sending streams (#1365)
Browse files Browse the repository at this point in the history
  • Loading branch information
b00f authored Jun 25, 2024
1 parent 94a9bb2 commit 17dc771
Show file tree
Hide file tree
Showing 12 changed files with 48 additions and 66 deletions.
3 changes: 2 additions & 1 deletion consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,8 @@ func (cs *consensus) startChangingProposer() {
// If it is not decided yet.
// TODO: can we remove this condition in new consensus model?
if cs.cpDecided == -1 {
cs.logger.Info("changing proposer started", "cpRound", cs.cpRound)
cs.logger.Info("changing proposer started",
"cpRound", cs.cpRound, "proposer", cs.proposer(cs.round).Address())
cs.enterNewState(cs.cpPreVoteState)
}
}
3 changes: 2 additions & 1 deletion fastconsensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,8 @@ func (cs *consensus) startChangingProposer() {
// If it is not decided yet.
// TODO: can we remove this condition in new consensus model?
if cs.cpDecided == -1 {
cs.logger.Info("changing proposer started", "cpRound", cs.cpRound)
cs.logger.Info("changing proposer started",
"cpRound", cs.cpRound, "proposer", cs.proposer(cs.round).Address())
cs.enterNewState(cs.cpPreVoteState)
}
}
Expand Down
3 changes: 1 addition & 2 deletions network/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,7 @@ func (g *gossipService) onReceiveMessage(m *lp2pps.Message) {
return
}

g.logger.Debug("receiving new gossip message",
"source", m.GetFrom(), "from", m.ReceivedFrom)
g.logger.Debug("receiving new gossip message", "source", m.GetFrom())
event := &GossipMessage{
From: m.ReceivedFrom,
Data: m.Data,
Expand Down
10 changes: 4 additions & 6 deletions network/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package network
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand All @@ -12,21 +11,20 @@ func TestJoinConsensusTopic(t *testing.T) {

msg := []byte("test-consensus-topic")

require.ErrorIs(t, net.Broadcast(msg, TopicIDConsensus),
require.ErrorIs(t, net.gossip.Broadcast(msg, TopicIDConsensus),
NotSubscribedError{
TopicID: TopicIDConsensus,
})
require.NoError(t, net.JoinTopic(TopicIDConsensus, alwaysPropagate))
require.NoError(t, net.Broadcast(msg, TopicIDConsensus))
require.NoError(t, net.gossip.Broadcast(msg, TopicIDConsensus))
}

func TestInvalidTopic(t *testing.T) {
net, err := NewNetwork(testConfig())
assert.NoError(t, err)
net := makeTestNetwork(t, testConfig(), nil)

msg := []byte("test-invalid-topic")

require.ErrorIs(t, net.Broadcast(msg, -1),
require.ErrorIs(t, net.gossip.Broadcast(msg, -1),
InvalidTopicError{
TopicID: TopicID(-1),
})
Expand Down
4 changes: 2 additions & 2 deletions network/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ type Network interface {
Stop()
Protect(lp2pcore.PeerID, string)
EventChannel() <-chan Event
Broadcast([]byte, TopicID) error
SendTo([]byte, lp2pcore.PeerID) error
Broadcast([]byte, TopicID)
SendTo([]byte, lp2pcore.PeerID)
JoinTopic(TopicID, ShouldPropagate) error
CloseConnection(lp2pcore.PeerID)
SelfID() lp2pcore.PeerID
Expand Down
2 changes: 1 addition & 1 deletion network/mdns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestMDNS(t *testing.T) {
time.Sleep(250 * time.Millisecond)

msg := []byte("test-mdns")
assert.NoError(t, net1.SendTo(msg, net2.SelfID()))
net1.SendTo(msg, net2.SelfID())

se := shouldReceiveEvent(t, net2, EventTypeStream).(*StreamMessage)
assert.Equal(t, se.From, net1.SelfID())
Expand Down
12 changes: 2 additions & 10 deletions network/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ type MockNetwork struct {
EventCh chan Event
ID lp2ppeer.ID
OtherNets []*MockNetwork
SendError error
}

func MockingNetwork(ts *testsuite.TestSuite, id lp2ppeer.ID) *MockNetwork {
Expand Down Expand Up @@ -56,25 +55,18 @@ func (mock *MockNetwork) SelfID() lp2ppeer.ID {
return mock.ID
}

func (mock *MockNetwork) SendTo(data []byte, pid lp2pcore.PeerID) error {
if mock.SendError != nil {
return mock.SendError
}
func (mock *MockNetwork) SendTo(data []byte, pid lp2pcore.PeerID) {
mock.PublishCh <- PublishData{
Data: data,
Target: &pid,
}

return nil
}

func (mock *MockNetwork) Broadcast(data []byte, _ TopicID) error {
func (mock *MockNetwork) Broadcast(data []byte, _ TopicID) {
mock.PublishCh <- PublishData{
Data: data,
Target: nil, // Send to all
}

return nil
}

func (mock *MockNetwork) SendToOthers(data []byte, target *lp2ppeer.ID) {
Expand Down
24 changes: 18 additions & 6 deletions network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,14 +368,26 @@ func (n *network) Protect(pid lp2pcore.PeerID, tag string) {
n.host.ConnManager().Protect(pid, tag)
}

func (n *network) SendTo(msg []byte, pid lp2pcore.PeerID) error {
n.logger.Trace("Sending new message", "to", pid)

return n.stream.SendRequest(msg, pid)
// SendTo sends a message to a specific peer identified by pid asynchronously.
// It uses a goroutine to ensure that if sending is blocked, receiving messages won't be blocked.
func (n *network) SendTo(msg []byte, pid lp2pcore.PeerID) {
go func() {
err := n.stream.SendRequest(msg, pid)
if err != nil {
n.logger.Warn("error on sending msg", "pid", pid, "error", err)
}
}()
}

func (n *network) Broadcast(msg []byte, topicID TopicID) error {
return n.gossip.Broadcast(msg, topicID)
// Broadcast sends a message to all peers subscribed to a specific topic asynchronously.
// It uses a goroutine to ensure that if broadcasting is blocked, receiving messages won't be blocked.
func (n *network) Broadcast(msg []byte, topicID TopicID) {
go func() {
err := n.gossip.Broadcast(msg, topicID)
if err != nil {
n.logger.Warn("error on broadcasting msg", "error", err)
}
}()
}

func (n *network) JoinTopic(topicID TopicID, sp ShouldPropagate) error {
Expand Down
20 changes: 10 additions & 10 deletions network/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func TestNetwork(t *testing.T) {

msg := ts.RandBytes(64)

require.NoError(t, networkP.Broadcast(msg, TopicIDBlock))
networkP.Broadcast(msg, TopicIDBlock)

eB := shouldReceiveEvent(t, networkB, EventTypeGossip).(*GossipMessage)
eM := shouldReceiveEvent(t, networkM, EventTypeGossip).(*GossipMessage)
Expand All @@ -278,7 +278,7 @@ func TestNetwork(t *testing.T) {

msg := ts.RandBytes(64)

require.NoError(t, networkP.Broadcast(msg, TopicIDConsensus))
networkP.Broadcast(msg, TopicIDConsensus)

eB := shouldReceiveEvent(t, networkB, EventTypeGossip).(*GossipMessage)
eM := shouldReceiveEvent(t, networkM, EventTypeGossip).(*GossipMessage)
Expand All @@ -296,7 +296,7 @@ func TestNetwork(t *testing.T) {
require.NoError(t, networkM.host.Connect(networkM.ctx, *publicAddrInfo))

msgM := ts.RandBytes(64)
require.NoError(t, networkM.SendTo(msgM, networkP.SelfID()))
networkM.SendTo(msgM, networkP.SelfID())
eP := shouldReceiveEvent(t, networkP, EventTypeStream).(*StreamMessage)
assert.Equal(t, eP.From, networkM.SelfID())
assert.Equal(t, readData(t, eP.Reader, len(msgM)), msgM)
Expand All @@ -308,7 +308,7 @@ func TestNetwork(t *testing.T) {
require.NoError(t, networkX.host.Connect(networkX.ctx, *publicAddrInfo))

msgX := ts.RandBytes(64)
require.NoError(t, networkX.SendTo(msgX, networkP.SelfID()))
networkX.SendTo(msgX, networkP.SelfID())
eP := shouldReceiveEvent(t, networkP, EventTypeStream).(*StreamMessage)
assert.Equal(t, eP.From, networkX.SelfID())
assert.Equal(t, readData(t, eP.Reader, len(msgX)), msgX)
Expand All @@ -319,7 +319,7 @@ func TestNetwork(t *testing.T) {

msgB := ts.RandBytes(64)

require.NoError(t, networkB.SendTo(msgB, networkP.SelfID()))
networkB.SendTo(msgB, networkP.SelfID())
eB := shouldReceiveEvent(t, networkP, EventTypeStream).(*StreamMessage)
assert.Equal(t, eB.From, networkB.SelfID())
assert.Equal(t, readData(t, eB.Reader, len(msgB)), msgB)
Expand All @@ -330,8 +330,8 @@ func TestNetwork(t *testing.T) {

msg := ts.RandBytes(64)

require.NoError(t, networkM.Broadcast(msg, TopicIDBlock))
require.NoError(t, networkN.Broadcast(msg, TopicIDBlock))
networkM.Broadcast(msg, TopicIDBlock)
networkN.Broadcast(msg, TopicIDBlock)

eX := shouldReceiveEvent(t, networkX, EventTypeGossip).(*GossipMessage)

Expand All @@ -346,7 +346,7 @@ func TestNetwork(t *testing.T) {
t.Log(t.Name())

msgM := ts.RandBytes(64)
require.Error(t, networkM.SendTo(msgM, networkX.SelfID()))
networkM.SendTo(msgM, networkX.SelfID())
})

// TODO: How to test this?
Expand All @@ -366,7 +366,7 @@ func TestNetwork(t *testing.T) {
networkB.CloseConnection(networkP.SelfID())
e := shouldReceiveEvent(t, networkB, EventTypeDisconnect).(*DisconnectEvent)
assert.Equal(t, e.PeerID, networkP.SelfID())
require.Error(t, networkB.SendTo(msgB, networkP.SelfID()))
networkB.SendTo(msgB, networkP.SelfID())
})

t.Run("Reachability Status", func(t *testing.T) {
Expand Down Expand Up @@ -443,7 +443,7 @@ func testConnection(t *testing.T, networkP, networkB *network) {

msg := []byte("test-msg")

require.NoError(t, networkP.SendTo(msg, networkB.SelfID()))
networkP.SendTo(msg, networkB.SelfID())
e := shouldReceiveEvent(t, networkB, EventTypeStream).(*StreamMessage)
assert.Equal(t, e.From, networkP.SelfID())
assert.Equal(t, readData(t, e.Reader, len(msg)), msg)
Expand Down
2 changes: 1 addition & 1 deletion network/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (s *streamService) SendRequest(msg []byte, pid lp2peer.ID) error {
}

// To prevent a broken stream from being open forever.
ctxWithTimeout, cancel := context.WithTimeout(s.ctx, 1*time.Minute)
ctxWithTimeout, cancel := context.WithTimeout(s.ctx, 20*time.Second)
defer cancel()

// Attempt to open a new stream to the target peer assuming there's already direct a connection
Expand Down
18 changes: 5 additions & 13 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,10 @@ func (sync *synchronizer) sendTo(msg message.Message, to peer.ID) {
if bdl != nil {
data, _ := bdl.Encode()

err := sync.network.SendTo(data, to)
if err != nil {
sync.logger.Warn("error on sending the bundle", "bundle", bdl, "to", to, "error", err)

return
}

sync.network.SendTo(data, to)
sync.peerSet.UpdateLastSent(to)
sync.peerSet.IncreaseSentCounters(msg.Type(), int64(len(data)), &to)

sync.logger.Debug("bundle sent", "bundle", bdl, "to", to)
}
}
Expand All @@ -219,13 +214,10 @@ func (sync *synchronizer) broadcast(msg message.Message) {
bdl.Flags = util.SetFlag(bdl.Flags, bundle.BundleFlagBroadcasted)

data, _ := bdl.Encode()
err := sync.network.Broadcast(data, msg.Type().TopicID())
if err != nil {
sync.logger.Error("error on broadcasting bundle", "bundle", bdl, "error", err)
} else {
sync.logger.Info("broadcasting new bundle", "bundle", bdl)
}
sync.network.Broadcast(data, msg.Type().TopicID())
sync.peerSet.IncreaseSentCounters(msg.Type(), int64(len(data)), nil)

sync.logger.Debug("bundle broadcasted", "bundle", bdl)
}
}

Expand Down
13 changes: 0 additions & 13 deletions sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,19 +374,6 @@ func TestDownload(t *testing.T) {

assert.False(t, td.sync.peerSet.HasOpenSession(pid))
})

t.Run("testing send failure", func(t *testing.T) {
td := setup(t, conf)

pid := td.RandPeerID()
td.network.SendError = fmt.Errorf("send error")

blk, cert := td.GenerateTestBlock(td.RandHeight())
baMsg := message.NewBlockAnnounceMessage(blk, cert)
assert.NoError(t, td.receivingNewMessage(td.sync, baMsg, pid))

td.shouldNotPublishMessageWithThisType(t, message.TypeBlocksRequest)
})
}

func TestBroadcastBlockAnnounce(t *testing.T) {
Expand Down

0 comments on commit 17dc771

Please sign in to comment.