Files
prysm/beacon-chain/sync/subscriber_beacon_blocks.go
Manu NALEPA 8191bb5711 Construct data column sidecars from the execution layer in parallel and add metrics (#16115)
**What type of PR is this?**
Optimisation

**What does this PR do? Why is it needed?**
While constructing data column sidecars from the execution layer is very
cheap compared to reconstructing data column sidecars from data column
sidecars, it is still efficient to run this construction in parallel.

(**Reminder:** Using `getBlobsV2`, all the cell proofs are present, but
only 64 (out of 128) cells are present. Recomputing the missing cells is
cheap, while reconstruction the missing proofs is expensive.)

This PR:
- adds some metrics
- ensure the construction is done in parallel

**Other notes for review**
Please read commit by commit

The red vertical lines represent the limit between before and after this
pull request
<img width="1575" height="603" alt="image"
src="https://github.com/user-attachments/assets/24811b1b-8e3c-4bf5-ac82-f920d385573a"
/>

The last commit transforms the bottom right histogram to summary, since
it makes no sense any more to have an histogram for values.

Please check "hide whitespace" so this PR is easier to review:
<img width="229" height="196" alt="image"
src="https://github.com/user-attachments/assets/548cb2f4-b6f4-41d1-b3b3-4d4c8554f390"
/>

Updated metrics:



Now, for every **non missed slot**, for a block **with at least one
commitment**, we have either:
```
[2025-12-10 10:02:12.93] DEBUG sync: Constructed data column sidecars from the execution client count=118 indices=0-5,7-16,18-27,29-35,37-46,48-49,51-82,84-100,102-106,108-125,127 iteration=0 proposerIndex=855082 root=0xf8f44e7d4cbc209b2ff2796c07fcf91e85ab45eebe145c4372017a18b25bf290 slot=1928961 type=BeaconBlock
```

either
```
[2025-12-10 10:02:25.69] DEBUG sync: No data column sidecars constructed from the execution client iteration=2 proposerIndex=1093657 root=0x64c2f6c31e369cd45f2edaf5524b64f4869e8148cd29fb84b5b8866be529eea3 slot=1928962 type=DataColumnSidecar
```
<img width="1581" height="957" alt="image"
src="https://github.com/user-attachments/assets/514dbdae-ef14-47e2-9127-502ac6d26bc0"
/>
<img width="1596" height="916" alt="image"
src="https://github.com/user-attachments/assets/343d4710-4191-49e8-98be-afe70d5ffe1c"
/>



**Acknowledgements**
- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description to this PR with sufficient context for
reviewers to understand this PR.
2025-12-16 16:27:32 +00:00

352 lines
11 KiB
Go

package sync
import (
"context"
"fmt"
"os"
"path"
"time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/transition/interop"
"github.com/OffchainLabs/prysm/v7/config/features"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/io/file"
"github.com/OffchainLabs/prysm/v7/runtime/version"
"github.com/OffchainLabs/prysm/v7/time/slots"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)
func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message) error {
signed, err := blocks.NewSignedBeaconBlock(msg)
if err != nil {
return err
}
if err := blocks.BeaconBlockIsNil(signed); err != nil {
return err
}
s.setSeenBlockIndexSlot(signed.Block().Slot(), signed.Block().ProposerIndex())
block := signed.Block()
root, err := block.HashTreeRoot()
if err != nil {
return err
}
roBlock, err := blocks.NewROBlockWithRoot(signed, root)
if err != nil {
return errors.Wrap(err, "new ro block with root")
}
go s.processSidecarsFromExecutionFromBlock(ctx, roBlock)
if err := s.cfg.chain.ReceiveBlock(ctx, signed, root, nil); err != nil {
if blockchain.IsInvalidBlock(err) {
r := blockchain.InvalidBlockRoot(err)
if r != [32]byte{} {
s.setBadBlock(ctx, r) // Setting head block as bad.
} else {
// TODO(13721): Remove this once we can deprecate the flag.
interop.WriteBlockToDisk(signed, true /*failed*/)
saveInvalidBlockToTemp(signed)
s.setBadBlock(ctx, root)
}
}
// Set the returned invalid ancestors as bad.
for _, root := range blockchain.InvalidAncestorRoots(err) {
s.setBadBlock(ctx, root)
}
return err
}
if err := s.processPendingAttsForBlock(ctx, root); err != nil {
return errors.Wrap(err, "process pending atts for block")
}
return nil
}
// processSidecarsFromExecutionFromBlock retrieves (if available) sidecars data from the execution client,
// builds corresponding sidecars, save them to the storage, and broadcasts them over P2P if necessary.
func (s *Service) processSidecarsFromExecutionFromBlock(ctx context.Context, roBlock blocks.ROBlock) {
if roBlock.Version() >= version.Fulu {
if err := s.processDataColumnSidecarsFromExecution(ctx, peerdas.PopulateFromBlock(roBlock)); err != nil {
log.WithError(err).Error("Failed to process data column sidecars from execution")
return
}
return
}
if roBlock.Version() >= version.Deneb {
s.processBlobSidecarsFromExecution(ctx, roBlock)
return
}
}
// processBlobSidecarsFromExecution retrieves (if available) blob sidecars data from the execution client,
// builds corresponding sidecars, save them to the storage, and broadcasts them over P2P if necessary.
func (s *Service) processBlobSidecarsFromExecution(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock) {
startTime, err := slots.StartTime(s.cfg.clock.GenesisTime(), block.Block().Slot())
if err != nil {
log.WithError(err).Error("Failed to convert slot to time")
}
blockRoot, err := block.Block().HashTreeRoot()
if err != nil {
log.WithError(err).Error("Failed to calculate block root")
return
}
if s.cfg.blobStorage == nil {
return
}
summary := s.cfg.blobStorage.Summary(blockRoot)
cmts, err := block.Block().Body().BlobKzgCommitments()
if err != nil {
log.WithError(err).Error("Failed to read commitments from block")
return
}
for i := range cmts {
if summary.HasIndex(uint64(i)) {
blobExistedInDBTotal.Inc()
}
}
// Reconstruct blob sidecars from the EL
blobSidecars, err := s.cfg.executionReconstructor.ReconstructBlobSidecars(ctx, block, blockRoot, summary.HasIndex)
if err != nil {
log.WithError(err).Error("Failed to reconstruct blob sidecars")
return
}
if len(blobSidecars) == 0 {
return
}
// Refresh indices as new blobs may have been added to the db
summary = s.cfg.blobStorage.Summary(blockRoot)
// Broadcast blob sidecars first than save them to the db
for _, sidecar := range blobSidecars {
// Don't broadcast the blob if it has appeared on disk.
if summary.HasIndex(sidecar.Index) {
continue
}
if err := s.cfg.p2p.BroadcastBlob(ctx, sidecar.Index, sidecar.BlobSidecar); err != nil {
log.WithFields(blobFields(sidecar.ROBlob)).WithError(err).Error("Failed to broadcast blob sidecar")
}
}
for _, sidecar := range blobSidecars {
if summary.HasIndex(sidecar.Index) {
continue
}
if err := s.subscribeBlob(ctx, sidecar); err != nil {
log.WithFields(blobFields(sidecar.ROBlob)).WithError(err).Error("Failed to receive blob")
continue
}
blobRecoveredFromELTotal.Inc()
fields := blobFields(sidecar.ROBlob)
fields["sinceSlotStartTime"] = s.cfg.clock.Now().Sub(startTime)
log.WithFields(fields).Debug("Processed blob sidecar from EL")
}
}
// processDataColumnSidecarsFromExecution retrieves (if available) data column sidecars data from the execution client,
// builds corresponding sidecars, save them to the storage, and broadcasts them over P2P if necessary.
func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, source peerdas.ConstructionPopulator) error {
key := fmt.Sprintf("%#x", source.Root())
if _, err, _ := s.columnSidecarsExecSingleFlight.Do(key, func() (any, error) {
const delay = 250 * time.Millisecond
secondsPerHalfSlot := time.Duration(params.BeaconConfig().SecondsPerSlot/2) * time.Second
commitments, err := source.Commitments()
if err != nil {
return nil, errors.Wrap(err, "blob kzg commitments")
}
// Exit early if there are no commitments.
if len(commitments) == 0 {
return nil, nil
}
// Retrieve the indices of sidecars we should sample.
columnIndicesToSample, err := s.columnIndicesToSample()
if err != nil {
return nil, errors.Wrap(err, "column indices to sample")
}
ctx, cancel := context.WithTimeout(ctx, secondsPerHalfSlot)
defer cancel()
log := log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", source.Root()),
"slot": source.Slot(),
"proposerIndex": source.ProposerIndex(),
"type": source.Type(),
})
var constructedSidecarCount uint64
for iteration := uint64(0); ; /*no stop condition*/ iteration++ {
log = log.WithField("iteration", iteration)
// Exit early if all sidecars to sample have been seen.
if s.haveAllSidecarsBeenSeen(source.Slot(), source.ProposerIndex(), columnIndicesToSample) {
if iteration > 0 && constructedSidecarCount == 0 {
log.Debug("No data column sidecars constructed from the execution client")
}
return nil, nil
}
if iteration == 0 {
dataColumnsRecoveredFromELAttempts.Inc()
}
// Try to reconstruct data column constructedSidecars from the execution client.
constructedSidecars, err := s.cfg.executionReconstructor.ConstructDataColumnSidecars(ctx, source)
if err != nil {
return nil, errors.Wrap(err, "reconstruct data column sidecars")
}
// No sidecars are retrieved from the EL, retry later
constructedSidecarCount = uint64(len(constructedSidecars))
if constructedSidecarCount == 0 {
if ctx.Err() != nil {
return nil, ctx.Err()
}
time.Sleep(delay)
continue
}
dataColumnsRecoveredFromELTotal.Inc()
// Boundary check.
if constructedSidecarCount != fieldparams.NumberOfColumns {
return nil, errors.Errorf("reconstruct data column sidecars returned %d sidecars, expected %d - should never happen", constructedSidecarCount, fieldparams.NumberOfColumns)
}
unseenIndices, err := s.broadcastAndReceiveUnseenDataColumnSidecars(ctx, source.Slot(), source.ProposerIndex(), columnIndicesToSample, constructedSidecars)
if err != nil {
return nil, errors.Wrap(err, "broadcast and receive unseen data column sidecars")
}
log.WithFields(logrus.Fields{
"count": len(unseenIndices),
"indices": helpers.SortedPrettySliceFromMap(unseenIndices),
}).Debug("Constructed data column sidecars from the execution client")
dataColumnSidecarsObtainedViaELCount.Observe(float64(len(unseenIndices)))
return nil, nil
}
}); err != nil {
return err
}
return nil
}
// broadcastAndReceiveUnseenDataColumnSidecars broadcasts and receives unseen data column sidecars.
func (s *Service) broadcastAndReceiveUnseenDataColumnSidecars(
ctx context.Context,
slot primitives.Slot,
proposerIndex primitives.ValidatorIndex,
neededIndices map[uint64]bool,
sidecars []blocks.VerifiedRODataColumn,
) (map[uint64]bool, error) {
// Compute sidecars we need to broadcast and receive.
unseenSidecars := make([]blocks.VerifiedRODataColumn, 0, len(sidecars))
unseenIndices := make(map[uint64]bool, len(sidecars))
for _, sidecar := range sidecars {
// Skip data column sidecars we don't need.
if !neededIndices[sidecar.Index] {
continue
}
// Skip already seen data column sidecars.
if s.hasSeenDataColumnIndex(slot, proposerIndex, sidecar.Index) {
continue
}
unseenSidecars = append(unseenSidecars, sidecar)
unseenIndices[sidecar.Index] = true
}
// Broadcast all the data column sidecars we reconstructed but did not see via gossip (non blocking).
if err := s.cfg.p2p.BroadcastDataColumnSidecars(ctx, unseenSidecars); err != nil {
return nil, errors.Wrap(err, "broadcast data column sidecars")
}
// Receive data column sidecars.
if err := s.receiveDataColumnSidecars(ctx, unseenSidecars); err != nil {
return nil, errors.Wrap(err, "receive data column sidecars")
}
return unseenIndices, nil
}
// haveAllSidecarsBeenSeen checks if all sidecars for the given slot, proposer index, and data column indices have been seen.
func (s *Service) haveAllSidecarsBeenSeen(slot primitives.Slot, proposerIndex primitives.ValidatorIndex, indices map[uint64]bool) bool {
for index := range indices {
if !s.hasSeenDataColumnIndex(slot, proposerIndex, index) {
return false
}
}
return true
}
// columnIndicesToSample returns the data column indices we should sample for the node.
func (s *Service) columnIndicesToSample() (map[uint64]bool, error) {
// Retrieve our node ID.
nodeID := s.cfg.p2p.NodeID()
// Get the custody group sampling size for the node.
custodyGroupCount, err := s.cfg.p2p.CustodyGroupCount(s.ctx)
if err != nil {
return nil, errors.Wrap(err, "custody group count")
}
// Compute the sampling size.
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/das-core.md#custody-sampling
samplesPerSlot := params.BeaconConfig().SamplesPerSlot
samplingSize := max(samplesPerSlot, custodyGroupCount)
// Get the peer info for the node.
peerInfo, _, err := peerdas.Info(nodeID, samplingSize)
if err != nil {
return nil, errors.Wrap(err, "peer info")
}
return peerInfo.CustodyColumns, nil
}
// WriteInvalidBlockToDisk as a block ssz. Writes to temp directory.
func saveInvalidBlockToTemp(block interfaces.ReadOnlySignedBeaconBlock) {
if !features.Get().SaveInvalidBlock {
return
}
filename := fmt.Sprintf("beacon_block_%d.ssz", block.Block().Slot())
fp := path.Join(os.TempDir(), filename)
log.Warnf("Writing invalid block to disk at %s", fp)
enc, err := block.MarshalSSZ()
if err != nil {
log.WithError(err).Error("Failed to ssz encode block")
return
}
if err := file.WriteFile(fp, enc); err != nil {
log.WithError(err).Error("Failed to write to disk")
}
}