Handle Networking Message Limits Better (#14799)

* Handle Message Limits

* Changelog

* Ignore them for now

* Sort fields

* Update to Latest Versions
This commit is contained in:
Nishant Das
2025-03-17 22:15:20 +08:00
committed by GitHub
parent 5c24978702
commit 629568c796
12 changed files with 79 additions and 52 deletions

View File

@@ -365,9 +365,9 @@ filegroup(
visibility = ["//visibility:public"],
)
""",
integrity = "sha256-b7ZTT+olF+VXEJYNTV5jggNtCkt9dOejm1i2VE+zy+0=",
strip_prefix = "holesky-874c199423ccd180607320c38cbaca05d9a1573a",
url = "https://github.com/eth-clients/holesky/archive/874c199423ccd180607320c38cbaca05d9a1573a.tar.gz", # 2024-06-18
integrity = "sha256-YVFFrCmjoGZ3fXMWpsCpSsYbANy1grnqYwOLKIg2SsA=",
strip_prefix = "holesky-32a72e21c6e53c262f27d50dd540cb654517d03a",
url = "https://github.com/eth-clients/holesky/archive/32a72e21c6e53c262f27d50dd540cb654517d03a.tar.gz", # 2025-03-17
)
http_archive(
@@ -381,9 +381,9 @@ filegroup(
visibility = ["//visibility:public"],
)
""",
integrity = "sha256-cY/UgpCcYEhQf7JefD65FI8tn/A+rAvKhcm2/qiVdqY=",
strip_prefix = "sepolia-f2c219a93c4491cee3d90c18f2f8e82aed850eab",
url = "https://github.com/eth-clients/sepolia/archive/f2c219a93c4491cee3d90c18f2f8e82aed850eab.tar.gz", # 2024-09-19
integrity = "sha256-b5F7Wg9LLMqGRIpP2uqb/YsSFVn2ynzlV7g/Nb1EFLk=",
strip_prefix = "sepolia-562d9938f08675e9ba490a1dfba21fb05843f39f",
url = "https://github.com/eth-clients/sepolia/archive/562d9938f08675e9ba490a1dfba21fb05843f39f.tar.gz", # 2025-03-17
)
http_archive(
@@ -397,9 +397,9 @@ filegroup(
visibility = ["//visibility:public"],
)
""",
integrity = "sha256-o51qGunwjDib2kxxdQfjbyHGdpTVQCd4ff2KE30IToc=",
strip_prefix = "hoodi-db5bfa8c65caeef23afa225797167e956477a3ee",
url = "https://github.com/eth-clients/hoodi/archive/db5bfa8c65caeef23afa225797167e956477a3ee.tar.gz",
integrity = "sha256-dPiEWUd8QvbYGwGtIm0QtCekitVLOLsW5rpQIGzz8PU=",
strip_prefix = "hoodi-828c2c940e1141092bd4bb979cef547ea926d272",
url = "https://github.com/eth-clients/hoodi/archive/828c2c940e1141092bd4bb979cef547ea926d272.tar.gz",
)
http_archive(

View File

@@ -15,9 +15,8 @@ import (
var _ NetworkEncoding = (*SszNetworkEncoder)(nil)
// MaxGossipSize allowed for gossip messages.
var MaxGossipSize = params.BeaconConfig().GossipMaxSize // 10 Mib.
var MaxChunkSize = params.BeaconConfig().MaxChunkSize // 10 Mib.
// MaxPayloadSize allowed for gossip and rpc messages.
var MaxPayloadSize = params.BeaconConfig().MaxPayloadSize // 10 Mib.
// This pool defines the sync pool for our buffered snappy writers, so that they
// can be constantly reused.
@@ -43,8 +42,8 @@ func (_ SszNetworkEncoder) EncodeGossip(w io.Writer, msg fastssz.Marshaler) (int
if err != nil {
return 0, err
}
if uint64(len(b)) > MaxGossipSize {
return 0, errors.Errorf("gossip message exceeds max gossip size: %d bytes > %d bytes", len(b), MaxGossipSize)
if uint64(len(b)) > MaxPayloadSize {
return 0, errors.Errorf("gossip message exceeds max gossip size: %d bytes > %d bytes", len(b), MaxPayloadSize)
}
b = snappy.Encode(nil /*dst*/, b)
return w.Write(b)
@@ -60,11 +59,11 @@ func (_ SszNetworkEncoder) EncodeWithMaxLength(w io.Writer, msg fastssz.Marshale
if err != nil {
return 0, err
}
if uint64(len(b)) > MaxChunkSize {
if uint64(len(b)) > MaxPayloadSize {
return 0, fmt.Errorf(
"size of encoded message is %d which is larger than the provided max limit of %d",
len(b),
MaxChunkSize,
MaxPayloadSize,
)
}
// write varint first
@@ -81,7 +80,10 @@ func doDecode(b []byte, to fastssz.Unmarshaler) error {
// DecodeGossip decodes the bytes to the protobuf gossip message provided.
func (_ SszNetworkEncoder) DecodeGossip(b []byte, to fastssz.Unmarshaler) error {
b, err := DecodeSnappy(b, MaxGossipSize)
if uint64(len(b)) > MaxCompressedLen(MaxPayloadSize) {
return fmt.Errorf("gossip message exceeds maximum compressed limit: %d", MaxCompressedLen(MaxPayloadSize))
}
b, err := DecodeSnappy(b, MaxPayloadSize)
if err != nil {
return err
}
@@ -111,11 +113,11 @@ func (e SszNetworkEncoder) DecodeWithMaxLength(r io.Reader, to fastssz.Unmarshal
if err != nil {
return err
}
if msgLen > MaxChunkSize {
if msgLen > MaxPayloadSize {
return fmt.Errorf(
"remaining bytes %d goes over the provided max limit of %d",
msgLen,
MaxChunkSize,
MaxPayloadSize,
)
}
msgMax, err := e.MaxLength(msgLen)
@@ -156,6 +158,18 @@ func (_ SszNetworkEncoder) MaxLength(length uint64) (int, error) {
return maxLen, nil
}
// MaxCompressedLen returns the maximum compressed size for a given payload size.
//
// Spec pseudocode definition:
// def max_compressed_len(n: uint64) -> uint64:
//
// # Worst-case compressed length for a given payload of size n when using snappy:
// # https://github.com/google/snappy/blob/32ded457c0b1fe78ceb8397632c416568d6714a0/snappy.cc#L218C1-L218C47
// return uint64(32 + n + n / 6)
func MaxCompressedLen(n uint64) uint64 {
return 32 + n + n/6
}
// Writes a bytes value through a snappy buffered writer.
func writeSnappyBuffer(w io.Writer, b []byte) (int, error) {
bufWriter := newBufferedWriter(w)

View File

@@ -555,11 +555,19 @@ func TestSszNetworkEncoder_FailsSnappyLength(t *testing.T) {
e := &encoder.SszNetworkEncoder{}
att := &ethpb.Fork{}
data := make([]byte, 32)
binary.PutUvarint(data, encoder.MaxGossipSize+32)
binary.PutUvarint(data, encoder.MaxPayloadSize+1)
err := e.DecodeGossip(data, att)
require.ErrorContains(t, "snappy message exceeds max size", err)
}
func TestSszNetworkEncoder_ExceedsMaxCompressedLimit(t *testing.T) {
e := &encoder.SszNetworkEncoder{}
att := &ethpb.Fork{}
data := make([]byte, encoder.MaxCompressedLen(encoder.MaxPayloadSize)+1)
err := e.DecodeGossip(data, att)
require.ErrorContains(t, "gossip message exceeds maximum compressed limit", err)
}
func testRoundTripWithLength(t *testing.T, e *encoder.SszNetworkEncoder) {
buf := new(bytes.Buffer)
msg := &ethpb.Fork{
@@ -604,31 +612,28 @@ func TestSszNetworkEncoder_EncodeWithMaxLength(t *testing.T) {
e := &encoder.SszNetworkEncoder{}
params.SetupTestConfigCleanup(t)
c := params.BeaconNetworkConfig()
encoder.MaxChunkSize = uint64(5)
encoder.MaxPayloadSize = uint64(5)
params.OverrideBeaconNetworkConfig(c)
_, err := e.EncodeWithMaxLength(buf, msg)
wanted := fmt.Sprintf("which is larger than the provided max limit of %d", encoder.MaxChunkSize)
wanted := fmt.Sprintf("which is larger than the provided max limit of %d", encoder.MaxPayloadSize)
assert.ErrorContains(t, wanted, err)
}
func TestSszNetworkEncoder_DecodeWithMaxLength(t *testing.T) {
buf := new(bytes.Buffer)
msg := &ethpb.Fork{
PreviousVersion: []byte("fooo"),
CurrentVersion: []byte("barr"),
Epoch: 4242,
}
e := &encoder.SszNetworkEncoder{}
params.SetupTestConfigCleanup(t)
c := params.BeaconNetworkConfig()
maxChunkSize := uint64(5)
encoder.MaxChunkSize = maxChunkSize
params.OverrideBeaconNetworkConfig(c)
_, err := e.EncodeGossip(buf, msg)
maxPayloadSize := uint64(5)
encoder.MaxPayloadSize = maxPayloadSize
_, err := buf.Write(gogo.EncodeVarint(maxPayloadSize + 1))
require.NoError(t, err)
_, err = buf.Write(make([]byte, maxPayloadSize+1))
require.NoError(t, err)
params.OverrideBeaconNetworkConfig(c)
decoded := &ethpb.Fork{}
err = e.DecodeWithMaxLength(buf, decoded)
wanted := fmt.Sprintf("goes over the provided max limit of %d", maxChunkSize)
wanted := fmt.Sprintf("goes over the provided max limit of %d", maxPayloadSize)
assert.ErrorContains(t, wanted, err)
}
@@ -639,8 +644,8 @@ func TestSszNetworkEncoder_DecodeWithMultipleFrames(t *testing.T) {
params.SetupTestConfigCleanup(t)
c := params.BeaconNetworkConfig()
// 4 * 1 Mib
maxChunkSize := uint64(1 << 22)
encoder.MaxChunkSize = maxChunkSize
maxPayloadSize := uint64(1 << 22)
encoder.MaxPayloadSize = maxPayloadSize
params.OverrideBeaconNetworkConfig(c)
_, err := e.EncodeWithMaxLength(buf, st.ToProtoUnsafe().(*ethpb.BeaconState))
require.NoError(t, err)

View File

@@ -50,7 +50,7 @@ func MsgID(genesisValidatorsRoot []byte, pmsg *pubsubpb.Message) string {
if fEpoch >= params.BeaconConfig().AltairForkEpoch {
return postAltairMsgID(pmsg, fEpoch)
}
decodedData, err := encoder.DecodeSnappy(pmsg.Data, params.BeaconConfig().GossipMaxSize)
decodedData, err := encoder.DecodeSnappy(pmsg.Data, params.BeaconConfig().MaxPayloadSize)
if err != nil {
combinedData := append(params.BeaconConfig().MessageDomainInvalidSnappy[:], pmsg.Data...)
h := hash.Hash(combinedData)
@@ -77,10 +77,9 @@ func postAltairMsgID(pmsg *pubsubpb.Message, fEpoch primitives.Epoch) string {
topicLen := len(topic)
topicLenBytes := bytesutil.Uint64ToBytesLittleEndian(uint64(topicLen)) // topicLen cannot be negative
// beyond Bellatrix epoch, allow 10 Mib gossip data size
gossipPubSubSize := params.BeaconConfig().GossipMaxSize
gossipPayloadSize := params.BeaconConfig().MaxPayloadSize
decodedData, err := encoder.DecodeSnappy(pmsg.Data, gossipPubSubSize)
decodedData, err := encoder.DecodeSnappy(pmsg.Data, gossipPayloadSize)
if err != nil {
totalLength, err := math.AddInt(
len(params.BeaconConfig().MessageDomainInvalidSnappy),
@@ -95,7 +94,7 @@ func postAltairMsgID(pmsg *pubsubpb.Message, fEpoch primitives.Epoch) string {
copy(msg, "invalid")
return bytesutil.UnsafeCastToString(msg)
}
if uint64(totalLength) > gossipPubSubSize {
if uint64(totalLength) > gossipPayloadSize {
// this should never happen
msg := make([]byte, 20)
copy(msg, "invalid")

View File

@@ -11,9 +11,11 @@ import (
pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
mathutil "github.com/prysmaticlabs/prysm/v5/math"
pbrpc "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
)
@@ -141,7 +143,7 @@ func (s *Service) pubsubOptions() []pubsub.Option {
}),
pubsub.WithSubscriptionFilter(s),
pubsub.WithPeerOutboundQueueSize(int(s.cfg.QueueSize)),
pubsub.WithMaxMessageSize(int(params.BeaconConfig().GossipMaxSize)),
pubsub.WithMaxMessageSize(int(MaxMessageSize())), // lint:ignore uintcast -- Max Message Size is a config value and is naturally bounded by networking limitations.
pubsub.WithValidateQueueSize(int(s.cfg.QueueSize)),
pubsub.WithPeerScore(peerScoringParams()),
pubsub.WithPeerScoreInspect(s.peerInspector, time.Minute),
@@ -235,3 +237,14 @@ func ExtractGossipDigest(topic string) ([4]byte, error) {
}
return bytesutil.ToBytes4(digest), nil
}
// MaxMessageSize returns the maximum allowed compressed message size.
//
// Spec pseudocode definition:
// def max_message_size() -> uint64:
//
// # Allow 1024 bytes for framing and encoding overhead but at least 1MiB in case MAX_PAYLOAD_SIZE is small.
// return max(max_compressed_len(MAX_PAYLOAD_SIZE) + 1024, 1024 * 1024)
func MaxMessageSize() uint64 {
return mathutil.Max(encoder.MaxCompressedLen(params.BeaconConfig().MaxPayloadSize)+1024, 1024*1024)
}

View File

@@ -200,7 +200,7 @@ func TestGetSpec(t *testing.T) {
data, ok := resp.Data.(map[string]interface{})
require.Equal(t, true, ok)
assert.Equal(t, 170, len(data))
assert.Equal(t, 169, len(data))
for k, v := range data {
t.Run(k, func(t *testing.T) {
switch k {
@@ -481,9 +481,7 @@ func TestGetSpec(t *testing.T) {
assert.Equal(t, "5", v)
case "MIN_EPOCHS_FOR_BLOCK_REQUESTS":
assert.Equal(t, "33024", v)
case "GOSSIP_MAX_SIZE":
assert.Equal(t, "10485760", v)
case "MAX_CHUNK_SIZE":
case "MAX_PAYLOAD_SIZE":
assert.Equal(t, "10485760", v)
case "ATTESTATION_SUBNET_COUNT":
assert.Equal(t, "64", v)

View File

@@ -0,0 +1,3 @@
### Changed
- Update Gossip and RPC message limits to comply with the spec.

View File

@@ -275,8 +275,7 @@ type BeaconChainConfig struct {
DataColumnSidecarSubnetCount uint64 `yaml:"DATA_COLUMN_SIDECAR_SUBNET_COUNT" spec:"true"` // DataColumnSidecarSubnetCount is the number of data column sidecar subnets used in the gossipsub protocol
// Networking Specific Parameters
GossipMaxSize uint64 `yaml:"GOSSIP_MAX_SIZE" spec:"true"` // GossipMaxSize is the maximum allowed size of uncompressed gossip messages.
MaxChunkSize uint64 `yaml:"MAX_CHUNK_SIZE" spec:"true"` // MaxChunkSize is the maximum allowed size of uncompressed req/resp chunked responses.
MaxPayloadSize uint64 `yaml:"MAX_PAYLOAD_SIZE" spec:"true"` // MAX_PAYLOAD_SIZE is the maximum allowed size of uncompressed payload in gossip messages and rpc chunks.
AttestationSubnetCount uint64 `yaml:"ATTESTATION_SUBNET_COUNT" spec:"true"` // AttestationSubnetCount is the number of attestation subnets used in the gossipsub protocol.
AttestationPropagationSlotRange primitives.Slot `yaml:"ATTESTATION_PROPAGATION_SLOT_RANGE" spec:"true"` // AttestationPropagationSlotRange is the maximum number of slots during which an attestation can be propagated.
MaxRequestBlocks uint64 `yaml:"MAX_REQUEST_BLOCKS" spec:"true"` // MaxRequestBlocks is the maximum number of blocks in a single request.

View File

@@ -228,8 +228,7 @@ func ConfigToYaml(cfg *BeaconChainConfig) []byte {
fmt.Sprintf("ATTESTATION_SUBNET_PREFIX_BITS: %d", cfg.AttestationSubnetPrefixBits),
fmt.Sprintf("SUBNETS_PER_NODE: %d", cfg.SubnetsPerNode),
fmt.Sprintf("NODE_ID_BITS: %d", cfg.NodeIdBits),
fmt.Sprintf("GOSSIP_MAX_SIZE: %d", cfg.GossipMaxSize),
fmt.Sprintf("MAX_CHUNK_SIZE: %d", cfg.MaxChunkSize),
fmt.Sprintf("MAX_PAYLOAD_SIZE: %d", cfg.MaxPayloadSize),
fmt.Sprintf("ATTESTATION_SUBNET_COUNT: %d", cfg.AttestationSubnetCount),
fmt.Sprintf("ATTESTATION_PROPAGATION_SLOT_RANGE: %d", cfg.AttestationPropagationSlotRange),
fmt.Sprintf("MAX_REQUEST_BLOCKS: %d", cfg.MaxRequestBlocks),

View File

@@ -37,7 +37,6 @@ var placeholderFields = []string{
"EIP7732_FORK_VERSION",
"EPOCHS_PER_SHUFFLING_PHASE",
"MAX_BLOBS_PER_BLOCK_FULU",
"MAX_PAYLOAD_SIZE",
"MAX_REQUEST_BLOB_SIDECARS_FULU",
"MAX_REQUEST_PAYLOADS", // Compile time constant on BeaconBlockBody.ExecutionRequests
"NUMBER_OF_CUSTODY_GROUPS",

View File

@@ -315,8 +315,7 @@ var mainnetBeaconConfig = &BeaconChainConfig{
MinEpochsForDataColumnSidecarsRequest: 4096,
// Values related to networking parameters.
GossipMaxSize: 10 * 1 << 20, // 10 MiB
MaxChunkSize: 10 * 1 << 20, // 10 MiB
MaxPayloadSize: 10 * 1 << 20, // 10 MiB
AttestationSubnetCount: 64,
AttestationPropagationSlotRange: 32,
MaxRequestBlocks: 1 << 10, // 1024

View File

@@ -49,7 +49,6 @@ func HoodiConfig() *BeaconChainConfig {
cfg.FuluForkVersion = []byte{0x70, 0x00, 0x09, 0x10}
cfg.TerminalTotalDifficulty = "0"
cfg.DepositContractAddress = "0x00000000219ab540356cBB839Cbe05303d7705Fa"
cfg.MinGenesisActiveValidatorCount = 1000000
cfg.InitializeForkSchedule()
return cfg
}