Compare commits

..

2 Commits

Author SHA1 Message Date
Aarsh Shah
970524a5ad add changelog 2026-02-03 13:40:49 +04:00
Aarsh Shah
55d5071db8 set options after reading config 2026-02-03 13:37:49 +04:00
12 changed files with 100 additions and 234 deletions

View File

@@ -610,6 +610,7 @@ func (dcs *DataColumnStorage) Clear() error {
// prune clean the cache, the filesystem and mutexes.
func (dcs *DataColumnStorage) prune() {
log.WithField("highestStoredEpoch", dcs.cache.HighestEpoch()).WithField("retentionEpochs", dcs.retentionEpochs).Debug("Pruning data column storage")
startTime := time.Now()
defer func() {
dataColumnPruneLatency.Observe(float64(time.Since(startTime).Milliseconds()))

View File

@@ -134,10 +134,19 @@ type BeaconNode struct {
// New creates a new node instance, sets up configuration options, and registers
// every required service to the node.
func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*BeaconNode, error) {
func New(cliCtx *cli.Context, cancel context.CancelFunc, optFuncs []func(*cli.Context) ([]Option, error), opts ...Option) (*BeaconNode, error) {
if err := configureBeacon(cliCtx); err != nil {
return nil, errors.Wrap(err, "could not set beacon configuration options")
}
for _, of := range optFuncs {
ofo, err := of(cliCtx)
if err != nil {
return nil, err
}
if ofo != nil {
opts = append(opts, ofo...)
}
}
ctx := cliCtx.Context
beacon := &BeaconNode{

View File

@@ -59,7 +59,7 @@ func TestNodeClose_OK(t *testing.T) {
WithDataColumnStorage(filesystem.NewEphemeralDataColumnStorage(t)),
}
node, err := New(ctx, cancel, options...)
node, err := New(ctx, cancel, nil, options...)
require.NoError(t, err)
node.Close()
@@ -87,7 +87,7 @@ func TestNodeStart_Ok(t *testing.T) {
WithDataColumnStorage(filesystem.NewEphemeralDataColumnStorage(t)),
}
node, err := New(ctx, cancel, options...)
node, err := New(ctx, cancel, nil, options...)
require.NoError(t, err)
require.NotNil(t, node.lcStore)
node.services = &runtime.ServiceRegistry{}
@@ -116,7 +116,7 @@ func TestNodeStart_SyncChecker(t *testing.T) {
WithDataColumnStorage(filesystem.NewEphemeralDataColumnStorage(t)),
}
node, err := New(ctx, cancel, options...)
node, err := New(ctx, cancel, nil, options...)
require.NoError(t, err)
go func() {
node.Start()
@@ -151,7 +151,7 @@ func TestClearDB(t *testing.T) {
WithDataColumnStorage(filesystem.NewEphemeralDataColumnStorage(t)),
}
_, err = New(context, cancel, options...)
_, err = New(context, cancel, nil, options...)
require.NoError(t, err)
require.LogsContain(t, hook, "Removing database")
}

View File

@@ -0,0 +1,3 @@
### Added
- Set beacon node options after reading the config file.

View File

@@ -1,3 +0,0 @@
### Ignored
- adding some short retries for some end to end evaluators in an attempt to deflake tests.

View File

@@ -367,17 +367,8 @@ func startNode(ctx *cli.Context, cancel context.CancelFunc) error {
backfill.BeaconNodeOptions,
das.BeaconNodeOptions,
}
for _, of := range optFuncs {
ofo, err := of(ctx)
if err != nil {
return err
}
if ofo != nil {
opts = append(opts, ofo...)
}
}
beacon, err := node.New(ctx, cancel, opts...)
beacon, err := node.New(ctx, cancel, optFuncs, opts...)
if err != nil {
return fmt.Errorf("unable to start beacon node: %w", err)
}

View File

@@ -40,7 +40,6 @@ type TransactionGenerator struct {
cancel context.CancelFunc
paused bool
useLargeBlobs bool // Use large blob transactions (6 blobs per tx) for BPO testing
blobTxCount int // Number of blob transactions per slot (0 means default of 5)
}
func (t *TransactionGenerator) UnderlyingProcess() *os.Process {
@@ -49,8 +48,8 @@ func (t *TransactionGenerator) UnderlyingProcess() *os.Process {
return &os.Process{}
}
func NewTransactionGenerator(keystore string, seed int64, useLargeBlobs bool, blobTxCount int) *TransactionGenerator {
return &TransactionGenerator{keystore: keystore, seed: seed, useLargeBlobs: useLargeBlobs, blobTxCount: blobTxCount}
func NewTransactionGenerator(keystore string, seed int64, useLargeBlobs bool) *TransactionGenerator {
return &TransactionGenerator{keystore: keystore, seed: seed, useLargeBlobs: useLargeBlobs}
}
func (t *TransactionGenerator) Start(ctx context.Context) error {
@@ -115,7 +114,7 @@ func (t *TransactionGenerator) Start(ctx context.Context) error {
continue
}
backend := ethclient.NewClient(client)
err = SendTransaction(client, mineKey.PrivateKey, gasPrice, mineKey.Address.String(), txCount, backend, false, t.useLargeBlobs, t.blobTxCount)
err = SendTransaction(client, mineKey.PrivateKey, gasPrice, mineKey.Address.String(), txCount, backend, false, t.useLargeBlobs)
if err != nil {
return err
}
@@ -129,7 +128,7 @@ func (s *TransactionGenerator) Started() <-chan struct{} {
return s.started
}
func SendTransaction(client *rpc.Client, key *ecdsa.PrivateKey, gasPrice *big.Int, addr string, txCount uint64, backend *ethclient.Client, al bool, useLargeBlobs bool, blobTxCount int) error {
func SendTransaction(client *rpc.Client, key *ecdsa.PrivateKey, gasPrice *big.Int, addr string, txCount uint64, backend *ethclient.Client, al bool, useLargeBlobs bool) error {
sender := common.HexToAddress(addr)
nonce, err := backend.PendingNonceAt(context.Background(), fundedAccount.Address)
if err != nil {
@@ -151,19 +150,14 @@ func SendTransaction(client *rpc.Client, key *ecdsa.PrivateKey, gasPrice *big.In
clock := startup.NewClock(e2e.TestParams.CLGenesisTime, [32]byte{})
isPostFulu := clock.CurrentEpoch() >= params.BeaconConfig().FuluForkEpoch
// Default to 5 blob transactions per slot if not configured.
numBlobTxs := blobTxCount
if numBlobTxs <= 0 {
numBlobTxs = 5
}
g, _ := errgroup.WithContext(context.Background())
txs := make([]*types.Transaction, numBlobTxs)
txs := make([]*types.Transaction, 10)
// Send blob transactions - use different versions pre/post Fulu
if isPostFulu {
logrus.Info("Sending blob transactions with cell proofs")
for index := range uint64(numBlobTxs) {
// Reduced from 10 to 5 to reduce load and prevent builder/EL timeouts
for index := range uint64(5) {
g.Go(func() error {
tx, err := RandomBlobCellTx(client, fundedAccount.Address, nonce+index, gasPrice, chainid, al, useLargeBlobs)
@@ -182,7 +176,8 @@ func SendTransaction(client *rpc.Client, key *ecdsa.PrivateKey, gasPrice *big.In
}
} else {
logrus.Info("Sending blob transactions with sidecars")
for index := range uint64(numBlobTxs) {
// Reduced from 10 to 5 to reduce load and prevent builder/EL timeouts
for index := range uint64(5) {
g.Go(func() error {
tx, err := RandomBlobTx(client, fundedAccount.Address, nonce+index, gasPrice, chainid, al, useLargeBlobs)

View File

@@ -252,7 +252,7 @@ func (r *testRunner) testDepositsAndTx(ctx context.Context, g *errgroup.Group,
}
func (r *testRunner) testTxGeneration(ctx context.Context, g *errgroup.Group, keystorePath string, requiredNodes []e2etypes.ComponentRunner) {
txGenerator := eth1.NewTransactionGenerator(keystorePath, r.config.Seed, r.config.UseLargeBlobs, r.config.BlobTxCount)
txGenerator := eth1.NewTransactionGenerator(keystorePath, r.config.Seed, r.config.UseLargeBlobs)
r.comHandler.txGen = txGenerator
g.Go(func() error {
if err := helpers.ComponentsStarted(ctx, requiredNodes); err != nil {

View File

@@ -156,9 +156,19 @@ func waitForMidEpoch(conn *grpc.ClientConn) error {
}
}
// getHeadEpochs fetches the head epoch from all beacon nodes concurrently.
func getHeadEpochs(conns []*grpc.ClientConn) ([]primitives.Epoch, error) {
epochs := make([]primitives.Epoch, len(conns))
func allNodesHaveSameHead(_ *e2etypes.EvaluationContext, conns ...*grpc.ClientConn) error {
// Wait until we're at least halfway into the epoch to avoid race conditions
// at epoch boundaries where nodes may report different epochs.
if err := waitForMidEpoch(conns[0]); err != nil {
return errors.Wrap(err, "failed waiting for mid-epoch")
}
headEpochs := make([]primitives.Epoch, len(conns))
headBlockRoots := make([][]byte, len(conns))
justifiedRoots := make([][]byte, len(conns))
prevJustifiedRoots := make([][]byte, len(conns))
finalizedRoots := make([][]byte, len(conns))
chainHeads := make([]*eth.ChainHead, len(conns))
g, _ := errgroup.WithContext(context.Background())
for i, conn := range conns {
@@ -170,145 +180,63 @@ func getHeadEpochs(conns []*grpc.ClientConn) ([]primitives.Epoch, error) {
if err != nil {
return errors.Wrapf(err, "connection number=%d", conIdx)
}
epochs[conIdx] = chainHead.HeadEpoch
headEpochs[conIdx] = chainHead.HeadEpoch
headBlockRoots[conIdx] = chainHead.HeadBlockRoot
justifiedRoots[conIdx] = chainHead.JustifiedBlockRoot
prevJustifiedRoots[conIdx] = chainHead.PreviousJustifiedBlockRoot
finalizedRoots[conIdx] = chainHead.FinalizedBlockRoot
chainHeads[conIdx] = chainHead
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
return err
}
return epochs, nil
}
func allNodesHaveSameHead(_ *e2etypes.EvaluationContext, conns ...*grpc.ClientConn) error {
// Wait until we're at least halfway into the epoch to avoid race conditions
// at epoch boundaries where nodes may report different epochs.
if err := waitForMidEpoch(conns[0]); err != nil {
return errors.Wrap(err, "failed waiting for mid-epoch")
}
// First, wait for all nodes to reach the same epoch. Sync nodes may be
// behind and need time to catch up. We poll every 2 seconds with a
// 60 second timeout - this adapts to actual sync progress rather than
// using fixed delays.
const epochTimeout = 60 * time.Second
const epochPollInterval = 2 * time.Second
epochDeadline := time.Now().Add(epochTimeout)
for time.Now().Before(epochDeadline) {
epochs, err := getHeadEpochs(conns)
if err != nil {
return err
}
allSame := true
for i := 1; i < len(epochs); i++ {
if epochs[0] != epochs[i] {
allSame = false
break
}
}
if allSame {
break
}
time.Sleep(epochPollInterval)
}
// Now that epochs match (or timeout reached), do detailed head comparison
// with a few retries to handle block propagation delays.
const maxRetries = 5
const retryDelay = 1 * time.Second
var lastErr error
for attempt := range maxRetries {
if attempt > 0 {
time.Sleep(retryDelay)
}
headEpochs := make([]primitives.Epoch, len(conns))
headBlockRoots := make([][]byte, len(conns))
justifiedRoots := make([][]byte, len(conns))
prevJustifiedRoots := make([][]byte, len(conns))
finalizedRoots := make([][]byte, len(conns))
chainHeads := make([]*eth.ChainHead, len(conns))
g, _ := errgroup.WithContext(context.Background())
for i, conn := range conns {
conIdx := i
currConn := conn
g.Go(func() error {
beaconClient := eth.NewBeaconChainClient(currConn)
chainHead, err := beaconClient.GetChainHead(context.Background(), &emptypb.Empty{})
if err != nil {
return errors.Wrapf(err, "connection number=%d", conIdx)
}
headEpochs[conIdx] = chainHead.HeadEpoch
headBlockRoots[conIdx] = chainHead.HeadBlockRoot
justifiedRoots[conIdx] = chainHead.JustifiedBlockRoot
prevJustifiedRoots[conIdx] = chainHead.PreviousJustifiedBlockRoot
finalizedRoots[conIdx] = chainHead.FinalizedBlockRoot
chainHeads[conIdx] = chainHead
return nil
})
}
if err := g.Wait(); err != nil {
return err
}
lastErr = nil
for i := range conns {
if headEpochs[0] != headEpochs[i] {
lastErr = fmt.Errorf(
"received conflicting head epochs on node %d, expected %d, received %d",
i,
headEpochs[0],
headEpochs[i],
)
break
}
if !bytes.Equal(headBlockRoots[0], headBlockRoots[i]) {
lastErr = fmt.Errorf(
"received conflicting head block roots on node %d, expected %#x, received %#x",
i,
headBlockRoots[0],
headBlockRoots[i],
)
break
}
if !bytes.Equal(justifiedRoots[0], justifiedRoots[i]) {
lastErr = fmt.Errorf(
"received conflicting justified block roots on node %d, expected %#x, received %#x: %s and %s",
i,
justifiedRoots[0],
justifiedRoots[i],
chainHeads[0].String(),
chainHeads[i].String(),
)
break
}
if !bytes.Equal(prevJustifiedRoots[0], prevJustifiedRoots[i]) {
lastErr = fmt.Errorf(
"received conflicting previous justified block roots on node %d, expected %#x, received %#x",
i,
prevJustifiedRoots[0],
prevJustifiedRoots[i],
)
break
}
if !bytes.Equal(finalizedRoots[0], finalizedRoots[i]) {
lastErr = fmt.Errorf(
"received conflicting finalized epoch roots on node %d, expected %#x, received %#x",
i,
finalizedRoots[0],
finalizedRoots[i],
)
break
}
}
if lastErr == nil {
return nil
}
}
return lastErr
for i := range conns {
if headEpochs[0] != headEpochs[i] {
return fmt.Errorf(
"received conflicting head epochs on node %d, expected %d, received %d",
i,
headEpochs[0],
headEpochs[i],
)
}
if !bytes.Equal(headBlockRoots[0], headBlockRoots[i]) {
return fmt.Errorf(
"received conflicting head block roots on node %d, expected %#x, received %#x",
i,
headBlockRoots[0],
headBlockRoots[i],
)
}
if !bytes.Equal(justifiedRoots[0], justifiedRoots[i]) {
return fmt.Errorf(
"received conflicting justified block roots on node %d, expected %#x, received %#x: %s and %s",
i,
justifiedRoots[0],
justifiedRoots[i],
chainHeads[0].String(),
chainHeads[i].String(),
)
}
if !bytes.Equal(prevJustifiedRoots[0], prevJustifiedRoots[i]) {
return fmt.Errorf(
"received conflicting previous justified block roots on node %d, expected %#x, received %#x",
i,
prevJustifiedRoots[0],
prevJustifiedRoots[i],
)
}
if !bytes.Equal(finalizedRoots[0], finalizedRoots[i]) {
return fmt.Errorf(
"received conflicting finalized epoch roots on node %d, expected %#x, received %#x",
i,
finalizedRoots[0],
finalizedRoots[i],
)
}
}
return nil
}

View File

@@ -6,7 +6,6 @@ import (
"fmt"
"net/http"
"strconv"
"time"
"github.com/OffchainLabs/prysm/v7/api/server/structs"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/altair"
@@ -124,25 +123,6 @@ func validatorsAreActive(ec *types.EvaluationContext, conns ...*grpc.ClientConn)
// validatorsParticipating ensures the validators have an acceptable participation rate.
func validatorsParticipating(_ *types.EvaluationContext, conns ...*grpc.ClientConn) error {
// Retry up to 3 times with 2 second delays to handle timing flakes where
// attestations haven't been fully processed yet due to block propagation delays.
const maxRetries = 3
const retryDelay = 2 * time.Second
var lastErr error
for attempt := range maxRetries {
if attempt > 0 {
time.Sleep(retryDelay)
}
lastErr = checkValidatorsParticipating(conns)
if lastErr == nil {
return nil
}
}
return lastErr
}
func checkValidatorsParticipating(conns []*grpc.ClientConn) error {
conn := conns[0]
client := ethpb.NewBeaconChainClient(conn)
validatorRequest := &ethpb.GetValidatorParticipationRequest{}
@@ -254,25 +234,6 @@ func checkValidatorsParticipating(conns []*grpc.ClientConn) error {
// validatorsSyncParticipation ensures the validators have an acceptable participation rate for
// sync committee assignments.
func validatorsSyncParticipation(_ *types.EvaluationContext, conns ...*grpc.ClientConn) error {
// Retry up to 3 times with 2 second delays to handle timing flakes where
// sync committee messages haven't fully propagated yet.
const maxRetries = 3
const retryDelay = 2 * time.Second
var lastErr error
for attempt := range maxRetries {
if attempt > 0 {
time.Sleep(retryDelay)
}
lastErr = checkSyncParticipation(conns)
if lastErr == nil {
return nil
}
}
return lastErr
}
func checkSyncParticipation(conns []*grpc.ClientConn) error {
conn := conns[0]
client := ethpb.NewNodeClient(conn)
altairClient := ethpb.NewBeaconChainClient(conn)
@@ -311,9 +272,9 @@ func checkSyncParticipation(conns []*grpc.ClientConn) error {
// Skip fork slot.
continue
}
// Skip early slots at genesis - validators need time to ramp up after chain start
// Skip slots 1-2 at genesis - validators need time to ramp up after chain start
// due to doppelganger protection. This is a startup timing issue, not a fork transition issue.
if b.Block().Slot() < 5 {
if b.Block().Slot() < 3 {
continue
}
expectedParticipation := expectedSyncParticipation
@@ -328,11 +289,6 @@ func checkSyncParticipation(conns []*grpc.ClientConn) error {
if err != nil {
return err
}
// Skip blocks with zero sync bits - these are typically empty/anomalous blocks
// where the proposer didn't receive sync committee contributions in time.
if syncAgg.SyncCommitteeBits.Count() == 0 {
continue
}
threshold := uint64(float64(syncAgg.SyncCommitteeBits.Len()) * expectedParticipation)
if syncAgg.SyncCommitteeBits.Count() < threshold {
return errors.Errorf("In block of slot %d ,the aggregate bitvector with length of %d only got a count of %d", b.Block().Slot(), threshold, syncAgg.SyncCommitteeBits.Count())
@@ -387,11 +343,6 @@ func checkSyncParticipation(conns []*grpc.ClientConn) error {
if err != nil {
return err
}
// Skip blocks with zero sync bits - these are typically empty/anomalous blocks
// where the proposer didn't receive sync committee contributions in time.
if syncAgg.SyncCommitteeBits.Count() == 0 {
continue
}
threshold := uint64(float64(syncAgg.SyncCommitteeBits.Len()) * expectedSyncParticipation)
if syncAgg.SyncCommitteeBits.Count() < threshold {
return errors.Errorf("In block of slot %d ,the aggregate bitvector with length of %d only got a count of %d", b.Block().Slot(), threshold, syncAgg.SyncCommitteeBits.Count())

View File

@@ -9,11 +9,11 @@ import (
)
func TestEndToEnd_MinimalConfig_WithBuilder(t *testing.T) {
r := e2eMinimal(t, types.InitForkCfg(version.Bellatrix, version.Electra, params.E2ETestConfig()), types.WithCheckpointSync(), types.WithBuilder(), types.WithBlobTxCount(2))
r := e2eMinimal(t, types.InitForkCfg(version.Bellatrix, version.Electra, params.E2ETestConfig()), types.WithCheckpointSync(), types.WithBuilder())
r.run()
}
func TestEndToEnd_MinimalConfig_WithBuilder_ValidatorRESTApi(t *testing.T) {
r := e2eMinimal(t, types.InitForkCfg(version.Bellatrix, version.Electra, params.E2ETestConfig()), types.WithCheckpointSync(), types.WithBuilder(), types.WithValidatorRESTApi(), types.WithBlobTxCount(2))
r := e2eMinimal(t, types.InitForkCfg(version.Bellatrix, version.Electra, params.E2ETestConfig()), types.WithCheckpointSync(), types.WithBuilder(), types.WithValidatorRESTApi())
r.run()
}

View File

@@ -68,14 +68,6 @@ func WithLargeBlobs() E2EConfigOpt {
}
}
// WithBlobTxCount sets the number of blob transactions sent per slot.
// Default is 5 when not specified.
func WithBlobTxCount(n int) E2EConfigOpt {
return func(cfg *E2EConfig) {
cfg.BlobTxCount = n
}
}
func WithSSZOnly() E2EConfigOpt {
return func(cfg *E2EConfig) {
if err := os.Setenv(params.EnvNameOverrideAccept, api.OctetStreamMediaType); err != nil {
@@ -116,7 +108,6 @@ type E2EConfig struct {
UseBeaconRestApi bool
UseBuilder bool
UseLargeBlobs bool // Use large blob transactions (6 blobs per tx) for BPO testing
BlobTxCount int // Number of blob transactions per slot (0 means default of 5)
EpochsToRun uint64
ExitEpoch primitives.Epoch // Custom epoch for voluntary exit submission (0 means use default)
Seed int64