diff --git a/internal/config/config.go b/internal/config/config.go index 90541f0..2d5adcc 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -107,14 +107,6 @@ func ParseArguments() { flag.BoolVar(&wakuDnsDiscovery, "waku.dnsdiscovery", true, "Enable DNS discovery") flag.Parse() - //// NOTE: wakuv2.test ENRtree returns invalid URLs, define static nodes - //if fleet == "wakuv2.test" { - // fleet = "" - // wakuStaticNodes = append(wakuStaticNodes, "/dns4/node-01.do-ams3.wakuv2.test.status.im/tcp/30303/p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ") - // wakuStaticNodes = append(wakuStaticNodes, "/dns4/node-01.gc-us-central1-a.wakuv2.test.status.im/tcp/30303/p2p/16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47Rxx5hw3q4YjS") - // wakuStaticNodes = append(wakuStaticNodes, "/dns4/node-01.ac-cn-hongkong-c.wakuv2.test.status.im/tcp/8000/wss/p2p/16Uiu2HAkvWiyFsgRhuJEb9JfjYxEkoHLgnUQmr1N5mKWnYjxYRVm") - //} - initialAction = strings.Join(flag.Args(), " ") } diff --git a/internal/transport/encryption_test.go b/internal/transport/encryption_test.go deleted file mode 100644 index 284ab66..0000000 --- a/internal/transport/encryption_test.go +++ /dev/null @@ -1,54 +0,0 @@ -package transport - -import ( - "context" - "github.com/brianvoe/gofakeit/v6" - pp "github.com/six78/2-story-points-cli/pkg/protocol" - "github.com/stretchr/testify/suite" - "go.uber.org/zap" - "testing" -) - -func TestEncryptionSuite(t *testing.T) { - suite.Run(t, new(EncryptionSuite)) -} - -type EncryptionSuite struct { - suite.Suite - node *Node - cancel func() -} - -func (s *EncryptionSuite) SetupSuite() { - var ctx context.Context - ctx, s.cancel = context.WithCancel(context.Background()) - - logger, err := zap.NewDevelopment() - s.Require().NoError(err) - - // Skip initialization, for this test we only need roomCache and logger - s.node = NewNode(ctx, logger) -} - -func (s *EncryptionSuite) TearDownSuite() { - s.cancel() -} - -func (s *EncryptionSuite) TestPublicEncryption() { - room, err := pp.NewRoom() - s.Require().NoError(err) - - payload := make([]byte, 100) - gofakeit.Slice(payload) - - message, err := s.node.buildWakuMessage(room, payload) - s.Require().NoError(err) - - err = s.node.encryptPublicPayload(room, message) - s.Require().NoError(err) - - decryptedPayload, err := decryptMessage(room, message) - s.Require().NoError(err) - - s.Require().Equal(payload, decryptedPayload) -} diff --git a/internal/transport/fleet.go b/internal/transport/fleet.go new file mode 100644 index 0000000..89ebe32 --- /dev/null +++ b/internal/transport/fleet.go @@ -0,0 +1,42 @@ +package transport + +import ( + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" +) + +type FleetName string + +const ( + ShardsStaging FleetName = "shards.staging" + ShardsTest FleetName = "shards.test" + WakuSandbox FleetName = "waku.sandbox" + WakuTest FleetName = "waku.test" +) + +const ( + DefaultClusterID = 16 + DefaultShardID = 64 +) + +var fleets = map[FleetName]string{ + WakuSandbox: "enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im", + ShardsTest: "enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.test.shards.nodes.status.im", +} + +func FleetENRTree(fleet FleetName) (string, bool) { + enr, ok := fleets[fleet] + return enr, ok +} + +func (f FleetName) IsSharded() bool { + return f == ShardsStaging || f == ShardsTest +} + +func (f FleetName) DefaultPubsubTopic() string { + if f.IsSharded() { + return protocol.NewStaticShardingPubsubTopic(DefaultClusterID, DefaultShardID).String() + } + + return relay.DefaultWakuTopic +} diff --git a/internal/transport/fleet_test.go b/internal/transport/fleet_test.go new file mode 100644 index 0000000..f847d85 --- /dev/null +++ b/internal/transport/fleet_test.go @@ -0,0 +1,34 @@ +package transport + +import ( + "testing" + + "github.com/brianvoe/gofakeit/v6" + "github.com/stretchr/testify/require" +) + +func TestFleets(t *testing.T) { + enr, ok := FleetENRTree(WakuSandbox) + require.True(t, ok) + require.Equal(t, fleets[WakuSandbox], enr) + + enr, ok = FleetENRTree(ShardsTest) + require.True(t, ok) + require.Equal(t, fleets[ShardsTest], enr) + + _, ok = FleetENRTree(WakuTest) + require.False(t, ok) // We know this fleet, but it's not supported +} + +func TestFleetSharded(t *testing.T) { + require.True(t, ShardsTest.IsSharded()) + require.True(t, ShardsStaging.IsSharded()) + require.False(t, WakuSandbox.IsSharded()) + require.False(t, WakuTest.IsSharded()) + require.False(t, FleetName(gofakeit.LetterN(5)).IsSharded()) +} + +func TestFleetDefaultPubsubTopic(t *testing.T) { + require.Equal(t, "/waku/2/default-waku/proto", WakuSandbox.DefaultPubsubTopic()) + require.Equal(t, "/waku/2/rs/16/64", ShardsTest.DefaultPubsubTopic()) +} diff --git a/internal/transport/waku.go b/internal/transport/waku.go index 5107902..846e6f5 100644 --- a/internal/transport/waku.go +++ b/internal/transport/waku.go @@ -3,12 +3,14 @@ package transport import ( "context" "encoding/hex" + "net" + "strings" + "time" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/multiformats/go-multiaddr" "github.com/pkg/errors" - "github.com/six78/2-story-points-cli/internal/config" - pp "github.com/six78/2-story-points-cli/pkg/protocol" "github.com/waku-org/go-waku/waku/v2/dnsdisc" "github.com/waku-org/go-waku/waku/v2/node" wp "github.com/waku-org/go-waku/waku/v2/payload" @@ -20,17 +22,10 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/subscription" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" - "net" - "strings" - "time" -) -var fleets = map[string]string{ - "wakuv2.prod": "enrtree://ANEDLO25QVUGJOUTQFRYKWX6P4Z4GKVESBMHML7DZ6YK4LGS5FC5O@prod.wakuv2.nodes.status.im", - "wakuv2.test": "enrtree://AO47IDOLBKH72HIZZOXQP6NMRESAN7CHYWIBNXDXWRJRZWLODKII6@test.wakuv2.nodes.status.im", - "waku.sandbox": "enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im", - "shards.test": "enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.test.shards.nodes.status.im", -} + "github.com/six78/2-story-points-cli/internal/config" + pp "github.com/six78/2-story-points-cli/pkg/protocol" +) type Node struct { waku *node.WakuNode @@ -50,7 +45,7 @@ func NewNode(ctx context.Context, logger *zap.Logger) *Node { waku: nil, ctx: ctx, logger: logger, - pubsubTopic: relay.DefaultWakuTopic, + pubsubTopic: FleetName(config.Fleet()).DefaultPubsubTopic(), wakuConnectionStatus: nil, roomCache: NewRoomCache(logger), lightMode: config.WakuLightMode(), @@ -100,6 +95,14 @@ func (n *Node) Initialize() error { ) } + fleet := FleetName(config.Fleet()) + + if fleet.IsSharded() { + options = append(options, + node.WithClusterID(DefaultClusterID), + ) + } + options = append(options, node.DefaultWakuNodeOptions...) wakuNode, err := node.New(options...) @@ -128,6 +131,13 @@ func (n *Node) Start() error { n.logger.Info("waku started", zap.String("peerID", n.waku.ID())) + if !config.WakuLightMode() { + err = n.subscribeToPubsubTopic() + if err != nil { + return errors.Wrap(err, "failed to subscribe to pubsub topic") + } + } + if config.WakuDiscV5() { n.logger.Debug("starting discoveryV5") err = n.waku.DiscV5().Start(context.Background()) @@ -181,7 +191,7 @@ func parseEnrProtocols(v wakuenr.WakuEnrBitfield) string { } func discoverNodes(ctx context.Context, logger *zap.Logger) ([]dnsdisc.DiscoveredNode, error) { - enrTree, ok := fleets[config.Fleet()] + enrTree, ok := FleetENRTree(FleetName(config.Fleet())) if !ok { return nil, errors.Errorf("unknown fleet %s", config.Fleet()) } @@ -353,6 +363,25 @@ func (n *Node) watchConnectionStatus() { } } +func (n *Node) subscribeToPubsubTopic() error { + filter := protocol.NewContentFilter(n.pubsubTopic) + _, err := n.waku.Relay().Subscribe(n.ctx, filter) + if err != nil { + return errors.Wrap(err, "failed to subscribe to pubsub topic") + } + + go func() { + <-n.ctx.Done() + + err := n.waku.Relay().Unsubscribe(n.ctx, filter) + if err != nil { + n.logger.Warn("failed to unsubscribe from relay", zap.Error(err)) + } + }() + + return nil +} + func (n *Node) SubscribeToMessages(room *pp.Room) (*MessagesSubscription, error) { n.logger.Debug("subscribing to room") @@ -361,7 +390,7 @@ func (n *Node) SubscribeToMessages(room *pp.Room) (*MessagesSubscription, error) return nil, errors.Wrap(err, "failed to build content topic") } - contentFilter := protocol.NewContentFilter(relay.DefaultWakuTopic, contentTopic) + contentFilter := protocol.NewContentFilter(n.pubsubTopic, contentTopic) var in chan *protocol.Envelope var unsubscribe func() diff --git a/internal/transport/waku_test.go b/internal/transport/waku_test.go new file mode 100644 index 0000000..b10a055 --- /dev/null +++ b/internal/transport/waku_test.go @@ -0,0 +1,128 @@ +package transport + +import ( + "context" + "reflect" + "testing" + "time" + + "github.com/brianvoe/gofakeit/v6" + "github.com/stretchr/testify/suite" + "github.com/waku-org/go-waku/waku/v2/node" + wakuenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" + "go.uber.org/zap" + + "github.com/six78/2-story-points-cli/internal/testcommon" + pp "github.com/six78/2-story-points-cli/pkg/protocol" +) + +func TestWakuSuite(t *testing.T) { + suite.Run(t, new(WakuSuite)) +} + +type WakuSuite struct { + testcommon.Suite + node *Node + cancel func() +} + +func (s *WakuSuite) SetupSuite() { + var ctx context.Context + ctx, s.cancel = context.WithCancel(context.Background()) + + logger, err := zap.NewDevelopment() + s.Require().NoError(err) + + // Skip initialization, for this test we only need roomCache and logger + s.node = NewNode(ctx, logger) +} + +func (s *WakuSuite) TearDownSuite() { + s.cancel() +} + +func (s *WakuSuite) TestPublicEncryption() { + room, err := pp.NewRoom() + s.Require().NoError(err) + + payload := make([]byte, 100) + gofakeit.Slice(payload) + + message, err := s.node.buildWakuMessage(room, payload) + s.Require().NoError(err) + + err = s.node.encryptPublicPayload(room, message) + s.Require().NoError(err) + + decryptedPayload, err := decryptMessage(room, message) + s.Require().NoError(err) + + s.Require().Equal(payload, decryptedPayload) +} + +func (s *WakuSuite) TestWakuInitialize() { + err := s.node.Initialize() + s.Require().NoError(err) +} + +func (s *WakuSuite) TestParseEnrProtocols() { + p := parseEnrProtocols(wakuenr.WakuEnrBitfield(0b00000000)) + s.Require().Empty(p) + + p = parseEnrProtocols(wakuenr.WakuEnrBitfield(0b00000001)) + s.Require().Equal("relay", p) + + p = parseEnrProtocols(wakuenr.WakuEnrBitfield(0b00000010)) + s.Require().Equal("store", p) + + p = parseEnrProtocols(wakuenr.WakuEnrBitfield(0b00000011)) + s.Require().Equal("store,relay", p) + + p = parseEnrProtocols(wakuenr.WakuEnrBitfield(0b00000100)) + s.Require().Equal("filter", p) + + p = parseEnrProtocols(wakuenr.WakuEnrBitfield(0b00001000)) + s.Require().Equal("lightpush", p) + + p = parseEnrProtocols(wakuenr.WakuEnrBitfield(0b00001111)) + s.Require().Equal("lightpush,filter,store,relay", p) +} + +func (s *WakuSuite) TestWatchConnectionStatus() { + err := s.node.Initialize() + s.Require().NoError(err) + + sub := s.node.SubscribeToConnectionStatus() + + finished := make(chan struct{}) + + go func() { + s.node.watchConnectionStatus() + close(finished) + }() + + sent := node.ConnStatus{} + err = gofakeit.Struct(&sent) + s.Require().NoError(err) + + s.node.wakuConnectionStatus <- sent + + select { + case received := <-sub: + s.Require().Equal(sent.IsOnline, received.IsOnline) + s.Require().Equal(sent.HasHistory, received.HasHistory) + s.Require().Equal(len(sent.Peers), received.PeersCount) + s.Require().True(reflect.DeepEqual(received, s.node.ConnectionStatus())) + case <-time.After(500 * time.Millisecond): + s.Require().Fail("timeout waiting for connection status") + } + + close(s.node.wakuConnectionStatus) + + select { + case <-finished: + break + case <-time.After(500 * time.Millisecond): + s.Require().Fail("timeout waiting for connection status watch finish") + } +}