From 792fa22099bd6140fd09c30310e9fcb34cc76ea0 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Mon, 5 Jan 2026 23:29:15 +0100 Subject: [PATCH] Add the `--disable-get-blobs-v2` flag and fixes #16171 (#16155) **What type of PR is this?** Feature + Bugfix **What does this PR do? Why is it needed?** Starting at Fusaka, the beacon node can pull blobs with the `engine_getBlobsV2` API from the execution layer. This reduces by a lot the burden on the beacon node. However, the beacon node should be able to work 100% correctly without this execution layer help. This PR introduces the `--disable-get-blobs-v2` flag to simulate a 0% success rate of this engine API. This PR also fixes: - https://github.com/OffchainLabs/prysm/issues/16171 Please read commit by commit with commit messages. **How to test it:** For the `--disable-get-blobs-v2` part: Run the beacon node with the `--disable-get-blobs-v2` flag in DEBUG mode. For every block with commitments, the following log should be displayed: ``` [2025-12-19 15:36:25.49] DEBUG sync: No data column sidecars constructed from the execution client ... ``` And the following log should **never** be displayed: ``` [2026-01-05 10:19:00.55] DEBUG sync: Constructed data column sidecars from the execution client count=... ``` For the #16171 part: - No ERROR log showed in the linked issue should never be displayed. **Acknowledgements** - [x] I have read [CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md). - [x] I have included a uniquely named [changelog fragment file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd). - [x] I have added a description with sufficient context for reviewers to understand this PR. - [x] I have tested that my changes work as expected and I added a testing plan to the PR description (if applicable). --- beacon-chain/execution/BUILD.bazel | 1 + beacon-chain/execution/engine_client.go | 5 ++ beacon-chain/sync/subscriber_beacon_blocks.go | 78 ++++++++++++------- .../sync/subscriber_beacon_blocks_test.go | 6 +- .../sync/subscriber_data_column_sidecar.go | 16 +++- changelog/manu_disable_get_blobs_v2.md | 2 + cmd/beacon-chain/flags/base.go | 5 ++ cmd/beacon-chain/flags/config.go | 6 ++ cmd/beacon-chain/main.go | 1 + cmd/beacon-chain/usage.go | 1 + 10 files changed, 88 insertions(+), 33 deletions(-) create mode 100644 changelog/manu_disable_get_blobs_v2.md diff --git a/beacon-chain/execution/BUILD.bazel b/beacon-chain/execution/BUILD.bazel index 93d7343027..4d4ce47689 100644 --- a/beacon-chain/execution/BUILD.bazel +++ b/beacon-chain/execution/BUILD.bazel @@ -40,6 +40,7 @@ go_library( "//beacon-chain/state/state-native:go_default_library", "//beacon-chain/state/stategen:go_default_library", "//beacon-chain/verification:go_default_library", + "//cmd/beacon-chain/flags:go_default_library", "//config/fieldparams:go_default_library", "//config/params:go_default_library", "//consensus-types/blocks:go_default_library", diff --git a/beacon-chain/execution/engine_client.go b/beacon-chain/execution/engine_client.go index fa63871ad0..82eb5ff634 100644 --- a/beacon-chain/execution/engine_client.go +++ b/beacon-chain/execution/engine_client.go @@ -11,6 +11,7 @@ import ( "github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas" "github.com/OffchainLabs/prysm/v7/beacon-chain/execution/types" "github.com/OffchainLabs/prysm/v7/beacon-chain/verification" + "github.com/OffchainLabs/prysm/v7/cmd/beacon-chain/flags" fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams" "github.com/OffchainLabs/prysm/v7/config/params" "github.com/OffchainLabs/prysm/v7/consensus-types/blocks" @@ -538,6 +539,10 @@ func (s *Service) GetBlobsV2(ctx context.Context, versionedHashes []common.Hash) return nil, errors.New(fmt.Sprintf("%s is not supported", GetBlobsV2)) } + if flags.Get().DisableGetBlobsV2 { + return []*pb.BlobAndProofV2{}, nil + } + result := make([]*pb.BlobAndProofV2, len(versionedHashes)) err := s.rpcClient.CallContext(ctx, &result, GetBlobsV2, versionedHashes) diff --git a/beacon-chain/sync/subscriber_beacon_blocks.go b/beacon-chain/sync/subscriber_beacon_blocks.go index 14798b2fc7..0938acb581 100644 --- a/beacon-chain/sync/subscriber_beacon_blocks.go +++ b/beacon-chain/sync/subscriber_beacon_blocks.go @@ -48,7 +48,14 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message) return errors.Wrap(err, "new ro block with root") } - go s.processSidecarsFromExecutionFromBlock(ctx, roBlock) + go func() { + if err := s.processSidecarsFromExecutionFromBlock(ctx, roBlock); err != nil { + log.WithError(err).WithFields(logrus.Fields{ + "root": fmt.Sprintf("%#x", root), + "slot": block.Slot(), + }).Error("Failed to process sidecars from execution from block") + } + }() if err := s.cfg.chain.ReceiveBlock(ctx, signed, root, nil); err != nil { if blockchain.IsInvalidBlock(err) { @@ -69,28 +76,37 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message) } return err } + if err := s.processPendingAttsForBlock(ctx, root); err != nil { return errors.Wrap(err, "process pending atts for block") } + return nil } // processSidecarsFromExecutionFromBlock retrieves (if available) sidecars data from the execution client, // builds corresponding sidecars, save them to the storage, and broadcasts them over P2P if necessary. -func (s *Service) processSidecarsFromExecutionFromBlock(ctx context.Context, roBlock blocks.ROBlock) { +func (s *Service) processSidecarsFromExecutionFromBlock(ctx context.Context, roBlock blocks.ROBlock) error { if roBlock.Version() >= version.Fulu { if err := s.processDataColumnSidecarsFromExecution(ctx, peerdas.PopulateFromBlock(roBlock)); err != nil { - log.WithError(err).Error("Failed to process data column sidecars from execution") - return + // Do not log if the context was cancelled on purpose. + // (Still log other context errors such as deadlines exceeded). + if errors.Is(err, context.Canceled) { + return nil + } + + return errors.Wrap(err, "process data column sidecars from execution") } - return + return nil } if roBlock.Version() >= version.Deneb { s.processBlobSidecarsFromExecution(ctx, roBlock) - return + return nil } + + return nil } // processBlobSidecarsFromExecution retrieves (if available) blob sidecars data from the execution client, @@ -168,7 +184,6 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so key := fmt.Sprintf("%#x", source.Root()) if _, err, _ := s.columnSidecarsExecSingleFlight.Do(key, func() (any, error) { const delay = 250 * time.Millisecond - secondsPerHalfSlot := time.Duration(params.BeaconConfig().SecondsPerSlot/2) * time.Second commitments, err := source.Commitments() if err != nil { @@ -186,9 +201,6 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so return nil, errors.Wrap(err, "column indices to sample") } - ctx, cancel := context.WithTimeout(ctx, secondsPerHalfSlot) - defer cancel() - log := log.WithFields(logrus.Fields{ "root": fmt.Sprintf("%#x", source.Root()), "slot": source.Slot(), @@ -209,6 +221,11 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so return nil, nil } + // Return if the context is done. + if ctx.Err() != nil { + return nil, ctx.Err() + } + if iteration == 0 { dataColumnsRecoveredFromELAttempts.Inc() } @@ -220,20 +237,10 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so } // No sidecars are retrieved from the EL, retry later - constructedSidecarCount = uint64(len(constructedSidecars)) - if constructedSidecarCount == 0 { - if ctx.Err() != nil { - return nil, ctx.Err() - } - - time.Sleep(delay) - continue - } - - dataColumnsRecoveredFromELTotal.Inc() + constructedCount := uint64(len(constructedSidecars)) // Boundary check. - if constructedSidecarCount != fieldparams.NumberOfColumns { + if constructedSidecarCount > 0 && constructedSidecarCount != fieldparams.NumberOfColumns { return nil, errors.Errorf("reconstruct data column sidecars returned %d sidecars, expected %d - should never happen", constructedSidecarCount, fieldparams.NumberOfColumns) } @@ -242,14 +249,24 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so return nil, errors.Wrap(err, "broadcast and receive unseen data column sidecars") } - log.WithFields(logrus.Fields{ - "count": len(unseenIndices), - "indices": helpers.SortedPrettySliceFromMap(unseenIndices), - }).Debug("Constructed data column sidecars from the execution client") + if constructedCount > 0 { + dataColumnsRecoveredFromELTotal.Inc() - dataColumnSidecarsObtainedViaELCount.Observe(float64(len(unseenIndices))) + log.WithFields(logrus.Fields{ + "root": fmt.Sprintf("%#x", source.Root()), + "slot": source.Slot(), + "proposerIndex": source.ProposerIndex(), + "iteration": iteration, + "type": source.Type(), + "count": len(unseenIndices), + "indices": helpers.SortedPrettySliceFromMap(unseenIndices), + }).Debug("Constructed data column sidecars from the execution client") - return nil, nil + return nil, nil + } + + // Wait before retrying. + time.Sleep(delay) } }); err != nil { return err @@ -284,6 +301,11 @@ func (s *Service) broadcastAndReceiveUnseenDataColumnSidecars( unseenIndices[sidecar.Index] = true } + // Exit early if there are no nothing to broadcast or receive. + if len(unseenSidecars) == 0 { + return nil, nil + } + // Broadcast all the data column sidecars we reconstructed but did not see via gossip (non blocking). if err := s.cfg.p2p.BroadcastDataColumnSidecars(ctx, unseenSidecars); err != nil { return nil, errors.Wrap(err, "broadcast data column sidecars") diff --git a/beacon-chain/sync/subscriber_beacon_blocks_test.go b/beacon-chain/sync/subscriber_beacon_blocks_test.go index f400f86226..ae79615b27 100644 --- a/beacon-chain/sync/subscriber_beacon_blocks_test.go +++ b/beacon-chain/sync/subscriber_beacon_blocks_test.go @@ -194,7 +194,8 @@ func TestProcessSidecarsFromExecutionFromBlock(t *testing.T) { }, seenBlobCache: lruwrpr.New(1), } - s.processSidecarsFromExecutionFromBlock(t.Context(), roBlock) + err := s.processSidecarsFromExecutionFromBlock(t.Context(), roBlock) + require.NoError(t, err) require.Equal(t, tt.expectedBlobCount, len(chainService.Blobs)) }) } @@ -293,7 +294,8 @@ func TestProcessSidecarsFromExecutionFromBlock(t *testing.T) { roBlock, err := blocks.NewROBlock(sb) require.NoError(t, err) - s.processSidecarsFromExecutionFromBlock(t.Context(), roBlock) + err = s.processSidecarsFromExecutionFromBlock(t.Context(), roBlock) + require.NoError(t, err) require.Equal(t, tt.expectedDataColumnCount, len(chainService.DataColumns)) }) } diff --git a/beacon-chain/sync/subscriber_data_column_sidecar.go b/beacon-chain/sync/subscriber_data_column_sidecar.go index 1cc10c0630..7e23ea3a1f 100644 --- a/beacon-chain/sync/subscriber_data_column_sidecar.go +++ b/beacon-chain/sync/subscriber_data_column_sidecar.go @@ -25,12 +25,12 @@ func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) e } if err := s.receiveDataColumnSidecar(ctx, sidecar); err != nil { - return errors.Wrap(err, "receive data column sidecar") + return wrapDataColumnError(sidecar, "receive data column sidecar", err) } wg.Go(func() error { if err := s.processDataColumnSidecarsFromReconstruction(ctx, sidecar); err != nil { - return errors.Wrap(err, "process data column sidecars from reconstruction") + return wrapDataColumnError(sidecar, "process data column sidecars from reconstruction", err) } return nil @@ -38,7 +38,13 @@ func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) e wg.Go(func() error { if err := s.processDataColumnSidecarsFromExecution(ctx, peerdas.PopulateFromSidecar(sidecar)); err != nil { - return errors.Wrap(err, "process data column sidecars from execution") + if errors.Is(err, context.Canceled) { + // Do not log if the context was cancelled on purpose. + // (Still log other context errors such as deadlines exceeded). + return nil + } + + return wrapDataColumnError(sidecar, "process data column sidecars from execution", err) } return nil @@ -110,3 +116,7 @@ func (s *Service) allDataColumnSubnets(_ primitives.Slot) map[uint64]bool { return allSubnets } + +func wrapDataColumnError(sidecar blocks.VerifiedRODataColumn, message string, err error) error { + return fmt.Errorf("%s - slot %d, root %s: %w", message, sidecar.SignedBlockHeader.Header.Slot, fmt.Sprintf("%#x", sidecar.BlockRoot()), err) +} diff --git a/changelog/manu_disable_get_blobs_v2.md b/changelog/manu_disable_get_blobs_v2.md new file mode 100644 index 0000000000..3c4d518a71 --- /dev/null +++ b/changelog/manu_disable_get_blobs_v2.md @@ -0,0 +1,2 @@ +### Added +- `--disable-get-blobs-v2` flag. diff --git a/cmd/beacon-chain/flags/base.go b/cmd/beacon-chain/flags/base.go index f12d0d6895..789200cda5 100644 --- a/cmd/beacon-chain/flags/base.go +++ b/cmd/beacon-chain/flags/base.go @@ -356,4 +356,9 @@ var ( Usage: "A comma-separated list of exponents (of 2) in decreasing order, defining the state diff hierarchy levels. The last exponent must be greater than or equal to 5.", Value: cli.NewIntSlice(21, 18, 16, 13, 11, 9, 5), } + // DisableGetBlobsV2 disables the engine_getBlobsV2 usage. + DisableGetBlobsV2 = &cli.BoolFlag{ + Name: "disable-get-blobs-v2", + Usage: "Disables the engine_getBlobsV2 usage.", + } ) diff --git a/cmd/beacon-chain/flags/config.go b/cmd/beacon-chain/flags/config.go index 1469da0b74..e03ecbd73f 100644 --- a/cmd/beacon-chain/flags/config.go +++ b/cmd/beacon-chain/flags/config.go @@ -17,6 +17,7 @@ type GlobalFlags struct { SubscribeToAllSubnets bool Supernode bool SemiSupernode bool + DisableGetBlobsV2 bool MinimumSyncPeers int MinimumPeersPerSubnet int MaxConcurrentDials int @@ -72,6 +73,11 @@ func ConfigureGlobalFlags(ctx *cli.Context) error { cfg.SemiSupernode = true } + if ctx.Bool(DisableGetBlobsV2.Name) { + log.Warning("Disabling `engine_getBlobsV2` API") + cfg.DisableGetBlobsV2 = true + } + // State-diff-exponents cfg.StateDiffExponents = ctx.IntSlice(StateDiffExponents.Name) if features.Get().EnableStateDiff { diff --git a/cmd/beacon-chain/main.go b/cmd/beacon-chain/main.go index d11489ed84..ee0373790c 100644 --- a/cmd/beacon-chain/main.go +++ b/cmd/beacon-chain/main.go @@ -148,6 +148,7 @@ var appFlags = []cli.Flag{ flags.SlasherDirFlag, flags.SlasherFlag, flags.JwtId, + flags.DisableGetBlobsV2, storage.BlobStoragePathFlag, storage.DataColumnStoragePathFlag, storage.BlobStorageLayout, diff --git a/cmd/beacon-chain/usage.go b/cmd/beacon-chain/usage.go index e13aa40ee1..303a47b433 100644 --- a/cmd/beacon-chain/usage.go +++ b/cmd/beacon-chain/usage.go @@ -169,6 +169,7 @@ var appHelpFlagGroups = []flagGroup{ flags.ExecutionJWTSecretFlag, flags.JwtId, flags.InteropMockEth1DataVotesFlag, + flags.DisableGetBlobsV2, }, }, { // Flags relevant to configuring beacon chain monitoring.