PeerDAS: Re-enable full node joining the main fork (#14475)

* `columnErrBuilder`: Uses `Wrap` instead of `Join`.

Reason: `Join` makes a carriage return. The log is quite unreadable.

* `validateDataColumn`: Improve log.

* `areDataColumnsAvailable`: Improve log.

* `SendDataColumnSidecarByRoot` ==> `SendDataColumnSidecarsByRootRequest`.

* `handleDA`: Refactor error message.

* `sendRecentBeaconBlocksRequest` ==> `sendBeaconBlocksRequest`.

Reason: There is no notion at all of "recent" in the function.

If the caller decides to call this function only with "recent" blocks, that's fine.
However, the function itself will know nothing about the "recentness" of these blocks.

* `sendBatchRootRequest`: Improve comments.

* `sendBeaconBlocksRequest`: Avoid `else` usage and use map of bool instead of `struct{}`.

* `wrapAndReportValidation`: Remove `agent` from log.

Reason: This prevent the log to hold on one line, and it is not really useful to debug.

* `validateAggregateAndProof`: Add comments.

* `GetValidCustodyPeers`: Fix typo.

* `GetValidCustodyPeers` ==> `DataColumnsAdmissibleCustodyPeers`.

* `CustodyHandler` ==> `DataColumnsHandler`.

* `CustodyCountFromRemotePeer` ==> `DataColumnsCustodyCountFromRemotePeer`.

* Implement `DataColumnsAdmissibleSubnetSamplingPeers`.

* Use `SubnetSamplingSize` instead of `CustodySubnetCount` where needed.

* Revert "`wrapAndReportValidation`: Remove `agent` from log."

This reverts commit 55db351102.
This commit is contained in:
Manu NALEPA
2024-09-24 17:37:54 +02:00
parent d9a1f96f6e
commit 4d3e389082
17 changed files with 197 additions and 108 deletions

View File

@@ -787,7 +787,7 @@ func (s *Service) areDataColumnsAvailable(ctx context.Context, root [32]byte, si
missingIndices = uint64MapToSortedSlice(missingMap)
}
return errors.Wrapf(ctx.Err(), "context deadline waiting for data column sidecars slot: %d, BlockRoot: %#x, missing %v", block.Slot(), root, missingIndices)
return errors.Wrapf(ctx.Err(), "data column sidecars slot: %d, BlockRoot: %#x, missing %v", block.Slot(), root, missingIndices)
}
}
}

View File

@@ -75,6 +75,7 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
log.WithField("blockRoot", fmt.Sprintf("%#x", blockRoot)).Debug("Ignoring already synced block")
return nil
}
receivedTime := time.Now()
s.blockBeingSynced.set(blockRoot)
defer s.blockBeingSynced.unset(blockRoot)
@@ -83,6 +84,7 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
if err != nil {
return err
}
preState, err := s.getBlockPreState(ctx, blockCopy.Block())
if err != nil {
return errors.Wrap(err, "could not get block's prestate")
@@ -93,10 +95,12 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
if err != nil {
return err
}
daWaitedTime, err := s.handleDA(ctx, blockCopy, blockRoot, avs)
if err != nil {
return err
}
// Defragment the state before continuing block processing.
s.defragmentState(postState)
@@ -239,7 +243,7 @@ func (s *Service) handleDA(
}
} else {
if err := s.isDataAvailable(ctx, blockRoot, block); err != nil {
return 0, errors.Wrap(err, "could not validate blob data availability")
return 0, errors.Wrap(err, "is data available")
}
}
daWaitedTime := time.Since(daStartTime)

View File

@@ -9,18 +9,35 @@ import (
"github.com/prysmaticlabs/prysm/v5/config/params"
)
// GetValidCustodyPeers returns a list of peers that custody a super set of the local node's custody columns.
func (s *Service) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) {
// DataColumnsAdmissibleCustodyPeers returns a list of peers that custody a super set of the local node's custody columns.
func (s *Service) DataColumnsAdmissibleCustodyPeers(peers []peer.ID) ([]peer.ID, error) {
localCustodySubnetCount := peerdas.CustodySubnetCount()
return s.dataColumnsAdmissiblePeers(peers, localCustodySubnetCount)
}
// DataColumnsAdmissibleSubnetSamplingPeers returns a list of peers that custody a super set of the local node's sampling columns.
func (s *Service) DataColumnsAdmissibleSubnetSamplingPeers(peers []peer.ID) ([]peer.ID, error) {
localSubnetSamplingSize := peerdas.SubnetSamplingSize()
return s.dataColumnsAdmissiblePeers(peers, localSubnetSamplingSize)
}
// dataColumnsAdmissiblePeers computes the first columns of the local node corresponding to `subnetCount`, then
// filters out `peers` that do not custody a super set of these columns.
func (s *Service) dataColumnsAdmissiblePeers(peers []peer.ID, subnetCount uint64) ([]peer.ID, error) {
// Get the total number of columns.
numberOfColumns := params.BeaconConfig().NumberOfColumns
localCustodySubnetCount := peerdas.CustodySubnetCount()
localCustodyColumns, err := peerdas.CustodyColumns(s.NodeID(), localCustodySubnetCount)
// Retrieve the local node ID.
localNodeId := s.NodeID()
// Retrieve the needed columns.
neededColumns, err := peerdas.CustodyColumns(localNodeId, subnetCount)
if err != nil {
return nil, errors.Wrap(err, "custody columns for local node")
}
localCustotyColumnsCount := uint64(len(localCustodyColumns))
// Get the number of needed columns.
localneededColumnsCount := uint64(len(neededColumns))
// Find the valid peers.
validPeers := make([]peer.ID, 0, len(peers))
@@ -28,7 +45,7 @@ func (s *Service) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) {
loop:
for _, pid := range peers {
// Get the custody subnets count of the remote peer.
remoteCustodySubnetCount := s.CustodyCountFromRemotePeer(pid)
remoteCustodySubnetCount := s.DataColumnsCustodyCountFromRemotePeer(pid)
// Get the remote node ID from the peer ID.
remoteNodeID, err := ConvertPeerIDToNodeID(pid)
@@ -44,8 +61,8 @@ loop:
remoteCustodyColumnsCount := uint64(len(remoteCustodyColumns))
// If the remote peer custodies less columns than the local node, skip it.
if remoteCustodyColumnsCount < localCustotyColumnsCount {
// If the remote peer custodies less columns than the local node needs, skip it.
if remoteCustodyColumnsCount < localneededColumnsCount {
continue
}
@@ -57,7 +74,7 @@ loop:
}
// Filter out invalid peers.
for c := range localCustodyColumns {
for c := range neededColumns {
if !remoteCustodyColumns[c] {
continue loop
}
@@ -101,8 +118,8 @@ func (s *Service) custodyCountFromRemotePeerEnr(pid peer.ID) uint64 {
return custodyCount
}
// CustodyCountFromRemotePeer retrieves the custody count from a remote peer.
func (s *Service) CustodyCountFromRemotePeer(pid peer.ID) uint64 {
// DataColumnsCustodyCountFromRemotePeer retrieves the custody count from a remote peer.
func (s *Service) DataColumnsCustodyCountFromRemotePeer(pid peer.ID) uint64 {
// Try to get the custody count from the peer's metadata.
metadata, err := s.peers.Metadata(pid)
if err != nil {

View File

@@ -47,7 +47,7 @@ func createPeer(t *testing.T, privateKeyOffset int, custodyCount uint64) (*enr.R
return record, peerID, privateKey
}
func TestGetValidCustodyPeers(t *testing.T) {
func TestDataColumnsAdmissibleCustodyPeers(t *testing.T) {
genesisValidatorRoot := make([]byte, 32)
for i := 0; i < 32; i++ {
@@ -98,14 +98,14 @@ func TestGetValidCustodyPeers(t *testing.T) {
service.peers.Add(peer3Record, peer3ID, nil, network.DirOutbound)
service.peers.Add(peer4Record, peer4ID, nil, network.DirOutbound)
actual, err := service.GetValidCustodyPeers([]peer.ID{peer1ID, peer2ID, peer3ID, peer4ID})
actual, err := service.DataColumnsAdmissibleCustodyPeers([]peer.ID{peer1ID, peer2ID, peer3ID, peer4ID})
require.NoError(t, err)
expected := []peer.ID{peer1ID, peer2ID}
require.DeepSSZEqual(t, expected, actual)
}
func TestCustodyCountFromRemotePeer(t *testing.T) {
func TestDataColumnsCustodyCountFromRemotePeer(t *testing.T) {
const (
expectedENR uint64 = 7
expectedMetadata uint64 = 8
@@ -191,7 +191,7 @@ func TestCustodyCountFromRemotePeer(t *testing.T) {
}
// Retrieve the custody count from the remote peer.
actual := service.CustodyCountFromRemotePeer(pid)
actual := service.DataColumnsCustodyCountFromRemotePeer(pid)
// Verify the result.
require.Equal(t, tc.expected, actual)

View File

@@ -30,7 +30,7 @@ type P2P interface {
ConnectionHandler
PeersProvider
MetadataProvider
CustodyHandler
DataColumnsHandler
}
type Acceser interface {
@@ -113,7 +113,8 @@ type MetadataProvider interface {
MetadataSeq() uint64
}
type CustodyHandler interface {
CustodyCountFromRemotePeer(peer.ID) uint64
GetValidCustodyPeers([]peer.ID) ([]peer.ID, error)
type DataColumnsHandler interface {
DataColumnsCustodyCountFromRemotePeer(peer.ID) uint64
DataColumnsAdmissibleCustodyPeers([]peer.ID) ([]peer.ID, error)
DataColumnsAdmissibleSubnetSamplingPeers([]peer.ID) ([]peer.ID, error)
}

View File

@@ -185,10 +185,14 @@ func (*FakeP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.Disc
return true, 0
}
func (*FakeP2P) CustodyCountFromRemotePeer(peer.ID) uint64 {
func (*FakeP2P) DataColumnsCustodyCountFromRemotePeer(peer.ID) uint64 {
return 0
}
func (*FakeP2P) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) {
func (*FakeP2P) DataColumnsAdmissibleCustodyPeers(peers []peer.ID) ([]peer.ID, error) {
return peers, nil
}
func (*FakeP2P) DataColumnsAdmissibleSubnetSamplingPeers(peers []peer.ID) ([]peer.ID, error) {
return peers, nil
}

View File

@@ -445,7 +445,7 @@ func (*TestP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.Disc
return true, 0
}
func (s *TestP2P) CustodyCountFromRemotePeer(pid peer.ID) uint64 {
func (s *TestP2P) DataColumnsCustodyCountFromRemotePeer(pid peer.ID) uint64 {
// By default, we assume the peer custodies the minimum number of subnets.
custodyRequirement := params.BeaconConfig().CustodyRequirement
@@ -464,6 +464,10 @@ func (s *TestP2P) CustodyCountFromRemotePeer(pid peer.ID) uint64 {
return custodyCount
}
func (*TestP2P) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) {
func (*TestP2P) DataColumnsAdmissibleCustodyPeers(peers []peer.ID) ([]peer.ID, error) {
return peers, nil
}
func (*TestP2P) DataColumnsAdmissibleSubnetSamplingPeers(peers []peer.ID) ([]peer.ID, error) {
return peers, nil
}

View File

@@ -155,7 +155,7 @@ func (d *dataColumnSampler1D) refreshPeerInfo() {
d.prunePeerInfo(activePeers)
for _, pid := range activePeers {
csc := d.p2p.CustodyCountFromRemotePeer(pid)
csc := d.p2p.DataColumnsCustodyCountFromRemotePeer(pid)
columns, ok := d.columnFromPeer[pid]
columnsCount := uint64(len(columns))
@@ -428,7 +428,7 @@ func (d *dataColumnSampler1D) sampleDataColumnsFromPeer(
}
// Send the request to the peer.
roDataColumns, err := SendDataColumnSidecarByRoot(ctx, d.clock, d.p2p, pid, d.ctxMap, &req)
roDataColumns, err := SendDataColumnSidecarsByRootRequest(ctx, d.clock, d.p2p, pid, d.ctxMap, &req)
if err != nil {
log.WithError(err).Error("Failed to send data column sidecar by root")
return nil

View File

@@ -717,7 +717,7 @@ loop:
}
// Get the custody columns count from the peer.
custodyCount := f.p2p.CustodyCountFromRemotePeer(peer)
custodyCount := f.p2p.DataColumnsCustodyCountFromRemotePeer(peer)
// Get the custody columns from the peer.
remoteCustodyColumns, err := peerdas.CustodyColumns(nodeID, custodyCount)

View File

@@ -444,12 +444,12 @@ func (s *Service) fetchOriginColumns(pids []peer.ID) error {
return nil
}
shufflePeers(pids)
pids, err = s.cfg.P2P.GetValidCustodyPeers(pids)
pids, err = s.cfg.P2P.DataColumnsAdmissibleCustodyPeers(pids)
if err != nil {
return err
}
for i := range pids {
sidecars, err := sync.SendDataColumnSidecarByRoot(s.ctx, s.clock, s.cfg.P2P, pids[i], s.ctxMap, &req)
sidecars, err := sync.SendDataColumnSidecarsByRootRequest(s.ctx, s.clock, s.cfg.P2P, pids[i], s.ctxMap, &req)
if err != nil {
continue
}

View File

@@ -205,13 +205,13 @@ func (s *Service) processAndBroadcastBlock(ctx context.Context, b interfaces.Rea
}
if coreTime.PeerDASIsActive(b.Block().Slot()) {
request, err := s.pendingDataColumnRequestForBlock(blkRoot, b)
request, err := s.buildRequestsForMissingDataColumns(blkRoot, b)
if err != nil {
return err
}
if len(request) > 0 {
peers := s.getBestPeers()
peers, err = s.cfg.p2p.GetValidCustodyPeers(peers)
peers, err = s.cfg.p2p.DataColumnsAdmissibleCustodyPeers(peers)
if err != nil {
return err
}
@@ -219,7 +219,7 @@ func (s *Service) processAndBroadcastBlock(ctx context.Context, b interfaces.Rea
if peerCount == 0 {
return errors.Wrapf(errNoPeersForPending, "block root=%#x", blkRoot)
}
if err := s.sendAndSaveDataColumnSidecars(ctx, request, peers[rand.NewGenerator().Int()%peerCount], b); err != nil {
if err := s.requestAndSaveDataColumnSidecars(ctx, request, peers[rand.NewGenerator().Int()%peerCount], b); err != nil {
return err
}
}
@@ -314,8 +314,7 @@ func (s *Service) sendBatchRootRequest(ctx context.Context, roots [][32]byte, ra
// Remove duplicates (if any) from the list of roots.
roots = dedupRoots(roots)
// Reversly iterate through the list of roots to request blocks, and filter out roots that are already
// seen in pending blocks or being synced.
// Filters out in place roots that are already seen in pending blocks or being synced.
func() {
s.pendingQueueLock.RLock()
defer s.pendingQueueLock.RUnlock()
@@ -347,9 +346,9 @@ func (s *Service) sendBatchRootRequest(ctx context.Context, roots [][32]byte, ra
if peerDASIsActive {
var err error
bestPeers, err = s.cfg.p2p.GetValidCustodyPeers(bestPeers)
bestPeers, err = s.cfg.p2p.DataColumnsAdmissibleSubnetSamplingPeers(bestPeers)
if err != nil {
return errors.Wrap(err, "get valid custody peers")
return errors.Wrap(err, "data columns admissible subnet sampling peers")
}
}
@@ -380,7 +379,7 @@ func (s *Service) sendBatchRootRequest(ctx context.Context, roots [][32]byte, ra
}
// Send the request to the peer.
if err := s.sendRecentBeaconBlocksRequest(ctx, &req, pid); err != nil {
if err := s.sendBeaconBlocksRequest(ctx, &req, pid); err != nil {
tracing.AnnotateError(span, err)
log.WithError(err).Debug("Could not send recent block request")
}

View File

@@ -24,15 +24,22 @@ import (
"github.com/prysmaticlabs/prysm/v5/time/slots"
)
// sendRecentBeaconBlocksRequest sends a recent beacon blocks request to a peer to get
// those corresponding blocks from that peer.
func (s *Service) sendRecentBeaconBlocksRequest(ctx context.Context, requests *types.BeaconBlockByRootsReq, id peer.ID) error {
// sendBeaconBlocksRequest sends the `requests` beacon blocks by root requests to
// the peer with the given `id`. For each received block, it inserts the block into the
// pending queue. Then, for each received blocks, it checks if all corresponding blobs
// or data columns are stored, and, if not, sends the corresponding sidecar requests
// and stores the received sidecars.
func (s *Service) sendBeaconBlocksRequest(
ctx context.Context,
requests *types.BeaconBlockByRootsReq,
id peer.ID,
) error {
ctx, cancel := context.WithTimeout(ctx, respTimeout)
defer cancel()
requestedRoots := make(map[[32]byte]struct{})
requestedRoots := make(map[[fieldparams.RootLength]byte]bool)
for _, root := range *requests {
requestedRoots[root] = struct{}{}
requestedRoots[root] = true
}
blks, err := SendBeaconBlocksByRootRequest(ctx, s.cfg.clock, s.cfg.p2p, id, requests, func(blk interfaces.ReadOnlySignedBeaconBlock) error {
@@ -40,49 +47,70 @@ func (s *Service) sendRecentBeaconBlocksRequest(ctx context.Context, requests *t
if err != nil {
return err
}
if _, ok := requestedRoots[blkRoot]; !ok {
if ok := requestedRoots[blkRoot]; !ok {
return fmt.Errorf("received unexpected block with root %x", blkRoot)
}
s.pendingQueueLock.Lock()
defer s.pendingQueueLock.Unlock()
if err := s.insertBlockToPendingQueue(blk.Block().Slot(), blk, blkRoot); err != nil {
return err
}
return nil
})
// The following part deals with blobs and data columns (if any).
for _, blk := range blks {
// Skip blocks before deneb because they have no blob.
// Skip blocks before deneb because they have nor blobs neither data columns.
if blk.Version() < version.Deneb {
continue
}
blkRoot, err := blk.Block().HashTreeRoot()
if err != nil {
return err
}
if coreTime.PeerDASIsActive(blk.Block().Slot()) {
request, err := s.pendingDataColumnRequestForBlock(blkRoot, blk)
blockSlot := blk.Block().Slot()
peerDASIsActive := coreTime.PeerDASIsActive(blockSlot)
if peerDASIsActive {
// For the block, check if we store all the data columns we should custody,
// and build the corresponding data column sidecar requests if needed.
requests, err := s.buildRequestsForMissingDataColumns(blkRoot, blk)
if err != nil {
return errors.Wrap(err, "pending data column request for block")
}
if len(request) == 0 {
// We already store all the data columns we should custody, nothing to request.
if len(requests) == 0 {
continue
}
if err := s.sendAndSaveDataColumnSidecars(ctx, request, id, blk); err != nil {
if err := s.requestAndSaveDataColumnSidecars(ctx, requests, id, blk); err != nil {
return errors.Wrap(err, "send and save data column sidecars")
}
} else {
request, err := s.pendingBlobsRequestForBlock(blkRoot, blk)
if err != nil {
return errors.Wrap(err, "pending blobs request for block")
}
if len(request) == 0 {
continue
}
if err := s.sendAndSaveBlobSidecars(ctx, request, id, blk); err != nil {
return errors.Wrap(err, "send and save blob sidecars")
}
continue
}
request, err := s.pendingBlobsRequestForBlock(blkRoot, blk)
if err != nil {
return errors.Wrap(err, "pending blobs request for block")
}
if len(request) == 0 {
continue
}
if err := s.sendAndSaveBlobSidecars(ctx, request, id, blk); err != nil {
return errors.Wrap(err, "send and save blob sidecars")
}
}
return err
}
@@ -187,12 +215,19 @@ func (s *Service) sendAndSaveBlobSidecars(ctx context.Context, request types.Blo
return nil
}
func (s *Service) sendAndSaveDataColumnSidecars(ctx context.Context, request types.DataColumnSidecarsByRootReq, peerID peer.ID, block interfaces.ReadOnlySignedBeaconBlock) error {
// requestAndSaveDataColumnSidecars sends a data column sidecars by root request
// to a peer and saves the received sidecars.
func (s *Service) requestAndSaveDataColumnSidecars(
ctx context.Context,
request types.DataColumnSidecarsByRootReq,
peerID peer.ID,
block interfaces.ReadOnlySignedBeaconBlock,
) error {
if len(request) == 0 {
return nil
}
sidecars, err := SendDataColumnSidecarByRoot(ctx, s.cfg.clock, s.cfg.p2p, peerID, s.ctxMap, &request)
sidecars, err := SendDataColumnSidecarsByRootRequest(ctx, s.cfg.clock, s.cfg.p2p, peerID, s.ctxMap, &request)
if err != nil {
return err
}
@@ -201,6 +236,7 @@ func (s *Service) sendAndSaveDataColumnSidecars(ctx context.Context, request typ
if err != nil {
return err
}
for _, sidecar := range sidecars {
if err := verify.ColumnAlignsWithBlock(sidecar, RoBlock, s.newColumnVerifier); err != nil {
return err
@@ -214,6 +250,7 @@ func (s *Service) sendAndSaveDataColumnSidecars(ctx context.Context, request typ
return err
}
}
return nil
}
@@ -237,18 +274,55 @@ func (s *Service) pendingBlobsRequestForBlock(root [32]byte, b interfaces.ReadOn
return blobIdentifiers, nil
}
func (s *Service) pendingDataColumnRequestForBlock(root [32]byte, b interfaces.ReadOnlySignedBeaconBlock) (types.DataColumnSidecarsByRootReq, error) {
if b.Version() < version.Deneb {
return nil, nil // Block before deneb has no blob.
}
cc, err := b.Block().Body().BlobKzgCommitments()
if err != nil {
return nil, err
}
if len(cc) == 0 {
// buildRequestsForMissingDataColumns looks at the data columns we should custody and have via subnet sampling
// and that we don't actually store for a given block, and construct the corresponding data column sidecars by root requests.
func (s *Service) buildRequestsForMissingDataColumns(root [32]byte, block interfaces.ReadOnlySignedBeaconBlock) (types.DataColumnSidecarsByRootReq, error) {
// Block before deneb has nor blobs neither data columns.
if block.Version() < version.Deneb {
return nil, nil
}
return s.constructPendingColumnRequest(root)
// Get the blob commitments from the block.
commitments, err := block.Block().Body().BlobKzgCommitments()
if err != nil {
return nil, errors.Wrap(err, "blob KZG commitments")
}
// Nothing to build if there are no commitments.
if len(commitments) == 0 {
return nil, nil
}
// Retrieve the columns we store for the current root.
storedColumns, err := s.cfg.blobStorage.ColumnIndices(root)
if err != nil {
return nil, errors.Wrap(err, "column indices")
}
// Retrieve the columns we should custody.
nodeID := s.cfg.p2p.NodeID()
custodySubnetCount := peerdas.SubnetSamplingSize()
custodyColumns, err := peerdas.CustodyColumns(nodeID, custodySubnetCount)
if err != nil {
return nil, errors.Wrap(err, "custody columns")
}
custodyColumnCount := len(custodyColumns)
// Build the request for the we should custody and we don't actually store.
req := make(types.DataColumnSidecarsByRootReq, 0, custodyColumnCount)
for column := range custodyColumns {
isColumnStored := storedColumns[column]
if !isColumnStored {
req = append(req, &eth.DataColumnIdentifier{
BlockRoot: root[:],
ColumnIndex: column,
})
}
}
return req, nil
}
// constructPendingBlobsRequest creates a request for BlobSidecars by root, considering blobs already in DB.
@@ -264,34 +338,6 @@ func (s *Service) constructPendingBlobsRequest(root [32]byte, commitments int) (
return requestsForMissingIndices(stored, commitments, root), nil
}
func (s *Service) constructPendingColumnRequest(root [32]byte) (types.DataColumnSidecarsByRootReq, error) {
// Retrieve the storedColumns columns for the current root.
storedColumns, err := s.cfg.blobStorage.ColumnIndices(root)
if err != nil {
return nil, errors.Wrap(err, "column indices")
}
// Retrieve the columns we should custody.
custodiedColumns, err := peerdas.CustodyColumns(s.cfg.p2p.NodeID(), peerdas.CustodySubnetCount())
if err != nil {
return nil, errors.Wrap(err, "custody columns")
}
// Build the request for the missing columns.
req := make(types.DataColumnSidecarsByRootReq, 0, len(custodiedColumns))
for column := range custodiedColumns {
isColumnStored := storedColumns[column]
if !isColumnStored {
req = append(req, &eth.DataColumnIdentifier{
BlockRoot: root[:],
ColumnIndex: column,
})
}
}
return req, nil
}
// requestsForMissingIndices constructs a slice of BlobIdentifiers that are missing from
// local storage, based on a mapping that represents which indices are locally stored,
// and the highest expected index.

View File

@@ -253,7 +253,7 @@ func TestRecentBeaconBlocks_RPCRequestSent(t *testing.T) {
})
p1.Connect(p2)
require.NoError(t, r.sendRecentBeaconBlocksRequest(context.Background(), &expectedRoots, p2.PeerID()))
require.NoError(t, r.sendBeaconBlocksRequest(context.Background(), &expectedRoots, p2.PeerID()))
if util.WaitTimeout(&wg, 1*time.Second) {
t.Fatal("Did not receive stream within 1 sec")
@@ -328,7 +328,7 @@ func TestRecentBeaconBlocks_RPCRequestSent_IncorrectRoot(t *testing.T) {
})
p1.Connect(p2)
require.ErrorContains(t, "received unexpected block with root", r.sendRecentBeaconBlocksRequest(context.Background(), &expectedRoots, p2.PeerID()))
require.ErrorContains(t, "received unexpected block with root", r.sendBeaconBlocksRequest(context.Background(), &expectedRoots, p2.PeerID()))
}
func TestRecentBeaconBlocksRPCHandler_HandleZeroBlocks(t *testing.T) {

View File

@@ -209,7 +209,9 @@ func SendBlobSidecarByRoot(
return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, blobValidatorFromRootReq(req), max)
}
func SendDataColumnSidecarByRoot(
// SendDataColumnSidecarsByRootRequest sends a request for data column sidecars by root
// and returns the fetched data column sidecars.
func SendDataColumnSidecarsByRootRequest(
ctx context.Context,
tor blockchain.TemporalOracle,
p2pApi p2p.P2P,

View File

@@ -118,6 +118,9 @@ func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms
if seen {
return pubsub.ValidationIgnore, nil
}
// Verify the block being voted on is in the beacon chain.
// If not, store this attestation in the map of pending attestations.
if !s.validateBlockInAttestation(ctx, m) {
return pubsub.ValidationIgnore, nil
}
@@ -223,6 +226,8 @@ func (s *Service) validateAggregatedAtt(ctx context.Context, signed ethpb.Signed
return s.validateWithBatchVerifier(ctx, "aggregate", set)
}
// validateBlocksInAttestation checks if the block being voted on is in the beaconDB.
// If not, it store this attestation in the map of pending attestations.
func (s *Service) validateBlockInAttestation(ctx context.Context, satt ethpb.SignedAggregateAttAndProof) bool {
// Verify the block being voted and the processed state is in beaconDB. The block should have passed validation if it's in the beaconDB.
blockRoot := bytesutil.ToBytes32(satt.AggregateAttestationAndProof().AggregateVal().GetData().BeaconBlockRoot)

View File

@@ -10,6 +10,7 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/config/features"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
@@ -66,8 +67,9 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs
if dataColumnsIgnoreSlotMultiple != 0 && blockSlot%dataColumnsIgnoreSlotMultiple == 0 {
log.WithFields(logrus.Fields{
"slot": blockSlot,
"topic": msg.Topic,
"slot": blockSlot,
"columnIndex": ds.ColumnIndex,
"blockRoot": fmt.Sprintf("%#x", ds.BlockRoot()),
}).Warning("Voluntary ignore data column sidecar gossip")
return pubsub.ValidationIgnore, err
@@ -99,11 +101,17 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs
return pubsub.ValidationIgnore, err
}
if err := verifier.SidecarParentSeen(s.hasBadBlock); err != nil {
// If we haven't seen the parent, request it asynchronously.
go func() {
if err := s.sendBatchRootRequest(context.Background(), [][32]byte{ds.ParentRoot()}, rand.NewGenerator()); err != nil {
customCtx := context.Background()
parentRoot := ds.ParentRoot()
roots := [][fieldparams.RootLength]byte{parentRoot}
randGenerator := rand.NewGenerator()
if err := s.sendBatchRootRequest(customCtx, roots, randGenerator); err != nil {
log.WithError(err).WithFields(logging.DataColumnFields(ds)).Debug("Failed to send batch root request")
}
}()
return pubsub.ValidationIgnore, err
}
if err := verifier.SidecarParentValid(s.hasBadBlock); err != nil {

View File

@@ -2,7 +2,6 @@ package verification
import (
"context"
goErrors "errors"
"github.com/pkg/errors"
forkchoicetypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/forkchoice/types"
@@ -329,5 +328,5 @@ func columnToSignatureData(d blocks.RODataColumn) SignatureData {
}
func columnErrBuilder(baseErr error) error {
return goErrors.Join(ErrColumnInvalid, baseErr)
return errors.Wrap(baseErr, ErrColumnInvalid.Error())
}