mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-30 23:58:23 -05:00
Compare commits
1 Commits
deflake-ev
...
vals-hash-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6c64a1fbe0 |
@@ -46,6 +46,7 @@ go_library(
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
"@org_golang_x_sync//errgroup:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
@@ -78,6 +79,7 @@ go_test(
|
||||
"//testing/assert:go_default_library",
|
||||
"//testing/require:go_default_library",
|
||||
"//testing/util:go_default_library",
|
||||
"@org_golang_x_sync//errgroup:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
@@ -2,16 +2,16 @@ package stateutil
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"runtime"
|
||||
"sync"
|
||||
|
||||
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
|
||||
"github.com/OffchainLabs/prysm/v7/crypto/hash/htr"
|
||||
"github.com/OffchainLabs/prysm/v7/encoding/ssz"
|
||||
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -54,17 +54,23 @@ func validatorRegistryRoot(validators []*ethpb.Validator) ([32]byte, error) {
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func hashValidatorHelper(validators []*ethpb.Validator, roots [][32]byte, j int, groupSize int, wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
for i := range groupSize {
|
||||
fRoots, err := ValidatorFieldRoots(validators[j*groupSize+i])
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Could not get validator field roots")
|
||||
return
|
||||
}
|
||||
for k, root := range fRoots {
|
||||
roots[(j*groupSize+i)*validatorFieldRoots+k] = root
|
||||
func hashValidatorHelper(ctx context.Context, validators []*ethpb.Validator, roots [][32]byte, j int, groupSize int) func() error {
|
||||
return func() error {
|
||||
for i := range groupSize {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
fRoots, err := ValidatorFieldRoots(validators[j*groupSize+i])
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not get validator field roots")
|
||||
}
|
||||
for k, root := range fRoots {
|
||||
roots[(j*groupSize+i)*validatorFieldRoots+k] = root
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -75,14 +81,13 @@ func OptimizedValidatorRoots(validators []*ethpb.Validator) ([][32]byte, error)
|
||||
if len(validators) == 0 {
|
||||
return [][32]byte{}, nil
|
||||
}
|
||||
wg := sync.WaitGroup{}
|
||||
g, ctx := errgroup.WithContext(context.Background())
|
||||
n := runtime.GOMAXPROCS(0)
|
||||
rootsSize := len(validators) * validatorFieldRoots
|
||||
groupSize := len(validators) / n
|
||||
roots := make([][32]byte, rootsSize)
|
||||
wg.Add(n - 1)
|
||||
for j := 0; j < n-1; j++ {
|
||||
go hashValidatorHelper(validators, roots, j, groupSize, &wg)
|
||||
g.Go(hashValidatorHelper(ctx, validators, roots, j, groupSize))
|
||||
}
|
||||
for i := (n - 1) * groupSize; i < len(validators); i++ {
|
||||
fRoots, err := ValidatorFieldRoots(validators[i])
|
||||
@@ -93,7 +98,9 @@ func OptimizedValidatorRoots(validators []*ethpb.Validator) ([][32]byte, error)
|
||||
roots[i*validatorFieldRoots+k] = root
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
if err := g.Wait(); err != nil {
|
||||
return [][32]byte{}, err
|
||||
}
|
||||
|
||||
// A validator's tree can represented with a depth of 3. As log2(8) = 3
|
||||
// Using this property we can lay out all the individual fields of a
|
||||
|
||||
@@ -3,13 +3,13 @@ package stateutil
|
||||
import (
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
mathutil "github.com/OffchainLabs/prysm/v7/math"
|
||||
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
||||
"github.com/OffchainLabs/prysm/v7/testing/assert"
|
||||
"github.com/OffchainLabs/prysm/v7/testing/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func TestValidatorConstants(t *testing.T) {
|
||||
@@ -34,15 +34,15 @@ func TestValidatorConstants(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestHashValidatorHelper(t *testing.T) {
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
g, ctx := errgroup.WithContext(t.Context())
|
||||
v := ðpb.Validator{}
|
||||
valList := make([]*ethpb.Validator, 10*validatorFieldRoots)
|
||||
for i := range valList {
|
||||
valList[i] = v
|
||||
}
|
||||
roots := make([][32]byte, len(valList))
|
||||
hashValidatorHelper(valList, roots, 2, 2, &wg)
|
||||
g.Go(hashValidatorHelper(ctx, valList, roots, 2, 2))
|
||||
require.NoError(t, g.Wait())
|
||||
for i := range 4 * validatorFieldRoots {
|
||||
require.Equal(t, [32]byte{}, roots[i])
|
||||
}
|
||||
|
||||
@@ -1,3 +0,0 @@
|
||||
### Ignored
|
||||
|
||||
- adding some short retries for some end to end evaluators in an attempt to deflake tests.
|
||||
3
changelog/radek_vals-hash-errgroup.md
Normal file
3
changelog/radek_vals-hash-errgroup.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Changed
|
||||
|
||||
- Use `errgroup` in `OptimizedValidatorRoots`.
|
||||
@@ -40,7 +40,6 @@ type TransactionGenerator struct {
|
||||
cancel context.CancelFunc
|
||||
paused bool
|
||||
useLargeBlobs bool // Use large blob transactions (6 blobs per tx) for BPO testing
|
||||
blobTxCount int // Number of blob transactions per slot (0 means default of 5)
|
||||
}
|
||||
|
||||
func (t *TransactionGenerator) UnderlyingProcess() *os.Process {
|
||||
@@ -49,8 +48,8 @@ func (t *TransactionGenerator) UnderlyingProcess() *os.Process {
|
||||
return &os.Process{}
|
||||
}
|
||||
|
||||
func NewTransactionGenerator(keystore string, seed int64, useLargeBlobs bool, blobTxCount int) *TransactionGenerator {
|
||||
return &TransactionGenerator{keystore: keystore, seed: seed, useLargeBlobs: useLargeBlobs, blobTxCount: blobTxCount}
|
||||
func NewTransactionGenerator(keystore string, seed int64, useLargeBlobs bool) *TransactionGenerator {
|
||||
return &TransactionGenerator{keystore: keystore, seed: seed, useLargeBlobs: useLargeBlobs}
|
||||
}
|
||||
|
||||
func (t *TransactionGenerator) Start(ctx context.Context) error {
|
||||
@@ -115,7 +114,7 @@ func (t *TransactionGenerator) Start(ctx context.Context) error {
|
||||
continue
|
||||
}
|
||||
backend := ethclient.NewClient(client)
|
||||
err = SendTransaction(client, mineKey.PrivateKey, gasPrice, mineKey.Address.String(), txCount, backend, false, t.useLargeBlobs, t.blobTxCount)
|
||||
err = SendTransaction(client, mineKey.PrivateKey, gasPrice, mineKey.Address.String(), txCount, backend, false, t.useLargeBlobs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -129,7 +128,7 @@ func (s *TransactionGenerator) Started() <-chan struct{} {
|
||||
return s.started
|
||||
}
|
||||
|
||||
func SendTransaction(client *rpc.Client, key *ecdsa.PrivateKey, gasPrice *big.Int, addr string, txCount uint64, backend *ethclient.Client, al bool, useLargeBlobs bool, blobTxCount int) error {
|
||||
func SendTransaction(client *rpc.Client, key *ecdsa.PrivateKey, gasPrice *big.Int, addr string, txCount uint64, backend *ethclient.Client, al bool, useLargeBlobs bool) error {
|
||||
sender := common.HexToAddress(addr)
|
||||
nonce, err := backend.PendingNonceAt(context.Background(), fundedAccount.Address)
|
||||
if err != nil {
|
||||
@@ -151,19 +150,14 @@ func SendTransaction(client *rpc.Client, key *ecdsa.PrivateKey, gasPrice *big.In
|
||||
clock := startup.NewClock(e2e.TestParams.CLGenesisTime, [32]byte{})
|
||||
isPostFulu := clock.CurrentEpoch() >= params.BeaconConfig().FuluForkEpoch
|
||||
|
||||
// Default to 5 blob transactions per slot if not configured.
|
||||
numBlobTxs := blobTxCount
|
||||
if numBlobTxs <= 0 {
|
||||
numBlobTxs = 5
|
||||
}
|
||||
|
||||
g, _ := errgroup.WithContext(context.Background())
|
||||
txs := make([]*types.Transaction, numBlobTxs)
|
||||
txs := make([]*types.Transaction, 10)
|
||||
|
||||
// Send blob transactions - use different versions pre/post Fulu
|
||||
if isPostFulu {
|
||||
logrus.Info("Sending blob transactions with cell proofs")
|
||||
for index := range uint64(numBlobTxs) {
|
||||
// Reduced from 10 to 5 to reduce load and prevent builder/EL timeouts
|
||||
for index := range uint64(5) {
|
||||
|
||||
g.Go(func() error {
|
||||
tx, err := RandomBlobCellTx(client, fundedAccount.Address, nonce+index, gasPrice, chainid, al, useLargeBlobs)
|
||||
@@ -182,7 +176,8 @@ func SendTransaction(client *rpc.Client, key *ecdsa.PrivateKey, gasPrice *big.In
|
||||
}
|
||||
} else {
|
||||
logrus.Info("Sending blob transactions with sidecars")
|
||||
for index := range uint64(numBlobTxs) {
|
||||
// Reduced from 10 to 5 to reduce load and prevent builder/EL timeouts
|
||||
for index := range uint64(5) {
|
||||
|
||||
g.Go(func() error {
|
||||
tx, err := RandomBlobTx(client, fundedAccount.Address, nonce+index, gasPrice, chainid, al, useLargeBlobs)
|
||||
|
||||
@@ -225,9 +225,9 @@ func (r *testRunner) testDepositsAndTx(ctx context.Context, g *errgroup.Group,
|
||||
if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{r.depositor}); err != nil {
|
||||
return errors.Wrap(err, "testDepositsAndTx unable to run, depositor did not Start")
|
||||
}
|
||||
go func() {
|
||||
if r.config.TestDeposits {
|
||||
log.Info("Running deposit tests")
|
||||
go func() {
|
||||
if r.config.TestDeposits {
|
||||
log.Info("Running deposit tests")
|
||||
// The validators with an index < minGenesisActiveCount all have deposits already from the chain start.
|
||||
// Skip all of those chain start validators by seeking to minGenesisActiveCount in the validator list
|
||||
// for further deposit testing.
|
||||
@@ -238,12 +238,12 @@ func (r *testRunner) testDepositsAndTx(ctx context.Context, g *errgroup.Group,
|
||||
r.t.Error(errors.Wrap(err, "depositor.SendAndMine failed"))
|
||||
}
|
||||
}
|
||||
}
|
||||
// Only generate background transactions when relevant for the test.
|
||||
if r.config.TestDeposits || r.config.TestFeature || r.config.UseBuilder {
|
||||
r.testTxGeneration(ctx, g, keystorePath, []e2etypes.ComponentRunner{})
|
||||
}
|
||||
}()
|
||||
}
|
||||
// Only generate background transactions when relevant for the test.
|
||||
if r.config.TestDeposits || r.config.TestFeature || r.config.UseBuilder {
|
||||
r.testTxGeneration(ctx, g, keystorePath, []e2etypes.ComponentRunner{})
|
||||
}
|
||||
}()
|
||||
if r.config.TestDeposits {
|
||||
return depositCheckValidator.Start(ctx)
|
||||
}
|
||||
@@ -252,7 +252,7 @@ func (r *testRunner) testDepositsAndTx(ctx context.Context, g *errgroup.Group,
|
||||
}
|
||||
|
||||
func (r *testRunner) testTxGeneration(ctx context.Context, g *errgroup.Group, keystorePath string, requiredNodes []e2etypes.ComponentRunner) {
|
||||
txGenerator := eth1.NewTransactionGenerator(keystorePath, r.config.Seed, r.config.UseLargeBlobs, r.config.BlobTxCount)
|
||||
txGenerator := eth1.NewTransactionGenerator(keystorePath, r.config.Seed, r.config.UseLargeBlobs)
|
||||
r.comHandler.txGen = txGenerator
|
||||
g.Go(func() error {
|
||||
if err := helpers.ComponentsStarted(ctx, requiredNodes); err != nil {
|
||||
|
||||
@@ -156,9 +156,19 @@ func waitForMidEpoch(conn *grpc.ClientConn) error {
|
||||
}
|
||||
}
|
||||
|
||||
// getHeadEpochs fetches the head epoch from all beacon nodes concurrently.
|
||||
func getHeadEpochs(conns []*grpc.ClientConn) ([]primitives.Epoch, error) {
|
||||
epochs := make([]primitives.Epoch, len(conns))
|
||||
func allNodesHaveSameHead(_ *e2etypes.EvaluationContext, conns ...*grpc.ClientConn) error {
|
||||
// Wait until we're at least halfway into the epoch to avoid race conditions
|
||||
// at epoch boundaries where nodes may report different epochs.
|
||||
if err := waitForMidEpoch(conns[0]); err != nil {
|
||||
return errors.Wrap(err, "failed waiting for mid-epoch")
|
||||
}
|
||||
|
||||
headEpochs := make([]primitives.Epoch, len(conns))
|
||||
headBlockRoots := make([][]byte, len(conns))
|
||||
justifiedRoots := make([][]byte, len(conns))
|
||||
prevJustifiedRoots := make([][]byte, len(conns))
|
||||
finalizedRoots := make([][]byte, len(conns))
|
||||
chainHeads := make([]*eth.ChainHead, len(conns))
|
||||
g, _ := errgroup.WithContext(context.Background())
|
||||
|
||||
for i, conn := range conns {
|
||||
@@ -170,145 +180,63 @@ func getHeadEpochs(conns []*grpc.ClientConn) ([]primitives.Epoch, error) {
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "connection number=%d", conIdx)
|
||||
}
|
||||
epochs[conIdx] = chainHead.HeadEpoch
|
||||
headEpochs[conIdx] = chainHead.HeadEpoch
|
||||
headBlockRoots[conIdx] = chainHead.HeadBlockRoot
|
||||
justifiedRoots[conIdx] = chainHead.JustifiedBlockRoot
|
||||
prevJustifiedRoots[conIdx] = chainHead.PreviousJustifiedBlockRoot
|
||||
finalizedRoots[conIdx] = chainHead.FinalizedBlockRoot
|
||||
chainHeads[conIdx] = chainHead
|
||||
return nil
|
||||
})
|
||||
}
|
||||
if err := g.Wait(); err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
return epochs, nil
|
||||
}
|
||||
|
||||
func allNodesHaveSameHead(_ *e2etypes.EvaluationContext, conns ...*grpc.ClientConn) error {
|
||||
// Wait until we're at least halfway into the epoch to avoid race conditions
|
||||
// at epoch boundaries where nodes may report different epochs.
|
||||
if err := waitForMidEpoch(conns[0]); err != nil {
|
||||
return errors.Wrap(err, "failed waiting for mid-epoch")
|
||||
}
|
||||
|
||||
// First, wait for all nodes to reach the same epoch. Sync nodes may be
|
||||
// behind and need time to catch up. We poll every 2 seconds with a
|
||||
// 60 second timeout - this adapts to actual sync progress rather than
|
||||
// using fixed delays.
|
||||
const epochTimeout = 60 * time.Second
|
||||
const epochPollInterval = 2 * time.Second
|
||||
epochDeadline := time.Now().Add(epochTimeout)
|
||||
|
||||
for time.Now().Before(epochDeadline) {
|
||||
epochs, err := getHeadEpochs(conns)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
allSame := true
|
||||
for i := 1; i < len(epochs); i++ {
|
||||
if epochs[0] != epochs[i] {
|
||||
allSame = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if allSame {
|
||||
break
|
||||
}
|
||||
time.Sleep(epochPollInterval)
|
||||
}
|
||||
|
||||
// Now that epochs match (or timeout reached), do detailed head comparison
|
||||
// with a few retries to handle block propagation delays.
|
||||
const maxRetries = 5
|
||||
const retryDelay = 1 * time.Second
|
||||
var lastErr error
|
||||
|
||||
for attempt := range maxRetries {
|
||||
if attempt > 0 {
|
||||
time.Sleep(retryDelay)
|
||||
}
|
||||
|
||||
headEpochs := make([]primitives.Epoch, len(conns))
|
||||
headBlockRoots := make([][]byte, len(conns))
|
||||
justifiedRoots := make([][]byte, len(conns))
|
||||
prevJustifiedRoots := make([][]byte, len(conns))
|
||||
finalizedRoots := make([][]byte, len(conns))
|
||||
chainHeads := make([]*eth.ChainHead, len(conns))
|
||||
g, _ := errgroup.WithContext(context.Background())
|
||||
|
||||
for i, conn := range conns {
|
||||
conIdx := i
|
||||
currConn := conn
|
||||
g.Go(func() error {
|
||||
beaconClient := eth.NewBeaconChainClient(currConn)
|
||||
chainHead, err := beaconClient.GetChainHead(context.Background(), &emptypb.Empty{})
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "connection number=%d", conIdx)
|
||||
}
|
||||
headEpochs[conIdx] = chainHead.HeadEpoch
|
||||
headBlockRoots[conIdx] = chainHead.HeadBlockRoot
|
||||
justifiedRoots[conIdx] = chainHead.JustifiedBlockRoot
|
||||
prevJustifiedRoots[conIdx] = chainHead.PreviousJustifiedBlockRoot
|
||||
finalizedRoots[conIdx] = chainHead.FinalizedBlockRoot
|
||||
chainHeads[conIdx] = chainHead
|
||||
return nil
|
||||
})
|
||||
}
|
||||
if err := g.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
lastErr = nil
|
||||
for i := range conns {
|
||||
if headEpochs[0] != headEpochs[i] {
|
||||
lastErr = fmt.Errorf(
|
||||
"received conflicting head epochs on node %d, expected %d, received %d",
|
||||
i,
|
||||
headEpochs[0],
|
||||
headEpochs[i],
|
||||
)
|
||||
break
|
||||
}
|
||||
if !bytes.Equal(headBlockRoots[0], headBlockRoots[i]) {
|
||||
lastErr = fmt.Errorf(
|
||||
"received conflicting head block roots on node %d, expected %#x, received %#x",
|
||||
i,
|
||||
headBlockRoots[0],
|
||||
headBlockRoots[i],
|
||||
)
|
||||
break
|
||||
}
|
||||
if !bytes.Equal(justifiedRoots[0], justifiedRoots[i]) {
|
||||
lastErr = fmt.Errorf(
|
||||
"received conflicting justified block roots on node %d, expected %#x, received %#x: %s and %s",
|
||||
i,
|
||||
justifiedRoots[0],
|
||||
justifiedRoots[i],
|
||||
chainHeads[0].String(),
|
||||
chainHeads[i].String(),
|
||||
)
|
||||
break
|
||||
}
|
||||
if !bytes.Equal(prevJustifiedRoots[0], prevJustifiedRoots[i]) {
|
||||
lastErr = fmt.Errorf(
|
||||
"received conflicting previous justified block roots on node %d, expected %#x, received %#x",
|
||||
i,
|
||||
prevJustifiedRoots[0],
|
||||
prevJustifiedRoots[i],
|
||||
)
|
||||
break
|
||||
}
|
||||
if !bytes.Equal(finalizedRoots[0], finalizedRoots[i]) {
|
||||
lastErr = fmt.Errorf(
|
||||
"received conflicting finalized epoch roots on node %d, expected %#x, received %#x",
|
||||
i,
|
||||
finalizedRoots[0],
|
||||
finalizedRoots[i],
|
||||
)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if lastErr == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return lastErr
|
||||
|
||||
for i := range conns {
|
||||
if headEpochs[0] != headEpochs[i] {
|
||||
return fmt.Errorf(
|
||||
"received conflicting head epochs on node %d, expected %d, received %d",
|
||||
i,
|
||||
headEpochs[0],
|
||||
headEpochs[i],
|
||||
)
|
||||
}
|
||||
if !bytes.Equal(headBlockRoots[0], headBlockRoots[i]) {
|
||||
return fmt.Errorf(
|
||||
"received conflicting head block roots on node %d, expected %#x, received %#x",
|
||||
i,
|
||||
headBlockRoots[0],
|
||||
headBlockRoots[i],
|
||||
)
|
||||
}
|
||||
if !bytes.Equal(justifiedRoots[0], justifiedRoots[i]) {
|
||||
return fmt.Errorf(
|
||||
"received conflicting justified block roots on node %d, expected %#x, received %#x: %s and %s",
|
||||
i,
|
||||
justifiedRoots[0],
|
||||
justifiedRoots[i],
|
||||
chainHeads[0].String(),
|
||||
chainHeads[i].String(),
|
||||
)
|
||||
}
|
||||
if !bytes.Equal(prevJustifiedRoots[0], prevJustifiedRoots[i]) {
|
||||
return fmt.Errorf(
|
||||
"received conflicting previous justified block roots on node %d, expected %#x, received %#x",
|
||||
i,
|
||||
prevJustifiedRoots[0],
|
||||
prevJustifiedRoots[i],
|
||||
)
|
||||
}
|
||||
if !bytes.Equal(finalizedRoots[0], finalizedRoots[i]) {
|
||||
return fmt.Errorf(
|
||||
"received conflicting finalized epoch roots on node %d, expected %#x, received %#x",
|
||||
i,
|
||||
finalizedRoots[0],
|
||||
finalizedRoots[i],
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v7/api/server/structs"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/altair"
|
||||
@@ -124,25 +123,6 @@ func validatorsAreActive(ec *types.EvaluationContext, conns ...*grpc.ClientConn)
|
||||
|
||||
// validatorsParticipating ensures the validators have an acceptable participation rate.
|
||||
func validatorsParticipating(_ *types.EvaluationContext, conns ...*grpc.ClientConn) error {
|
||||
// Retry up to 3 times with 2 second delays to handle timing flakes where
|
||||
// attestations haven't been fully processed yet due to block propagation delays.
|
||||
const maxRetries = 3
|
||||
const retryDelay = 2 * time.Second
|
||||
var lastErr error
|
||||
|
||||
for attempt := range maxRetries {
|
||||
if attempt > 0 {
|
||||
time.Sleep(retryDelay)
|
||||
}
|
||||
lastErr = checkValidatorsParticipating(conns)
|
||||
if lastErr == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return lastErr
|
||||
}
|
||||
|
||||
func checkValidatorsParticipating(conns []*grpc.ClientConn) error {
|
||||
conn := conns[0]
|
||||
client := ethpb.NewBeaconChainClient(conn)
|
||||
validatorRequest := ðpb.GetValidatorParticipationRequest{}
|
||||
@@ -254,25 +234,6 @@ func checkValidatorsParticipating(conns []*grpc.ClientConn) error {
|
||||
// validatorsSyncParticipation ensures the validators have an acceptable participation rate for
|
||||
// sync committee assignments.
|
||||
func validatorsSyncParticipation(_ *types.EvaluationContext, conns ...*grpc.ClientConn) error {
|
||||
// Retry up to 3 times with 2 second delays to handle timing flakes where
|
||||
// sync committee messages haven't fully propagated yet.
|
||||
const maxRetries = 3
|
||||
const retryDelay = 2 * time.Second
|
||||
var lastErr error
|
||||
|
||||
for attempt := range maxRetries {
|
||||
if attempt > 0 {
|
||||
time.Sleep(retryDelay)
|
||||
}
|
||||
lastErr = checkSyncParticipation(conns)
|
||||
if lastErr == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return lastErr
|
||||
}
|
||||
|
||||
func checkSyncParticipation(conns []*grpc.ClientConn) error {
|
||||
conn := conns[0]
|
||||
client := ethpb.NewNodeClient(conn)
|
||||
altairClient := ethpb.NewBeaconChainClient(conn)
|
||||
@@ -311,9 +272,9 @@ func checkSyncParticipation(conns []*grpc.ClientConn) error {
|
||||
// Skip fork slot.
|
||||
continue
|
||||
}
|
||||
// Skip early slots at genesis - validators need time to ramp up after chain start
|
||||
// Skip slots 1-2 at genesis - validators need time to ramp up after chain start
|
||||
// due to doppelganger protection. This is a startup timing issue, not a fork transition issue.
|
||||
if b.Block().Slot() < 5 {
|
||||
if b.Block().Slot() < 3 {
|
||||
continue
|
||||
}
|
||||
expectedParticipation := expectedSyncParticipation
|
||||
@@ -328,11 +289,6 @@ func checkSyncParticipation(conns []*grpc.ClientConn) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Skip blocks with zero sync bits - these are typically empty/anomalous blocks
|
||||
// where the proposer didn't receive sync committee contributions in time.
|
||||
if syncAgg.SyncCommitteeBits.Count() == 0 {
|
||||
continue
|
||||
}
|
||||
threshold := uint64(float64(syncAgg.SyncCommitteeBits.Len()) * expectedParticipation)
|
||||
if syncAgg.SyncCommitteeBits.Count() < threshold {
|
||||
return errors.Errorf("In block of slot %d ,the aggregate bitvector with length of %d only got a count of %d", b.Block().Slot(), threshold, syncAgg.SyncCommitteeBits.Count())
|
||||
@@ -387,11 +343,6 @@ func checkSyncParticipation(conns []*grpc.ClientConn) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Skip blocks with zero sync bits - these are typically empty/anomalous blocks
|
||||
// where the proposer didn't receive sync committee contributions in time.
|
||||
if syncAgg.SyncCommitteeBits.Count() == 0 {
|
||||
continue
|
||||
}
|
||||
threshold := uint64(float64(syncAgg.SyncCommitteeBits.Len()) * expectedSyncParticipation)
|
||||
if syncAgg.SyncCommitteeBits.Count() < threshold {
|
||||
return errors.Errorf("In block of slot %d ,the aggregate bitvector with length of %d only got a count of %d", b.Block().Slot(), threshold, syncAgg.SyncCommitteeBits.Count())
|
||||
|
||||
@@ -9,11 +9,11 @@ import (
|
||||
)
|
||||
|
||||
func TestEndToEnd_MinimalConfig_WithBuilder(t *testing.T) {
|
||||
r := e2eMinimal(t, types.InitForkCfg(version.Bellatrix, version.Electra, params.E2ETestConfig()), types.WithCheckpointSync(), types.WithBuilder(), types.WithBlobTxCount(2))
|
||||
r := e2eMinimal(t, types.InitForkCfg(version.Bellatrix, version.Electra, params.E2ETestConfig()), types.WithCheckpointSync(), types.WithBuilder())
|
||||
r.run()
|
||||
}
|
||||
|
||||
func TestEndToEnd_MinimalConfig_WithBuilder_ValidatorRESTApi(t *testing.T) {
|
||||
r := e2eMinimal(t, types.InitForkCfg(version.Bellatrix, version.Electra, params.E2ETestConfig()), types.WithCheckpointSync(), types.WithBuilder(), types.WithValidatorRESTApi(), types.WithBlobTxCount(2))
|
||||
r := e2eMinimal(t, types.InitForkCfg(version.Bellatrix, version.Electra, params.E2ETestConfig()), types.WithCheckpointSync(), types.WithBuilder(), types.WithValidatorRESTApi())
|
||||
r.run()
|
||||
}
|
||||
|
||||
@@ -68,14 +68,6 @@ func WithLargeBlobs() E2EConfigOpt {
|
||||
}
|
||||
}
|
||||
|
||||
// WithBlobTxCount sets the number of blob transactions sent per slot.
|
||||
// Default is 5 when not specified.
|
||||
func WithBlobTxCount(n int) E2EConfigOpt {
|
||||
return func(cfg *E2EConfig) {
|
||||
cfg.BlobTxCount = n
|
||||
}
|
||||
}
|
||||
|
||||
func WithSSZOnly() E2EConfigOpt {
|
||||
return func(cfg *E2EConfig) {
|
||||
if err := os.Setenv(params.EnvNameOverrideAccept, api.OctetStreamMediaType); err != nil {
|
||||
@@ -116,7 +108,6 @@ type E2EConfig struct {
|
||||
UseBeaconRestApi bool
|
||||
UseBuilder bool
|
||||
UseLargeBlobs bool // Use large blob transactions (6 blobs per tx) for BPO testing
|
||||
BlobTxCount int // Number of blob transactions per slot (0 means default of 5)
|
||||
EpochsToRun uint64
|
||||
ExitEpoch primitives.Epoch // Custom epoch for voluntary exit submission (0 means use default)
|
||||
Seed int64
|
||||
|
||||
Reference in New Issue
Block a user