Skip to content

Commit

Permalink
Handle Message Limits
Browse files Browse the repository at this point in the history
  • Loading branch information
nisdas committed Jan 15, 2025
1 parent 34ff4c3 commit 3b35284
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 41 deletions.
34 changes: 24 additions & 10 deletions beacon-chain/p2p/encoder/ssz.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ import (

var _ NetworkEncoding = (*SszNetworkEncoder)(nil)

// MaxGossipSize allowed for gossip messages.
var MaxGossipSize = params.BeaconConfig().GossipMaxSize // 10 Mib.
var MaxChunkSize = params.BeaconConfig().MaxChunkSize // 10 Mib.
// MaxPayloadSize allowed for gossip and rpc messages.
var MaxPayloadSize = params.BeaconConfig().MaxPayloadSize // 10 Mib.

// This pool defines the sync pool for our buffered snappy writers, so that they
// can be constantly reused.
Expand All @@ -43,8 +42,8 @@ func (_ SszNetworkEncoder) EncodeGossip(w io.Writer, msg fastssz.Marshaler) (int
if err != nil {
return 0, err
}
if uint64(len(b)) > MaxGossipSize {
return 0, errors.Errorf("gossip message exceeds max gossip size: %d bytes > %d bytes", len(b), MaxGossipSize)
if uint64(len(b)) > MaxPayloadSize {
return 0, errors.Errorf("gossip message exceeds max gossip size: %d bytes > %d bytes", len(b), MaxPayloadSize)
}
b = snappy.Encode(nil /*dst*/, b)
return w.Write(b)
Expand All @@ -60,11 +59,11 @@ func (_ SszNetworkEncoder) EncodeWithMaxLength(w io.Writer, msg fastssz.Marshale
if err != nil {
return 0, err
}
if uint64(len(b)) > MaxChunkSize {
if uint64(len(b)) > MaxPayloadSize {
return 0, fmt.Errorf(
"size of encoded message is %d which is larger than the provided max limit of %d",
len(b),
MaxChunkSize,
MaxPayloadSize,
)
}
// write varint first
Expand All @@ -81,7 +80,10 @@ func doDecode(b []byte, to fastssz.Unmarshaler) error {

// DecodeGossip decodes the bytes to the protobuf gossip message provided.
func (_ SszNetworkEncoder) DecodeGossip(b []byte, to fastssz.Unmarshaler) error {
b, err := DecodeSnappy(b, MaxGossipSize)
if uint64(len(b)) > MaxCompressedLen(MaxPayloadSize) {
return fmt.Errorf("gossip message exceeds maximum compressed limit: %d", MaxCompressedLen(MaxPayloadSize))
}
b, err := DecodeSnappy(b, MaxPayloadSize)
if err != nil {
return err
}
Expand Down Expand Up @@ -111,11 +113,11 @@ func (e SszNetworkEncoder) DecodeWithMaxLength(r io.Reader, to fastssz.Unmarshal
if err != nil {
return err
}
if msgLen > MaxChunkSize {
if msgLen > MaxPayloadSize {
return fmt.Errorf(
"remaining bytes %d goes over the provided max limit of %d",
msgLen,
MaxChunkSize,
MaxPayloadSize,
)
}
msgMax, err := e.MaxLength(msgLen)
Expand Down Expand Up @@ -156,6 +158,18 @@ func (_ SszNetworkEncoder) MaxLength(length uint64) (int, error) {
return maxLen, nil
}

// MaxCompressedLen returns the maximum compressed size for a given payload size.
//
// Spec pseudocode definition:
// def max_compressed_len(n: uint64) -> uint64:
//
// # Worst-case compressed length for a given payload of size n when using snappy:
// # https://github.com/google/snappy/blob/32ded457c0b1fe78ceb8397632c416568d6714a0/snappy.cc#L218C1-L218C47
// return uint64(32 + n + n / 6)
func MaxCompressedLen(n uint64) uint64 {
return 32 + n + n/6
}

// Writes a bytes value through a snappy buffered writer.
func writeSnappyBuffer(w io.Writer, b []byte) (int, error) {
bufWriter := newBufferedWriter(w)
Expand Down
35 changes: 20 additions & 15 deletions beacon-chain/p2p/encoder/ssz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,11 +555,19 @@ func TestSszNetworkEncoder_FailsSnappyLength(t *testing.T) {
e := &encoder.SszNetworkEncoder{}
att := &ethpb.Fork{}
data := make([]byte, 32)
binary.PutUvarint(data, encoder.MaxGossipSize+32)
binary.PutUvarint(data, encoder.MaxPayloadSize+1)
err := e.DecodeGossip(data, att)
require.ErrorContains(t, "snappy message exceeds max size", err)
}

func TestSszNetworkEncoder_ExceedsMaxCompressedLimit(t *testing.T) {
e := &encoder.SszNetworkEncoder{}
att := &ethpb.Fork{}
data := make([]byte, encoder.MaxCompressedLen(encoder.MaxPayloadSize)+1)
err := e.DecodeGossip(data, att)
require.ErrorContains(t, "gossip message exceeds maximum compressed limit", err)
}

func testRoundTripWithLength(t *testing.T, e *encoder.SszNetworkEncoder) {
buf := new(bytes.Buffer)
msg := &ethpb.Fork{
Expand Down Expand Up @@ -604,31 +612,28 @@ func TestSszNetworkEncoder_EncodeWithMaxLength(t *testing.T) {
e := &encoder.SszNetworkEncoder{}
params.SetupTestConfigCleanup(t)
c := params.BeaconNetworkConfig()
encoder.MaxChunkSize = uint64(5)
encoder.MaxPayloadSize = uint64(5)
params.OverrideBeaconNetworkConfig(c)
_, err := e.EncodeWithMaxLength(buf, msg)
wanted := fmt.Sprintf("which is larger than the provided max limit of %d", encoder.MaxChunkSize)
wanted := fmt.Sprintf("which is larger than the provided max limit of %d", encoder.MaxPayloadSize)
assert.ErrorContains(t, wanted, err)
}

func TestSszNetworkEncoder_DecodeWithMaxLength(t *testing.T) {
buf := new(bytes.Buffer)
msg := &ethpb.Fork{
PreviousVersion: []byte("fooo"),
CurrentVersion: []byte("barr"),
Epoch: 4242,
}
e := &encoder.SszNetworkEncoder{}
params.SetupTestConfigCleanup(t)
c := params.BeaconNetworkConfig()
maxChunkSize := uint64(5)
encoder.MaxChunkSize = maxChunkSize
params.OverrideBeaconNetworkConfig(c)
_, err := e.EncodeGossip(buf, msg)
maxPayloadSize := uint64(5)
encoder.MaxPayloadSize = maxPayloadSize
_, err := buf.Write(gogo.EncodeVarint(maxPayloadSize + 1))
require.NoError(t, err)
_, err = buf.Write(make([]byte, maxPayloadSize+1))
require.NoError(t, err)
params.OverrideBeaconNetworkConfig(c)
decoded := &ethpb.Fork{}
err = e.DecodeWithMaxLength(buf, decoded)
wanted := fmt.Sprintf("goes over the provided max limit of %d", maxChunkSize)
wanted := fmt.Sprintf("goes over the provided max limit of %d", maxPayloadSize)
assert.ErrorContains(t, wanted, err)
}

Expand All @@ -639,8 +644,8 @@ func TestSszNetworkEncoder_DecodeWithMultipleFrames(t *testing.T) {
params.SetupTestConfigCleanup(t)
c := params.BeaconNetworkConfig()
// 4 * 1 Mib
maxChunkSize := uint64(1 << 22)
encoder.MaxChunkSize = maxChunkSize
maxPayloadSize := uint64(1 << 22)
encoder.MaxPayloadSize = maxPayloadSize
params.OverrideBeaconNetworkConfig(c)
_, err := e.EncodeWithMaxLength(buf, st.ToProtoUnsafe().(*ethpb.BeaconState))
require.NoError(t, err)
Expand Down
9 changes: 4 additions & 5 deletions beacon-chain/p2p/message_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func MsgID(genesisValidatorsRoot []byte, pmsg *pubsubpb.Message) string {
if fEpoch >= params.BeaconConfig().AltairForkEpoch {
return postAltairMsgID(pmsg, fEpoch)
}
decodedData, err := encoder.DecodeSnappy(pmsg.Data, params.BeaconConfig().GossipMaxSize)
decodedData, err := encoder.DecodeSnappy(pmsg.Data, params.BeaconConfig().MaxPayloadSize)
if err != nil {
combinedData := append(params.BeaconConfig().MessageDomainInvalidSnappy[:], pmsg.Data...)
h := hash.Hash(combinedData)
Expand All @@ -77,10 +77,9 @@ func postAltairMsgID(pmsg *pubsubpb.Message, fEpoch primitives.Epoch) string {
topicLen := len(topic)
topicLenBytes := bytesutil.Uint64ToBytesLittleEndian(uint64(topicLen)) // topicLen cannot be negative

// beyond Bellatrix epoch, allow 10 Mib gossip data size
gossipPubSubSize := params.BeaconConfig().GossipMaxSize
gossipPayloadSize := params.BeaconConfig().MaxPayloadSize

decodedData, err := encoder.DecodeSnappy(pmsg.Data, gossipPubSubSize)
decodedData, err := encoder.DecodeSnappy(pmsg.Data, gossipPayloadSize)
if err != nil {
totalLength, err := math.AddInt(
len(params.BeaconConfig().MessageDomainInvalidSnappy),
Expand All @@ -95,7 +94,7 @@ func postAltairMsgID(pmsg *pubsubpb.Message, fEpoch primitives.Epoch) string {
copy(msg, "invalid")
return bytesutil.UnsafeCastToString(msg)
}
if uint64(totalLength) > gossipPubSubSize {
if uint64(totalLength) > gossipPayloadSize {
// this should never happen
msg := make([]byte, 20)
copy(msg, "invalid")
Expand Down
15 changes: 14 additions & 1 deletion beacon-chain/p2p/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
mathutil "github.com/prysmaticlabs/prysm/v5/math"
pbrpc "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
)

Expand Down Expand Up @@ -141,7 +143,7 @@ func (s *Service) pubsubOptions() []pubsub.Option {
}),
pubsub.WithSubscriptionFilter(s),
pubsub.WithPeerOutboundQueueSize(int(s.cfg.QueueSize)),
pubsub.WithMaxMessageSize(int(params.BeaconConfig().GossipMaxSize)),
pubsub.WithMaxMessageSize(int(MaxMessageSize())), // lint:ignore uintcast -- Max Message Size is a config value and is naturally bounded by networking limitations.
pubsub.WithValidateQueueSize(int(s.cfg.QueueSize)),
pubsub.WithPeerScore(peerScoringParams()),
pubsub.WithPeerScoreInspect(s.peerInspector, time.Minute),
Expand Down Expand Up @@ -235,3 +237,14 @@ func ExtractGossipDigest(topic string) ([4]byte, error) {
}
return bytesutil.ToBytes4(digest), nil
}

// MaxMessageSize returns the maximum allowed compressed message size.
//
// Spec pseudocode definition:
// def max_message_size() -> uint64:
//
// # Allow 1024 bytes for framing and encoding overhead but at least 1MiB in case MAX_PAYLOAD_SIZE is small.
// return max(max_compressed_len(MAX_PAYLOAD_SIZE) + 1024, 1024 * 1024)
func MaxMessageSize() uint64 {
return mathutil.Max(encoder.MaxCompressedLen(params.BeaconConfig().MaxPayloadSize)+1024, 1024*1024)
}
4 changes: 1 addition & 3 deletions beacon-chain/rpc/eth/config/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,9 +472,7 @@ func TestGetSpec(t *testing.T) {
assert.Equal(t, "5", v)
case "MIN_EPOCHS_FOR_BLOCK_REQUESTS":
assert.Equal(t, "33024", v)
case "GOSSIP_MAX_SIZE":
assert.Equal(t, "10485760", v)
case "MAX_CHUNK_SIZE":
case "MAX_PAYLOAD_SIZE":
assert.Equal(t, "10485760", v)
case "ATTESTATION_SUBNET_COUNT":
assert.Equal(t, "64", v)
Expand Down
3 changes: 1 addition & 2 deletions config/params/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,7 @@ type BeaconChainConfig struct {
NumberOfColumns uint64 `yaml:"NUMBER_OF_COLUMNS" spec:"true"` // NumberOfColumns in the extended data matrix.

// Networking Specific Parameters
GossipMaxSize uint64 `yaml:"GOSSIP_MAX_SIZE" spec:"true"` // GossipMaxSize is the maximum allowed size of uncompressed gossip messages.
MaxChunkSize uint64 `yaml:"MAX_CHUNK_SIZE" spec:"true"` // MaxChunkSize is the maximum allowed size of uncompressed req/resp chunked responses.
MaxPayloadSize uint64 `yaml:"MAX_PAYLOAD_SIZE" spec:"true"` // MAX_PAYLOAD_SIZE is the maximum allowed size of uncompressed payload in gossip messages and rpc chunks.
AttestationSubnetCount uint64 `yaml:"ATTESTATION_SUBNET_COUNT" spec:"true"` // AttestationSubnetCount is the number of attestation subnets used in the gossipsub protocol.
AttestationPropagationSlotRange primitives.Slot `yaml:"ATTESTATION_PROPAGATION_SLOT_RANGE" spec:"true"` // AttestationPropagationSlotRange is the maximum number of slots during which an attestation can be propagated.
MaxRequestBlocks uint64 `yaml:"MAX_REQUEST_BLOCKS" spec:"true"` // MaxRequestBlocks is the maximum number of blocks in a single request.
Expand Down
3 changes: 1 addition & 2 deletions config/params/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,7 @@ func ConfigToYaml(cfg *BeaconChainConfig) []byte {
fmt.Sprintf("ATTESTATION_SUBNET_PREFIX_BITS: %d", cfg.AttestationSubnetPrefixBits),
fmt.Sprintf("SUBNETS_PER_NODE: %d", cfg.SubnetsPerNode),
fmt.Sprintf("NODE_ID_BITS: %d", cfg.NodeIdBits),
fmt.Sprintf("GOSSIP_MAX_SIZE: %d", cfg.GossipMaxSize),
fmt.Sprintf("MAX_CHUNK_SIZE: %d", cfg.MaxChunkSize),
fmt.Sprintf("MAX_PAYLOAD_SIZE: %d", cfg.MaxPayloadSize),
fmt.Sprintf("ATTESTATION_SUBNET_COUNT: %d", cfg.AttestationSubnetCount),
fmt.Sprintf("ATTESTATION_PROPAGATION_SLOT_RANGE: %d", cfg.AttestationPropagationSlotRange),
fmt.Sprintf("MAX_REQUEST_BLOCKS: %d", cfg.MaxRequestBlocks),
Expand Down
1 change: 0 additions & 1 deletion config/params/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ var placeholderFields = []string{
"MAX_BLOB_COMMITMENTS_PER_BLOCK", // Compile time constant on BeaconBlockBodyDeneb.blob_kzg_commitments.
"MAX_BYTES_PER_TRANSACTION", // Used for ssz of EL transactions. Unused in Prysm.
"MAX_EXTRA_DATA_BYTES", // Compile time constant on ExecutionPayload.extra_data.
"MAX_PAYLOAD_SIZE",
"MAX_REQUEST_BLOB_SIDECARS_FULU",
"MAX_REQUEST_PAYLOADS", // Compile time constant on BeaconBlockBody.ExecutionRequests
"MAX_TRANSACTIONS_PER_PAYLOAD", // Compile time constant on ExecutionPayload.transactions.
Expand Down
3 changes: 1 addition & 2 deletions config/params/mainnet_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,7 @@ var mainnetBeaconConfig = &BeaconChainConfig{
MinEpochsForDataColumnSidecarsRequest: 4096,

// Values related to networking parameters.
GossipMaxSize: 10 * 1 << 20, // 10 MiB
MaxChunkSize: 10 * 1 << 20, // 10 MiB
MaxPayloadSize: 10 * 1 << 20, // 10 MiB
AttestationSubnetCount: 64,
AttestationPropagationSlotRange: 32,
MaxRequestBlocks: 1 << 10, // 1024
Expand Down

0 comments on commit 3b35284

Please sign in to comment.