Skip to content

Commit

Permalink
fix(sync): fix concurrent map read-write crash (#1112)
Browse files Browse the repository at this point in the history
  • Loading branch information
themantre committed Feb 18, 2024
1 parent 776b31e commit 8fe0ff2
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 46 deletions.
15 changes: 4 additions & 11 deletions committee/committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,14 @@ type committee struct {
proposerPos *linkedlist.Element[*validator.Validator]
}

func cloneValidator(val *validator.Validator) *validator.Validator {
cloned := new(validator.Validator)
*cloned = *val

return cloned
}

func NewCommittee(validators []*validator.Validator, committeeSize int,
proposerAddress crypto.Address,
) (Committee, error) {
validatorList := linkedlist.New[*validator.Validator]()
var proposerPos *linkedlist.Element[*validator.Validator]

for _, val := range validators {
el := validatorList.InsertAtTail(cloneValidator(val))
el := validatorList.InsertAtTail(val.Clone())
if val.Address() == proposerAddress {
proposerPos = el
}
Expand Down Expand Up @@ -69,7 +62,7 @@ func (c *committee) Update(lastRound int16, joined []*validator.Validator) {
for _, val := range joined {
committeeVal := c.find(val.Address())
if committeeVal == nil {
c.validatorList.InsertBefore(cloneValidator(val), c.proposerPos)
c.validatorList.InsertBefore(val.Clone(), c.proposerPos)
} else {
committeeVal.UpdateLastSortitionHeight(val.LastSortitionHeight())

Expand Down Expand Up @@ -121,7 +114,7 @@ func (c *committee) Validators() []*validator.Validator {
vals := make([]*validator.Validator, c.validatorList.Length())
i := 0
c.iterate(func(v *validator.Validator) bool {
vals[i] = cloneValidator(v)
vals[i] = v.Clone()
i++

return false
Expand Down Expand Up @@ -159,7 +152,7 @@ func (c *committee) IsProposer(addr crypto.Address, round int16) bool {
// Proposer returns an instance of the proposer validator for the specified round.
// A cloned instance of the proposer is returned to avoid modification of the original object.
func (c *committee) Proposer(round int16) *validator.Validator {
return cloneValidator(c.proposer(round))
return c.proposer(round).Clone()
}

func (c *committee) proposer(round int16) *validator.Validator {
Expand Down
5 changes: 4 additions & 1 deletion network/peermgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,10 @@ func (mgr *peerMgr) CheckConnectivity() {
}
}

mgr.logger.Debug("check connectivity", "peers", connectedPeers)
mgr.logger.Debug("check connectivity",
"peers", len(connectedPeers),
"inbound", mgr.numInbound,
"outbound", mgr.numOutbound)

switch {
case len(connectedPeers) > mgr.maxConns:
Expand Down
8 changes: 4 additions & 4 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,8 @@ func (s *store) TotalAccounts() int32 {
}

func (s *store) IterateAccounts(consumer func(crypto.Address, *account.Account) (stop bool)) {
s.lk.Lock()
defer s.lk.Unlock()
s.lk.RLock()
defer s.lk.RUnlock()

s.accountStore.iterateAccounts(consumer)
}
Expand Down Expand Up @@ -321,8 +321,8 @@ func (s *store) TotalValidators() int32 {
}

func (s *store) IterateValidators(consumer func(*validator.Validator) (stop bool)) {
s.lk.Lock()
defer s.lk.Unlock()
s.lk.RLock()
defer s.lk.RUnlock()

s.validatorStore.iterateValidators(consumer)
}
Expand Down
21 changes: 12 additions & 9 deletions sync/peerset/peer_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ import (
"github.com/pactus-project/pactus/util"
)

// TODO:
// - Add tests for peerset
// - Is it thread safe (GetPeer and IteratePeers) ??

type PeerSet struct {
lk sync.RWMutex

Expand Down Expand Up @@ -404,6 +400,9 @@ func (ps *PeerSet) StartedAt() time.Time {
}

func (ps *PeerSet) IteratePeers(consumer func(peer *Peer) (stop bool)) {
ps.lk.RLock()
defer ps.lk.RUnlock()

for _, p := range ps.peers {
stopped := consumer(p)
if stopped {
Expand All @@ -412,13 +411,17 @@ func (ps *PeerSet) IteratePeers(consumer func(peer *Peer) (stop bool)) {
}
}

func (ps *PeerSet) IterateSessions(consumer func(s *session.Session) (stop bool)) {
func (ps *PeerSet) Sessions() []*session.Session {
ps.lk.RLock()
defer ps.lk.RUnlock()

sessions := make([]*session.Session, 0, len(ps.sessions))

for _, ssn := range ps.sessions {
stopped := consumer(ssn)
if stopped {
return
}
sessions = append(sessions, ssn)
}

return sessions
}

// GetRandomPeer selects a random peer from the peer set based on their weights.
Expand Down
35 changes: 20 additions & 15 deletions sync/peerset/peer_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ func TestPeerSet(t *testing.T) {
pk3, _ := ts.RandBLSKeyPair()
pk4, _ := ts.RandBLSKeyPair()
pk5, _ := ts.RandBLSKeyPair()
pid1 := peer.ID("peer1")
pid2 := peer.ID("peer2")
pid3 := peer.ID("peer3")
pid1 := ts.RandPeerID()
pid2 := ts.RandPeerID()
pid3 := ts.RandPeerID()
peerSet.UpdateInfo(pid1, "Moniker1", "Agent1",
[]*bls.PublicKey{pk1, pk2}, service.New(service.Network))
peerSet.UpdateInfo(pid2, "Moniker2", "Agent2",
Expand All @@ -40,20 +40,18 @@ func TestPeerSet(t *testing.T) {

t.Run("Testing Iterate peers", func(t *testing.T) {
// Verify that the peer list contains the expected peers
expectedPeerIDs := []peer.ID{pid1, pid2, pid3}
found := false
peerSet.IteratePeers(func(p *Peer) bool {
found := false
for _, expectedID := range expectedPeerIDs {
if p.PeerID == expectedID {
found = true
if p.PeerID == pid2 {
found = true

break
}
return true
}
assert.True(t, found, "Peer with ID %s not found in the peer list", p.PeerID)

return false
})

assert.True(t, found, "Peer with ID %s not found in the peer list", pid2)
})

t.Run("Testing GetPeer", func(t *testing.T) {
Expand Down Expand Up @@ -143,18 +141,20 @@ func TestPeerSet(t *testing.T) {
})

t.Run("Testing RemovePeer", func(t *testing.T) {
peerSet.RemovePeer(peer.ID("unknown"))
peerSet.RemovePeer(ts.RandPeerID())
assert.Equal(t, peerSet.Len(), 3)

peerSet.RemovePeer(peer.ID("peer2"))
peerSet.RemovePeer(pid2)
assert.Equal(t, peerSet.Len(), 2)
})
}

func TestOpenSession(t *testing.T) {
ts := testsuite.NewTestSuite(t)

ps := NewPeerSet(time.Minute)

pid := peer.ID("peer1")
pid := ts.RandPeerID()
ssn := ps.OpenSession(pid, 100, 1)

assert.NotNil(t, ssn)
Expand All @@ -164,8 +164,9 @@ func TestOpenSession(t *testing.T) {
assert.Equal(t, session.Open, ssn.Status)
assert.LessOrEqual(t, ssn.LastActivity, time.Now())
assert.True(t, ps.HasOpenSession(pid))
assert.False(t, ps.HasOpenSession("peer2"))
assert.False(t, ps.HasOpenSession(ts.RandPeerID()))
assert.Equal(t, 1, ps.NumberOfSessions())
assert.Contains(t, ps.Sessions(), ssn)
}

func TestFindSession(t *testing.T) {
Expand Down Expand Up @@ -298,19 +299,23 @@ func TestGetRandomPeerConnected(t *testing.T) {

pidBanned := peer.ID("known")
pidConnected := peer.ID("connected")
pidDisconnected := peer.ID("disconnected")
pidKnown := peer.ID("banned")
peerSet.UpdateInfo(pidBanned, "moniker", "agent", nil, service.New())
peerSet.UpdateInfo(pidConnected, "moniker", "agent", nil, service.New())
peerSet.UpdateInfo(pidDisconnected, "moniker", "agent", nil, service.New())
peerSet.UpdateInfo(pidKnown, "moniker", "agent", nil, service.New())

peerSet.UpdateStatus(pidBanned, StatusCodeBanned)
peerSet.UpdateStatus(pidConnected, StatusCodeConnected)
peerSet.UpdateStatus(pidDisconnected, StatusCodeDisconnected)
peerSet.UpdateStatus(pidKnown, StatusCodeKnown)

p := peerSet.GetRandomPeer()

assert.NotEqual(t, p.PeerID, pidBanned)
assert.NotEqual(t, p.PeerID, pidConnected)
assert.NotEqual(t, p.PeerID, pidDisconnected)
assert.Equal(t, p.PeerID, pidKnown)
}

Expand Down
11 changes: 5 additions & 6 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,21 +373,20 @@ func (sync *synchronizer) updateBlockchain() {
// Check if we have any expired sessions
sync.peerSet.SetExpiredSessionsAsUncompleted()

sync.peerSet.IterateSessions(func(ssn *session.Session) bool {
// Try to re-download the blocks for uncompleted sessions
sessions := sync.peerSet.Sessions()
for _, ssn := range sessions {
if ssn.Status == session.Uncompleted {
sync.logger.Info("uncompleted block request, re-download",
"sid", ssn.SessionID, "pid", ssn.PeerID,
"stats", sync.peerSet.SessionStats())

// Try to re-download the blocks from this closed session
sent := sync.sendBlockRequestToRandomPeer(ssn.From, ssn.Count, true)
if !sent {
return true
break
}
}

return false
})
}

// First, let's check if we have any open sessions.
// If there are any open sessions, we should wait for them to be closed.
Expand Down

0 comments on commit 8fe0ff2

Please sign in to comment.