Add the --disable-get-blobs-v2 flag and fixes #16171 (#16155)

**What type of PR is this?**
Feature + Bugfix

**What does this PR do? Why is it needed?**
Starting at Fusaka, the beacon node can pull blobs with the
`engine_getBlobsV2` API from the execution layer.
This reduces by a lot the burden on the beacon node. However, the beacon
node should be able to work 100% correctly without this execution layer
help.

This PR introduces the `--disable-get-blobs-v2` flag to simulate a 0%
success rate of this engine API.

This PR also fixes:
- https://github.com/OffchainLabs/prysm/issues/16171

Please read commit by commit with commit messages.

**How to test it:**
For the `--disable-get-blobs-v2` part:

Run the beacon node with the `--disable-get-blobs-v2` flag in DEBUG
mode.
For every block with commitments, the following log should be displayed:
```
[2025-12-19 15:36:25.49] DEBUG sync: No data column sidecars constructed from the execution client ...
```

And the following log should **never** be displayed:
```
[2026-01-05 10:19:00.55] DEBUG sync: Constructed data column sidecars from the execution client count=...
```

For the #16171 part:
- No ERROR log showed in the linked issue should never be displayed.

**Acknowledgements**
- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
This commit is contained in:
Manu NALEPA
2026-01-05 23:29:15 +01:00
committed by GitHub
parent c5b3d3531c
commit 792fa22099
10 changed files with 88 additions and 33 deletions

View File

@@ -40,6 +40,7 @@ go_library(
"//beacon-chain/state/state-native:go_default_library", "//beacon-chain/state/state-native:go_default_library",
"//beacon-chain/state/stategen:go_default_library", "//beacon-chain/state/stategen:go_default_library",
"//beacon-chain/verification:go_default_library", "//beacon-chain/verification:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//config/fieldparams:go_default_library", "//config/fieldparams:go_default_library",
"//config/params:go_default_library", "//config/params:go_default_library",
"//consensus-types/blocks:go_default_library", "//consensus-types/blocks:go_default_library",

View File

@@ -11,6 +11,7 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas" "github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v7/beacon-chain/execution/types" "github.com/OffchainLabs/prysm/v7/beacon-chain/execution/types"
"github.com/OffchainLabs/prysm/v7/beacon-chain/verification" "github.com/OffchainLabs/prysm/v7/beacon-chain/verification"
"github.com/OffchainLabs/prysm/v7/cmd/beacon-chain/flags"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams" fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params" "github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks" "github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
@@ -538,6 +539,10 @@ func (s *Service) GetBlobsV2(ctx context.Context, versionedHashes []common.Hash)
return nil, errors.New(fmt.Sprintf("%s is not supported", GetBlobsV2)) return nil, errors.New(fmt.Sprintf("%s is not supported", GetBlobsV2))
} }
if flags.Get().DisableGetBlobsV2 {
return []*pb.BlobAndProofV2{}, nil
}
result := make([]*pb.BlobAndProofV2, len(versionedHashes)) result := make([]*pb.BlobAndProofV2, len(versionedHashes))
err := s.rpcClient.CallContext(ctx, &result, GetBlobsV2, versionedHashes) err := s.rpcClient.CallContext(ctx, &result, GetBlobsV2, versionedHashes)

View File

@@ -48,7 +48,14 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)
return errors.Wrap(err, "new ro block with root") return errors.Wrap(err, "new ro block with root")
} }
go s.processSidecarsFromExecutionFromBlock(ctx, roBlock) go func() {
if err := s.processSidecarsFromExecutionFromBlock(ctx, roBlock); err != nil {
log.WithError(err).WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", root),
"slot": block.Slot(),
}).Error("Failed to process sidecars from execution from block")
}
}()
if err := s.cfg.chain.ReceiveBlock(ctx, signed, root, nil); err != nil { if err := s.cfg.chain.ReceiveBlock(ctx, signed, root, nil); err != nil {
if blockchain.IsInvalidBlock(err) { if blockchain.IsInvalidBlock(err) {
@@ -69,28 +76,37 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)
} }
return err return err
} }
if err := s.processPendingAttsForBlock(ctx, root); err != nil { if err := s.processPendingAttsForBlock(ctx, root); err != nil {
return errors.Wrap(err, "process pending atts for block") return errors.Wrap(err, "process pending atts for block")
} }
return nil return nil
} }
// processSidecarsFromExecutionFromBlock retrieves (if available) sidecars data from the execution client, // processSidecarsFromExecutionFromBlock retrieves (if available) sidecars data from the execution client,
// builds corresponding sidecars, save them to the storage, and broadcasts them over P2P if necessary. // builds corresponding sidecars, save them to the storage, and broadcasts them over P2P if necessary.
func (s *Service) processSidecarsFromExecutionFromBlock(ctx context.Context, roBlock blocks.ROBlock) { func (s *Service) processSidecarsFromExecutionFromBlock(ctx context.Context, roBlock blocks.ROBlock) error {
if roBlock.Version() >= version.Fulu { if roBlock.Version() >= version.Fulu {
if err := s.processDataColumnSidecarsFromExecution(ctx, peerdas.PopulateFromBlock(roBlock)); err != nil { if err := s.processDataColumnSidecarsFromExecution(ctx, peerdas.PopulateFromBlock(roBlock)); err != nil {
log.WithError(err).Error("Failed to process data column sidecars from execution") // Do not log if the context was cancelled on purpose.
return // (Still log other context errors such as deadlines exceeded).
if errors.Is(err, context.Canceled) {
return nil
}
return errors.Wrap(err, "process data column sidecars from execution")
} }
return return nil
} }
if roBlock.Version() >= version.Deneb { if roBlock.Version() >= version.Deneb {
s.processBlobSidecarsFromExecution(ctx, roBlock) s.processBlobSidecarsFromExecution(ctx, roBlock)
return return nil
} }
return nil
} }
// processBlobSidecarsFromExecution retrieves (if available) blob sidecars data from the execution client, // processBlobSidecarsFromExecution retrieves (if available) blob sidecars data from the execution client,
@@ -168,7 +184,6 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
key := fmt.Sprintf("%#x", source.Root()) key := fmt.Sprintf("%#x", source.Root())
if _, err, _ := s.columnSidecarsExecSingleFlight.Do(key, func() (any, error) { if _, err, _ := s.columnSidecarsExecSingleFlight.Do(key, func() (any, error) {
const delay = 250 * time.Millisecond const delay = 250 * time.Millisecond
secondsPerHalfSlot := time.Duration(params.BeaconConfig().SecondsPerSlot/2) * time.Second
commitments, err := source.Commitments() commitments, err := source.Commitments()
if err != nil { if err != nil {
@@ -186,9 +201,6 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
return nil, errors.Wrap(err, "column indices to sample") return nil, errors.Wrap(err, "column indices to sample")
} }
ctx, cancel := context.WithTimeout(ctx, secondsPerHalfSlot)
defer cancel()
log := log.WithFields(logrus.Fields{ log := log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", source.Root()), "root": fmt.Sprintf("%#x", source.Root()),
"slot": source.Slot(), "slot": source.Slot(),
@@ -209,6 +221,11 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
return nil, nil return nil, nil
} }
// Return if the context is done.
if ctx.Err() != nil {
return nil, ctx.Err()
}
if iteration == 0 { if iteration == 0 {
dataColumnsRecoveredFromELAttempts.Inc() dataColumnsRecoveredFromELAttempts.Inc()
} }
@@ -220,20 +237,10 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
} }
// No sidecars are retrieved from the EL, retry later // No sidecars are retrieved from the EL, retry later
constructedSidecarCount = uint64(len(constructedSidecars)) constructedCount := uint64(len(constructedSidecars))
if constructedSidecarCount == 0 {
if ctx.Err() != nil {
return nil, ctx.Err()
}
time.Sleep(delay)
continue
}
dataColumnsRecoveredFromELTotal.Inc()
// Boundary check. // Boundary check.
if constructedSidecarCount != fieldparams.NumberOfColumns { if constructedSidecarCount > 0 && constructedSidecarCount != fieldparams.NumberOfColumns {
return nil, errors.Errorf("reconstruct data column sidecars returned %d sidecars, expected %d - should never happen", constructedSidecarCount, fieldparams.NumberOfColumns) return nil, errors.Errorf("reconstruct data column sidecars returned %d sidecars, expected %d - should never happen", constructedSidecarCount, fieldparams.NumberOfColumns)
} }
@@ -242,14 +249,24 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
return nil, errors.Wrap(err, "broadcast and receive unseen data column sidecars") return nil, errors.Wrap(err, "broadcast and receive unseen data column sidecars")
} }
log.WithFields(logrus.Fields{ if constructedCount > 0 {
"count": len(unseenIndices), dataColumnsRecoveredFromELTotal.Inc()
"indices": helpers.SortedPrettySliceFromMap(unseenIndices),
}).Debug("Constructed data column sidecars from the execution client")
dataColumnSidecarsObtainedViaELCount.Observe(float64(len(unseenIndices))) log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", source.Root()),
"slot": source.Slot(),
"proposerIndex": source.ProposerIndex(),
"iteration": iteration,
"type": source.Type(),
"count": len(unseenIndices),
"indices": helpers.SortedPrettySliceFromMap(unseenIndices),
}).Debug("Constructed data column sidecars from the execution client")
return nil, nil return nil, nil
}
// Wait before retrying.
time.Sleep(delay)
} }
}); err != nil { }); err != nil {
return err return err
@@ -284,6 +301,11 @@ func (s *Service) broadcastAndReceiveUnseenDataColumnSidecars(
unseenIndices[sidecar.Index] = true unseenIndices[sidecar.Index] = true
} }
// Exit early if there are no nothing to broadcast or receive.
if len(unseenSidecars) == 0 {
return nil, nil
}
// Broadcast all the data column sidecars we reconstructed but did not see via gossip (non blocking). // Broadcast all the data column sidecars we reconstructed but did not see via gossip (non blocking).
if err := s.cfg.p2p.BroadcastDataColumnSidecars(ctx, unseenSidecars); err != nil { if err := s.cfg.p2p.BroadcastDataColumnSidecars(ctx, unseenSidecars); err != nil {
return nil, errors.Wrap(err, "broadcast data column sidecars") return nil, errors.Wrap(err, "broadcast data column sidecars")

View File

@@ -194,7 +194,8 @@ func TestProcessSidecarsFromExecutionFromBlock(t *testing.T) {
}, },
seenBlobCache: lruwrpr.New(1), seenBlobCache: lruwrpr.New(1),
} }
s.processSidecarsFromExecutionFromBlock(t.Context(), roBlock) err := s.processSidecarsFromExecutionFromBlock(t.Context(), roBlock)
require.NoError(t, err)
require.Equal(t, tt.expectedBlobCount, len(chainService.Blobs)) require.Equal(t, tt.expectedBlobCount, len(chainService.Blobs))
}) })
} }
@@ -293,7 +294,8 @@ func TestProcessSidecarsFromExecutionFromBlock(t *testing.T) {
roBlock, err := blocks.NewROBlock(sb) roBlock, err := blocks.NewROBlock(sb)
require.NoError(t, err) require.NoError(t, err)
s.processSidecarsFromExecutionFromBlock(t.Context(), roBlock) err = s.processSidecarsFromExecutionFromBlock(t.Context(), roBlock)
require.NoError(t, err)
require.Equal(t, tt.expectedDataColumnCount, len(chainService.DataColumns)) require.Equal(t, tt.expectedDataColumnCount, len(chainService.DataColumns))
}) })
} }

View File

@@ -25,12 +25,12 @@ func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) e
} }
if err := s.receiveDataColumnSidecar(ctx, sidecar); err != nil { if err := s.receiveDataColumnSidecar(ctx, sidecar); err != nil {
return errors.Wrap(err, "receive data column sidecar") return wrapDataColumnError(sidecar, "receive data column sidecar", err)
} }
wg.Go(func() error { wg.Go(func() error {
if err := s.processDataColumnSidecarsFromReconstruction(ctx, sidecar); err != nil { if err := s.processDataColumnSidecarsFromReconstruction(ctx, sidecar); err != nil {
return errors.Wrap(err, "process data column sidecars from reconstruction") return wrapDataColumnError(sidecar, "process data column sidecars from reconstruction", err)
} }
return nil return nil
@@ -38,7 +38,13 @@ func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) e
wg.Go(func() error { wg.Go(func() error {
if err := s.processDataColumnSidecarsFromExecution(ctx, peerdas.PopulateFromSidecar(sidecar)); err != nil { if err := s.processDataColumnSidecarsFromExecution(ctx, peerdas.PopulateFromSidecar(sidecar)); err != nil {
return errors.Wrap(err, "process data column sidecars from execution") if errors.Is(err, context.Canceled) {
// Do not log if the context was cancelled on purpose.
// (Still log other context errors such as deadlines exceeded).
return nil
}
return wrapDataColumnError(sidecar, "process data column sidecars from execution", err)
} }
return nil return nil
@@ -110,3 +116,7 @@ func (s *Service) allDataColumnSubnets(_ primitives.Slot) map[uint64]bool {
return allSubnets return allSubnets
} }
func wrapDataColumnError(sidecar blocks.VerifiedRODataColumn, message string, err error) error {
return fmt.Errorf("%s - slot %d, root %s: %w", message, sidecar.SignedBlockHeader.Header.Slot, fmt.Sprintf("%#x", sidecar.BlockRoot()), err)
}

View File

@@ -0,0 +1,2 @@
### Added
- `--disable-get-blobs-v2` flag.

View File

@@ -356,4 +356,9 @@ var (
Usage: "A comma-separated list of exponents (of 2) in decreasing order, defining the state diff hierarchy levels. The last exponent must be greater than or equal to 5.", Usage: "A comma-separated list of exponents (of 2) in decreasing order, defining the state diff hierarchy levels. The last exponent must be greater than or equal to 5.",
Value: cli.NewIntSlice(21, 18, 16, 13, 11, 9, 5), Value: cli.NewIntSlice(21, 18, 16, 13, 11, 9, 5),
} }
// DisableGetBlobsV2 disables the engine_getBlobsV2 usage.
DisableGetBlobsV2 = &cli.BoolFlag{
Name: "disable-get-blobs-v2",
Usage: "Disables the engine_getBlobsV2 usage.",
}
) )

View File

@@ -17,6 +17,7 @@ type GlobalFlags struct {
SubscribeToAllSubnets bool SubscribeToAllSubnets bool
Supernode bool Supernode bool
SemiSupernode bool SemiSupernode bool
DisableGetBlobsV2 bool
MinimumSyncPeers int MinimumSyncPeers int
MinimumPeersPerSubnet int MinimumPeersPerSubnet int
MaxConcurrentDials int MaxConcurrentDials int
@@ -72,6 +73,11 @@ func ConfigureGlobalFlags(ctx *cli.Context) error {
cfg.SemiSupernode = true cfg.SemiSupernode = true
} }
if ctx.Bool(DisableGetBlobsV2.Name) {
log.Warning("Disabling `engine_getBlobsV2` API")
cfg.DisableGetBlobsV2 = true
}
// State-diff-exponents // State-diff-exponents
cfg.StateDiffExponents = ctx.IntSlice(StateDiffExponents.Name) cfg.StateDiffExponents = ctx.IntSlice(StateDiffExponents.Name)
if features.Get().EnableStateDiff { if features.Get().EnableStateDiff {

View File

@@ -148,6 +148,7 @@ var appFlags = []cli.Flag{
flags.SlasherDirFlag, flags.SlasherDirFlag,
flags.SlasherFlag, flags.SlasherFlag,
flags.JwtId, flags.JwtId,
flags.DisableGetBlobsV2,
storage.BlobStoragePathFlag, storage.BlobStoragePathFlag,
storage.DataColumnStoragePathFlag, storage.DataColumnStoragePathFlag,
storage.BlobStorageLayout, storage.BlobStorageLayout,

View File

@@ -169,6 +169,7 @@ var appHelpFlagGroups = []flagGroup{
flags.ExecutionJWTSecretFlag, flags.ExecutionJWTSecretFlag,
flags.JwtId, flags.JwtId,
flags.InteropMockEth1DataVotesFlag, flags.InteropMockEth1DataVotesFlag,
flags.DisableGetBlobsV2,
}, },
}, },
{ // Flags relevant to configuring beacon chain monitoring. { // Flags relevant to configuring beacon chain monitoring.