Compare commits

...

1 Commits

Author SHA1 Message Date
terence tsao
03ad817d7e Get blobs from el based in cli flag blob-to-address 2025-10-16 12:30:50 -07:00
9 changed files with 69 additions and 0 deletions

View File

@@ -122,6 +122,7 @@ type Reconstructor interface {
) ([]interfaces.SignedBeaconBlock, error)
ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [fieldparams.RootLength]byte, hi func(uint64) bool) ([]blocks.VerifiedROBlob, error)
ConstructDataColumnSidecars(ctx context.Context, populator peerdas.ConstructionPopulator) ([]blocks.VerifiedRODataColumn, error)
GetBlobsV2(ctx context.Context, versionedHashes []common.Hash) ([]*pb.BlobAndProofV2, error)
}
// EngineCaller defines a client that can interact with an Ethereum

View File

@@ -828,6 +828,7 @@ func (b *BeaconNode) registerSyncService(initialSyncComplete chan struct{}, bFil
regularsync.WithSlasherEnabled(b.slasherEnabled),
regularsync.WithLightClientStore(b.lcStore),
regularsync.WithBatchVerifierLimit(b.cliCtx.Int(flags.BatchVerifierLimit.Name)),
regularsync.WithBlobToAddress(b.cliCtx.String(flags.BlobToAddressFlag.Name)),
)
return b.services.RegisterService(rs)
}

View File

@@ -135,6 +135,7 @@ go_library(
"//time:go_default_library",
"//time/slots:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_ethereum_go_ethereum//core/types:go_default_library",
"@com_github_hashicorp_golang_lru//:go_default_library",
"@com_github_libp2p_go_libp2p//core:go_default_library",
"@com_github_libp2p_go_libp2p//core/host:go_default_library",

View File

@@ -192,6 +192,21 @@ var (
},
)
// Metrics for blob retrieval from execution layer based on transaction filtering
blobRetrievalAttemptsTotal = promauto.NewCounter(
prometheus.CounterOpts{
Name: "blob_retrieval_attempts_total",
Help: "Count the number of times blob retrieval from EL was attempted for filtered transactions.",
},
)
blobRetrievalSuccessTotal = promauto.NewCounter(
prometheus.CounterOpts{
Name: "blob_retrieval_success_total",
Help: "Count the number of times blob retrieval from EL was successful for filtered transactions.",
},
)
// Data column sidecar validation, beacon metrics specs
dataColumnSidecarVerificationRequestsCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "beacon_data_column_sidecar_processing_requests_total",

View File

@@ -238,3 +238,11 @@ func WithReconstructionRandGen(rg *rand.Rand) Option {
return nil
}
}
// WithBlobToAddress sets the address to filter blob transactions for.
func WithBlobToAddress(address string) Option {
return func(s *Service) error {
s.cfg.blobToAddress = address
return nil
}
}

View File

@@ -107,6 +107,7 @@ type config struct {
blobStorage *filesystem.BlobStorage
dataColumnStorage *filesystem.DataColumnStorage
batchVerifierLimit int
blobToAddress string
}
// This defines the interface for interacting with block chain service

View File

@@ -19,6 +19,7 @@ import (
"github.com/OffchainLabs/prysm/v6/io/file"
"github.com/OffchainLabs/prysm/v6/runtime/version"
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/ethereum/go-ethereum/core/types"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
@@ -78,6 +79,41 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)
// builds corresponding sidecars, save them to the storage, and broadcasts them over P2P if necessary.
func (s *Service) processSidecarsFromExecutionFromBlock(ctx context.Context, roBlock blocks.ROBlock) {
if roBlock.Version() >= version.Fulu {
payload, err := roBlock.Block().Body().Execution()
if err != nil {
log.WithError(err).Error("Failed to extract execution payload")
return
}
txs, err := payload.Transactions()
if err != nil {
log.WithError(err).Error("Failed to extract transactions from execution payload")
return
}
for i, txBytes := range txs {
var tx types.Transaction
if err := tx.UnmarshalBinary(txBytes); err != nil {
log.WithError(err).WithField("txIndex", i).Error("Failed to unmarshal transaction")
continue
}
if tx.Type() == 3 && s.cfg.blobToAddress != "" && tx.To() != nil && tx.To().Hex() == s.cfg.blobToAddress {
blobHashes := tx.BlobHashes()
blobRetrievalAttemptsTotal.Inc()
_, err := s.cfg.executionReconstructor.GetBlobsV2(ctx, blobHashes)
if err != nil {
log.WithError(err).WithField("blobHashes", blobHashes).Error("Failed to reconstruct blob")
continue
}
blobRetrievalSuccessTotal.Inc()
log.WithFields(logrus.Fields{
"txIndex": i,
"txHash": tx.Hash().Hex(),
"slot": roBlock.Block().Slot(),
"blobHashes": blobHashes,
}).Info("Retrieved blob")
}
}
if err := s.processDataColumnSidecarsFromExecution(ctx, peerdas.PopulateFromBlock(roBlock)); err != nil {
log.WithError(err).Error("Failed to process data column sidecars from execution")
return

View File

@@ -344,4 +344,9 @@ var (
Usage: "Maximum number of signatures to batch verify at once for beacon attestation p2p gossip.",
Value: 1000,
}
BlobToAddressFlag = &cli.StringFlag{
Name: "blob-to-address",
Usage: "Ethereum address to filter type 3 (blob) transactions for processing. When set, only blob transactions sent to this address will be used to try to retrieve blobs from the execution layer and save them.",
Value: "",
}
)

View File

@@ -87,6 +87,7 @@ var appFlags = []cli.Flag{
flags.BeaconDBPruning,
flags.PrunerRetentionEpochs,
flags.EnableBuilderSSZ,
flags.BlobToAddressFlag,
cmd.MinimalConfigFlag,
cmd.E2EConfigFlag,
cmd.RPCMaxPageSizeFlag,