diff --git a/beacon-chain/sync/rpc_blob_sidecars_by_root.go b/beacon-chain/sync/rpc_blob_sidecars_by_root.go index 63b31f113c..cbb75cacc0 100644 --- a/beacon-chain/sync/rpc_blob_sidecars_by_root.go +++ b/beacon-chain/sync/rpc_blob_sidecars_by_root.go @@ -37,6 +37,11 @@ func (s *Service) blobSidecarByRootRPCHandler(ctx context.Context, msg interface } blobIdents := *ref + + if err := s.rateLimiter.validateRequest(stream, uint64(len(blobIdents))); err != nil { + return errors.Wrap(err, "rate limiter validate request") + } + cs := s.cfg.clock.CurrentSlot() remotePeer := stream.Conn().RemotePeer() if err := validateBlobByRootRequest(blobIdents, cs); err != nil { @@ -44,6 +49,7 @@ func (s *Service) blobSidecarByRootRPCHandler(ctx context.Context, msg interface s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream) return err } + // Sort the identifiers so that requests for the same blob root will be adjacent, minimizing db lookups. sort.Sort(blobIdents) diff --git a/beacon-chain/sync/rpc_blob_sidecars_by_root_test.go b/beacon-chain/sync/rpc_blob_sidecars_by_root_test.go index e011455e0f..5fc6660cc7 100644 --- a/beacon-chain/sync/rpc_blob_sidecars_by_root_test.go +++ b/beacon-chain/sync/rpc_blob_sidecars_by_root_test.go @@ -238,7 +238,7 @@ func TestBlobsByRootValidation(t *testing.T) { { name: "exceeds req max", nblocks: int(params.BeaconConfig().MaxRequestBlobSidecars) + 1, - err: p2pTypes.ErrMaxBlobReqExceeded, + err: p2pTypes.ErrRateLimited, }, } for _, c := range cases { @@ -270,3 +270,64 @@ func TestBlobsByRootOK(t *testing.T) { }) } } + +func TestValidateBlobByRootRequest(t *testing.T) { + params.SetupTestConfigCleanup(t) + cfg := params.BeaconConfig() + + // Helper function to create blob identifiers + createBlobIdents := func(count int) p2pTypes.BlobSidecarsByRootReq { + idents := make([]*ethpb.BlobIdentifier, count) + for i := 0; i < count; i++ { + idents[i] = ðpb.BlobIdentifier{ + BlockRoot: make([]byte, 32), + Index: uint64(i), + } + } + return idents + } + + tests := []struct { + name string + blobIdents p2pTypes.BlobSidecarsByRootReq + slot types.Slot + expectedErr error + }{ + { + name: "pre-Electra: at max limit", + blobIdents: createBlobIdents(int(cfg.MaxRequestBlobSidecars)), + slot: util.SlotAtEpoch(t, cfg.ElectraForkEpoch-1), + expectedErr: nil, + }, + { + name: "pre-Electra: exceeds max limit by 1", + blobIdents: createBlobIdents(int(cfg.MaxRequestBlobSidecars) + 1), + slot: util.SlotAtEpoch(t, cfg.ElectraForkEpoch-1), + expectedErr: p2pTypes.ErrMaxBlobReqExceeded, + }, + { + name: "Electra: at max limit", + blobIdents: createBlobIdents(int(cfg.MaxRequestBlobSidecarsElectra)), + slot: util.SlotAtEpoch(t, cfg.ElectraForkEpoch), + expectedErr: nil, + }, + { + name: "Electra: exceeds Electra max limit by 1", + blobIdents: createBlobIdents(int(cfg.MaxRequestBlobSidecarsElectra) + 1), + slot: util.SlotAtEpoch(t, cfg.ElectraForkEpoch), + expectedErr: p2pTypes.ErrMaxBlobReqExceeded, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := validateBlobByRootRequest(tt.blobIdents, tt.slot) + if tt.expectedErr != nil { + require.ErrorIs(t, err, tt.expectedErr) + return + } + + require.NoError(t, err) + }) + } +} diff --git a/beacon-chain/sync/rpc_data_column_sidecars_by_root.go b/beacon-chain/sync/rpc_data_column_sidecars_by_root.go index 2c0f86ac20..983000ea3c 100644 --- a/beacon-chain/sync/rpc_data_column_sidecars_by_root.go +++ b/beacon-chain/sync/rpc_data_column_sidecars_by_root.go @@ -49,8 +49,18 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int SetRPCStreamDeadlines(stream) + // Count the total number of requested data column sidecars. + totalRequested := 0 + for _, ident := range requestedColumnIdents { + totalRequested += len(ident.Columns) + } + + if err := s.rateLimiter.validateRequest(stream, uint64(totalRequested)); err != nil { + return errors.Wrap(err, "rate limiter validate request") + } + // Penalize peers that send invalid requests. - if err := validateDataColumnsByRootRequest(requestedColumnIdents); err != nil { + if err := validateDataColumnsByRootRequest(totalRequested); err != nil { s.downscorePeer(remotePeer, "dataColumnSidecarByRootRPCHandlerValidationError") s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream) return errors.Wrap(err, "validate data columns by root request") @@ -154,13 +164,8 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int } // validateDataColumnsByRootRequest checks if the request for data column sidecars is valid. -func validateDataColumnsByRootRequest(colIdents types.DataColumnsByRootIdentifiers) error { - total := uint64(0) - for _, id := range colIdents { - total += uint64(len(id.Columns)) - } - - if total > params.BeaconConfig().MaxRequestDataColumnSidecars { +func validateDataColumnsByRootRequest(count int) error { + if uint64(count) > params.BeaconConfig().MaxRequestDataColumnSidecars { return types.ErrMaxDataColumnReqExceeded } diff --git a/beacon-chain/sync/rpc_data_column_sidecars_by_root_test.go b/beacon-chain/sync/rpc_data_column_sidecars_by_root_test.go index a0ba567985..a4ccae99a9 100644 --- a/beacon-chain/sync/rpc_data_column_sidecars_by_root_test.go +++ b/beacon-chain/sync/rpc_data_column_sidecars_by_root_test.go @@ -1,7 +1,6 @@ package sync import ( - "context" "io" "math" "sync" @@ -12,10 +11,10 @@ import ( "github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem" testDB "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p" + "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder" p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types" "github.com/OffchainLabs/prysm/v6/beacon-chain/startup" - "github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags" fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/consensus-types/blocks" @@ -36,7 +35,10 @@ func TestDataColumnSidecarsByRootRPCHandler(t *testing.T) { params.BeaconConfig().InitializeForkSchedule() ctxMap, err := ContextByteVersionsForValRoot(params.BeaconConfig().GenesisValidatorsRoot) require.NoError(t, err) - ctx := context.Background() + ctx := t.Context() + + protocolID := protocol.ID(p2p.RPCDataColumnSidecarsByRootTopicV1) + "/" + encoder.ProtocolSuffixSSZSnappy + t.Run("wrong message type", func(t *testing.T) { service := &Service{} err := service.dataColumnSidecarByRootRPCHandler(t.Context(), nil, nil) @@ -50,9 +52,7 @@ func TestDataColumnSidecarsByRootRPCHandler(t *testing.T) { params.OverrideBeaconConfig(cfg) localP2P := p2ptest.NewTestP2P(t) - service := &Service{cfg: &config{p2p: localP2P}} - - protocolID := protocol.ID(p2p.RPCDataColumnSidecarsByRootTopicV1) + service := &Service{cfg: &config{p2p: localP2P}, rateLimiter: newRateLimiter(localP2P)} remoteP2P := p2ptest.NewTestP2P(t) var wg sync.WaitGroup @@ -83,12 +83,6 @@ func TestDataColumnSidecarsByRootRPCHandler(t *testing.T) { }) t.Run("nominal", func(t *testing.T) { - resetFlags := flags.Get() - gFlags := new(flags.GlobalFlags) - gFlags.DataColumnBatchLimit = 2 - flags.Init(gFlags) - defer flags.Init(resetFlags) - // Setting the ticker to 0 will cause the ticker to panic. // Setting it to the minimum value instead. refTickerDelay := tickerDelay @@ -151,7 +145,6 @@ func TestDataColumnSidecarsByRootRPCHandler(t *testing.T) { rateLimiter: newRateLimiter(localP2P), } - protocolID := protocol.ID(p2p.RPCDataColumnSidecarsByRootTopicV1) remoteP2P := p2ptest.NewTestP2P(t) var wg sync.WaitGroup @@ -226,68 +219,22 @@ func TestDataColumnSidecarsByRootRPCHandler(t *testing.T) { } func TestValidateDataColumnsByRootRequest(t *testing.T) { + const max = 10 + params.SetupTestConfigCleanup(t) - config := params.BeaconConfig() - maxCols := uint64(10) // Set a small value for testing - config.MaxRequestDataColumnSidecars = maxCols - params.OverrideBeaconConfig(config) + cfg := params.BeaconConfig() + cfg.MaxRequestDataColumnSidecars = max + params.OverrideBeaconConfig(cfg) - tests := []struct { - name string - colIdents types.DataColumnsByRootIdentifiers - expectedErr error - }{ - { - name: "Invalid request - multiple identifiers exceed max", - colIdents: types.DataColumnsByRootIdentifiers{ - { - BlockRoot: make([]byte, fieldparams.RootLength), - Columns: make([]uint64, maxCols/2+1), - }, - { - BlockRoot: make([]byte, fieldparams.RootLength), - Columns: make([]uint64, maxCols/2+1), - }, - }, - expectedErr: types.ErrMaxDataColumnReqExceeded, - }, - { - name: "Valid request - less than max", - colIdents: types.DataColumnsByRootIdentifiers{ - { - BlockRoot: make([]byte, fieldparams.RootLength), - Columns: make([]uint64, maxCols-1), - }, - }, - expectedErr: nil, - }, - { - name: "Valid request - multiple identifiers sum to max", - colIdents: types.DataColumnsByRootIdentifiers{ - { - BlockRoot: make([]byte, fieldparams.RootLength), - Columns: make([]uint64, maxCols/2), - }, - { - BlockRoot: make([]byte, fieldparams.RootLength), - Columns: make([]uint64, maxCols/2), - }, - }, - expectedErr: nil, - }, - } + t.Run("invalid", func(t *testing.T) { + err := validateDataColumnsByRootRequest(max + 1) + require.ErrorIs(t, err, types.ErrMaxDataColumnReqExceeded) + }) - // Run tests - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - err := validateDataColumnsByRootRequest(tt.colIdents) - if tt.expectedErr == nil { - require.NoError(t, err) - } else { - require.ErrorIs(t, err, tt.expectedErr) - } - }) - } + t.Run("valid", func(t *testing.T) { + err := validateDataColumnsByRootRequest(max) + require.NoError(t, err) + }) } func TestDataColumnsRPCMinValidSlot(t *testing.T) { diff --git a/beacon-chain/sync/sync_test.go b/beacon-chain/sync/sync_test.go index 9615255ae4..dc3c222e2e 100644 --- a/beacon-chain/sync/sync_test.go +++ b/beacon-chain/sync/sync_test.go @@ -20,7 +20,7 @@ func TestMain(m *testing.M) { BlobBatchLimit: 32, BlobBatchLimitBurstFactor: 2, DataColumnBatchLimit: 4096, - DataColumnBatchLimitBurstFactor: 2, + DataColumnBatchLimitBurstFactor: 4, }) defer func() { flags.Init(resetFlags) diff --git a/changelog/manu-rate-limit.md b/changelog/manu-rate-limit.md new file mode 100644 index 0000000000..b2e16c5cc5 --- /dev/null +++ b/changelog/manu-rate-limit.md @@ -0,0 +1,2 @@ +### Fixed +- Ensures the rate limitation is respected for by root blob and data column sidecars requests. \ No newline at end of file diff --git a/cmd/beacon-chain/flags/base.go b/cmd/beacon-chain/flags/base.go index a338572bd1..681fec23fe 100644 --- a/cmd/beacon-chain/flags/base.go +++ b/cmd/beacon-chain/flags/base.go @@ -204,7 +204,7 @@ var ( BlobBatchLimit = &cli.IntFlag{ Name: "blob-batch-limit", Usage: "The amount of blobs the local peer is bounded to request and respond to in a batch.", - Value: 192, + Value: 384, } // BlobBatchLimitBurstFactor specifies the factor by which blob batch size may increase. BlobBatchLimitBurstFactor = &cli.IntFlag{ @@ -222,7 +222,7 @@ var ( DataColumnBatchLimitBurstFactor = &cli.IntFlag{ Name: "data-column-batch-limit-burst-factor", Usage: "The factor by which data column batch limit may increase on burst.", - Value: 2, + Value: 4, } // DisableDebugRPCEndpoints disables the debug Beacon API namespace. DisableDebugRPCEndpoints = &cli.BoolFlag{