PeerDAS: Implement reconstruction. (#14036)

* Wrap errors, add logs.

* `missingColumnRequest`: Fix blobs <-> data columns mix.

* `ColumnIndices`: Return `map[uint64]bool` instead of `[fieldparams.NumberOfColumns]bool`.

* `DataColumnSidecars`: `interfaces.SignedBeaconBlock` ==> `interfaces.ReadOnlySignedBeaconBlock`.

We don't need any of the non read-only methods.

* Fix comments.

* `handleUnblidedBlock` ==> `handleUnblindedBlock`.

* `SaveDataColumn`: Move log from debug to trace.

If we attempt to save an already existing data column sidecar,
a debug log was printed.

This case could be quite common now with the data column reconstruction enabled.

* `sampling_data_columns.go` --> `data_columns_sampling.go`.

* Reconstruct data columns.
This commit is contained in:
Manu NALEPA
2024-05-29 10:03:21 +02:00
parent a2cf84dc30
commit 55c2e0b6a9
19 changed files with 687 additions and 307 deletions

View File

@@ -170,7 +170,7 @@ func RecoverMatrix(cellFromCoordinate map[cellCoordinate]cKzg4844.Cell, blobCoun
// DataColumnSidecars computes the data column sidecars from the signed block and blobs.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/_features/eip7594/das-core.md#recover_matrix
func DataColumnSidecars(signedBlock interfaces.SignedBeaconBlock, blobs []cKzg4844.Blob) ([]*ethpb.DataColumnSidecar, error) {
func DataColumnSidecars(signedBlock interfaces.ReadOnlySignedBeaconBlock, blobs []cKzg4844.Blob) ([]*ethpb.DataColumnSidecar, error) {
blobsCount := len(blobs)
if blobsCount == 0 {
return nil, nil

View File

@@ -230,10 +230,12 @@ func (bs *BlobStorage) SaveDataColumn(column blocks.VerifiedRODataColumn) error
if err != nil {
return err
}
if exists {
log.Debug("Ignoring a duplicate data column sidecar save attempt")
log.Trace("Ignoring a duplicate data column sidecar save attempt")
return nil
}
if bs.pruner != nil {
hRoot, err := column.SignedBlockHeader.Header.HashTreeRoot()
if err != nil {
@@ -399,38 +401,58 @@ func (bs *BlobStorage) Indices(root [32]byte) ([fieldparams.MaxBlobsPerBlock]boo
}
// ColumnIndices retrieve the stored column indexes from our filesystem.
func (bs *BlobStorage) ColumnIndices(root [32]byte) ([fieldparams.NumberOfColumns]bool, error) {
var mask [fieldparams.NumberOfColumns]bool
func (bs *BlobStorage) ColumnIndices(root [32]byte) (map[uint64]bool, error) {
custody := make(map[uint64]bool, fieldparams.NumberOfColumns)
// Get all the files in the directory.
rootDir := blobNamer{root: root}.dir()
entries, err := afero.ReadDir(bs.fs, rootDir)
if err != nil {
// If the directory does not exist, we do not custody any columns.
if os.IsNotExist(err) {
return mask, nil
return nil, nil
}
return mask, err
return nil, errors.Wrap(err, "read directory")
}
for i := range entries {
if entries[i].IsDir() {
// Iterate over all the entries in the directory.
for _, entry := range entries {
// If the entry is a directory, skip it.
if entry.IsDir() {
continue
}
name := entries[i].Name()
// If the entry does not have the correct extension, skip it.
name := entry.Name()
if !strings.HasSuffix(name, sszExt) {
continue
}
// The file should be in the `<index>.<extension>` format.
// Skip the file if it does not match the format.
parts := strings.Split(name, ".")
if len(parts) != 2 {
continue
}
u, err := strconv.ParseUint(parts[0], 10, 64)
// Get the column index from the file name.
columnIndexStr := parts[0]
columnIndex, err := strconv.ParseUint(columnIndexStr, 10, 64)
if err != nil {
return mask, errors.Wrapf(err, "unexpected directory entry breaks listing, %s", parts[0])
return nil, errors.Wrapf(err, "unexpected directory entry breaks listing, %s", parts[0])
}
if u >= fieldparams.NumberOfColumns {
return mask, errors.Wrapf(errIndexOutOfBounds, "invalid index %d", u)
// If the column index is out of bounds, return an error.
if columnIndex >= fieldparams.NumberOfColumns {
return nil, errors.Wrapf(errIndexOutOfBounds, "invalid index %d", columnIndex)
}
mask[u] = true
// Mark the column index as in custody.
custody[columnIndex] = true
}
return mask, nil
return custody, nil
}
// Clear deletes all files on the filesystem.

View File

@@ -23,10 +23,10 @@ import (
bolt "go.etcd.io/bbolt"
)
// used to represent errors for inconsistent slot ranges.
// Used to represent errors for inconsistent slot ranges.
var errInvalidSlotRange = errors.New("invalid end slot and start slot provided")
// Block retrieval by root.
// Block retrieval by root. Return nil if block is not found.
func (s *Store) Block(ctx context.Context, blockRoot [32]byte) (interfaces.ReadOnlySignedBeaconBlock, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.Block")
defer span.End()

View File

@@ -7,6 +7,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/sirupsen/logrus"
)
func (s *Service) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) {
@@ -20,17 +21,15 @@ func (s *Service) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) {
}
var validPeers []peer.ID
for _, pid := range peers {
remoteCount, err := s.CustodyCountFromRemotePeer(pid)
if err != nil {
return nil, err
}
remoteCount := s.CustodyCountFromRemotePeer(pid)
nodeId, err := ConvertPeerIDToNodeID(pid)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "convert peer ID to node ID")
}
remoteCustodiedColumns, err := peerdas.CustodyColumns(nodeId, remoteCount)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "custody columns")
}
invalidPeer := false
for c := range custodiedColumns {
@@ -49,24 +48,36 @@ func (s *Service) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) {
return validPeers, nil
}
func (s *Service) CustodyCountFromRemotePeer(pid peer.ID) (uint64, error) {
func (s *Service) CustodyCountFromRemotePeer(pid peer.ID) uint64 {
// By default, we assume the peer custodies the minimum number of subnets.
peerCustodyCountCount := params.BeaconConfig().CustodyRequirement
// Retrieve the ENR of the peer.
peerRecord, err := s.peers.ENR(pid)
if err != nil {
return 0, errors.Wrap(err, "ENR")
log.WithError(err).WithField("peerID", pid).Error("Failed to retrieve ENR for peer")
return peerCustodyCountCount
}
peerCustodiedSubnetCount := params.BeaconConfig().CustodyRequirement
if peerRecord != nil {
// Load the `custody_subnet_count`
custodyBytes := make([]byte, 8)
custodyObj := CustodySubnetCount(custodyBytes)
if err := peerRecord.Load(&custodyObj); err != nil {
return 0, errors.Wrap(err, "load custody_subnet_count")
}
actualCustodyCount := ssz.UnmarshallUint64(custodyObj)
if actualCustodyCount > peerCustodiedSubnetCount {
peerCustodiedSubnetCount = actualCustodyCount
}
if peerRecord == nil {
// This is the case for inbound peers. So we don't log an error for this.
log.WithField("peerID", pid).Debug("No ENR found for peer")
return peerCustodyCountCount
}
return peerCustodiedSubnetCount, nil
// Load the `custody_subnet_count`
custodyObj := CustodySubnetCount(make([]byte, 8))
if err := peerRecord.Load(&custodyObj); err != nil {
log.WithField("peerID", pid).Error("Cannot load the custody_subnet_count from peer")
return peerCustodyCountCount
}
// Unmarshal the custody count from the peer's ENR.
peerCustodyCountFromRecord := ssz.UnmarshallUint64(custodyObj)
log.WithFields(logrus.Fields{
"peerID": pid,
"custodyCount": peerCustodyCountFromRecord,
}).Debug("Custody count read from peer's ENR")
return peerCustodyCountFromRecord
}

View File

@@ -113,6 +113,6 @@ type MetadataProvider interface {
}
type CustodyHandler interface {
CustodyCountFromRemotePeer(peer.ID) (uint64, error)
CustodyCountFromRemotePeer(peer.ID) uint64
GetValidCustodyPeers([]peer.ID) ([]peer.ID, error)
}

View File

@@ -184,8 +184,8 @@ func (_ *FakeP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.Di
return true, 0
}
func (_ *FakeP2P) CustodyCountFromRemotePeer(peer.ID) (uint64, error) {
return 0, nil
func (_ *FakeP2P) CustodyCountFromRemotePeer(peer.ID) uint64 {
return 0
}
func (_ *FakeP2P) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) {

View File

@@ -427,8 +427,8 @@ func (_ *TestP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.Di
return true, 0
}
func (_ *TestP2P) CustodyCountFromRemotePeer(peer.ID) (uint64, error) {
return 0, nil
func (_ *TestP2P) CustodyCountFromRemotePeer(peer.ID) uint64 {
return 0
}
func (_ *TestP2P) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) {

View File

@@ -7,6 +7,8 @@ go_library(
"block_batcher.go",
"broadcast_bls_changes.go",
"context.go",
"data_columns_reconstruct.go",
"data_columns_sampling.go",
"deadlines.go",
"decode_pubsub.go",
"doc.go",
@@ -32,7 +34,6 @@ go_library(
"rpc_ping.go",
"rpc_send_request.go",
"rpc_status.go",
"sampling_data_columns.go",
"service.go",
"subscriber.go",
"subscriber_beacon_aggregate_proof.go",
@@ -129,6 +130,7 @@ go_library(
"//time:go_default_library",
"//time/slots:go_default_library",
"@com_github_btcsuite_btcd_btcec_v2//:go_default_library",
"@com_github_ethereum_c_kzg_4844//bindings/go:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_ethereum_go_ethereum//common/math:go_default_library",
"@com_github_ethereum_go_ethereum//crypto:go_default_library",

View File

@@ -0,0 +1,187 @@
package sync
import (
"context"
"fmt"
"time"
cKzg4844 "github.com/ethereum/c-kzg-4844/bindings/go"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed"
statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/sirupsen/logrus"
)
// recoverBlobs recovers the blobs from the data column sidecars.
func recoverBlobs(
dataColumnSideCars []*ethpb.DataColumnSidecar,
columnsCount int,
blockRoot [fieldparams.RootLength]byte,
) ([]cKzg4844.Blob, error) {
recoveredBlobs := make([]cKzg4844.Blob, 0, fieldparams.MaxBlobsPerBlock)
for blobIndex := 0; blobIndex < fieldparams.MaxBlobsPerBlock; blobIndex++ {
start := time.Now()
cellsId := make([]uint64, 0, columnsCount)
cKzgCells := make([]cKzg4844.Cell, 0, columnsCount)
for _, sidecar := range dataColumnSideCars {
// Build the cell ids.
cellsId = append(cellsId, sidecar.ColumnIndex)
// Get the cell.
column := sidecar.DataColumn
cell := column[blobIndex]
// Transform the cell as a cKzg cell.
var ckzgCell cKzg4844.Cell
for i := 0; i < cKzg4844.FieldElementsPerCell; i++ {
copy(ckzgCell[i][:], cell[32*i:32*(i+1)])
}
cKzgCells = append(cKzgCells, ckzgCell)
}
// Recover the blob.
recoveredCells, err := cKzg4844.RecoverAllCells(cellsId, cKzgCells)
if err != nil {
return nil, errors.Wrapf(err, "recover all cells for blob %d", blobIndex)
}
recoveredBlob, err := cKzg4844.CellsToBlob(recoveredCells)
if err != nil {
return nil, errors.Wrapf(err, "cells to blob for blob %d", blobIndex)
}
recoveredBlobs = append(recoveredBlobs, recoveredBlob)
log.WithFields(logrus.Fields{
"elapsed": time.Since(start),
"index": blobIndex,
"root": fmt.Sprintf("%x", blockRoot),
}).Debug("Recovered blob")
}
return recoveredBlobs, nil
}
// getSignedBlock retrieves the signed block corresponding to the given root.
// If the block is not available, it waits for it.
func (s *Service) getSignedBlock(
ctx context.Context,
blockRoot [fieldparams.RootLength]byte,
) (interfaces.ReadOnlySignedBeaconBlock, error) {
blocksChannel := make(chan *feed.Event, 1)
blockSub := s.cfg.blockNotifier.BlockFeed().Subscribe(blocksChannel)
defer blockSub.Unsubscribe()
// Get the signedBlock corresponding to this root.
signedBlock, err := s.cfg.beaconDB.Block(ctx, blockRoot)
if err != nil {
return nil, errors.Wrap(err, "block")
}
// If the block is here, return it.
if signedBlock != nil {
return signedBlock, nil
}
// Wait for the block to be available.
for {
select {
case blockEvent := <-blocksChannel:
// Check the type of the event.
data, ok := blockEvent.Data.(*statefeed.BlockProcessedData)
if !ok || data == nil {
continue
}
// Check if the block is the one we are looking for.
if data.BlockRoot != blockRoot {
continue
}
// This is the block we are looking for.
return data.SignedBlock, nil
case err := <-blockSub.Err():
return nil, errors.Wrap(err, "block subscriber error")
case <-ctx.Done():
return nil, errors.New("context canceled")
}
}
}
func (s *Service) reconstructDataColumns(ctx context.Context, verifiedRODataColumn blocks.VerifiedRODataColumn) error {
// Lock to prevent concurrent reconstruction.
s.dataColumsnReconstructionLock.Lock()
defer s.dataColumsnReconstructionLock.Unlock()
// Get the block root.
blockRoot := verifiedRODataColumn.BlockRoot()
// Get the columns we store.
storedColumnsIndices, err := s.cfg.blobStorage.ColumnIndices(blockRoot)
if err != nil {
return errors.Wrap(err, "columns indices")
}
storedColumnsCount := len(storedColumnsIndices)
numberOfColumns := fieldparams.NumberOfColumns
// If less than half of the columns are stored, reconstruction is not possible.
// If all columns are stored, no need to reconstruct.
if storedColumnsCount < numberOfColumns/2 || storedColumnsCount == numberOfColumns {
return nil
}
// Load the data columns sidecars.
dataColumnSideCars := make([]*ethpb.DataColumnSidecar, 0, storedColumnsCount)
for index := range storedColumnsIndices {
dataColumnSidecar, err := s.cfg.blobStorage.GetColumn(blockRoot, index)
if err != nil {
return errors.Wrap(err, "get column")
}
dataColumnSideCars = append(dataColumnSideCars, dataColumnSidecar)
}
// Recover blobs.
recoveredBlobs, err := recoverBlobs(dataColumnSideCars, storedColumnsCount, blockRoot)
if err != nil {
return errors.Wrap(err, "recover blobs")
}
// Get the signed block.
signedBlock, err := s.getSignedBlock(ctx, blockRoot)
if err != nil {
return errors.Wrap(err, "get signed block")
}
// Reconstruct the data columns sidecars.
dataColumnSidecars, err := peerdas.DataColumnSidecars(signedBlock, recoveredBlobs)
if err != nil {
return errors.Wrap(err, "data column sidecars")
}
// Save the data columns sidecars in the database.
for _, dataColumnSidecar := range dataColumnSidecars {
roDataColumn, err := blocks.NewRODataColumnWithRoot(dataColumnSidecar, blockRoot)
if err != nil {
return errors.Wrap(err, "new read-only data column with root")
}
verifiedRoDataColumn := blocks.NewVerifiedRODataColumn(roDataColumn)
if err := s.cfg.blobStorage.SaveDataColumn(verifiedRoDataColumn); err != nil {
return errors.Wrap(err, "save column")
}
}
log.WithField("root", fmt.Sprintf("%x", blockRoot)).Debug("Data columns reconstructed successfully")
return nil
}

View File

@@ -0,0 +1,303 @@
package sync
import (
"context"
"fmt"
"sort"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed"
statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/crypto/rand"
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/sirupsen/logrus"
)
// reandomIntegers returns a map of `count` random integers in the range [0, max[.
func randomIntegers(count uint64, max uint64) map[uint64]bool {
result := make(map[uint64]bool, count)
randGenerator := rand.NewGenerator()
for uint64(len(result)) < count {
n := randGenerator.Uint64() % max
result[n] = true
}
return result
}
// sortedListFromMap returns a sorted list of keys from a map.
func sortedListFromMap(m map[uint64]bool) []uint64 {
result := make([]uint64, 0, len(m))
for k := range m {
result = append(result, k)
}
sort.Slice(result, func(i, j int) bool {
return result[i] < result[j]
})
return result
}
// extractNodeID extracts the node ID from a peer ID.
func extractNodeID(pid peer.ID) ([32]byte, error) {
var nodeID [32]byte
// Retrieve the public key object of the peer under "crypto" form.
pubkeyObjCrypto, err := pid.ExtractPublicKey()
if err != nil {
return nodeID, errors.Wrap(err, "extract public key")
}
// Extract the bytes representation of the public key.
compressedPubKeyBytes, err := pubkeyObjCrypto.Raw()
if err != nil {
return nodeID, errors.Wrap(err, "public key raw")
}
// Retrieve the public key object of the peer under "SECP256K1" form.
pubKeyObjSecp256k1, err := btcec.ParsePubKey(compressedPubKeyBytes)
if err != nil {
return nodeID, errors.Wrap(err, "parse public key")
}
// Concatenate the X and Y coordinates represented in bytes.
buf := make([]byte, 64)
math.ReadBits(pubKeyObjSecp256k1.X(), buf[:32])
math.ReadBits(pubKeyObjSecp256k1.Y(), buf[32:])
// Get the node ID by hashing the concatenated X and Y coordinates.
nodeIDBytes := crypto.Keccak256(buf)
copy(nodeID[:], nodeIDBytes)
return nodeID, nil
}
// sampleDataColumnFromPeer samples data columns from a peer.
// It returns the missing columns after sampling.
func (s *Service) sampleDataColumnFromPeer(
pid peer.ID,
columnsToSample map[uint64]bool,
requestedRoot [fieldparams.RootLength]byte,
) (map[uint64]bool, error) {
// Define missing columns.
missingColumns := make(map[uint64]bool, len(columnsToSample))
for index := range columnsToSample {
missingColumns[index] = true
}
// Retrieve the custody count of the peer.
peerCustodiedSubnetCount := s.cfg.p2p.CustodyCountFromRemotePeer(pid)
// Extract the node ID from the peer ID.
nodeID, err := extractNodeID(pid)
if err != nil {
return nil, errors.Wrap(err, "extract node ID")
}
// Determine which columns the peer should custody.
peerCustodiedColumns, err := peerdas.CustodyColumns(nodeID, peerCustodiedSubnetCount)
if err != nil {
return nil, errors.Wrap(err, "custody columns")
}
peerCustodiedColumnsList := sortedListFromMap(peerCustodiedColumns)
// Compute the intersection of the columns to sample and the columns the peer should custody.
peerRequestedColumns := make(map[uint64]bool, len(columnsToSample))
for column := range columnsToSample {
if peerCustodiedColumns[column] {
peerRequestedColumns[column] = true
}
}
peerRequestedColumnsList := sortedListFromMap(peerRequestedColumns)
// Get the data column identifiers to sample from this peer.
dataColumnIdentifiers := make(types.BlobSidecarsByRootReq, 0, len(peerRequestedColumns))
for index := range peerRequestedColumns {
dataColumnIdentifiers = append(dataColumnIdentifiers, &eth.BlobIdentifier{
BlockRoot: requestedRoot[:],
Index: index,
})
}
// Return early if there are no data columns to sample.
if len(dataColumnIdentifiers) == 0 {
log.WithFields(logrus.Fields{
"peerID": pid,
"custodiedColumns": peerCustodiedColumnsList,
"requestedColumns": peerRequestedColumnsList,
}).Debug("Peer does not custody any of the requested columns")
return columnsToSample, nil
}
// Sample data columns.
roDataColumns, err := SendDataColumnSidecarByRoot(s.ctx, s.cfg.clock, s.cfg.p2p, pid, s.ctxMap, &dataColumnIdentifiers)
if err != nil {
return nil, errors.Wrap(err, "send data column sidecar by root")
}
peerRetrievedColumns := make(map[uint64]bool, len(roDataColumns))
// Remove retrieved items from rootsByDataColumnIndex.
for _, roDataColumn := range roDataColumns {
retrievedColumn := roDataColumn.ColumnIndex
actualRoot := roDataColumn.BlockRoot()
if actualRoot != requestedRoot {
// TODO: Should we decrease the peer score here?
log.WithFields(logrus.Fields{
"peerID": pid,
"requestedRoot": fmt.Sprintf("%#x", requestedRoot),
"actualRoot": fmt.Sprintf("%#x", actualRoot),
}).Warning("Actual root does not match requested root")
continue
}
peerRetrievedColumns[retrievedColumn] = true
if !columnsToSample[retrievedColumn] {
// TODO: Should we decrease the peer score here?
log.WithFields(logrus.Fields{
"peerID": pid,
"retrievedColumn": retrievedColumn,
"requestedColumns": peerRequestedColumnsList,
}).Warning("Retrieved column is was not requested")
}
delete(missingColumns, retrievedColumn)
}
peerRetrievedColumnsList := sortedListFromMap(peerRetrievedColumns)
remainingMissingColumnsList := sortedListFromMap(missingColumns)
log.WithFields(logrus.Fields{
"peerID": pid,
"custodiedColumns": peerCustodiedColumnsList,
"requestedColumns": peerRequestedColumnsList,
"retrievedColumns": peerRetrievedColumnsList,
"remainingMissingColumns": remainingMissingColumnsList,
}).Debug("Peer data column sampling summary")
return missingColumns, nil
}
// sampleDataColumns samples data columns from active peers.
func (s *Service) sampleDataColumns(requestedRoot [fieldparams.RootLength]byte, samplesCount uint64) error {
// Determine `samplesCount` random column indexes.
requestedColumns := randomIntegers(samplesCount, params.BeaconConfig().NumberOfColumns)
missingColumns := make(map[uint64]bool, len(requestedColumns))
for index := range requestedColumns {
missingColumns[index] = true
}
// Get the active peers from the p2p service.
activePeers := s.cfg.p2p.Peers().Active()
var err error
// Sampling is done sequentially peer by peer.
// TODO: Add parallelism if (probably) needed.
for _, pid := range activePeers {
// Early exit if all needed columns are already sampled. (This is the happy path.)
if len(missingColumns) == 0 {
break
}
// Sample data columns from the peer.
missingColumns, err = s.sampleDataColumnFromPeer(pid, missingColumns, requestedRoot)
if err != nil {
return errors.Wrap(err, "sample data column from peer")
}
}
requestedColumnsList := sortedListFromMap(requestedColumns)
if len(missingColumns) == 0 {
log.WithField("requestedColumns", requestedColumnsList).Debug("Successfully sampled all requested columns")
return nil
}
missingColumnsList := sortedListFromMap(missingColumns)
log.WithFields(logrus.Fields{
"requestedColumns": requestedColumnsList,
"missingColumns": missingColumnsList,
}).Warning("Failed to sample some requested columns")
return nil
}
func (s *Service) dataColumnSampling(ctx context.Context) {
// Create a subscription to the state feed.
stateChannel := make(chan *feed.Event, 1)
stateSub := s.cfg.stateNotifier.StateFeed().Subscribe(stateChannel)
// Unsubscribe from the state feed when the function returns.
defer stateSub.Unsubscribe()
for {
select {
case e := <-stateChannel:
if e.Type != statefeed.BlockProcessed {
continue
}
data, ok := e.Data.(*statefeed.BlockProcessedData)
if !ok {
log.Error("Event feed data is not of type *statefeed.BlockProcessedData")
continue
}
if !data.Verified {
// We only process blocks that have been verified
log.Error("Data is not verified")
continue
}
if data.SignedBlock.Version() < version.Deneb {
log.Debug("Pre Deneb block, skipping data column sampling")
continue
}
// Get the commitments for this block.
commitments, err := data.SignedBlock.Block().Body().BlobKzgCommitments()
if err != nil {
log.WithError(err).Error("Failed to get blob KZG commitments")
continue
}
// Skip if there are no commitments.
if len(commitments) == 0 {
log.Debug("No commitments in block, skipping data column sampling")
continue
}
dataColumnSamplingCount := params.BeaconConfig().SamplesPerSlot
// Sample data columns.
if err := s.sampleDataColumns(data.BlockRoot, dataColumnSamplingCount); err != nil {
log.WithError(err).Error("Failed to sample data columns")
}
case <-s.ctx.Done():
log.Debug("Context closed, exiting goroutine")
return
case err := <-stateSub.Err():
log.WithError(err).Error("Subscription to state feed failed")
}
}
}

View File

@@ -34,6 +34,7 @@ go_library(
"//beacon-chain/verification:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//config/features:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",

View File

@@ -15,6 +15,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
blockfeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/block"
statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
@@ -25,6 +26,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/features"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/crypto/rand"
@@ -314,30 +316,56 @@ func missingBlobRequest(blk blocks.ROBlock, store *filesystem.BlobStorage) (p2pt
return req, nil
}
func missingColumnRequest(blk blocks.ROBlock, store *filesystem.BlobStorage) (p2ptypes.BlobSidecarsByRootReq, error) {
r := blk.Root()
if blk.Version() < version.Deneb {
func (s *Service) missingColumnRequest(roBlock blocks.ROBlock, store *filesystem.BlobStorage) (p2ptypes.BlobSidecarsByRootReq, error) {
// No columns for pre-Deneb blocks.
if roBlock.Version() < version.Deneb {
return nil, nil
}
cmts, err := blk.Block().Body().BlobKzgCommitments()
// Get the block root.
blockRoot := roBlock.Root()
// Get the commitments from the block.
commitments, err := roBlock.Block().Body().BlobKzgCommitments()
if err != nil {
log.WithField("root", r).Error("Error reading commitments from checkpoint sync origin block")
return nil, err
return nil, errors.Wrap(err, "failed to get blob KZG commitments")
}
if len(cmts) == 0 {
// Return early if there are no commitments.
if len(commitments) == 0 {
return nil, nil
}
onDisk, err := store.ColumnIndices(r)
// Check which columns are already on disk.
storedColumns, err := store.ColumnIndices(blockRoot)
if err != nil {
return nil, errors.Wrapf(err, "error checking existing blobs for checkpoint sync block root %#x", r)
return nil, errors.Wrapf(err, "error checking existing blobs for checkpoint sync block root %#x", blockRoot)
}
req := make(p2ptypes.BlobSidecarsByRootReq, 0, len(cmts))
for i := range cmts {
if onDisk[i] {
continue
// Get the number of columns we should custody.
custodyRequirement := params.BeaconConfig().CustodyRequirement
if features.Get().EnablePeerDAS {
custodyRequirement = fieldparams.NumberOfColumns
}
// Get our node ID.
nodeID := s.cfg.P2P.NodeID()
// Get the custodied columns.
custodiedColumns, err := peerdas.CustodyColumns(nodeID, custodyRequirement)
if err != nil {
return nil, errors.Wrap(err, "custody columns")
}
// Build blob sidecars by root requests based on missing columns.
req := make(p2ptypes.BlobSidecarsByRootReq, 0, len(commitments))
for columnIndex := range custodiedColumns {
isColumnAvailable := storedColumns[columnIndex]
if !isColumnAvailable {
req = append(req, &eth.BlobIdentifier{BlockRoot: blockRoot[:], Index: columnIndex})
}
req = append(req, &eth.BlobIdentifier{BlockRoot: r[:], Index: uint64(i)})
}
return req, nil
}
@@ -408,7 +436,7 @@ func (s *Service) fetchOriginColumns(pids []peer.ID) error {
if err != nil {
return err
}
req, err := missingColumnRequest(rob, s.cfg.BlobStorage)
req, err := s.missingColumnRequest(rob, s.cfg.BlobStorage)
if err != nil {
return err
}

View File

@@ -264,20 +264,34 @@ func (s *Service) constructPendingBlobsRequest(root [32]byte, commitments int) (
}
func (s *Service) constructPendingColumnRequest(root [32]byte) (types.BlobSidecarsByRootReq, error) {
stored, err := s.cfg.blobStorage.ColumnIndices(root)
// Retrieve the storedColumns columns for the current root.
storedColumns, err := s.cfg.blobStorage.ColumnIndices(root)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "column indices")
}
// Compute how many subnets we should custody.
custodiedSubnetCount := params.BeaconConfig().CustodyRequirement
if flags.Get().SubscribeToAllSubnets {
custodiedSubnetCount = params.BeaconConfig().DataColumnSidecarSubnetCount
}
// Retrieve the columns we should custody.
custodiedColumns, err := peerdas.CustodyColumns(s.cfg.p2p.NodeID(), custodiedSubnetCount)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "custody columns")
}
return requestsForMissingColumnIndices(stored, custodiedColumns, root), nil
// Build the request for the missing columns.
req := make(types.BlobSidecarsByRootReq, 0, len(custodiedColumns))
for column := range custodiedColumns {
isColumnStored := storedColumns[column]
if !isColumnStored {
req = append(req, &eth.BlobIdentifier{Index: column, BlockRoot: root[:]})
}
}
return req, nil
}
// requestsForMissingIndices constructs a slice of BlobIdentifiers that are missing from
@@ -292,13 +306,3 @@ func requestsForMissingIndices(storedIndices [fieldparams.MaxBlobsPerBlock]bool,
}
return ids
}
func requestsForMissingColumnIndices(storedIndices [fieldparams.NumberOfColumns]bool, wantedIndices map[uint64]bool, root [32]byte) []*eth.BlobIdentifier {
var ids []*eth.BlobIdentifier
for i := range wantedIndices {
if !storedIndices[i] {
ids = append(ids, &eth.BlobIdentifier{Index: i, BlockRoot: root[:]})
}
}
return ids
}

View File

@@ -28,10 +28,13 @@ import (
func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error {
ctx, span := trace.StartSpan(ctx, "sync.dataColumnSidecarByRootRPCHandler")
defer span.End()
ctx, cancel := context.WithTimeout(ctx, ttfbTimeout)
defer cancel()
SetRPCStreamDeadlines(stream)
log := log.WithField("handler", p2p.DataColumnSidecarsByRootName[1:]) // slice the leading slash off the name var
// We use the same type as for blobs as they are the same data structure.
// TODO: Make the type naming more generic to be extensible to data columns
ref, ok := msg.(*types.BlobSidecarsByRootReq)
@@ -39,19 +42,25 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
return errors.New("message is not type BlobSidecarsByRootReq")
}
columnIdents := *ref
if err := validateDataColummnsByRootRequest(columnIdents); err != nil {
requestedColumnIdents := *ref
if err := validateDataColummnsByRootRequest(requestedColumnIdents); err != nil {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
return err
return errors.Wrap(err, "validate data columns by root request")
}
// Sort the identifiers so that requests for the same blob root will be adjacent, minimizing db lookups.
sort.Sort(columnIdents)
sort.Sort(requestedColumnIdents)
requestedColumnsList := make([]uint64, 0, len(requestedColumnIdents))
for _, ident := range requestedColumnIdents {
requestedColumnsList = append(requestedColumnsList, ident.Index)
}
// TODO: Customize data column batches too
batchSize := flags.Get().BlobBatchLimit
var ticker *time.Ticker
if len(columnIdents) > batchSize {
if len(requestedColumnIdents) > batchSize {
ticker = time.NewTicker(time.Second)
}
@@ -69,25 +78,50 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
}
custodiedColumns, err := peerdas.CustodyColumns(s.cfg.p2p.NodeID(), custodiedSubnets)
if err != nil {
log.WithError(err).Errorf("unexpected error retrieving the node id")
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
return err
return errors.Wrap(err, "custody columns")
}
for i := range columnIdents {
custodiedColumnsList := make([]uint64, 0, len(custodiedColumns))
for column := range custodiedColumns {
custodiedColumnsList = append(custodiedColumnsList, column)
}
// Sort the custodied columns by index.
sort.Slice(custodiedColumnsList, func(i, j int) bool {
return custodiedColumnsList[i] < custodiedColumnsList[j]
})
log.WithFields(logrus.Fields{
"custodied": custodiedColumnsList,
"requested": requestedColumnsList,
"custodiedCount": len(custodiedColumnsList),
"requestedCount": len(requestedColumnsList),
}).Debug("Received data column sidecar by root request")
for i := range requestedColumnIdents {
if err := ctx.Err(); err != nil {
closeStream(stream, log)
return err
return errors.Wrap(err, "context error")
}
// Throttle request processing to no more than batchSize/sec.
if ticker != nil && i != 0 && i%batchSize == 0 {
<-ticker.C
for {
select {
case <-ticker.C:
log.Debug("Throttling data column sidecar request")
case <-ctx.Done():
log.Debug("Context closed, exiting routine")
return nil
}
}
}
s.rateLimiter.add(stream, 1)
root, idx := bytesutil.ToBytes32(columnIdents[i].BlockRoot), columnIdents[i].Index
root, idx := bytesutil.ToBytes32(requestedColumnIdents[i].BlockRoot), requestedColumnIdents[i].Index
isCustodied := custodiedColumns[idx]
if !isCustodied {
@@ -124,7 +158,7 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
log.WithError(err).Errorf("unexpected db error retrieving data column, root=%x, index=%d", root, idx)
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
return err
return errors.Wrap(err, "get column")
}
break
@@ -137,7 +171,7 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
if sc.SignedBlockHeader.Header.Slot < minReqSlot {
s.writeErrorResponseToStream(responseCodeResourceUnavailable, types.ErrDataColumnLTMinRequest.Error(), stream)
log.WithError(types.ErrDataColumnLTMinRequest).
Debugf("requested data column for block %#x before minimum_request_epoch", columnIdents[i].BlockRoot)
Debugf("requested data column for block %#x before minimum_request_epoch", requestedColumnIdents[i].BlockRoot)
return types.ErrDataColumnLTMinRequest
}
@@ -149,6 +183,7 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
return chunkErr
}
}
closeStream(stream, log)
return nil
}

View File

@@ -1,219 +0,0 @@
package sync
import (
"context"
"sort"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/crypto"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed"
statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/crypto/rand"
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/sirupsen/logrus"
)
// reandomIntegers returns a map of `count` random integers in the range [0, max[.
func randomIntegers(count uint64, max uint64) map[uint64]bool {
result := make(map[uint64]bool, count)
randGenerator := rand.NewGenerator()
for uint64(len(result)) < count {
n := randGenerator.Uint64() % max
result[n] = true
}
return result
}
func (s *Service) sampleDataColumns(requestedRoot [fieldparams.RootLength]byte, samplesCount uint64) (map[uint64]bool, error) {
// Determine `samplesCount` random column indexes.
missingIndices := randomIntegers(samplesCount, params.BeaconConfig().NumberOfColumns)
// Get the active peers from the p2p service.
activePeers := s.cfg.p2p.Peers().Active()
// Sampling is done sequentially peer by peer.
// TODO: Add parallelism if (probably) needed.
for _, pid := range activePeers {
// Early exit if all needed columns are already sampled.
// This is the happy path.
if len(missingIndices) == 0 {
return nil, nil
}
peerCustodiedSubnetCount, err := s.cfg.p2p.CustodyCountFromRemotePeer(pid)
if err != nil {
return nil, err
}
// Retrieve the public key object of the peer under "crypto" form.
pubkeyObjCrypto, err := pid.ExtractPublicKey()
if err != nil {
return nil, errors.Wrap(err, "extract public key")
}
// Extract the bytes representation of the public key.
compressedPubKeyBytes, err := pubkeyObjCrypto.Raw()
if err != nil {
return nil, errors.Wrap(err, "public key raw")
}
// Retrieve the public key object of the peer under "SECP256K1" form.
pubKeyObjSecp256k1, err := btcec.ParsePubKey(compressedPubKeyBytes)
if err != nil {
return nil, errors.Wrap(err, "parse public key")
}
// Concatenate the X and Y coordinates represented in bytes.
buf := make([]byte, 64)
math.ReadBits(pubKeyObjSecp256k1.X(), buf[:32])
math.ReadBits(pubKeyObjSecp256k1.Y(), buf[32:])
// Get the peer ID by hashing the concatenated X and Y coordinates.
peerIDBytes := crypto.Keccak256(buf)
var peerID [32]byte
copy(peerID[:], peerIDBytes)
// Determine which columns the peer should custody.
peerCustodiedColumns, err := peerdas.CustodyColumns(peerID, peerCustodiedSubnetCount)
if err != nil {
return nil, errors.Wrap(err, "custody columns")
}
// Determine how many columns are yet missing.
missingColumnsCount := len(missingIndices)
// Get the data column identifiers to sample from this particular peer.
dataColumnIdentifiers := make(types.BlobSidecarsByRootReq, 0, missingColumnsCount)
for index := range missingIndices {
if peerCustodiedColumns[index] {
dataColumnIdentifiers = append(dataColumnIdentifiers, &eth.BlobIdentifier{
BlockRoot: requestedRoot[:],
Index: index,
})
}
}
// Skip the peer if there are no data columns to sample.
if len(dataColumnIdentifiers) == 0 {
continue
}
// Sample data columns.
roDataColumns, err := SendDataColumnSidecarByRoot(s.ctx, s.cfg.clock, s.cfg.p2p, pid, s.ctxMap, &dataColumnIdentifiers)
if err != nil {
return nil, errors.Wrap(err, "send data column sidecar by root")
}
// Remove retrieved items from rootsByDataColumnIndex.
for _, roDataColumn := range roDataColumns {
index := roDataColumn.ColumnIndex
actualRoot := roDataColumn.BlockRoot()
if actualRoot != requestedRoot {
return nil, errors.Errorf("actual root (%#x) does not match requested root (%#x)", actualRoot, requestedRoot)
}
delete(missingIndices, index)
}
}
// We tried all our active peers and some columns are still missing.
// This is the unhappy path.
return missingIndices, nil
}
func (s *Service) dataColumnSampling(ctx context.Context) {
// Create a subscription to the state feed.
stateChannel := make(chan *feed.Event, 1)
stateSub := s.cfg.stateNotifier.StateFeed().Subscribe(stateChannel)
// Unsubscribe from the state feed when the function returns.
defer stateSub.Unsubscribe()
for {
select {
case e := <-stateChannel:
if e.Type != statefeed.BlockProcessed {
continue
}
data, ok := e.Data.(*statefeed.BlockProcessedData)
if !ok {
log.Error("Event feed data is not of type *statefeed.BlockProcessedData")
continue
}
if !data.Verified {
// We only process blocks that have been verified
log.Error("Data is not verified")
continue
}
if data.SignedBlock.Version() < version.Deneb {
log.Debug("Pre Deneb block, skipping data column sampling")
continue
}
// Get the commitments for this block.
commitments, err := data.SignedBlock.Block().Body().BlobKzgCommitments()
if err != nil {
log.WithError(err).Error("Failed to get blob KZG commitments")
continue
}
// Skip if there are no commitments.
if len(commitments) == 0 {
log.Debug("No commitments in block, skipping data column sampling")
continue
}
dataColumnSamplingCount := params.BeaconConfig().SamplesPerSlot
// Sample data columns.
missingColumns, err := s.sampleDataColumns(data.BlockRoot, dataColumnSamplingCount)
if err != nil {
log.WithError(err).Error("Failed to sample data columns")
continue
}
missingColumnsCount := len(missingColumns)
missingColumnsList := make([]uint64, 0, missingColumnsCount)
for column := range missingColumns {
missingColumnsList = append(missingColumnsList, column)
}
// Sort the missing columns list.
sort.Slice(missingColumnsList, func(i, j int) bool {
return missingColumnsList[i] < missingColumnsList[j]
})
if missingColumnsCount > 0 {
log.WithFields(logrus.Fields{
"missingColumns": missingColumnsList,
"sampledColumnsCount": dataColumnSamplingCount,
}).Warning("Failed to sample some data columns")
continue
}
log.WithField("sampledColumnsCount", dataColumnSamplingCount).Info("Successfully sampled all data columns")
case <-s.ctx.Done():
log.Debug("Context closed, exiting goroutine")
return
case err := <-stateSub.Err():
log.WithError(err).Error("Subscription to state feed failed")
}
}
}

View File

@@ -164,6 +164,7 @@ type Service struct {
verifierWaiter *verification.InitializerWaiter
newBlobVerifier verification.NewBlobVerifier
availableBlocker coverage.AvailableBlocker
dataColumsnReconstructionLock sync.Mutex
ctxMap ContextByteVersions
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed"
opfeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
@@ -29,5 +30,10 @@ func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) e
},
})
// Reconstruct the data columns if needed.
if err := s.reconstructDataColumns(ctx, dc); err != nil {
return errors.Wrap(err, "reconstruct data columns")
}
return nil
}

2
go.mod
View File

@@ -18,6 +18,7 @@ require (
github.com/emicklei/dot v0.11.0
github.com/ethereum/c-kzg-4844 v1.0.2-0.20240507203752-26d3b4156f7a
github.com/ethereum/go-ethereum v1.13.5
github.com/ferranbt/fastssz v0.0.0-20210120143747-11b9eff30ea9
github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5
github.com/fsnotify/fsnotify v1.6.0
github.com/ghodss/yaml v1.0.0
@@ -137,7 +138,6 @@ require (
github.com/docker/go-units v0.5.0 // indirect
github.com/dop251/goja v0.0.0-20230806174421-c933cf95e127 // indirect
github.com/elastic/gosigar v0.14.3 // indirect
github.com/ferranbt/fastssz v0.0.0-20210120143747-11b9eff30ea9 // indirect
github.com/flynn/noise v1.1.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/getsentry/sentry-go v0.25.0 // indirect

View File

@@ -126,7 +126,6 @@ func GethPragueTime(genesisTime uint64, cfg *clparams.BeaconChainConfig) *uint64
// like in an e2e test. The parameters are minimal but the full value is returned unmarshaled so that it can be
// customized as desired.
func GethTestnetGenesis(genesisTime uint64, cfg *clparams.BeaconChainConfig) *core.Genesis {
shanghaiTime := GethShanghaiTime(genesisTime, cfg)
cancunTime := GethCancunTime(genesisTime, cfg)
pragueTime := GethPragueTime(genesisTime, cfg)