mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-30 23:58:23 -05:00
Compare commits
9 Commits
debug-stat
...
paralleliz
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cfe59706af | ||
|
|
b113d6bbde | ||
|
|
45577ef931 | ||
|
|
92865adfe7 | ||
|
|
a3863c118b | ||
|
|
730e6500e3 | ||
|
|
dac2c65004 | ||
|
|
dbfb987e1d | ||
|
|
3596d00ff9 |
@@ -38,6 +38,7 @@ go_library(
|
|||||||
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
|
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
|
||||||
"@com_github_sirupsen_logrus//:go_default_library",
|
"@com_github_sirupsen_logrus//:go_default_library",
|
||||||
"@com_github_spf13_afero//:go_default_library",
|
"@com_github_spf13_afero//:go_default_library",
|
||||||
|
"@org_golang_x_sync//errgroup:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ import (
|
|||||||
"github.com/OffchainLabs/prysm/v7/time/slots"
|
"github.com/OffchainLabs/prysm/v7/time/slots"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/spf13/afero"
|
"github.com/spf13/afero"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -625,7 +626,7 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsExistingFile(filePath string
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create the SSZ encoded data column sidecars.
|
// Create the SSZ encoded data column sidecars.
|
||||||
var sszEncodedDataColumnSidecars []byte
|
var sszEncodedDataColumnSidecarsBytes []byte
|
||||||
|
|
||||||
// Initialize the count of the saved SSZ encoded data column sidecar.
|
// Initialize the count of the saved SSZ encoded data column sidecar.
|
||||||
storedCount := uint8(0)
|
storedCount := uint8(0)
|
||||||
@@ -636,7 +637,26 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsExistingFile(filePath string
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, dataColumnSidecar := range dataColumnSidecars {
|
var wg errgroup.Group
|
||||||
|
sszEncodedDataColumnSidecars := make([][]byte, len(dataColumnSidecars))
|
||||||
|
for i, dataColumnSidecar := range dataColumnSidecars {
|
||||||
|
wg.Go(func() error {
|
||||||
|
// SSZ encode the data column sidecar.
|
||||||
|
sszEncodedDataColumnSidecar, err := dataColumnSidecar.MarshalSSZ()
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "data column sidecar marshal SSZ")
|
||||||
|
}
|
||||||
|
|
||||||
|
sszEncodedDataColumnSidecars[i] = sszEncodedDataColumnSidecar
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := wg.Wait(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, dataColumnSidecar := range dataColumnSidecars {
|
||||||
// Extract the data columns index.
|
// Extract the data columns index.
|
||||||
dataColumnIndex := dataColumnSidecar.Index
|
dataColumnIndex := dataColumnSidecar.Index
|
||||||
|
|
||||||
@@ -658,10 +678,7 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsExistingFile(filePath string
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SSZ encode the data column sidecar.
|
// SSZ encode the data column sidecar.
|
||||||
sszEncodedDataColumnSidecar, err := dataColumnSidecar.MarshalSSZ()
|
sszEncodedDataColumnSidecar := sszEncodedDataColumnSidecars[i]
|
||||||
if err != nil {
|
|
||||||
return errors.Wrap(err, "data column sidecar marshal SSZ")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Compute the size of the SSZ encoded data column sidecar.
|
// Compute the size of the SSZ encoded data column sidecar.
|
||||||
incomingSszEncodedDataColumnSidecarSize := uint32(len(sszEncodedDataColumnSidecar))
|
incomingSszEncodedDataColumnSidecarSize := uint32(len(sszEncodedDataColumnSidecar))
|
||||||
@@ -680,7 +697,7 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsExistingFile(filePath string
|
|||||||
storedCount++
|
storedCount++
|
||||||
|
|
||||||
// Append the SSZ encoded data column sidecar to the SSZ encoded data column sidecars.
|
// Append the SSZ encoded data column sidecar to the SSZ encoded data column sidecars.
|
||||||
sszEncodedDataColumnSidecars = append(sszEncodedDataColumnSidecars, sszEncodedDataColumnSidecar...)
|
sszEncodedDataColumnSidecarsBytes = append(sszEncodedDataColumnSidecarsBytes, sszEncodedDataColumnSidecar...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -695,11 +712,11 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsExistingFile(filePath string
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Append the SSZ encoded data column sidecars to the end of the file.
|
// Append the SSZ encoded data column sidecars to the end of the file.
|
||||||
count, err = file.WriteAt(sszEncodedDataColumnSidecars, metadata.fileSize)
|
count, err = file.WriteAt(sszEncodedDataColumnSidecarsBytes, metadata.fileSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "write SSZ encoded data column sidecars")
|
return errors.Wrap(err, "write SSZ encoded data column sidecars")
|
||||||
}
|
}
|
||||||
if count != len(sszEncodedDataColumnSidecars) {
|
if count != len(sszEncodedDataColumnSidecarsBytes) {
|
||||||
return errWrongBytesWritten
|
return errWrongBytesWritten
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -721,7 +738,7 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsNewFile(filePath string, inp
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
sszEncodedDataColumnSidecarRefSize int
|
sszEncodedDataColumnSidecarRefSize int
|
||||||
sszEncodedDataColumnSidecars []byte
|
sszEncodedDataColumnSidecarsBytes []byte
|
||||||
)
|
)
|
||||||
|
|
||||||
// Initialize the count of the saved SSZ encoded data column sidecar.
|
// Initialize the count of the saved SSZ encoded data column sidecar.
|
||||||
@@ -733,7 +750,26 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsNewFile(filePath string, inp
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, dataColumnSidecar := range dataColumnSidecars {
|
var wg errgroup.Group
|
||||||
|
sszEncodedDataColumnSidecars := make([][]byte, len(dataColumnSidecars))
|
||||||
|
for i, dataColumnSidecar := range dataColumnSidecars {
|
||||||
|
wg.Go(func() error {
|
||||||
|
// SSZ encode the first data column sidecar.
|
||||||
|
sszEncodedDataColumnSidecar, err := dataColumnSidecar.MarshalSSZ()
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "data column sidecar marshal SSZ")
|
||||||
|
}
|
||||||
|
|
||||||
|
sszEncodedDataColumnSidecars[i] = sszEncodedDataColumnSidecar
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := wg.Wait(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, dataColumnSidecar := range dataColumnSidecars {
|
||||||
// Extract the data column index.
|
// Extract the data column index.
|
||||||
dataColumnIndex := dataColumnSidecar.Index
|
dataColumnIndex := dataColumnSidecar.Index
|
||||||
|
|
||||||
@@ -756,10 +792,7 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsNewFile(filePath string, inp
|
|||||||
storedCount++
|
storedCount++
|
||||||
|
|
||||||
// SSZ encode the first data column sidecar.
|
// SSZ encode the first data column sidecar.
|
||||||
sszEncodedDataColumnSidecar, err := dataColumnSidecar.MarshalSSZ()
|
sszEncodedDataColumnSidecar := sszEncodedDataColumnSidecars[i]
|
||||||
if err != nil {
|
|
||||||
return errors.Wrap(err, "data column sidecar marshal SSZ")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if the size of the SSZ encoded data column sidecar is correct.
|
// Check if the size of the SSZ encoded data column sidecar is correct.
|
||||||
if sszEncodedDataColumnSidecarRefSize != 0 && len(sszEncodedDataColumnSidecar) != sszEncodedDataColumnSidecarRefSize {
|
if sszEncodedDataColumnSidecarRefSize != 0 && len(sszEncodedDataColumnSidecar) != sszEncodedDataColumnSidecarRefSize {
|
||||||
@@ -770,7 +803,7 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsNewFile(filePath string, inp
|
|||||||
sszEncodedDataColumnSidecarRefSize = len(sszEncodedDataColumnSidecar)
|
sszEncodedDataColumnSidecarRefSize = len(sszEncodedDataColumnSidecar)
|
||||||
|
|
||||||
// Append the first SSZ encoded data column sidecar to the SSZ encoded data column sidecars.
|
// Append the first SSZ encoded data column sidecar to the SSZ encoded data column sidecars.
|
||||||
sszEncodedDataColumnSidecars = append(sszEncodedDataColumnSidecars, sszEncodedDataColumnSidecar...)
|
sszEncodedDataColumnSidecarsBytes = append(sszEncodedDataColumnSidecarsBytes, sszEncodedDataColumnSidecar...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -807,12 +840,12 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsNewFile(filePath string, inp
|
|||||||
rawIndices := indices.raw()
|
rawIndices := indices.raw()
|
||||||
|
|
||||||
// Concatenate the version, the data column sidecar size, the data column indices and the SSZ encoded data column sidecar.
|
// Concatenate the version, the data column sidecar size, the data column indices and the SSZ encoded data column sidecar.
|
||||||
countToWrite := headerSize + len(sszEncodedDataColumnSidecars)
|
countToWrite := headerSize + len(sszEncodedDataColumnSidecarsBytes)
|
||||||
bytes := make([]byte, 0, countToWrite)
|
bytes := make([]byte, 0, countToWrite)
|
||||||
bytes = append(bytes, byte(version))
|
bytes = append(bytes, byte(version))
|
||||||
bytes = append(bytes, encodedSszEncodedDataColumnSidecarSize[:]...)
|
bytes = append(bytes, encodedSszEncodedDataColumnSidecarSize[:]...)
|
||||||
bytes = append(bytes, rawIndices[:]...)
|
bytes = append(bytes, rawIndices[:]...)
|
||||||
bytes = append(bytes, sszEncodedDataColumnSidecars...)
|
bytes = append(bytes, sszEncodedDataColumnSidecarsBytes...)
|
||||||
|
|
||||||
countWritten, err := file.Write(bytes)
|
countWritten, err := file.Write(bytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -40,6 +40,7 @@ go_library(
|
|||||||
"//beacon-chain/state/state-native:go_default_library",
|
"//beacon-chain/state/state-native:go_default_library",
|
||||||
"//beacon-chain/state/stategen:go_default_library",
|
"//beacon-chain/state/stategen:go_default_library",
|
||||||
"//beacon-chain/verification:go_default_library",
|
"//beacon-chain/verification:go_default_library",
|
||||||
|
"//cmd/beacon-chain/flags:go_default_library",
|
||||||
"//config/fieldparams:go_default_library",
|
"//config/fieldparams:go_default_library",
|
||||||
"//config/params:go_default_library",
|
"//config/params:go_default_library",
|
||||||
"//consensus-types/blocks:go_default_library",
|
"//consensus-types/blocks:go_default_library",
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
|
||||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/execution/types"
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/execution/types"
|
||||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/verification"
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/verification"
|
||||||
|
"github.com/OffchainLabs/prysm/v7/cmd/beacon-chain/flags"
|
||||||
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
|
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
|
||||||
"github.com/OffchainLabs/prysm/v7/config/params"
|
"github.com/OffchainLabs/prysm/v7/config/params"
|
||||||
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
|
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
|
||||||
@@ -538,6 +539,10 @@ func (s *Service) GetBlobsV2(ctx context.Context, versionedHashes []common.Hash)
|
|||||||
return nil, errors.New(fmt.Sprintf("%s is not supported", GetBlobsV2))
|
return nil, errors.New(fmt.Sprintf("%s is not supported", GetBlobsV2))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if flags.Get().DisableGetBlobsV2 {
|
||||||
|
return []*pb.BlobAndProofV2{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
result := make([]*pb.BlobAndProofV2, len(versionedHashes))
|
result := make([]*pb.BlobAndProofV2, len(versionedHashes))
|
||||||
err := s.rpcClient.CallContext(ctx, &result, GetBlobsV2, versionedHashes)
|
err := s.rpcClient.CallContext(ctx, &result, GetBlobsV2, versionedHashes)
|
||||||
|
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain"
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain"
|
||||||
@@ -48,7 +49,15 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)
|
|||||||
return errors.Wrap(err, "new ro block with root")
|
return errors.Wrap(err, "new ro block with root")
|
||||||
}
|
}
|
||||||
|
|
||||||
go s.processSidecarsFromExecutionFromBlock(ctx, roBlock)
|
var wg sync.WaitGroup
|
||||||
|
wg.Go(func() {
|
||||||
|
if err := s.processSidecarsFromExecutionFromBlock(ctx, roBlock); err != nil {
|
||||||
|
log.WithError(err).WithFields(logrus.Fields{
|
||||||
|
"root": fmt.Sprintf("%#x", root),
|
||||||
|
"slot": block.Slot(),
|
||||||
|
}).Error("Failed to process sidecars from execution from block")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
if err := s.cfg.chain.ReceiveBlock(ctx, signed, root, nil); err != nil {
|
if err := s.cfg.chain.ReceiveBlock(ctx, signed, root, nil); err != nil {
|
||||||
if blockchain.IsInvalidBlock(err) {
|
if blockchain.IsInvalidBlock(err) {
|
||||||
@@ -69,28 +78,33 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)
|
|||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.processPendingAttsForBlock(ctx, root); err != nil {
|
if err := s.processPendingAttsForBlock(ctx, root); err != nil {
|
||||||
return errors.Wrap(err, "process pending atts for block")
|
return errors.Wrap(err, "process pending atts for block")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// processSidecarsFromExecutionFromBlock retrieves (if available) sidecars data from the execution client,
|
// processSidecarsFromExecutionFromBlock retrieves (if available) sidecars data from the execution client,
|
||||||
// builds corresponding sidecars, save them to the storage, and broadcasts them over P2P if necessary.
|
// builds corresponding sidecars, save them to the storage, and broadcasts them over P2P if necessary.
|
||||||
func (s *Service) processSidecarsFromExecutionFromBlock(ctx context.Context, roBlock blocks.ROBlock) {
|
func (s *Service) processSidecarsFromExecutionFromBlock(ctx context.Context, roBlock blocks.ROBlock) error {
|
||||||
if roBlock.Version() >= version.Fulu {
|
if roBlock.Version() >= version.Fulu {
|
||||||
if err := s.processDataColumnSidecarsFromExecution(ctx, peerdas.PopulateFromBlock(roBlock)); err != nil {
|
if err := s.processDataColumnSidecarsFromExecution(ctx, peerdas.PopulateFromBlock(roBlock)); err != nil {
|
||||||
log.WithError(err).Error("Failed to process data column sidecars from execution")
|
return errors.Wrap(err, "process data column sidecars from execution")
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if roBlock.Version() >= version.Deneb {
|
if roBlock.Version() >= version.Deneb {
|
||||||
s.processBlobSidecarsFromExecution(ctx, roBlock)
|
s.processBlobSidecarsFromExecution(ctx, roBlock)
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// processBlobSidecarsFromExecution retrieves (if available) blob sidecars data from the execution client,
|
// processBlobSidecarsFromExecution retrieves (if available) blob sidecars data from the execution client,
|
||||||
@@ -168,7 +182,6 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
|
|||||||
key := fmt.Sprintf("%#x", source.Root())
|
key := fmt.Sprintf("%#x", source.Root())
|
||||||
if _, err, _ := s.columnSidecarsExecSingleFlight.Do(key, func() (any, error) {
|
if _, err, _ := s.columnSidecarsExecSingleFlight.Do(key, func() (any, error) {
|
||||||
const delay = 250 * time.Millisecond
|
const delay = 250 * time.Millisecond
|
||||||
secondsPerHalfSlot := time.Duration(params.BeaconConfig().SecondsPerSlot/2) * time.Second
|
|
||||||
|
|
||||||
commitments, err := source.Commitments()
|
commitments, err := source.Commitments()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -186,9 +199,6 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
|
|||||||
return nil, errors.Wrap(err, "column indices to sample")
|
return nil, errors.Wrap(err, "column indices to sample")
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, secondsPerHalfSlot)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
log := log.WithFields(logrus.Fields{
|
log := log.WithFields(logrus.Fields{
|
||||||
"root": fmt.Sprintf("%#x", source.Root()),
|
"root": fmt.Sprintf("%#x", source.Root()),
|
||||||
"slot": source.Slot(),
|
"slot": source.Slot(),
|
||||||
@@ -209,6 +219,11 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Return if the context is done.
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return nil, ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
if iteration == 0 {
|
if iteration == 0 {
|
||||||
dataColumnsRecoveredFromELAttempts.Inc()
|
dataColumnsRecoveredFromELAttempts.Inc()
|
||||||
}
|
}
|
||||||
@@ -220,20 +235,10 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
|
|||||||
}
|
}
|
||||||
|
|
||||||
// No sidecars are retrieved from the EL, retry later
|
// No sidecars are retrieved from the EL, retry later
|
||||||
constructedSidecarCount = uint64(len(constructedSidecars))
|
constructedCount := uint64(len(constructedSidecars))
|
||||||
if constructedSidecarCount == 0 {
|
|
||||||
if ctx.Err() != nil {
|
|
||||||
return nil, ctx.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
time.Sleep(delay)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
dataColumnsRecoveredFromELTotal.Inc()
|
|
||||||
|
|
||||||
// Boundary check.
|
// Boundary check.
|
||||||
if constructedSidecarCount != fieldparams.NumberOfColumns {
|
if constructedSidecarCount > 0 && constructedSidecarCount != fieldparams.NumberOfColumns {
|
||||||
return nil, errors.Errorf("reconstruct data column sidecars returned %d sidecars, expected %d - should never happen", constructedSidecarCount, fieldparams.NumberOfColumns)
|
return nil, errors.Errorf("reconstruct data column sidecars returned %d sidecars, expected %d - should never happen", constructedSidecarCount, fieldparams.NumberOfColumns)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -242,14 +247,24 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
|
|||||||
return nil, errors.Wrap(err, "broadcast and receive unseen data column sidecars")
|
return nil, errors.Wrap(err, "broadcast and receive unseen data column sidecars")
|
||||||
}
|
}
|
||||||
|
|
||||||
log.WithFields(logrus.Fields{
|
if constructedCount > 0 {
|
||||||
"count": len(unseenIndices),
|
dataColumnsRecoveredFromELTotal.Inc()
|
||||||
"indices": helpers.SortedPrettySliceFromMap(unseenIndices),
|
|
||||||
}).Debug("Constructed data column sidecars from the execution client")
|
|
||||||
|
|
||||||
dataColumnSidecarsObtainedViaELCount.Observe(float64(len(unseenIndices)))
|
log.WithFields(logrus.Fields{
|
||||||
|
"root": fmt.Sprintf("%#x", source.Root()),
|
||||||
|
"slot": source.Slot(),
|
||||||
|
"proposerIndex": source.ProposerIndex(),
|
||||||
|
"iteration": iteration,
|
||||||
|
"type": source.Type(),
|
||||||
|
"count": len(unseenIndices),
|
||||||
|
"indices": helpers.SortedPrettySliceFromMap(unseenIndices),
|
||||||
|
}).Debug("Constructed data column sidecars from the execution client")
|
||||||
|
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait before retrying.
|
||||||
|
time.Sleep(delay)
|
||||||
}
|
}
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -284,6 +299,11 @@ func (s *Service) broadcastAndReceiveUnseenDataColumnSidecars(
|
|||||||
unseenIndices[sidecar.Index] = true
|
unseenIndices[sidecar.Index] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Exit early if there are no nothing to broadcast or receive.
|
||||||
|
if len(unseenSidecars) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Broadcast all the data column sidecars we reconstructed but did not see via gossip (non blocking).
|
// Broadcast all the data column sidecars we reconstructed but did not see via gossip (non blocking).
|
||||||
if err := s.cfg.p2p.BroadcastDataColumnSidecars(ctx, unseenSidecars); err != nil {
|
if err := s.cfg.p2p.BroadcastDataColumnSidecars(ctx, unseenSidecars); err != nil {
|
||||||
return nil, errors.Wrap(err, "broadcast data column sidecars")
|
return nil, errors.Wrap(err, "broadcast data column sidecars")
|
||||||
|
|||||||
@@ -194,7 +194,8 @@ func TestProcessSidecarsFromExecutionFromBlock(t *testing.T) {
|
|||||||
},
|
},
|
||||||
seenBlobCache: lruwrpr.New(1),
|
seenBlobCache: lruwrpr.New(1),
|
||||||
}
|
}
|
||||||
s.processSidecarsFromExecutionFromBlock(t.Context(), roBlock)
|
err := s.processSidecarsFromExecutionFromBlock(t.Context(), roBlock)
|
||||||
|
require.NoError(t, err)
|
||||||
require.Equal(t, tt.expectedBlobCount, len(chainService.Blobs))
|
require.Equal(t, tt.expectedBlobCount, len(chainService.Blobs))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -293,7 +294,8 @@ func TestProcessSidecarsFromExecutionFromBlock(t *testing.T) {
|
|||||||
roBlock, err := blocks.NewROBlock(sb)
|
roBlock, err := blocks.NewROBlock(sb)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s.processSidecarsFromExecutionFromBlock(t.Context(), roBlock)
|
err = s.processSidecarsFromExecutionFromBlock(t.Context(), roBlock)
|
||||||
|
require.NoError(t, err)
|
||||||
require.Equal(t, tt.expectedDataColumnCount, len(chainService.DataColumns))
|
require.Equal(t, tt.expectedDataColumnCount, len(chainService.DataColumns))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,12 +25,12 @@ func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) e
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := s.receiveDataColumnSidecar(ctx, sidecar); err != nil {
|
if err := s.receiveDataColumnSidecar(ctx, sidecar); err != nil {
|
||||||
return errors.Wrap(err, "receive data column sidecar")
|
return wrapDataColumnError(sidecar, "receive data column sidecar", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Go(func() error {
|
wg.Go(func() error {
|
||||||
if err := s.processDataColumnSidecarsFromReconstruction(ctx, sidecar); err != nil {
|
if err := s.processDataColumnSidecarsFromReconstruction(ctx, sidecar); err != nil {
|
||||||
return errors.Wrap(err, "process data column sidecars from reconstruction")
|
return wrapDataColumnError(sidecar, "process data column sidecars from reconstruction", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@@ -38,7 +38,7 @@ func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) e
|
|||||||
|
|
||||||
wg.Go(func() error {
|
wg.Go(func() error {
|
||||||
if err := s.processDataColumnSidecarsFromExecution(ctx, peerdas.PopulateFromSidecar(sidecar)); err != nil {
|
if err := s.processDataColumnSidecarsFromExecution(ctx, peerdas.PopulateFromSidecar(sidecar)); err != nil {
|
||||||
return errors.Wrap(err, "process data column sidecars from execution")
|
return wrapDataColumnError(sidecar, "process data column sidecars from execution", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@@ -110,3 +110,7 @@ func (s *Service) allDataColumnSubnets(_ primitives.Slot) map[uint64]bool {
|
|||||||
|
|
||||||
return allSubnets
|
return allSubnets
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func wrapDataColumnError(sidecar blocks.VerifiedRODataColumn, message string, err error) error {
|
||||||
|
return fmt.Errorf("%s - slot %d, root %s: %w", message, sidecar.SignedBlockHeader.Header.Slot, fmt.Sprintf("%#x", sidecar.BlockRoot()), err)
|
||||||
|
}
|
||||||
|
|||||||
2
changelog/manu_disable_get_blobs_v2.md
Normal file
2
changelog/manu_disable_get_blobs_v2.md
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
### Added
|
||||||
|
- `--disable-get-blobs-v2` flag.
|
||||||
@@ -356,4 +356,9 @@ var (
|
|||||||
Usage: "A comma-separated list of exponents (of 2) in decreasing order, defining the state diff hierarchy levels. The last exponent must be greater than or equal to 5.",
|
Usage: "A comma-separated list of exponents (of 2) in decreasing order, defining the state diff hierarchy levels. The last exponent must be greater than or equal to 5.",
|
||||||
Value: cli.NewIntSlice(21, 18, 16, 13, 11, 9, 5),
|
Value: cli.NewIntSlice(21, 18, 16, 13, 11, 9, 5),
|
||||||
}
|
}
|
||||||
|
// DisableGetBlobsV2 disables the engine_getBlobsV2 usage.
|
||||||
|
DisableGetBlobsV2 = &cli.BoolFlag{
|
||||||
|
Name: "disable-get-blobs-v2",
|
||||||
|
Usage: "Disables the engine_getBlobsV2 usage.",
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ type GlobalFlags struct {
|
|||||||
SubscribeToAllSubnets bool
|
SubscribeToAllSubnets bool
|
||||||
Supernode bool
|
Supernode bool
|
||||||
SemiSupernode bool
|
SemiSupernode bool
|
||||||
|
DisableGetBlobsV2 bool
|
||||||
MinimumSyncPeers int
|
MinimumSyncPeers int
|
||||||
MinimumPeersPerSubnet int
|
MinimumPeersPerSubnet int
|
||||||
MaxConcurrentDials int
|
MaxConcurrentDials int
|
||||||
@@ -72,6 +73,11 @@ func ConfigureGlobalFlags(ctx *cli.Context) error {
|
|||||||
cfg.SemiSupernode = true
|
cfg.SemiSupernode = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ctx.Bool(DisableGetBlobsV2.Name) {
|
||||||
|
log.Warning("Disabling `engine_getBlobsV2` API")
|
||||||
|
cfg.DisableGetBlobsV2 = true
|
||||||
|
}
|
||||||
|
|
||||||
// State-diff-exponents
|
// State-diff-exponents
|
||||||
cfg.StateDiffExponents = ctx.IntSlice(StateDiffExponents.Name)
|
cfg.StateDiffExponents = ctx.IntSlice(StateDiffExponents.Name)
|
||||||
if features.Get().EnableStateDiff {
|
if features.Get().EnableStateDiff {
|
||||||
|
|||||||
@@ -147,6 +147,7 @@ var appFlags = []cli.Flag{
|
|||||||
flags.SlasherDirFlag,
|
flags.SlasherDirFlag,
|
||||||
flags.SlasherFlag,
|
flags.SlasherFlag,
|
||||||
flags.JwtId,
|
flags.JwtId,
|
||||||
|
flags.DisableGetBlobsV2,
|
||||||
storage.BlobStoragePathFlag,
|
storage.BlobStoragePathFlag,
|
||||||
storage.DataColumnStoragePathFlag,
|
storage.DataColumnStoragePathFlag,
|
||||||
storage.BlobStorageLayout,
|
storage.BlobStorageLayout,
|
||||||
|
|||||||
@@ -169,6 +169,7 @@ var appHelpFlagGroups = []flagGroup{
|
|||||||
flags.ExecutionJWTSecretFlag,
|
flags.ExecutionJWTSecretFlag,
|
||||||
flags.JwtId,
|
flags.JwtId,
|
||||||
flags.InteropMockEth1DataVotesFlag,
|
flags.InteropMockEth1DataVotesFlag,
|
||||||
|
flags.DisableGetBlobsV2,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{ // Flags relevant to configuring beacon chain monitoring.
|
{ // Flags relevant to configuring beacon chain monitoring.
|
||||||
|
|||||||
Reference in New Issue
Block a user