mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 15:37:56 -05:00
Ensures the rate limitation is respected for by root blob and data column sidecars requests. (#15981)
* Set default value of `--blob-batch-limit` to 384. So, using default values, `--blob-batch-limit * --blob-batch-limit-burst-factor = 384*3 = MAX_REQUEST_BLOB_SIDECARS = 1152.` * `blobSidecarByRootRPCHandler`: Add rate limiting. Bacause now the rate limiter validation is done before the request validation, adapt `TestBlobsByRootValidation` consequently and add new specific tests for `validateBlobByRootRequest` to cover the now untested case. * Set default value of `--data-column-batch-limit-burst-factor` to 4. So, using default values, `--data-column-batch-limit * --data-column-batch-limit-burst-factor = 4096*2 = MAX_REQUEST_DATA_COLUMN_SIDECARS_ELECTRA = 16384`. * `validateDataColumnsByRootRequest`: Take a count instead of idents. * `dataColumnSidecarByRootRPCHandler`: Add rate limiting.
This commit is contained in:
@@ -37,6 +37,11 @@ func (s *Service) blobSidecarByRootRPCHandler(ctx context.Context, msg interface
|
||||
}
|
||||
|
||||
blobIdents := *ref
|
||||
|
||||
if err := s.rateLimiter.validateRequest(stream, uint64(len(blobIdents))); err != nil {
|
||||
return errors.Wrap(err, "rate limiter validate request")
|
||||
}
|
||||
|
||||
cs := s.cfg.clock.CurrentSlot()
|
||||
remotePeer := stream.Conn().RemotePeer()
|
||||
if err := validateBlobByRootRequest(blobIdents, cs); err != nil {
|
||||
@@ -44,6 +49,7 @@ func (s *Service) blobSidecarByRootRPCHandler(ctx context.Context, msg interface
|
||||
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
|
||||
return err
|
||||
}
|
||||
|
||||
// Sort the identifiers so that requests for the same blob root will be adjacent, minimizing db lookups.
|
||||
sort.Sort(blobIdents)
|
||||
|
||||
|
||||
@@ -238,7 +238,7 @@ func TestBlobsByRootValidation(t *testing.T) {
|
||||
{
|
||||
name: "exceeds req max",
|
||||
nblocks: int(params.BeaconConfig().MaxRequestBlobSidecars) + 1,
|
||||
err: p2pTypes.ErrMaxBlobReqExceeded,
|
||||
err: p2pTypes.ErrRateLimited,
|
||||
},
|
||||
}
|
||||
for _, c := range cases {
|
||||
@@ -270,3 +270,64 @@ func TestBlobsByRootOK(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateBlobByRootRequest(t *testing.T) {
|
||||
params.SetupTestConfigCleanup(t)
|
||||
cfg := params.BeaconConfig()
|
||||
|
||||
// Helper function to create blob identifiers
|
||||
createBlobIdents := func(count int) p2pTypes.BlobSidecarsByRootReq {
|
||||
idents := make([]*ethpb.BlobIdentifier, count)
|
||||
for i := 0; i < count; i++ {
|
||||
idents[i] = ðpb.BlobIdentifier{
|
||||
BlockRoot: make([]byte, 32),
|
||||
Index: uint64(i),
|
||||
}
|
||||
}
|
||||
return idents
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
blobIdents p2pTypes.BlobSidecarsByRootReq
|
||||
slot types.Slot
|
||||
expectedErr error
|
||||
}{
|
||||
{
|
||||
name: "pre-Electra: at max limit",
|
||||
blobIdents: createBlobIdents(int(cfg.MaxRequestBlobSidecars)),
|
||||
slot: util.SlotAtEpoch(t, cfg.ElectraForkEpoch-1),
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "pre-Electra: exceeds max limit by 1",
|
||||
blobIdents: createBlobIdents(int(cfg.MaxRequestBlobSidecars) + 1),
|
||||
slot: util.SlotAtEpoch(t, cfg.ElectraForkEpoch-1),
|
||||
expectedErr: p2pTypes.ErrMaxBlobReqExceeded,
|
||||
},
|
||||
{
|
||||
name: "Electra: at max limit",
|
||||
blobIdents: createBlobIdents(int(cfg.MaxRequestBlobSidecarsElectra)),
|
||||
slot: util.SlotAtEpoch(t, cfg.ElectraForkEpoch),
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "Electra: exceeds Electra max limit by 1",
|
||||
blobIdents: createBlobIdents(int(cfg.MaxRequestBlobSidecarsElectra) + 1),
|
||||
slot: util.SlotAtEpoch(t, cfg.ElectraForkEpoch),
|
||||
expectedErr: p2pTypes.ErrMaxBlobReqExceeded,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
err := validateBlobByRootRequest(tt.blobIdents, tt.slot)
|
||||
if tt.expectedErr != nil {
|
||||
require.ErrorIs(t, err, tt.expectedErr)
|
||||
return
|
||||
}
|
||||
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,8 +49,18 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
|
||||
|
||||
SetRPCStreamDeadlines(stream)
|
||||
|
||||
// Count the total number of requested data column sidecars.
|
||||
totalRequested := 0
|
||||
for _, ident := range requestedColumnIdents {
|
||||
totalRequested += len(ident.Columns)
|
||||
}
|
||||
|
||||
if err := s.rateLimiter.validateRequest(stream, uint64(totalRequested)); err != nil {
|
||||
return errors.Wrap(err, "rate limiter validate request")
|
||||
}
|
||||
|
||||
// Penalize peers that send invalid requests.
|
||||
if err := validateDataColumnsByRootRequest(requestedColumnIdents); err != nil {
|
||||
if err := validateDataColumnsByRootRequest(totalRequested); err != nil {
|
||||
s.downscorePeer(remotePeer, "dataColumnSidecarByRootRPCHandlerValidationError")
|
||||
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
|
||||
return errors.Wrap(err, "validate data columns by root request")
|
||||
@@ -154,13 +164,8 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
|
||||
}
|
||||
|
||||
// validateDataColumnsByRootRequest checks if the request for data column sidecars is valid.
|
||||
func validateDataColumnsByRootRequest(colIdents types.DataColumnsByRootIdentifiers) error {
|
||||
total := uint64(0)
|
||||
for _, id := range colIdents {
|
||||
total += uint64(len(id.Columns))
|
||||
}
|
||||
|
||||
if total > params.BeaconConfig().MaxRequestDataColumnSidecars {
|
||||
func validateDataColumnsByRootRequest(count int) error {
|
||||
if uint64(count) > params.BeaconConfig().MaxRequestDataColumnSidecars {
|
||||
return types.ErrMaxDataColumnReqExceeded
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"math"
|
||||
"sync"
|
||||
@@ -12,10 +11,10 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
|
||||
testDB "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder"
|
||||
p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
|
||||
"github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags"
|
||||
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
|
||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
|
||||
@@ -36,7 +35,10 @@ func TestDataColumnSidecarsByRootRPCHandler(t *testing.T) {
|
||||
params.BeaconConfig().InitializeForkSchedule()
|
||||
ctxMap, err := ContextByteVersionsForValRoot(params.BeaconConfig().GenesisValidatorsRoot)
|
||||
require.NoError(t, err)
|
||||
ctx := context.Background()
|
||||
ctx := t.Context()
|
||||
|
||||
protocolID := protocol.ID(p2p.RPCDataColumnSidecarsByRootTopicV1) + "/" + encoder.ProtocolSuffixSSZSnappy
|
||||
|
||||
t.Run("wrong message type", func(t *testing.T) {
|
||||
service := &Service{}
|
||||
err := service.dataColumnSidecarByRootRPCHandler(t.Context(), nil, nil)
|
||||
@@ -50,9 +52,7 @@ func TestDataColumnSidecarsByRootRPCHandler(t *testing.T) {
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
|
||||
localP2P := p2ptest.NewTestP2P(t)
|
||||
service := &Service{cfg: &config{p2p: localP2P}}
|
||||
|
||||
protocolID := protocol.ID(p2p.RPCDataColumnSidecarsByRootTopicV1)
|
||||
service := &Service{cfg: &config{p2p: localP2P}, rateLimiter: newRateLimiter(localP2P)}
|
||||
remoteP2P := p2ptest.NewTestP2P(t)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
@@ -83,12 +83,6 @@ func TestDataColumnSidecarsByRootRPCHandler(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("nominal", func(t *testing.T) {
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.DataColumnBatchLimit = 2
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
// Setting the ticker to 0 will cause the ticker to panic.
|
||||
// Setting it to the minimum value instead.
|
||||
refTickerDelay := tickerDelay
|
||||
@@ -151,7 +145,6 @@ func TestDataColumnSidecarsByRootRPCHandler(t *testing.T) {
|
||||
rateLimiter: newRateLimiter(localP2P),
|
||||
}
|
||||
|
||||
protocolID := protocol.ID(p2p.RPCDataColumnSidecarsByRootTopicV1)
|
||||
remoteP2P := p2ptest.NewTestP2P(t)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
@@ -226,68 +219,22 @@ func TestDataColumnSidecarsByRootRPCHandler(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestValidateDataColumnsByRootRequest(t *testing.T) {
|
||||
const max = 10
|
||||
|
||||
params.SetupTestConfigCleanup(t)
|
||||
config := params.BeaconConfig()
|
||||
maxCols := uint64(10) // Set a small value for testing
|
||||
config.MaxRequestDataColumnSidecars = maxCols
|
||||
params.OverrideBeaconConfig(config)
|
||||
cfg := params.BeaconConfig()
|
||||
cfg.MaxRequestDataColumnSidecars = max
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
colIdents types.DataColumnsByRootIdentifiers
|
||||
expectedErr error
|
||||
}{
|
||||
{
|
||||
name: "Invalid request - multiple identifiers exceed max",
|
||||
colIdents: types.DataColumnsByRootIdentifiers{
|
||||
{
|
||||
BlockRoot: make([]byte, fieldparams.RootLength),
|
||||
Columns: make([]uint64, maxCols/2+1),
|
||||
},
|
||||
{
|
||||
BlockRoot: make([]byte, fieldparams.RootLength),
|
||||
Columns: make([]uint64, maxCols/2+1),
|
||||
},
|
||||
},
|
||||
expectedErr: types.ErrMaxDataColumnReqExceeded,
|
||||
},
|
||||
{
|
||||
name: "Valid request - less than max",
|
||||
colIdents: types.DataColumnsByRootIdentifiers{
|
||||
{
|
||||
BlockRoot: make([]byte, fieldparams.RootLength),
|
||||
Columns: make([]uint64, maxCols-1),
|
||||
},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "Valid request - multiple identifiers sum to max",
|
||||
colIdents: types.DataColumnsByRootIdentifiers{
|
||||
{
|
||||
BlockRoot: make([]byte, fieldparams.RootLength),
|
||||
Columns: make([]uint64, maxCols/2),
|
||||
},
|
||||
{
|
||||
BlockRoot: make([]byte, fieldparams.RootLength),
|
||||
Columns: make([]uint64, maxCols/2),
|
||||
},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
}
|
||||
t.Run("invalid", func(t *testing.T) {
|
||||
err := validateDataColumnsByRootRequest(max + 1)
|
||||
require.ErrorIs(t, err, types.ErrMaxDataColumnReqExceeded)
|
||||
})
|
||||
|
||||
// Run tests
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
err := validateDataColumnsByRootRequest(tt.colIdents)
|
||||
if tt.expectedErr == nil {
|
||||
require.NoError(t, err)
|
||||
} else {
|
||||
require.ErrorIs(t, err, tt.expectedErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
t.Run("valid", func(t *testing.T) {
|
||||
err := validateDataColumnsByRootRequest(max)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestDataColumnsRPCMinValidSlot(t *testing.T) {
|
||||
|
||||
@@ -20,7 +20,7 @@ func TestMain(m *testing.M) {
|
||||
BlobBatchLimit: 32,
|
||||
BlobBatchLimitBurstFactor: 2,
|
||||
DataColumnBatchLimit: 4096,
|
||||
DataColumnBatchLimitBurstFactor: 2,
|
||||
DataColumnBatchLimitBurstFactor: 4,
|
||||
})
|
||||
defer func() {
|
||||
flags.Init(resetFlags)
|
||||
|
||||
Reference in New Issue
Block a user