Files
prysm/beacon-chain/sync/rpc_data_column_sidecars_by_root.go
Manu NALEPA 00cf1f2507 Implement PeerDAS sync (#15564)
* PeerDAS: Implement sync

* Fix Potuz's comment.

* Fix Potuz's comment.

* Fix Potuz's comment.

* Fix Satyajit's comment.

* Partially fix Potuz's comment.

* Fix Potuz's comment.

* Fix Potuz's comment.

* Fix Potuz's comment.

* Fix Potuz's comment.

* Fix Potuz's comment.

* Fix Potuz's comment.

* Fix Potuz's comment.

* Add tests for `sendDataColumnSidecarsRequest`.

* Fix Satyajit's comment.

* Implement `TestSendDataColumnSidecarsRequest`.

* Implement `TestFetchDataColumnSidecarsFromPeers`.

* Implement `TestUpdateResults`.

* Implement `TestSelectPeers`.

* Implement `TestCategorizeIndices`.

* Fix James' comment.

* Fix James's comment.

* Fix James' commit.

* Fix James' comment.

* Fix James' comment.

* Fix flakiness in `TestSelectPeers`.

* Update cmd/beacon-chain/flags/config.go

Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>

* Fix Preston's comment.

* Fix James's comment.

* Implement `TestFetchDataColumnSidecars`.

* Revert "Fix Potuz's comment."

This reverts commit c45230b455.

* Fix Potuz's comment.

* Revert "Fix James' comment."

This reverts commit a3f919205a.

* Fix James' comment.

* Fix Preston's comment.

* Fix James' comment.

* `selectPeers`: Avoid map with key but empty value.

* Fix typo.

* Fix Potuz's comment.

* Fix Potuz's comment.

* Fix James' comment.

* Add DataColumnStorage and SubscribeAllDataSubnets flag.

* Add extra flags

* Fix Potuz's and Preston's comment.

* Add rate limiter check.

---------

Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>
Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
2025-08-18 14:36:07 +00:00

184 lines
5.9 KiB
Go

package sync
import (
"context"
"fmt"
"math"
"slices"
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
"github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
"github.com/OffchainLabs/prysm/v6/monitoring/tracing"
"github.com/OffchainLabs/prysm/v6/monitoring/tracing/trace"
"github.com/OffchainLabs/prysm/v6/time/slots"
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
var (
notDataColumnsByRootIdentifiersError = errors.New("not data columns by root identifiers")
tickerDelay = time.Second
)
// dataColumnSidecarByRootRPCHandler handles the data column sidecars by root RPC request.
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#datacolumnsidecarsbyroot-v1
func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error {
ctx, span := trace.StartSpan(ctx, "sync.dataColumnSidecarByRootRPCHandler")
defer span.End()
batchSize := flags.Get().DataColumnBatchLimit
numberOfColumns := params.BeaconConfig().NumberOfColumns
// Check if the message type is the one expected.
ref, ok := msg.(types.DataColumnsByRootIdentifiers)
if !ok {
return notDataColumnsByRootIdentifiersError
}
requestedColumnIdents := ref
remotePeer := stream.Conn().RemotePeer()
ctx, cancel := context.WithTimeout(ctx, ttfbTimeout)
defer cancel()
SetRPCStreamDeadlines(stream)
// Penalize peers that send invalid requests.
if err := validateDataColumnsByRootRequest(requestedColumnIdents); err != nil {
s.downscorePeer(remotePeer, "dataColumnSidecarByRootRPCHandlerValidationError")
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
return errors.Wrap(err, "validate data columns by root request")
}
requestedColumnsByRoot := make(map[[fieldparams.RootLength]byte][]uint64)
for _, columnIdent := range requestedColumnIdents {
var root [fieldparams.RootLength]byte
copy(root[:], columnIdent.BlockRoot)
requestedColumnsByRoot[root] = append(requestedColumnsByRoot[root], columnIdent.Columns...)
}
// Sort by column index for each root.
for _, columns := range requestedColumnsByRoot {
slices.Sort(columns)
}
// Format nice logs.
requestedColumnsByRootLog := make(map[string]interface{})
for root, columns := range requestedColumnsByRoot {
rootStr := fmt.Sprintf("%#x", root)
requestedColumnsByRootLog[rootStr] = "all"
if uint64(len(columns)) != numberOfColumns {
requestedColumnsByRootLog[rootStr] = columns
}
}
// Compute the oldest slot we'll allow a peer to request, based on the current slot.
minReqSlot, err := dataColumnsRPCMinValidSlot(s.cfg.clock.CurrentSlot())
if err != nil {
return errors.Wrapf(err, "data columns RPC min valid slot")
}
log := log.WithFields(logrus.Fields{
"peer": remotePeer,
"columns": requestedColumnsByRootLog,
})
defer closeStream(stream, log)
var ticker *time.Ticker
if len(requestedColumnIdents) > batchSize {
ticker = time.NewTicker(tickerDelay)
}
log.Debug("Serving data column sidecar by root request")
count := 0
for _, ident := range requestedColumnIdents {
if err := ctx.Err(); err != nil {
closeStream(stream, log)
return errors.Wrap(err, "context error")
}
root := bytesutil.ToBytes32(ident.BlockRoot)
columns := ident.Columns
// Throttle request processing to no more than batchSize/sec.
for range columns {
if ticker != nil && count != 0 && count%batchSize == 0 {
<-ticker.C
}
count++
}
s.rateLimiter.add(stream, int64(len(columns)))
// Retrieve the requested sidecars from the store.
verifiedRODataColumns, err := s.cfg.dataColumnStorage.Get(root, columns)
if err != nil {
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
return errors.Wrap(err, "get data column sidecars")
}
for _, verifiedRODataColumn := range verifiedRODataColumns {
// Filter out data column sidecars that are too old.
if verifiedRODataColumn.SignedBlockHeader.Header.Slot < minReqSlot {
continue
}
SetStreamWriteDeadline(stream, defaultWriteDuration)
if chunkErr := WriteDataColumnSidecarChunk(stream, s.cfg.clock, s.cfg.p2p.Encoding(), verifiedRODataColumn.DataColumnSidecar); chunkErr != nil {
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
tracing.AnnotateError(span, chunkErr)
return chunkErr
}
}
}
return nil
}
// validateDataColumnsByRootRequest checks if the request for data column sidecars is valid.
func validateDataColumnsByRootRequest(colIdents types.DataColumnsByRootIdentifiers) error {
total := uint64(0)
for _, id := range colIdents {
total += uint64(len(id.Columns))
}
if total > params.BeaconConfig().MaxRequestDataColumnSidecars {
return types.ErrMaxDataColumnReqExceeded
}
return nil
}
// dataColumnsRPCMinValidSlot returns the minimum slot that a peer can request data column sidecars for.
func dataColumnsRPCMinValidSlot(currentSlot primitives.Slot) (primitives.Slot, error) {
// Avoid overflow if we're running on a config where fulu is set to far future epoch.
if !params.FuluEnabled() {
return primitives.Slot(math.MaxUint64), nil
}
beaconConfig := params.BeaconConfig()
minReqEpochs := beaconConfig.MinEpochsForDataColumnSidecarsRequest
minStartEpoch := beaconConfig.FuluForkEpoch
currEpoch := slots.ToEpoch(currentSlot)
if currEpoch > minReqEpochs && currEpoch-minReqEpochs > minStartEpoch {
minStartEpoch = currEpoch - minReqEpochs
}
epochStart, err := slots.EpochStart(minStartEpoch)
if err != nil {
return 0, errors.Wrapf(err, "epoch start for epoch %d", minStartEpoch)
}
return epochStart, nil
}