Compare commits

...

10 Commits

Author SHA1 Message Date
satushh
32ef35c6ac bound zero sync participation tolerance in validatorsSyncParticipation 2026-03-06 16:39:11 +05:30
satushh
bc211b9b33 lint and changelog 2026-03-06 16:15:19 +05:30
satushh
9aa109d65a test commit 2 2026-03-06 11:27:52 +05:30
satushh
87debbc35e test commit 2026-03-06 10:44:38 +05:30
satushh
967d0cca4b deflake sync participation evaluator around fork transitions 2026-03-05 22:06:36 +05:30
satushh
474de75942 reset expected vote if required 2026-03-05 18:34:19 +05:30
satushh
8c7f35dd8b log only optimistic status check 2026-03-05 17:18:21 +05:30
satushh
a955f762eb updated comment 2026-03-05 14:32:09 +05:30
satushh
4a0b986581 harden evaluator + bug fix 2026-03-05 13:27:28 +05:30
satushh
89909b18a9 potential improvements to e2e 2026-03-05 10:09:44 +05:30
19 changed files with 394 additions and 145 deletions

3
changelog/satushh_e2e.md Normal file
View File

@@ -0,0 +1,3 @@
### Changed
- Improved the e2e to make it less flaky

View File

@@ -344,7 +344,7 @@ func (node *BeaconNode) Resume() error {
// Stop stops the component and its underlying process.
func (node *BeaconNode) Stop() error {
return node.cmd.Process.Kill()
return helpers.GracefulStop(node.cmd.Process)
}
func (node *BeaconNode) UnderlyingProcess() *os.Process {

View File

@@ -94,7 +94,7 @@ func (node *BootNode) Resume() error {
// Stop stops the component and its underlying process.
func (node *BootNode) Stop() error {
return node.cmd.Process.Kill()
return helpers.GracefulStop(node.cmd.Process)
}
func enrFromLogFile(name string) (string, error) {

View File

@@ -263,7 +263,7 @@ func (m *Miner) Resume() error {
// Stop kills the component and its underlying process.
func (m *Miner) Stop() error {
return m.cmd.Process.Kill()
return helpers.GracefulStop(m.cmd.Process)
}
func enodeFromLogFile(name string) (string, error) {

View File

@@ -171,7 +171,7 @@ func (node *Node) Resume() error {
// Stop kills the component and its underlying process.
func (node *Node) Stop() error {
return node.cmd.Process.Kill()
return helpers.GracefulStop(node.cmd.Process)
}
func (node *Node) UnderlyingProcess() *os.Process {

View File

@@ -8,6 +8,7 @@ import (
"math/big"
mathRand "math/rand"
"os"
"sync/atomic"
"time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/startup"
@@ -38,7 +39,7 @@ type TransactionGenerator struct {
seed int64
started chan struct{}
cancel context.CancelFunc
paused bool
paused atomic.Bool
useLargeBlobs bool // Use large blob transactions (6 blobs per tx) for BPO testing
}
@@ -110,7 +111,7 @@ func (t *TransactionGenerator) Start(ctx context.Context) error {
case <-ctx.Done():
return nil
case <-ticker.C:
if t.paused {
if t.paused.Load() {
continue
}
backend := ethclient.NewClient(client)
@@ -267,13 +268,13 @@ func SendTransaction(client *rpc.Client, key *ecdsa.PrivateKey, gasPrice *big.In
// Pause pauses the component and its underlying process.
func (t *TransactionGenerator) Pause() error {
t.paused = true
t.paused.Store(true)
return nil
}
// Resume resumes the component and its underlying process.
func (t *TransactionGenerator) Resume() error {
t.paused = false
t.paused.Store(false)
return nil
}

View File

@@ -246,7 +246,7 @@ func (node *LighthouseBeaconNode) Resume() error {
// Stop stops the component and its underlying process.
func (node *LighthouseBeaconNode) Stop() error {
return node.cmd.Process.Kill()
return helpers.GracefulStop(node.cmd.Process)
}
func (node *LighthouseBeaconNode) UnderlyingProcess() *os.Process {

View File

@@ -237,7 +237,7 @@ func (v *LighthouseValidatorNode) Resume() error {
// Stop stops the component and its underlying process.
func (v *LighthouseValidatorNode) Stop() error {
return v.cmd.Process.Kill()
return helpers.GracefulStop(v.cmd.Process)
}
func (v *LighthouseValidatorNode) UnderlyingProcess() *os.Process {

View File

@@ -113,19 +113,11 @@ func (ts *TracingSink) initializeSink(ctx context.Context) {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
for {
select {
case <-ctx.Done():
cleanup()
return
case <-sigs:
cleanup()
return
default:
// Sleep for 100ms and do nothing while waiting for
// cancellation.
time.Sleep(100 * time.Millisecond)
}
select {
case <-ctx.Done():
cleanup()
case <-sigs:
cleanup()
}
}()
if err := ts.server.ListenAndServe(); err != http.ErrServerClosed {

View File

@@ -337,7 +337,7 @@ func (v *ValidatorNode) Resume() error {
// Stop stops the component and its underlying process.
func (v *ValidatorNode) Stop() error {
return v.cmd.Process.Kill()
return helpers.GracefulStop(v.cmd.Process)
}
func (v *ValidatorNode) UnderlyingProcess() *os.Process {

View File

@@ -20,6 +20,7 @@ import (
"github.com/OffchainLabs/prysm/v7/crypto/bls"
"github.com/OffchainLabs/prysm/v7/io/file"
"github.com/OffchainLabs/prysm/v7/runtime/interop"
"github.com/OffchainLabs/prysm/v7/testing/endtoend/helpers"
e2e "github.com/OffchainLabs/prysm/v7/testing/endtoend/params"
e2etypes "github.com/OffchainLabs/prysm/v7/testing/endtoend/types"
"github.com/bazelbuild/rules_go/go/tools/bazel"
@@ -136,7 +137,7 @@ func (w *Web3RemoteSigner) Resume() error {
// Stop stops the component and its underlying process.
func (w *Web3RemoteSigner) Stop() error {
return w.cmd.Process.Kill()
return helpers.GracefulStop(w.cmd.Process)
}
// monitorStart by polling server until it returns a 200 at /upcheck.

View File

@@ -3,8 +3,6 @@ package evaluators
import (
"context"
"fmt"
"io"
"net/http"
"regexp"
"strconv"
"strings"
@@ -91,19 +89,13 @@ func metricsTest(_ *types.EvaluationContext, conns ...*grpc.ClientConn) error {
currentEpoch := slots.ToEpoch(currentSlot)
forkDigest := params.ForkDigest(currentEpoch)
for i := range conns {
response, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", e2e.TestParams.Ports.PrysmBeaconNodeMetricsPort+i))
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
dataInBytes, err := getURLBodyWithRetries(ctx, fmt.Sprintf("http://localhost:%d/metrics", e2e.TestParams.Ports.PrysmBeaconNodeMetricsPort+i), httpCheckAttempts, httpCheckRetryDelay)
cancel()
if err != nil {
// Continue if the connection fails, regular flake.
continue
}
dataInBytes, err := io.ReadAll(response.Body)
if err != nil {
return err
return fmt.Errorf("metrics check failed for beacon node %d: %w", i, err)
}
pageContent := string(dataInBytes)
if err = response.Body.Close(); err != nil {
return err
}
time.Sleep(connTimeDelay)
beaconClient := eth.NewBeaconChainClient(conns[i])

View File

@@ -8,10 +8,12 @@ import (
"fmt"
"io"
"net/http"
"strings"
"time"
log "github.com/sirupsen/logrus"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
eth "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
e2e "github.com/OffchainLabs/prysm/v7/testing/endtoend/params"
"github.com/OffchainLabs/prysm/v7/testing/endtoend/policies"
@@ -25,6 +27,14 @@ import (
// Allow a very short delay after disconnecting to prevent connection refused issues.
var connTimeDelay = 50 * time.Millisecond
const (
httpCheckAttempts = 5
httpCheckRetryDelay = 200 * time.Millisecond
headComparePollDelay = time.Second
headCompareRequiredConsecutive = 2
minPeersForHeadCheck = 1
)
// PeersConnect checks all beacon nodes and returns whether they are connected to each other as peers.
var PeersConnect = e2etypes.Evaluator{
Name: "peers_connect_epoch_%d",
@@ -46,8 +56,12 @@ var FinishedSyncing = e2etypes.Evaluator{
Evaluation: finishedSyncing,
}
// AllNodesHaveSameHead ensures all nodes have the same head epoch. Checks finality and justification as well.
// Not checking head block root as it may change irregularly for the validator connected nodes.
// AllNodesHaveSameHead ensures all nodes converge on the same canonical head:
// epoch, head block root, justified root, previous justified root, and finalized root.
// We intentionally check head block root (unlike older behavior) because only comparing
// epochs can hide real divergence where nodes are on different blocks in the same slot/epoch.
// To avoid reintroducing flake, the evaluator now waits for readiness and requires
// convergence across consecutive samples before passing.
var AllNodesHaveSameHead = e2etypes.Evaluator{
Name: "all_nodes_have_same_head_%d",
Policy: policies.AllEpochs,
@@ -57,45 +71,66 @@ var AllNodesHaveSameHead = e2etypes.Evaluator{
func healthzCheck(_ *e2etypes.EvaluationContext, conns ...*grpc.ClientConn) error {
count := len(conns)
for i := range count {
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/healthz", e2e.TestParams.Ports.PrysmBeaconNodeMetricsPort+i))
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
_, err := getURLBodyWithRetries(ctx, fmt.Sprintf("http://localhost:%d/healthz", e2e.TestParams.Ports.PrysmBeaconNodeMetricsPort+i), httpCheckAttempts, httpCheckRetryDelay)
cancel()
if err != nil {
// Continue if the connection fails, regular flake.
continue
}
if resp.StatusCode != http.StatusOK {
body, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
return fmt.Errorf("expected status code OK for beacon node %d, received %v with body %s", i, resp.StatusCode, body)
}
if err = resp.Body.Close(); err != nil {
return err
return fmt.Errorf("healthz check failed for beacon node %d: %w", i, err)
}
time.Sleep(connTimeDelay)
}
for i := range count {
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/healthz", e2e.TestParams.Ports.ValidatorMetricsPort+i))
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
_, err := getURLBodyWithRetries(ctx, fmt.Sprintf("http://localhost:%d/healthz", e2e.TestParams.Ports.ValidatorMetricsPort+i), httpCheckAttempts, httpCheckRetryDelay)
cancel()
if err != nil {
// Continue if the connection fails, regular flake.
continue
}
if resp.StatusCode != http.StatusOK {
body, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
return fmt.Errorf("expected status code OK for validator client %d, received %v with body %s", i, resp.StatusCode, body)
}
if err = resp.Body.Close(); err != nil {
return err
return fmt.Errorf("healthz check failed for validator client %d: %w", i, err)
}
time.Sleep(connTimeDelay)
}
return nil
}
func getURLBodyWithRetries(ctx context.Context, url string, attempts int, retryDelay time.Duration) ([]byte, error) {
var lastErr error
for attempt := range attempts {
if ctx.Err() != nil {
return nil, ctx.Err()
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
lastErr = err
} else {
body, readErr := io.ReadAll(resp.Body)
closeErr := resp.Body.Close()
if readErr != nil {
lastErr = readErr
} else if closeErr != nil {
lastErr = closeErr
} else if resp.StatusCode != http.StatusOK {
lastErr = fmt.Errorf("status code=%d body=%s", resp.StatusCode, strings.TrimSpace(string(body)))
} else {
return body, nil
}
}
if attempt < attempts-1 {
err := sleepWithContext(ctx, retryDelay)
if err != nil {
return nil, err
}
}
}
return nil, fmt.Errorf("request to %s failed after %d attempts: %w", url, attempts, lastErr)
}
func peersConnect(_ *e2etypes.EvaluationContext, conns ...*grpc.ClientConn) error {
if len(conns) == 1 {
return nil
@@ -172,81 +207,216 @@ func allNodesHaveSameHead(_ *e2etypes.EvaluationContext, conns ...*grpc.ClientCo
return errors.Wrap(err, "failed waiting for mid-epoch")
}
headEpochs := make([]primitives.Epoch, len(conns))
headBlockRoots := make([][]byte, len(conns))
justifiedRoots := make([][]byte, len(conns))
prevJustifiedRoots := make([][]byte, len(conns))
finalizedRoots := make([][]byte, len(conns))
chainHeads := make([]*eth.ChainHead, len(conns))
g, _ := errgroup.WithContext(context.Background())
consecutiveSuccesses := 0
var lastErr error
var lastHeads []*eth.ChainHead
var lastPeers []int
attempt := 0
for {
attempt++
if ctx.Err() != nil {
break
}
chainHeads, err := fetchAllChainHeads(ctx, conns)
if err != nil {
lastErr = errors.Wrap(err, "fetch chain heads")
consecutiveSuccesses = 0
if err := sleepWithContext(ctx, headComparePollDelay); err != nil {
break
}
continue
}
lastHeads = chainHeads
peerCounts, err := fetchPeerCounts(ctx, conns)
if err != nil {
lastErr = errors.Wrap(err, "fetch peer counts")
consecutiveSuccesses = 0
if err := sleepWithContext(ctx, headComparePollDelay); err != nil {
break
}
continue
}
lastPeers = peerCounts
if !allPeerCountsAtLeast(peerCounts, minPeersForHeadCheck) {
lastErr = fmt.Errorf("nodes not ready: insufficient peers (min=%d)\n%s", minPeersForHeadCheck, summarizeHeads(chainHeads, peerCounts))
consecutiveSuccesses = 0
log.WithField("attempt", attempt).Warn("Insufficient peers for stable head comparison, retrying")
if err := sleepWithContext(ctx, headComparePollDelay); err != nil {
break
}
continue
}
if anyOptimistic(chainHeads) {
log.WithField("attempt", attempt).Warn("Optimistic head(s) observed, proceeding with head comparison anyway")
}
if err := compareChainHeads(chainHeads); err != nil {
lastErr = fmt.Errorf("%w\n%s", err, summarizeHeads(chainHeads, peerCounts))
consecutiveSuccesses = 0
log.WithField("attempt", attempt).Warn("Chain head mismatch, retrying")
if err := sleepWithContext(ctx, headComparePollDelay); err != nil {
break
}
continue
}
consecutiveSuccesses++
if consecutiveSuccesses >= headCompareRequiredConsecutive {
return nil
}
if err := sleepWithContext(ctx, headComparePollDelay); err != nil {
break
}
}
if lastErr == nil {
lastErr = errors.New("head comparison did not converge before timeout")
}
if len(lastHeads) > 0 {
return fmt.Errorf("head comparison failed: %w\n%s", lastErr, summarizeHeads(lastHeads, lastPeers))
}
return fmt.Errorf("head comparison failed: %w", lastErr)
}
// fetchAllChainHeads queries all beacon nodes for their chain head in parallel.
func fetchAllChainHeads(ctx context.Context, conns []*grpc.ClientConn) ([]*eth.ChainHead, error) {
chainHeads := make([]*eth.ChainHead, len(conns))
g, gctx := errgroup.WithContext(ctx)
for i, conn := range conns {
conIdx := i
currConn := conn
g.Go(func() error {
beaconClient := eth.NewBeaconChainClient(currConn)
chainHead, err := beaconClient.GetChainHead(context.Background(), &emptypb.Empty{})
chainHead, err := beaconClient.GetChainHead(gctx, &emptypb.Empty{})
if err != nil {
return errors.Wrapf(err, "connection number=%d", conIdx)
}
headEpochs[conIdx] = chainHead.HeadEpoch
headBlockRoots[conIdx] = chainHead.HeadBlockRoot
justifiedRoots[conIdx] = chainHead.JustifiedBlockRoot
prevJustifiedRoots[conIdx] = chainHead.PreviousJustifiedBlockRoot
finalizedRoots[conIdx] = chainHead.FinalizedBlockRoot
chainHeads[conIdx] = chainHead
return nil
})
}
if err := g.Wait(); err != nil {
return err
return nil, err
}
return chainHeads, nil
}
for i := range conns {
if headEpochs[0] != headEpochs[i] {
func fetchPeerCounts(ctx context.Context, conns []*grpc.ClientConn) ([]int, error) {
peerCounts := make([]int, len(conns))
g, gctx := errgroup.WithContext(ctx)
for i, conn := range conns {
conIdx := i
currConn := conn
g.Go(func() error {
nodeClient := eth.NewNodeClient(currConn)
peersResp, err := nodeClient.ListPeers(gctx, &emptypb.Empty{})
if err != nil {
return errors.Wrapf(err, "failed to list peers for connection number=%d", conIdx)
}
peerCounts[conIdx] = len(peersResp.Peers)
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return peerCounts, nil
}
func allPeerCountsAtLeast(peerCounts []int, minPeers int) bool {
for _, count := range peerCounts {
if count < minPeers {
return false
}
}
return true
}
func anyOptimistic(chainHeads []*eth.ChainHead) bool {
for _, head := range chainHeads {
if head.GetOptimisticStatus() {
return true
}
}
return false
}
func summarizeHeads(chainHeads []*eth.ChainHead, peerCounts []int) string {
var b strings.Builder
for i, head := range chainHeads {
peers := -1
if i < len(peerCounts) {
peers = peerCounts[i]
}
_, _ = fmt.Fprintf(
&b,
"node=%d slot=%d epoch=%d optimistic=%t peers=%d head=%#x justified=%#x finalized=%#x\n",
i,
head.HeadSlot,
head.HeadEpoch,
head.GetOptimisticStatus(),
peers,
head.HeadBlockRoot,
head.JustifiedBlockRoot,
head.FinalizedBlockRoot,
)
}
return strings.TrimSpace(b.String())
}
// compareChainHeads checks that all chain heads agree on epoch, head root,
// justified root, previous justified root, and finalized root.
func compareChainHeads(chainHeads []*eth.ChainHead) error {
for i := 1; i < len(chainHeads); i++ {
if chainHeads[0].HeadEpoch != chainHeads[i].HeadEpoch {
return fmt.Errorf(
"received conflicting head epochs on node %d, expected %d, received %d",
i,
headEpochs[0],
headEpochs[i],
chainHeads[0].HeadEpoch,
chainHeads[i].HeadEpoch,
)
}
if !bytes.Equal(headBlockRoots[0], headBlockRoots[i]) {
if !bytes.Equal(chainHeads[0].HeadBlockRoot, chainHeads[i].HeadBlockRoot) {
return fmt.Errorf(
"received conflicting head block roots on node %d, expected %#x, received %#x",
"received conflicting head block roots on node %d (slot %d vs %d), expected %#x, received %#x",
i,
headBlockRoots[0],
headBlockRoots[i],
chainHeads[0].HeadSlot,
chainHeads[i].HeadSlot,
chainHeads[0].HeadBlockRoot,
chainHeads[i].HeadBlockRoot,
)
}
if !bytes.Equal(justifiedRoots[0], justifiedRoots[i]) {
if !bytes.Equal(chainHeads[0].JustifiedBlockRoot, chainHeads[i].JustifiedBlockRoot) {
return fmt.Errorf(
"received conflicting justified block roots on node %d, expected %#x, received %#x: %s and %s",
i,
justifiedRoots[0],
justifiedRoots[i],
chainHeads[0].JustifiedBlockRoot,
chainHeads[i].JustifiedBlockRoot,
chainHeads[0].String(),
chainHeads[i].String(),
)
}
if !bytes.Equal(prevJustifiedRoots[0], prevJustifiedRoots[i]) {
if !bytes.Equal(chainHeads[0].PreviousJustifiedBlockRoot, chainHeads[i].PreviousJustifiedBlockRoot) {
return fmt.Errorf(
"received conflicting previous justified block roots on node %d, expected %#x, received %#x",
i,
prevJustifiedRoots[0],
prevJustifiedRoots[i],
chainHeads[0].PreviousJustifiedBlockRoot,
chainHeads[i].PreviousJustifiedBlockRoot,
)
}
if !bytes.Equal(finalizedRoots[0], finalizedRoots[i]) {
if !bytes.Equal(chainHeads[0].FinalizedBlockRoot, chainHeads[i].FinalizedBlockRoot) {
return fmt.Errorf(
"received conflicting finalized epoch roots on node %d, expected %#x, received %#x",
i,
finalizedRoots[0],
finalizedRoots[i],
chainHeads[0].FinalizedBlockRoot,
chainHeads[i].FinalizedBlockRoot,
)
}
}
return nil
}

View File

@@ -623,19 +623,24 @@ func validatorsVoteWithTheMajority(ec *e2etypes.EvaluationContext, conns ...*grp
// We treat epoch 1 differently from other epoch for two reasons:
// - this evaluator is not executed for epoch 0 so we have to calculate the first slot differently
// - for some reason the vote for the first slot in epoch 1 is 0x000... so we skip this slot
var isFirstSlotInVotingPeriod bool
if chainHead.HeadEpoch == 1 && slot%params.BeaconConfig().SlotsPerEpoch == 0 {
continue
}
// We skipped the first slot so we treat the second slot as the starting slot of epoch 1.
// Detect voting period boundary. Use period number instead of exact slot
// divisibility so that missed blocks at period boundaries don't prevent
// the expected vote from being reset.
currentPeriod := uint64(slot) / uint64(slotsPerVotingPeriod)
// For epoch 1, treat the second slot in the epoch as the period start
// (since we skip the first slot above).
isNewPeriod := currentPeriod != ec.ExpectedEth1DataVotePeriod || ec.ExpectedEth1DataVote == nil
if chainHead.HeadEpoch == 1 {
isFirstSlotInVotingPeriod = slot%params.BeaconConfig().SlotsPerEpoch == 1
} else {
isFirstSlotInVotingPeriod = slot%slotsPerVotingPeriod == 0
isNewPeriod = (slot%params.BeaconConfig().SlotsPerEpoch == 1) || ec.ExpectedEth1DataVote == nil
}
if isFirstSlotInVotingPeriod {
if isNewPeriod {
ec.ExpectedEth1DataVote = vote
ec.Eth1DataMismatchCount = 0 // Reset for new voting period
ec.ExpectedEth1DataVotePeriod = currentPeriod
ec.Eth1DataMismatchCount = 0
return nil
}

View File

@@ -3,6 +3,7 @@ package evaluators
import (
"context"
"fmt"
"sync"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/signing"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
@@ -58,16 +59,24 @@ var SlashedValidatorsLoseBalanceAfterEpoch = func(n primitives.Epoch) e2eTypes.E
}
}
var slashedIndices []uint64
var (
slashedMu sync.Mutex
slashedIndices []uint64
)
func validatorsSlashed(_ *e2eTypes.EvaluationContext, conns ...*grpc.ClientConn) error {
conn := conns[0]
ctx := context.Background()
client := eth.NewBeaconChainClient(conn)
slashedMu.Lock()
indices := make([]uint64, len(slashedIndices))
copy(indices, slashedIndices)
slashedMu.Unlock()
actualSlashedIndices := 0
for _, slashedIndex := range slashedIndices {
for _, slashedIndex := range indices {
req := &eth.GetValidatorRequest{
QueryFilter: &eth.GetValidatorRequest_Index{
Index: primitives.ValidatorIndex(slashedIndex),
@@ -83,8 +92,8 @@ func validatorsSlashed(_ *e2eTypes.EvaluationContext, conns ...*grpc.ClientConn)
}
}
if actualSlashedIndices != len(slashedIndices) {
return fmt.Errorf("expected %d indices to be slashed, received %d", len(slashedIndices), actualSlashedIndices)
if actualSlashedIndices != len(indices) {
return fmt.Errorf("expected %d indices to be slashed, received %d", len(indices), actualSlashedIndices)
}
return nil
}
@@ -94,7 +103,12 @@ func validatorsLoseBalance(_ *e2eTypes.EvaluationContext, conns ...*grpc.ClientC
ctx := context.Background()
client := eth.NewBeaconChainClient(conn)
for i, slashedIndex := range slashedIndices {
slashedMu.Lock()
indices := make([]uint64, len(slashedIndices))
copy(indices, slashedIndices)
slashedMu.Unlock()
for i, slashedIndex := range indices {
req := &eth.GetValidatorRequest{
QueryFilter: &eth.GetValidatorRequest_Index{
Index: primitives.ValidatorIndex(slashedIndex),
@@ -139,7 +153,10 @@ func insertDoubleAttestationIntoPool(_ *e2eTypes.EvaluationContext, conns ...*gr
for i := uint64(0); i < valsToSlash; i++ {
valIdx := h.validatorIndexAtCommitteeIndex(i)
if len(slice.IntersectionUint64(slashedIndices, []uint64{uint64(valIdx)})) > 0 {
slashedMu.Lock()
alreadySlashed := len(slice.IntersectionUint64(slashedIndices, []uint64{uint64(valIdx)})) > 0
slashedMu.Unlock()
if alreadySlashed {
valsToSlash++
continue
}
@@ -164,7 +181,9 @@ func insertDoubleAttestationIntoPool(_ *e2eTypes.EvaluationContext, conns ...*gr
return errors.Wrap(err, "could not propose attestation")
}
slashedMu.Lock()
slashedIndices = append(slashedIndices, uint64(valIdx))
slashedMu.Unlock()
}
return nil
}
@@ -232,7 +251,9 @@ func proposeDoubleBlock(_ *e2eTypes.EvaluationContext, conns ...*grpc.ClientConn
return errors.New("expected block to fail processing")
}
slashedMu.Lock()
slashedIndices = append(slashedIndices, uint64(proposerIndex))
slashedMu.Unlock()
return nil
}

View File

@@ -243,6 +243,12 @@ func validatorsSyncParticipation(_ *types.EvaluationContext, conns ...*grpc.Clie
}
currSlot := slots.CurrentSlot(genesis.GenesisTime.AsTime())
currEpoch := slots.ToEpoch(currSlot)
// Track blocks with zero sync committee participation across both loops.
// A small number is tolerated (transient p2p issues), but sustained
// zero participation indicates a real problem.
zeroSyncCount := 0
const maxZeroSyncBlocks = 2
lowestBound := primitives.Epoch(0)
if currEpoch >= 1 {
lowestBound = currEpoch - 1
@@ -264,31 +270,52 @@ func validatorsSyncParticipation(_ *types.EvaluationContext, conns ...*grpc.Clie
if b == nil || b.IsNil() {
return errors.New("nil block provided")
}
forkStartSlot, err := slots.EpochStart(params.BeaconConfig().AltairForkEpoch)
if err != nil {
return err
}
if forkStartSlot == b.Block().Slot() {
// Skip fork slot.
continue
}
// Skip slots 1-2 at genesis - validators need time to ramp up after chain start
// due to doppelganger protection. This is a startup timing issue, not a fork transition issue.
if b.Block().Slot() < 3 {
continue
}
expectedParticipation := expectedSyncParticipation
switch slots.ToEpoch(b.Block().Slot()) {
case params.BeaconConfig().AltairForkEpoch:
// Drop expected sync participation figure.
expectedParticipation = 0.90
default:
// no-op
// Skip the first three slots of each fork epoch to allow
// gossipsub mesh reformation and sync committee ramp-up.
forkEpochs := []primitives.Epoch{
params.BeaconConfig().AltairForkEpoch,
params.BeaconConfig().BellatrixForkEpoch,
params.BeaconConfig().CapellaForkEpoch,
params.BeaconConfig().DenebForkEpoch,
params.BeaconConfig().ElectraForkEpoch,
params.BeaconConfig().FuluForkEpoch,
}
skipSlot := false
for _, forkEpoch := range forkEpochs {
if forkEpoch == params.BeaconConfig().FarFutureEpoch {
continue
}
forkSlot, err := slots.EpochStart(forkEpoch)
if err != nil {
return err
}
if b.Block().Slot() >= forkSlot && b.Block().Slot() <= forkSlot+2 {
skipSlot = true
break
}
}
if skipSlot {
continue
}
expectedParticipation := expectedSyncParticipation
syncAgg, err := b.Block().Body().SyncAggregate()
if err != nil {
return err
}
// Tolerate a small number of blocks with zero sync committee messages
// (transient p2p issues), but fail if it happens too often.
if syncAgg.SyncCommitteeBits.Count() == 0 {
zeroSyncCount++
if zeroSyncCount > maxZeroSyncBlocks {
return errors.Errorf("too many blocks (%d) with zero sync participation, last at slot %d", zeroSyncCount, b.Block().Slot())
}
continue
}
threshold := uint64(float64(syncAgg.SyncCommitteeBits.Len()) * expectedParticipation)
if syncAgg.SyncCommitteeBits.Count() < threshold {
return errors.Errorf("In block of slot %d ,the aggregate bitvector with length of %d only got a count of %d", b.Block().Slot(), threshold, syncAgg.SyncCommitteeBits.Count())
@@ -330,8 +357,9 @@ func validatorsSyncParticipation(_ *types.EvaluationContext, conns ...*grpc.Clie
if err != nil {
return err
}
// Skip the first two slots of each fork epoch.
if b.Block().Slot() == forkSlot || b.Block().Slot() == forkSlot+1 {
// Skip the first three slots of each fork epoch to allow
// gossipsub mesh reformation and sync committee ramp-up.
if b.Block().Slot() >= forkSlot && b.Block().Slot() <= forkSlot+2 {
skipSlot = true
break
}
@@ -343,6 +371,15 @@ func validatorsSyncParticipation(_ *types.EvaluationContext, conns ...*grpc.Clie
if err != nil {
return err
}
// Tolerate a small number of blocks with zero sync committee messages
// (transient p2p issues), but fail if it happens too often.
if syncAgg.SyncCommitteeBits.Count() == 0 {
zeroSyncCount++
if zeroSyncCount > maxZeroSyncBlocks {
return errors.Errorf("too many blocks (%d) with zero sync participation, last at slot %d", zeroSyncCount, b.Block().Slot())
}
continue
}
threshold := uint64(float64(syncAgg.SyncCommitteeBits.Len()) * expectedSyncParticipation)
if syncAgg.SyncCommitteeBits.Count() < threshold {
return errors.Errorf("In block of slot %d ,the aggregate bitvector with length of %d only got a count of %d", b.Block().Slot(), threshold, syncAgg.SyncCommitteeBits.Count())

View File

@@ -25,16 +25,17 @@ func (s *EpochTicker) C() <-chan uint64 {
// Done should be called to clean up the ticker.
func (s *EpochTicker) Done() {
go func() {
s.done <- struct{}{}
}()
select {
case s.done <- struct{}{}:
default:
}
}
// NewEpochTicker starts the EpochTicker.
func NewEpochTicker(genesisTime time.Time, secondsPerEpoch uint64) *EpochTicker {
ticker := &EpochTicker{
c: make(chan uint64),
done: make(chan struct{}),
done: make(chan struct{}, 1),
}
ticker.start(genesisTime, secondsPerEpoch, prysmTime.Since, prysmTime.Until, time.After)
return ticker

View File

@@ -14,6 +14,7 @@ import (
"path/filepath"
"strconv"
"strings"
"syscall"
"testing"
"time"
@@ -31,7 +32,7 @@ import (
)
const (
maxPollingWaitTime = 60 * time.Second // A minute so timing out doesn't take very long.
maxPollingWaitTime = 120 * time.Second // Two minutes to accommodate loaded CI machines.
filePollingInterval = 500 * time.Millisecond
memoryHeapFileName = "node_heap_%d.pb.gz"
cpuProfileFileName = "node_cpu_profile_%d.pb.gz"
@@ -84,17 +85,18 @@ func WaitForTextInFile(src *os.File, match string) error {
}
defer func() {
if ferr := f.Close(); ferr != nil {
if !errors.Is(err, os.ErrClosed) {
if !errors.Is(ferr, os.ErrClosed) {
log.WithError(ferr).Errorf("error calling .Close on the file handle for %s", f.Name())
}
}
}()
// spawn a goroutine to scan
errChan := make(chan error)
errChan := make(chan error, 1)
foundChan := make(chan struct{})
go func() {
t := time.NewTicker(filePollingInterval)
defer t.Stop()
// This needs to happen in a loop because, even though the other process is still appending to the log file,
// scanner will see EOF once it hits the end of what's been written so far.
for {
@@ -104,9 +106,8 @@ func WaitForTextInFile(src *os.File, match string) error {
case <-t.C:
// This is a paranoid check because I'm not sure if the underlying fd handle can be stuck mid-line
// when Scanner sees a partially written line at EOF. It's probably safest to just keep this.
_, err = f.Seek(0, io.SeekStart)
if err != nil {
errChan <- err
if _, serr := f.Seek(0, io.SeekStart); serr != nil {
errChan <- serr
return
}
lineScanner := bufio.NewScanner(f)
@@ -122,9 +123,10 @@ func WaitForTextInFile(src *os.File, match string) error {
}
}
// If Scan returned false for an error (except EOF), Err will return it.
if err = lineScanner.Err(); err != nil {
if serr := lineScanner.Err(); serr != nil {
// Bubble the error back up to the parent goroutine.
errChan <- err
errChan <- serr
return
}
}
}
@@ -135,8 +137,8 @@ func WaitForTextInFile(src *os.File, match string) error {
return fmt.Errorf("could not find requested text \"%s\" in %s before deadline", match, f.Name())
case <-foundChan:
return nil
case err = <-errChan:
return errors.Wrapf(err, "received error while scanning %s for %s", f.Name(), match)
case scanErr := <-errChan:
return errors.Wrapf(scanErr, "received error while scanning %s for %s", f.Name(), match)
}
}
@@ -381,6 +383,29 @@ func WaitOnNodes(ctx context.Context, nodes []e2etypes.ComponentRunner, nodesSta
return g.Wait()
}
// GracefulStop sends SIGTERM to a process and gives it 5 seconds to exit before
// sending SIGKILL. It does not call p.Wait() since the caller (cmd.Wait in Start)
// is expected to handle process reaping.
func GracefulStop(p *os.Process) error {
if p == nil {
return nil
}
if err := p.Signal(syscall.SIGTERM); err != nil {
// Process may have already exited; try kill as last resort.
return p.Kill()
}
// Give the process time to handle SIGTERM and exit cleanly.
// The parent goroutine's cmd.Wait() will detect the exit.
// If still alive after the grace period, force kill.
time.AfterFunc(5*time.Second, func() {
// Signal(0) checks if the process is still alive without sending a signal.
if err := p.Signal(syscall.Signal(0)); err == nil {
_ = p.Kill()
}
})
return nil
}
func MinerRPCClient() (*ethclient.Client, error) {
client, err := rpc.DialHTTP(e2e.TestParams.Eth1RPCURL(e2e.MinerComponentOffset).String())
if err != nil {

View File

@@ -179,9 +179,10 @@ type EvaluationContext struct {
DepositBalancer
// ExitedVals maps validator pubkey to the epoch when their exit was submitted.
// The actual exit takes effect at: submission_epoch + 1 + MaxSeedLookahead
ExitedVals map[[48]byte]primitives.Epoch
SeenVotes map[primitives.Slot][]byte
ExpectedEth1DataVote []byte
ExitedVals map[[48]byte]primitives.Epoch
SeenVotes map[primitives.Slot][]byte
ExpectedEth1DataVote []byte
ExpectedEth1DataVotePeriod uint64
// Eth1DataMismatchCount tracks how many eth1data vote mismatches have been seen
// in the current voting period. Some tolerance is allowed for timing differences.
Eth1DataMismatchCount int