Compare commits

...

9 Commits

Author SHA1 Message Date
Manu NALEPA
cfe59706af saveDataColumnSidecarsExistingFile: Parallelize SSZ encoding of data column sidecars. 2025-12-28 23:21:52 +01:00
Manu NALEPA
b113d6bbde saveDataColumnSidecarsExistingFile: Rename sszEncodedDataColumnSidecars to sszEncodedDataColumnSidecarsBytes/ 2025-12-28 23:21:52 +01:00
Manu NALEPA
45577ef931 saveDataColumnSidecarsNewFile: Parallelize SSZ encoding of data column sidecars. 2025-12-28 23:21:52 +01:00
Manu NALEPA
92865adfe7 saveDataColumnSidecarsNewFile: Rename sszEncodedDataColumnSidecars to sszEncodedDataColumnSidecarsBytes/ 2025-12-28 23:21:52 +01:00
Manu NALEPA
a3863c118b processDataColumnSidecarsFromExecution: Return context canceled error only if the context has been cancelled while some sidecars are still missing. 2025-12-19 16:05:09 +01:00
Manu NALEPA
730e6500e3 beaconBlockSubscriber: Wait for the end of processSidecarsFromExecutionFromBlock before returning.
If returning before waiting, then the `ctx` used in `processSidecarsFromExecutionFromBlock` is canceled as well, stopping early the blobs retrieval from the EL.
2025-12-19 15:51:33 +01:00
Manu NALEPA
dac2c65004 dataColumnSubscriber: Improve error wrapping. 2025-12-19 15:51:28 +01:00
Manu NALEPA
dbfb987e1d processDataColumnSidecarsFromExecution: Use the context parent.
(There is not reason to stop retrying before the end of the context parent.)
2025-12-19 15:51:17 +01:00
Manu NALEPA
3596d00ff9 Add the --disable-get-blobs-v2 flag. 2025-12-19 15:48:29 +01:00
12 changed files with 132 additions and 51 deletions

View File

@@ -38,6 +38,7 @@ go_library(
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_spf13_afero//:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
],
)

View File

@@ -25,6 +25,7 @@ import (
"github.com/OffchainLabs/prysm/v7/time/slots"
"github.com/pkg/errors"
"github.com/spf13/afero"
"golang.org/x/sync/errgroup"
)
const (
@@ -625,7 +626,7 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsExistingFile(filePath string
}
// Create the SSZ encoded data column sidecars.
var sszEncodedDataColumnSidecars []byte
var sszEncodedDataColumnSidecarsBytes []byte
// Initialize the count of the saved SSZ encoded data column sidecar.
storedCount := uint8(0)
@@ -636,7 +637,26 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsExistingFile(filePath string
break
}
for _, dataColumnSidecar := range dataColumnSidecars {
var wg errgroup.Group
sszEncodedDataColumnSidecars := make([][]byte, len(dataColumnSidecars))
for i, dataColumnSidecar := range dataColumnSidecars {
wg.Go(func() error {
// SSZ encode the data column sidecar.
sszEncodedDataColumnSidecar, err := dataColumnSidecar.MarshalSSZ()
if err != nil {
return errors.Wrap(err, "data column sidecar marshal SSZ")
}
sszEncodedDataColumnSidecars[i] = sszEncodedDataColumnSidecar
return nil
})
}
if err := wg.Wait(); err != nil {
return err
}
for i, dataColumnSidecar := range dataColumnSidecars {
// Extract the data columns index.
dataColumnIndex := dataColumnSidecar.Index
@@ -658,10 +678,7 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsExistingFile(filePath string
}
// SSZ encode the data column sidecar.
sszEncodedDataColumnSidecar, err := dataColumnSidecar.MarshalSSZ()
if err != nil {
return errors.Wrap(err, "data column sidecar marshal SSZ")
}
sszEncodedDataColumnSidecar := sszEncodedDataColumnSidecars[i]
// Compute the size of the SSZ encoded data column sidecar.
incomingSszEncodedDataColumnSidecarSize := uint32(len(sszEncodedDataColumnSidecar))
@@ -680,7 +697,7 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsExistingFile(filePath string
storedCount++
// Append the SSZ encoded data column sidecar to the SSZ encoded data column sidecars.
sszEncodedDataColumnSidecars = append(sszEncodedDataColumnSidecars, sszEncodedDataColumnSidecar...)
sszEncodedDataColumnSidecarsBytes = append(sszEncodedDataColumnSidecarsBytes, sszEncodedDataColumnSidecar...)
}
}
@@ -695,11 +712,11 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsExistingFile(filePath string
}
// Append the SSZ encoded data column sidecars to the end of the file.
count, err = file.WriteAt(sszEncodedDataColumnSidecars, metadata.fileSize)
count, err = file.WriteAt(sszEncodedDataColumnSidecarsBytes, metadata.fileSize)
if err != nil {
return errors.Wrap(err, "write SSZ encoded data column sidecars")
}
if count != len(sszEncodedDataColumnSidecars) {
if count != len(sszEncodedDataColumnSidecarsBytes) {
return errWrongBytesWritten
}
@@ -721,7 +738,7 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsNewFile(filePath string, inp
var (
sszEncodedDataColumnSidecarRefSize int
sszEncodedDataColumnSidecars []byte
sszEncodedDataColumnSidecarsBytes []byte
)
// Initialize the count of the saved SSZ encoded data column sidecar.
@@ -733,7 +750,26 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsNewFile(filePath string, inp
break
}
for _, dataColumnSidecar := range dataColumnSidecars {
var wg errgroup.Group
sszEncodedDataColumnSidecars := make([][]byte, len(dataColumnSidecars))
for i, dataColumnSidecar := range dataColumnSidecars {
wg.Go(func() error {
// SSZ encode the first data column sidecar.
sszEncodedDataColumnSidecar, err := dataColumnSidecar.MarshalSSZ()
if err != nil {
return errors.Wrap(err, "data column sidecar marshal SSZ")
}
sszEncodedDataColumnSidecars[i] = sszEncodedDataColumnSidecar
return nil
})
}
if err := wg.Wait(); err != nil {
return err
}
for i, dataColumnSidecar := range dataColumnSidecars {
// Extract the data column index.
dataColumnIndex := dataColumnSidecar.Index
@@ -756,10 +792,7 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsNewFile(filePath string, inp
storedCount++
// SSZ encode the first data column sidecar.
sszEncodedDataColumnSidecar, err := dataColumnSidecar.MarshalSSZ()
if err != nil {
return errors.Wrap(err, "data column sidecar marshal SSZ")
}
sszEncodedDataColumnSidecar := sszEncodedDataColumnSidecars[i]
// Check if the size of the SSZ encoded data column sidecar is correct.
if sszEncodedDataColumnSidecarRefSize != 0 && len(sszEncodedDataColumnSidecar) != sszEncodedDataColumnSidecarRefSize {
@@ -770,7 +803,7 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsNewFile(filePath string, inp
sszEncodedDataColumnSidecarRefSize = len(sszEncodedDataColumnSidecar)
// Append the first SSZ encoded data column sidecar to the SSZ encoded data column sidecars.
sszEncodedDataColumnSidecars = append(sszEncodedDataColumnSidecars, sszEncodedDataColumnSidecar...)
sszEncodedDataColumnSidecarsBytes = append(sszEncodedDataColumnSidecarsBytes, sszEncodedDataColumnSidecar...)
}
}
@@ -807,12 +840,12 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsNewFile(filePath string, inp
rawIndices := indices.raw()
// Concatenate the version, the data column sidecar size, the data column indices and the SSZ encoded data column sidecar.
countToWrite := headerSize + len(sszEncodedDataColumnSidecars)
countToWrite := headerSize + len(sszEncodedDataColumnSidecarsBytes)
bytes := make([]byte, 0, countToWrite)
bytes = append(bytes, byte(version))
bytes = append(bytes, encodedSszEncodedDataColumnSidecarSize[:]...)
bytes = append(bytes, rawIndices[:]...)
bytes = append(bytes, sszEncodedDataColumnSidecars...)
bytes = append(bytes, sszEncodedDataColumnSidecarsBytes...)
countWritten, err := file.Write(bytes)
if err != nil {

View File

@@ -40,6 +40,7 @@ go_library(
"//beacon-chain/state/state-native:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//beacon-chain/verification:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",

View File

@@ -11,6 +11,7 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v7/beacon-chain/execution/types"
"github.com/OffchainLabs/prysm/v7/beacon-chain/verification"
"github.com/OffchainLabs/prysm/v7/cmd/beacon-chain/flags"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
@@ -538,6 +539,10 @@ func (s *Service) GetBlobsV2(ctx context.Context, versionedHashes []common.Hash)
return nil, errors.New(fmt.Sprintf("%s is not supported", GetBlobsV2))
}
if flags.Get().DisableGetBlobsV2 {
return []*pb.BlobAndProofV2{}, nil
}
result := make([]*pb.BlobAndProofV2, len(versionedHashes))
err := s.rpcClient.CallContext(ctx, &result, GetBlobsV2, versionedHashes)

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"os"
"path"
"sync"
"time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain"
@@ -48,7 +49,15 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)
return errors.Wrap(err, "new ro block with root")
}
go s.processSidecarsFromExecutionFromBlock(ctx, roBlock)
var wg sync.WaitGroup
wg.Go(func() {
if err := s.processSidecarsFromExecutionFromBlock(ctx, roBlock); err != nil {
log.WithError(err).WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", root),
"slot": block.Slot(),
}).Error("Failed to process sidecars from execution from block")
}
})
if err := s.cfg.chain.ReceiveBlock(ctx, signed, root, nil); err != nil {
if blockchain.IsInvalidBlock(err) {
@@ -69,28 +78,33 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)
}
return err
}
if err := s.processPendingAttsForBlock(ctx, root); err != nil {
return errors.Wrap(err, "process pending atts for block")
}
wg.Wait()
return nil
}
// processSidecarsFromExecutionFromBlock retrieves (if available) sidecars data from the execution client,
// builds corresponding sidecars, save them to the storage, and broadcasts them over P2P if necessary.
func (s *Service) processSidecarsFromExecutionFromBlock(ctx context.Context, roBlock blocks.ROBlock) {
func (s *Service) processSidecarsFromExecutionFromBlock(ctx context.Context, roBlock blocks.ROBlock) error {
if roBlock.Version() >= version.Fulu {
if err := s.processDataColumnSidecarsFromExecution(ctx, peerdas.PopulateFromBlock(roBlock)); err != nil {
log.WithError(err).Error("Failed to process data column sidecars from execution")
return
return errors.Wrap(err, "process data column sidecars from execution")
}
return
return nil
}
if roBlock.Version() >= version.Deneb {
s.processBlobSidecarsFromExecution(ctx, roBlock)
return
return nil
}
return nil
}
// processBlobSidecarsFromExecution retrieves (if available) blob sidecars data from the execution client,
@@ -168,7 +182,6 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
key := fmt.Sprintf("%#x", source.Root())
if _, err, _ := s.columnSidecarsExecSingleFlight.Do(key, func() (any, error) {
const delay = 250 * time.Millisecond
secondsPerHalfSlot := time.Duration(params.BeaconConfig().SecondsPerSlot/2) * time.Second
commitments, err := source.Commitments()
if err != nil {
@@ -186,9 +199,6 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
return nil, errors.Wrap(err, "column indices to sample")
}
ctx, cancel := context.WithTimeout(ctx, secondsPerHalfSlot)
defer cancel()
log := log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", source.Root()),
"slot": source.Slot(),
@@ -209,6 +219,11 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
return nil, nil
}
// Return if the context is done.
if ctx.Err() != nil {
return nil, ctx.Err()
}
if iteration == 0 {
dataColumnsRecoveredFromELAttempts.Inc()
}
@@ -220,20 +235,10 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
}
// No sidecars are retrieved from the EL, retry later
constructedSidecarCount = uint64(len(constructedSidecars))
if constructedSidecarCount == 0 {
if ctx.Err() != nil {
return nil, ctx.Err()
}
time.Sleep(delay)
continue
}
dataColumnsRecoveredFromELTotal.Inc()
constructedCount := uint64(len(constructedSidecars))
// Boundary check.
if constructedSidecarCount != fieldparams.NumberOfColumns {
if constructedSidecarCount > 0 && constructedSidecarCount != fieldparams.NumberOfColumns {
return nil, errors.Errorf("reconstruct data column sidecars returned %d sidecars, expected %d - should never happen", constructedSidecarCount, fieldparams.NumberOfColumns)
}
@@ -242,14 +247,24 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
return nil, errors.Wrap(err, "broadcast and receive unseen data column sidecars")
}
log.WithFields(logrus.Fields{
"count": len(unseenIndices),
"indices": helpers.SortedPrettySliceFromMap(unseenIndices),
}).Debug("Constructed data column sidecars from the execution client")
if constructedCount > 0 {
dataColumnsRecoveredFromELTotal.Inc()
dataColumnSidecarsObtainedViaELCount.Observe(float64(len(unseenIndices)))
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", source.Root()),
"slot": source.Slot(),
"proposerIndex": source.ProposerIndex(),
"iteration": iteration,
"type": source.Type(),
"count": len(unseenIndices),
"indices": helpers.SortedPrettySliceFromMap(unseenIndices),
}).Debug("Constructed data column sidecars from the execution client")
return nil, nil
return nil, nil
}
// Wait before retrying.
time.Sleep(delay)
}
}); err != nil {
return err
@@ -284,6 +299,11 @@ func (s *Service) broadcastAndReceiveUnseenDataColumnSidecars(
unseenIndices[sidecar.Index] = true
}
// Exit early if there are no nothing to broadcast or receive.
if len(unseenSidecars) == 0 {
return nil, nil
}
// Broadcast all the data column sidecars we reconstructed but did not see via gossip (non blocking).
if err := s.cfg.p2p.BroadcastDataColumnSidecars(ctx, unseenSidecars); err != nil {
return nil, errors.Wrap(err, "broadcast data column sidecars")

View File

@@ -194,7 +194,8 @@ func TestProcessSidecarsFromExecutionFromBlock(t *testing.T) {
},
seenBlobCache: lruwrpr.New(1),
}
s.processSidecarsFromExecutionFromBlock(t.Context(), roBlock)
err := s.processSidecarsFromExecutionFromBlock(t.Context(), roBlock)
require.NoError(t, err)
require.Equal(t, tt.expectedBlobCount, len(chainService.Blobs))
})
}
@@ -293,7 +294,8 @@ func TestProcessSidecarsFromExecutionFromBlock(t *testing.T) {
roBlock, err := blocks.NewROBlock(sb)
require.NoError(t, err)
s.processSidecarsFromExecutionFromBlock(t.Context(), roBlock)
err = s.processSidecarsFromExecutionFromBlock(t.Context(), roBlock)
require.NoError(t, err)
require.Equal(t, tt.expectedDataColumnCount, len(chainService.DataColumns))
})
}

View File

@@ -25,12 +25,12 @@ func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) e
}
if err := s.receiveDataColumnSidecar(ctx, sidecar); err != nil {
return errors.Wrap(err, "receive data column sidecar")
return wrapDataColumnError(sidecar, "receive data column sidecar", err)
}
wg.Go(func() error {
if err := s.processDataColumnSidecarsFromReconstruction(ctx, sidecar); err != nil {
return errors.Wrap(err, "process data column sidecars from reconstruction")
return wrapDataColumnError(sidecar, "process data column sidecars from reconstruction", err)
}
return nil
@@ -38,7 +38,7 @@ func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) e
wg.Go(func() error {
if err := s.processDataColumnSidecarsFromExecution(ctx, peerdas.PopulateFromSidecar(sidecar)); err != nil {
return errors.Wrap(err, "process data column sidecars from execution")
return wrapDataColumnError(sidecar, "process data column sidecars from execution", err)
}
return nil
@@ -110,3 +110,7 @@ func (s *Service) allDataColumnSubnets(_ primitives.Slot) map[uint64]bool {
return allSubnets
}
func wrapDataColumnError(sidecar blocks.VerifiedRODataColumn, message string, err error) error {
return fmt.Errorf("%s - slot %d, root %s: %w", message, sidecar.SignedBlockHeader.Header.Slot, fmt.Sprintf("%#x", sidecar.BlockRoot()), err)
}

View File

@@ -0,0 +1,2 @@
### Added
- `--disable-get-blobs-v2` flag.

View File

@@ -356,4 +356,9 @@ var (
Usage: "A comma-separated list of exponents (of 2) in decreasing order, defining the state diff hierarchy levels. The last exponent must be greater than or equal to 5.",
Value: cli.NewIntSlice(21, 18, 16, 13, 11, 9, 5),
}
// DisableGetBlobsV2 disables the engine_getBlobsV2 usage.
DisableGetBlobsV2 = &cli.BoolFlag{
Name: "disable-get-blobs-v2",
Usage: "Disables the engine_getBlobsV2 usage.",
}
)

View File

@@ -17,6 +17,7 @@ type GlobalFlags struct {
SubscribeToAllSubnets bool
Supernode bool
SemiSupernode bool
DisableGetBlobsV2 bool
MinimumSyncPeers int
MinimumPeersPerSubnet int
MaxConcurrentDials int
@@ -72,6 +73,11 @@ func ConfigureGlobalFlags(ctx *cli.Context) error {
cfg.SemiSupernode = true
}
if ctx.Bool(DisableGetBlobsV2.Name) {
log.Warning("Disabling `engine_getBlobsV2` API")
cfg.DisableGetBlobsV2 = true
}
// State-diff-exponents
cfg.StateDiffExponents = ctx.IntSlice(StateDiffExponents.Name)
if features.Get().EnableStateDiff {

View File

@@ -147,6 +147,7 @@ var appFlags = []cli.Flag{
flags.SlasherDirFlag,
flags.SlasherFlag,
flags.JwtId,
flags.DisableGetBlobsV2,
storage.BlobStoragePathFlag,
storage.DataColumnStoragePathFlag,
storage.BlobStorageLayout,

View File

@@ -169,6 +169,7 @@ var appHelpFlagGroups = []flagGroup{
flags.ExecutionJWTSecretFlag,
flags.JwtId,
flags.InteropMockEth1DataVotesFlag,
flags.DisableGetBlobsV2,
},
},
{ // Flags relevant to configuring beacon chain monitoring.