Skip to content

Commit

Permalink
fix: proper subscribe to shards.test fleet (#49)
Browse files Browse the repository at this point in the history
* fix: proper subscribe to shards.test fleet

* test: waku initialization test

* test: fleets

* test: parseEnrProtocols

* test: watch transport connection status
  • Loading branch information
igor-sirotin authored Jun 23, 2024
1 parent bd52c08 commit df9939c
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 77 deletions.
8 changes: 0 additions & 8 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,6 @@ func ParseArguments() {
flag.BoolVar(&wakuDnsDiscovery, "waku.dnsdiscovery", true, "Enable DNS discovery")
flag.Parse()

//// NOTE: wakuv2.test ENRtree returns invalid URLs, define static nodes
//if fleet == "wakuv2.test" {
// fleet = ""
// wakuStaticNodes = append(wakuStaticNodes, "/dns4/node-01.do-ams3.wakuv2.test.status.im/tcp/30303/p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ")
// wakuStaticNodes = append(wakuStaticNodes, "/dns4/node-01.gc-us-central1-a.wakuv2.test.status.im/tcp/30303/p2p/16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47Rxx5hw3q4YjS")
// wakuStaticNodes = append(wakuStaticNodes, "/dns4/node-01.ac-cn-hongkong-c.wakuv2.test.status.im/tcp/8000/wss/p2p/16Uiu2HAkvWiyFsgRhuJEb9JfjYxEkoHLgnUQmr1N5mKWnYjxYRVm")
//}

initialAction = strings.Join(flag.Args(), " ")
}

Expand Down
54 changes: 0 additions & 54 deletions internal/transport/encryption_test.go

This file was deleted.

42 changes: 42 additions & 0 deletions internal/transport/fleet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package transport

import (
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
)

type FleetName string

const (
ShardsStaging FleetName = "shards.staging"
ShardsTest FleetName = "shards.test"
WakuSandbox FleetName = "waku.sandbox"
WakuTest FleetName = "waku.test"
)

const (
DefaultClusterID = 16
DefaultShardID = 64
)

var fleets = map[FleetName]string{
WakuSandbox: "enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im",
ShardsTest: "enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.test.shards.nodes.status.im",
}

func FleetENRTree(fleet FleetName) (string, bool) {
enr, ok := fleets[fleet]
return enr, ok
}

func (f FleetName) IsSharded() bool {
return f == ShardsStaging || f == ShardsTest
}

func (f FleetName) DefaultPubsubTopic() string {
if f.IsSharded() {
return protocol.NewStaticShardingPubsubTopic(DefaultClusterID, DefaultShardID).String()
}

return relay.DefaultWakuTopic
}
34 changes: 34 additions & 0 deletions internal/transport/fleet_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package transport

import (
"testing"

"github.com/brianvoe/gofakeit/v6"
"github.com/stretchr/testify/require"
)

func TestFleets(t *testing.T) {
enr, ok := FleetENRTree(WakuSandbox)
require.True(t, ok)
require.Equal(t, fleets[WakuSandbox], enr)

enr, ok = FleetENRTree(ShardsTest)
require.True(t, ok)
require.Equal(t, fleets[ShardsTest], enr)

_, ok = FleetENRTree(WakuTest)
require.False(t, ok) // We know this fleet, but it's not supported
}

func TestFleetSharded(t *testing.T) {
require.True(t, ShardsTest.IsSharded())
require.True(t, ShardsStaging.IsSharded())
require.False(t, WakuSandbox.IsSharded())
require.False(t, WakuTest.IsSharded())
require.False(t, FleetName(gofakeit.LetterN(5)).IsSharded())
}

func TestFleetDefaultPubsubTopic(t *testing.T) {
require.Equal(t, "/waku/2/default-waku/proto", WakuSandbox.DefaultPubsubTopic())
require.Equal(t, "/waku/2/rs/16/64", ShardsTest.DefaultPubsubTopic())
}
59 changes: 44 additions & 15 deletions internal/transport/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package transport
import (
"context"
"encoding/hex"
"net"
"strings"
"time"

"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
"github.com/six78/2-story-points-cli/internal/config"
pp "github.com/six78/2-story-points-cli/pkg/protocol"
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
"github.com/waku-org/go-waku/waku/v2/node"
wp "github.com/waku-org/go-waku/waku/v2/payload"
Expand All @@ -20,17 +22,10 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"net"
"strings"
"time"
)

var fleets = map[string]string{
"wakuv2.prod": "enrtree://ANEDLO25QVUGJOUTQFRYKWX6P4Z4GKVESBMHML7DZ6YK4LGS5FC5O@prod.wakuv2.nodes.status.im",
"wakuv2.test": "enrtree://AO47IDOLBKH72HIZZOXQP6NMRESAN7CHYWIBNXDXWRJRZWLODKII6@test.wakuv2.nodes.status.im",
"waku.sandbox": "enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im",
"shards.test": "enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.test.shards.nodes.status.im",
}
"github.com/six78/2-story-points-cli/internal/config"
pp "github.com/six78/2-story-points-cli/pkg/protocol"
)

type Node struct {
waku *node.WakuNode
Expand All @@ -50,7 +45,7 @@ func NewNode(ctx context.Context, logger *zap.Logger) *Node {
waku: nil,
ctx: ctx,
logger: logger,
pubsubTopic: relay.DefaultWakuTopic,
pubsubTopic: FleetName(config.Fleet()).DefaultPubsubTopic(),
wakuConnectionStatus: nil,
roomCache: NewRoomCache(logger),
lightMode: config.WakuLightMode(),
Expand Down Expand Up @@ -100,6 +95,14 @@ func (n *Node) Initialize() error {
)
}

fleet := FleetName(config.Fleet())

if fleet.IsSharded() {
options = append(options,
node.WithClusterID(DefaultClusterID),
)
}

options = append(options, node.DefaultWakuNodeOptions...)

wakuNode, err := node.New(options...)
Expand Down Expand Up @@ -128,6 +131,13 @@ func (n *Node) Start() error {

n.logger.Info("waku started", zap.String("peerID", n.waku.ID()))

if !config.WakuLightMode() {
err = n.subscribeToPubsubTopic()
if err != nil {
return errors.Wrap(err, "failed to subscribe to pubsub topic")
}
}

if config.WakuDiscV5() {
n.logger.Debug("starting discoveryV5")
err = n.waku.DiscV5().Start(context.Background())
Expand Down Expand Up @@ -181,7 +191,7 @@ func parseEnrProtocols(v wakuenr.WakuEnrBitfield) string {
}

func discoverNodes(ctx context.Context, logger *zap.Logger) ([]dnsdisc.DiscoveredNode, error) {
enrTree, ok := fleets[config.Fleet()]
enrTree, ok := FleetENRTree(FleetName(config.Fleet()))
if !ok {
return nil, errors.Errorf("unknown fleet %s", config.Fleet())
}
Expand Down Expand Up @@ -353,6 +363,25 @@ func (n *Node) watchConnectionStatus() {
}
}

func (n *Node) subscribeToPubsubTopic() error {
filter := protocol.NewContentFilter(n.pubsubTopic)
_, err := n.waku.Relay().Subscribe(n.ctx, filter)
if err != nil {
return errors.Wrap(err, "failed to subscribe to pubsub topic")
}

go func() {
<-n.ctx.Done()

err := n.waku.Relay().Unsubscribe(n.ctx, filter)
if err != nil {
n.logger.Warn("failed to unsubscribe from relay", zap.Error(err))
}
}()

return nil
}

func (n *Node) SubscribeToMessages(room *pp.Room) (*MessagesSubscription, error) {
n.logger.Debug("subscribing to room")

Expand All @@ -361,7 +390,7 @@ func (n *Node) SubscribeToMessages(room *pp.Room) (*MessagesSubscription, error)
return nil, errors.Wrap(err, "failed to build content topic")
}

contentFilter := protocol.NewContentFilter(relay.DefaultWakuTopic, contentTopic)
contentFilter := protocol.NewContentFilter(n.pubsubTopic, contentTopic)

var in chan *protocol.Envelope
var unsubscribe func()
Expand Down
128 changes: 128 additions & 0 deletions internal/transport/waku_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package transport

import (
"context"
"reflect"
"testing"
"time"

"github.com/brianvoe/gofakeit/v6"
"github.com/stretchr/testify/suite"
"github.com/waku-org/go-waku/waku/v2/node"
wakuenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
"go.uber.org/zap"

"github.com/six78/2-story-points-cli/internal/testcommon"
pp "github.com/six78/2-story-points-cli/pkg/protocol"
)

func TestWakuSuite(t *testing.T) {
suite.Run(t, new(WakuSuite))
}

type WakuSuite struct {
testcommon.Suite
node *Node
cancel func()
}

func (s *WakuSuite) SetupSuite() {
var ctx context.Context
ctx, s.cancel = context.WithCancel(context.Background())

logger, err := zap.NewDevelopment()
s.Require().NoError(err)

// Skip initialization, for this test we only need roomCache and logger
s.node = NewNode(ctx, logger)
}

func (s *WakuSuite) TearDownSuite() {
s.cancel()
}

func (s *WakuSuite) TestPublicEncryption() {
room, err := pp.NewRoom()
s.Require().NoError(err)

payload := make([]byte, 100)
gofakeit.Slice(payload)

message, err := s.node.buildWakuMessage(room, payload)
s.Require().NoError(err)

err = s.node.encryptPublicPayload(room, message)
s.Require().NoError(err)

decryptedPayload, err := decryptMessage(room, message)
s.Require().NoError(err)

s.Require().Equal(payload, decryptedPayload)
}

func (s *WakuSuite) TestWakuInitialize() {
err := s.node.Initialize()
s.Require().NoError(err)
}

func (s *WakuSuite) TestParseEnrProtocols() {
p := parseEnrProtocols(wakuenr.WakuEnrBitfield(0b00000000))
s.Require().Empty(p)

p = parseEnrProtocols(wakuenr.WakuEnrBitfield(0b00000001))
s.Require().Equal("relay", p)

p = parseEnrProtocols(wakuenr.WakuEnrBitfield(0b00000010))
s.Require().Equal("store", p)

p = parseEnrProtocols(wakuenr.WakuEnrBitfield(0b00000011))
s.Require().Equal("store,relay", p)

p = parseEnrProtocols(wakuenr.WakuEnrBitfield(0b00000100))
s.Require().Equal("filter", p)

p = parseEnrProtocols(wakuenr.WakuEnrBitfield(0b00001000))
s.Require().Equal("lightpush", p)

p = parseEnrProtocols(wakuenr.WakuEnrBitfield(0b00001111))
s.Require().Equal("lightpush,filter,store,relay", p)
}

func (s *WakuSuite) TestWatchConnectionStatus() {
err := s.node.Initialize()
s.Require().NoError(err)

sub := s.node.SubscribeToConnectionStatus()

finished := make(chan struct{})

go func() {
s.node.watchConnectionStatus()
close(finished)
}()

sent := node.ConnStatus{}
err = gofakeit.Struct(&sent)
s.Require().NoError(err)

s.node.wakuConnectionStatus <- sent

select {
case received := <-sub:
s.Require().Equal(sent.IsOnline, received.IsOnline)
s.Require().Equal(sent.HasHistory, received.HasHistory)
s.Require().Equal(len(sent.Peers), received.PeersCount)
s.Require().True(reflect.DeepEqual(received, s.node.ConnectionStatus()))
case <-time.After(500 * time.Millisecond):
s.Require().Fail("timeout waiting for connection status")
}

close(s.node.wakuConnectionStatus)

select {
case <-finished:
break
case <-time.After(500 * time.Millisecond):
s.Require().Fail("timeout waiting for connection status watch finish")
}
}

0 comments on commit df9939c

Please sign in to comment.