Skip to content

Commit

Permalink
Refactor makeSubscriptionsToBootnodes and fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
AnkushinDaniil committed Jan 22, 2025
1 parent 7ae5c6c commit 6b301dd
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 20 deletions.
50 changes: 33 additions & 17 deletions l1/l1.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (c *Client) Run(ctx context.Context) error {
})
if c.network.BootnodeRegistry != emptyBootnodeRegistry {
errs.Go(func() error {
return c.makeSubscribtionsToBootnodes(ctx, buffer)
return c.makeSubscriptionsToBootnodes(ctx, buffer)
})
}
return errs.Wait()
Expand Down Expand Up @@ -229,41 +229,57 @@ func (c *Client) makeSubscriptionToStateUpdates(ctx context.Context, buffer int)
}
}

func (c *Client) makeSubscribtionsToBootnodes(ctx context.Context, buffer int) error {
func (c *Client) makeSubscriptionsToBootnodes(ctx context.Context, buffer int) error {
defer close(c.eventsToP2P)

addresses, err := c.l1.GetIPAddresses(ctx, c.network.BootnodeRegistry)
if err != nil {
if err := c.processInitialAddresses(ctx); err != nil {
return err
}

for _, address := range addresses {
select {
case c.eventsToP2P <- p2p.BootnodeRegistryEvent{
EventType: p2p.Add,
Address: address,
}:
case <-ctx.Done():
return ctx.Err()
}
}

addedChan := make(chan *contract.BootnodeRegistryIPAdded, buffer)
addedSub, err := c.subscribeToBootnodeAddition(ctx, addedChan)
if err != nil {
return err
return fmt.Errorf("failed to setup addition subscription: %w", err)
}
defer addedSub.Unsubscribe()

removedChan := make(chan *contract.BootnodeRegistryIPRemoved, buffer)
removedSub, err := c.subscribeToBootnodeRemoval(ctx, removedChan)
if err != nil {
return err
return fmt.Errorf("failed to setup removal subscription: %w", err)
}
defer removedSub.Unsubscribe()

c.log.Debugw("Successfully subscribed to bootnode registry events")

// Handle events
return c.handleBootnodeEvents(ctx, addedSub, removedSub, addedChan, removedChan)
}

func (c *Client) processInitialAddresses(ctx context.Context) error {
addresses, err := c.l1.GetIPAddresses(ctx, c.network.BootnodeRegistry)
if err != nil {
return fmt.Errorf("failed to fetch bootnode registry addresses: %w", err)
}

for _, address := range addresses {
select {
case c.eventsToP2P <- p2p.BootnodeRegistryEvent{
EventType: p2p.Add,
Address: address,
}:
case <-ctx.Done():
return ctx.Err()

Check warning on line 272 in l1/l1.go

View check run for this annotation

Codecov / codecov/patch

l1/l1.go#L271-L272

Added lines #L271 - L272 were not covered by tests
}
}
return nil
}

func (c *Client) handleBootnodeEvents(
ctx context.Context, addedSub, removedSub event.Subscription,
addedChan chan *contract.BootnodeRegistryIPAdded,
removedChan chan *contract.BootnodeRegistryIPRemoved,
) error {
for {
select {
case <-ctx.Done():
Expand Down
4 changes: 2 additions & 2 deletions l1/l1_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ func TestMakeSubscribtionsToBootnodes(t *testing.T) {
}).
Times(1)

require.NoError(t, client.makeSubscribtionsToBootnodes(ctx, 1))
require.ErrorIs(t, client.makeSubscriptionsToBootnodes(ctx, 1), context.DeadlineExceeded)

expectedAddressesToAdd := make(map[string]struct{}, len(addressesToAdd)+len(storedAddresses))
for _, addr := range addressesToAdd {
Expand Down Expand Up @@ -662,7 +662,7 @@ func TestUnreliableSubscriptionToBootnodes(t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), tc.timeOut)
defer cancel()
err := client.makeSubscribtionsToBootnodes(ctx, 1)
err := client.makeSubscriptionsToBootnodes(ctx, 1)
require.ErrorIs(t, err, tc.expectedErr)
})
}
Expand Down
2 changes: 1 addition & 1 deletion l1/l1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func TestEventListener(t *testing.T) {
})

ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
require.NoError(t, client.Run(ctx))
require.ErrorIs(t, client.Run(ctx), context.DeadlineExceeded)
cancel()

require.Equal(t, &core.L1Head{
Expand Down

0 comments on commit 6b301dd

Please sign in to comment.