Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(zeromq): add publisher raw tx #1672

Merged
merged 8 commits into from
Jan 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 0 additions & 56 deletions www/zmq/config.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package zmq

import (
"errors"
"fmt"
"net/url"
"strings"
)

type Config struct {
Expand All @@ -26,62 +23,9 @@ func DefaultConfig() *Config {
}

func (c *Config) BasicCheck() error {
if c.ZmqPubBlockInfo != "" {
if err := validateTopicSocket(c.ZmqPubBlockInfo); err != nil {
return err
}
}

if c.ZmqPubTxInfo != "" {
if err := validateTopicSocket(c.ZmqPubTxInfo); err != nil {
return err
}
}

if c.ZmqPubRawBlock != "" {
if err := validateTopicSocket(c.ZmqPubRawBlock); err != nil {
return err
}
}

if c.ZmqPubRawTx != "" {
if err := validateTopicSocket(c.ZmqPubRawTx); err != nil {
return err
}
}

if c.ZmqPubHWM < 0 {
return fmt.Errorf("invalid publisher hwm %d", c.ZmqPubHWM)
}

return nil
}

func validateTopicSocket(socket string) error {
addr, err := url.Parse(socket)
if err != nil {
return errors.New("failed to parse URL: " + err.Error())
}

if addr.Scheme != "tcp" {
return errors.New("invalid scheme: zeromq socket schema")
}

if addr.Host == "" {
return errors.New("invalid host: host is empty")
}

parts := strings.Split(addr.Host, ":")
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
return errors.New("invalid host: missing or malformed host/port")
}

port := parts[1]
for _, r := range port {
if r < '0' || r > '9' {
return errors.New("invalid port: non-numeric characters detected")
}
}

return nil
}
21 changes: 0 additions & 21 deletions www/zmq/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,6 @@ func TestBasicCheck(t *testing.T) {
},
expectErr: false,
},
{
name: "Invalid scheme",
config: &Config{
ZmqPubBlockInfo: "udp://127.0.0.1:28332",
},
expectErr: true,
},
{
name: "Missing port",
config: &Config{
ZmqPubBlockInfo: "tcp://127.0.0.1",
},
expectErr: true,
},
{
name: "Empty host",
config: &Config{
ZmqPubTxInfo: "tcp://:28332",
},
expectErr: true,
},
{
name: "Negative ZmqPubHWM",
config: &Config{
Expand Down
2 changes: 1 addition & 1 deletion www/zmq/publisher_block_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (b *blockInfoPub) onNewBlock(blk *block.Block) {
return
}

b.logger.Debug("zmq published message success",
b.logger.Debug("ZMQ published the message successfully",
"publisher", b.TopicName(),
"block_height", blk.Height())

Expand Down
2 changes: 1 addition & 1 deletion www/zmq/publisher_raw_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (r *rawBlockPub) onNewBlock(blk *block.Block) {
return
}

r.logger.Debug("zmq published message success",
r.logger.Debug("ZMQ published the message successfully",
"publisher", r.TopicName(),
"block_height", blk.Height())

Expand Down
27 changes: 24 additions & 3 deletions www/zmq/publisher_raw_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,28 @@
}
}

func (*rawTxPub) onNewBlock(_ *block.Block) {
// TODO implement me
panic("implement me")
func (r *rawTxPub) onNewBlock(blk *block.Block) {
for _, tx := range blk.Transactions() {
buf, err := tx.Bytes()
if err != nil {
r.logger.Error("failed to serializing raw tx", "err", err, "topic", r.TopicName())

return
}

Check warning on line 30 in www/zmq/publisher_raw_tx.go

View check run for this annotation

Codecov / codecov/patch

www/zmq/publisher_raw_tx.go#L27-L30

Added lines #L27 - L30 were not covered by tests

rawMsg := r.makeTopicMsg(buf, blk.Height())
message := zmq4.NewMsg(rawMsg)

if err := r.zmqSocket.Send(message); err != nil {
r.logger.Error("zmq publish message error", "err", err, "publisher", r.TopicName())

return
}

Check warning on line 39 in www/zmq/publisher_raw_tx.go

View check run for this annotation

Codecov / codecov/patch

www/zmq/publisher_raw_tx.go#L36-L39

Added lines #L36 - L39 were not covered by tests

r.logger.Debug("ZMQ published the message successfully",
"publisher", r.TopicName(),
"block_height", blk.Height())

r.seqNo++
}
}
65 changes: 65 additions & 0 deletions www/zmq/publisher_raw_tx_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package zmq

import (
"context"
"encoding/binary"
"fmt"
"testing"

"github.com/go-zeromq/zmq4"
"github.com/pactus-project/pactus/types/tx"
"github.com/pactus-project/pactus/util/testsuite"
"github.com/stretchr/testify/require"
)

func TestRawTxPublisher(t *testing.T) {
port := testsuite.FindFreePort()
addr := fmt.Sprintf("tcp://localhost:%d", port)
conf := DefaultConfig()
conf.ZmqPubRawTx = addr

td := setup(t, conf)
defer td.closeServer()

td.server.Publishers()

sub := zmq4.NewSub(context.TODO(), zmq4.WithAutomaticReconnect(false))

err := sub.Dial(addr)
require.NoError(t, err)

err = sub.SetOption(zmq4.OptionSubscribe, string(TopicRawTransaction.Bytes()))
require.NoError(t, err)

blk, _ := td.TestSuite.GenerateTestBlock(td.RandHeight())

td.eventCh <- blk

for i := 0; i < len(blk.Transactions()); i++ {
received, err := sub.Recv()
require.NoError(t, err)

require.NotNil(t, received.Frames)
require.GreaterOrEqual(t, len(received.Frames), 1)

msg := received.Frames[0]

topic := msg[:2]
rawTx := msg[2 : len(msg)-8]

blockNumberOffset := len(msg) - 8
height := binary.BigEndian.Uint32(msg[blockNumberOffset : blockNumberOffset+4])
seqNo := binary.BigEndian.Uint32(msg[len(msg)-4:])

txn, err := tx.FromBytes(rawTx)
require.NoError(t, err)
require.NotNil(t, txn)

require.Equal(t, TopicRawTransaction.Bytes(), topic)
require.Equal(t, height, blk.Height())
require.Equal(t, uint32(i), seqNo)
require.NotEqual(t, 0, txn.SerializeSize())
}

require.NoError(t, sub.Close())
}
111 changes: 111 additions & 0 deletions www/zmq/publisher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package zmq

import (
"bytes"
"context"
"encoding/binary"
"fmt"
"testing"

"github.com/go-zeromq/zmq4"
"github.com/pactus-project/pactus/crypto/hash"
"github.com/pactus-project/pactus/types/block"
"github.com/pactus-project/pactus/types/tx"
"github.com/pactus-project/pactus/util/testsuite"
"github.com/stretchr/testify/require"
)

func TestPublisherOnSameSockets(t *testing.T) {
port := testsuite.FindFreePort()
addr := fmt.Sprintf("tcp://localhost:%d", port)
conf := DefaultConfig()
conf.ZmqPubRawTx = addr
conf.ZmqPubTxInfo = addr
conf.ZmqPubRawBlock = addr
conf.ZmqPubBlockInfo = addr

td := setup(t, conf)
defer td.closeServer()

td.server.Publishers()

sub := zmq4.NewSub(context.TODO(), zmq4.WithAutomaticReconnect(false))

err := sub.Dial(addr)
require.NoError(t, err)

err = sub.SetOption(zmq4.OptionSubscribe, string(TopicTransactionInfo.Bytes()))
require.NoError(t, err)

err = sub.SetOption(zmq4.OptionSubscribe, string(TopicRawTransaction.Bytes()))
require.NoError(t, err)

err = sub.SetOption(zmq4.OptionSubscribe, string(TopicBlockInfo.Bytes()))
require.NoError(t, err)

err = sub.SetOption(zmq4.OptionSubscribe, string(TopicRawBlock.Bytes()))
require.NoError(t, err)

blk, _ := td.TestSuite.GenerateTestBlock(td.RandHeight())

td.eventCh <- blk

for i := 0; i < (len(blk.Transactions())*2)+2; i++ {
received, err := sub.Recv()
require.NoError(t, err)

require.NotNil(t, received.Frames)
require.GreaterOrEqual(t, len(received.Frames), 1)

msg := received.Frames[0]

topic := TopicFromBytes(msg[:2])
blockNumberOffset := len(msg) - 8
height := binary.BigEndian.Uint32(msg[blockNumberOffset : blockNumberOffset+4])
seqNo := binary.BigEndian.Uint32(msg[len(msg)-4:])
t.Logf("[%s] %d", topic, seqNo)

require.Equal(t, height, blk.Height())

switch topic {
case TopicRawTransaction:
rawTx := msg[2 : len(msg)-8]

txn, err := tx.FromBytes(rawTx)

require.NoError(t, err)
require.NotNil(t, txn)
require.Equal(t, TopicRawTransaction, topic)
require.NotEqual(t, 0, txn.SerializeSize())
case TopicTransactionInfo:
txHash := msg[2:34]
id, err := hash.FromBytes(txHash)

require.NoError(t, err)
require.NotNil(t, id)
require.Equal(t, TopicTransactionInfo, topic)

case TopicRawBlock:
blockHeader := msg[2 : len(msg)-8]
buf := bytes.NewBuffer(blockHeader)
header := new(block.Header)

require.NoError(t, header.Decode(buf))
require.NotNil(t, header)
require.Equal(t, TopicRawBlock, topic)
require.Equal(t, header.PrevBlockHash(), blk.Header().PrevBlockHash())
require.Equal(t, header.StateRoot(), blk.Header().StateRoot())
case TopicBlockInfo:
proposerBytes := msg[2:23]
timestamp := binary.BigEndian.Uint32(msg[23:27])
txCount := binary.BigEndian.Uint16(msg[27:29])

require.Equal(t, TopicBlockInfo, topic)
require.Equal(t, blk.Header().ProposerAddress().Bytes(), proposerBytes)
require.Equal(t, blk.Header().UnixTime(), timestamp)
require.Equal(t, uint16(len(blk.Transactions())), txCount)
}
}

require.NoError(t, sub.Close())
}
2 changes: 1 addition & 1 deletion www/zmq/publisher_tx_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (t *txInfoPub) onNewBlock(blk *block.Block) {
continue
}

t.logger.Debug("zmq published message success",
t.logger.Debug("ZMQ published the message successfully",
"publisher", t.TopicName(),
"block_height", blk.Height(),
"tx_hash", txn.ID().String(),
Expand Down
8 changes: 8 additions & 0 deletions www/zmq/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,11 @@ func (t Topic) Bytes() []byte {

return b
}

func TopicFromBytes(b []byte) Topic {
if len(b) < 2 {
return 0
}

return Topic(binary.BigEndian.Uint16(b))
}
18 changes: 18 additions & 0 deletions www/zmq/topic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package zmq

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestTopicFromBytes(t *testing.T) {
validRawTopic := TopicRawTransaction.Bytes()
invalidRawTopic := make([]byte, 0)

topic := TopicFromBytes(validRawTopic)
require.Equal(t, TopicRawTransaction, topic)

topic = TopicFromBytes(invalidRawTopic)
require.Equal(t, 0, int(topic))
}
Loading