mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-04-19 03:01:06 -04:00
Compare commits
10 Commits
proposer-p
...
testing-e2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
32ef35c6ac | ||
|
|
bc211b9b33 | ||
|
|
9aa109d65a | ||
|
|
87debbc35e | ||
|
|
967d0cca4b | ||
|
|
474de75942 | ||
|
|
8c7f35dd8b | ||
|
|
a955f762eb | ||
|
|
4a0b986581 | ||
|
|
89909b18a9 |
3
changelog/satushh_e2e.md
Normal file
3
changelog/satushh_e2e.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Changed
|
||||
|
||||
- Improved the e2e to make it less flaky
|
||||
@@ -344,7 +344,7 @@ func (node *BeaconNode) Resume() error {
|
||||
|
||||
// Stop stops the component and its underlying process.
|
||||
func (node *BeaconNode) Stop() error {
|
||||
return node.cmd.Process.Kill()
|
||||
return helpers.GracefulStop(node.cmd.Process)
|
||||
}
|
||||
|
||||
func (node *BeaconNode) UnderlyingProcess() *os.Process {
|
||||
|
||||
@@ -94,7 +94,7 @@ func (node *BootNode) Resume() error {
|
||||
|
||||
// Stop stops the component and its underlying process.
|
||||
func (node *BootNode) Stop() error {
|
||||
return node.cmd.Process.Kill()
|
||||
return helpers.GracefulStop(node.cmd.Process)
|
||||
}
|
||||
|
||||
func enrFromLogFile(name string) (string, error) {
|
||||
|
||||
@@ -263,7 +263,7 @@ func (m *Miner) Resume() error {
|
||||
|
||||
// Stop kills the component and its underlying process.
|
||||
func (m *Miner) Stop() error {
|
||||
return m.cmd.Process.Kill()
|
||||
return helpers.GracefulStop(m.cmd.Process)
|
||||
}
|
||||
|
||||
func enodeFromLogFile(name string) (string, error) {
|
||||
|
||||
@@ -171,7 +171,7 @@ func (node *Node) Resume() error {
|
||||
|
||||
// Stop kills the component and its underlying process.
|
||||
func (node *Node) Stop() error {
|
||||
return node.cmd.Process.Kill()
|
||||
return helpers.GracefulStop(node.cmd.Process)
|
||||
}
|
||||
|
||||
func (node *Node) UnderlyingProcess() *os.Process {
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"math/big"
|
||||
mathRand "math/rand"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/startup"
|
||||
@@ -38,7 +39,7 @@ type TransactionGenerator struct {
|
||||
seed int64
|
||||
started chan struct{}
|
||||
cancel context.CancelFunc
|
||||
paused bool
|
||||
paused atomic.Bool
|
||||
useLargeBlobs bool // Use large blob transactions (6 blobs per tx) for BPO testing
|
||||
}
|
||||
|
||||
@@ -110,7 +111,7 @@ func (t *TransactionGenerator) Start(ctx context.Context) error {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-ticker.C:
|
||||
if t.paused {
|
||||
if t.paused.Load() {
|
||||
continue
|
||||
}
|
||||
backend := ethclient.NewClient(client)
|
||||
@@ -267,13 +268,13 @@ func SendTransaction(client *rpc.Client, key *ecdsa.PrivateKey, gasPrice *big.In
|
||||
|
||||
// Pause pauses the component and its underlying process.
|
||||
func (t *TransactionGenerator) Pause() error {
|
||||
t.paused = true
|
||||
t.paused.Store(true)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Resume resumes the component and its underlying process.
|
||||
func (t *TransactionGenerator) Resume() error {
|
||||
t.paused = false
|
||||
t.paused.Store(false)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -246,7 +246,7 @@ func (node *LighthouseBeaconNode) Resume() error {
|
||||
|
||||
// Stop stops the component and its underlying process.
|
||||
func (node *LighthouseBeaconNode) Stop() error {
|
||||
return node.cmd.Process.Kill()
|
||||
return helpers.GracefulStop(node.cmd.Process)
|
||||
}
|
||||
|
||||
func (node *LighthouseBeaconNode) UnderlyingProcess() *os.Process {
|
||||
|
||||
@@ -237,7 +237,7 @@ func (v *LighthouseValidatorNode) Resume() error {
|
||||
|
||||
// Stop stops the component and its underlying process.
|
||||
func (v *LighthouseValidatorNode) Stop() error {
|
||||
return v.cmd.Process.Kill()
|
||||
return helpers.GracefulStop(v.cmd.Process)
|
||||
}
|
||||
|
||||
func (v *LighthouseValidatorNode) UnderlyingProcess() *os.Process {
|
||||
|
||||
@@ -113,19 +113,11 @@ func (ts *TracingSink) initializeSink(ctx context.Context) {
|
||||
sigs := make(chan os.Signal, 1)
|
||||
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
cleanup()
|
||||
return
|
||||
case <-sigs:
|
||||
cleanup()
|
||||
return
|
||||
default:
|
||||
// Sleep for 100ms and do nothing while waiting for
|
||||
// cancellation.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
cleanup()
|
||||
case <-sigs:
|
||||
cleanup()
|
||||
}
|
||||
}()
|
||||
if err := ts.server.ListenAndServe(); err != http.ErrServerClosed {
|
||||
|
||||
@@ -337,7 +337,7 @@ func (v *ValidatorNode) Resume() error {
|
||||
|
||||
// Stop stops the component and its underlying process.
|
||||
func (v *ValidatorNode) Stop() error {
|
||||
return v.cmd.Process.Kill()
|
||||
return helpers.GracefulStop(v.cmd.Process)
|
||||
}
|
||||
|
||||
func (v *ValidatorNode) UnderlyingProcess() *os.Process {
|
||||
|
||||
@@ -20,6 +20,7 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v7/crypto/bls"
|
||||
"github.com/OffchainLabs/prysm/v7/io/file"
|
||||
"github.com/OffchainLabs/prysm/v7/runtime/interop"
|
||||
"github.com/OffchainLabs/prysm/v7/testing/endtoend/helpers"
|
||||
e2e "github.com/OffchainLabs/prysm/v7/testing/endtoend/params"
|
||||
e2etypes "github.com/OffchainLabs/prysm/v7/testing/endtoend/types"
|
||||
"github.com/bazelbuild/rules_go/go/tools/bazel"
|
||||
@@ -136,7 +137,7 @@ func (w *Web3RemoteSigner) Resume() error {
|
||||
|
||||
// Stop stops the component and its underlying process.
|
||||
func (w *Web3RemoteSigner) Stop() error {
|
||||
return w.cmd.Process.Kill()
|
||||
return helpers.GracefulStop(w.cmd.Process)
|
||||
}
|
||||
|
||||
// monitorStart by polling server until it returns a 200 at /upcheck.
|
||||
|
||||
@@ -3,8 +3,6 @@ package evaluators
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -91,19 +89,13 @@ func metricsTest(_ *types.EvaluationContext, conns ...*grpc.ClientConn) error {
|
||||
currentEpoch := slots.ToEpoch(currentSlot)
|
||||
forkDigest := params.ForkDigest(currentEpoch)
|
||||
for i := range conns {
|
||||
response, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", e2e.TestParams.Ports.PrysmBeaconNodeMetricsPort+i))
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
dataInBytes, err := getURLBodyWithRetries(ctx, fmt.Sprintf("http://localhost:%d/metrics", e2e.TestParams.Ports.PrysmBeaconNodeMetricsPort+i), httpCheckAttempts, httpCheckRetryDelay)
|
||||
cancel()
|
||||
if err != nil {
|
||||
// Continue if the connection fails, regular flake.
|
||||
continue
|
||||
}
|
||||
dataInBytes, err := io.ReadAll(response.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("metrics check failed for beacon node %d: %w", i, err)
|
||||
}
|
||||
pageContent := string(dataInBytes)
|
||||
if err = response.Body.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
time.Sleep(connTimeDelay)
|
||||
|
||||
beaconClient := eth.NewBeaconChainClient(conns[i])
|
||||
|
||||
@@ -8,10 +8,12 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v7/config/params"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
|
||||
eth "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
||||
e2e "github.com/OffchainLabs/prysm/v7/testing/endtoend/params"
|
||||
"github.com/OffchainLabs/prysm/v7/testing/endtoend/policies"
|
||||
@@ -25,6 +27,14 @@ import (
|
||||
// Allow a very short delay after disconnecting to prevent connection refused issues.
|
||||
var connTimeDelay = 50 * time.Millisecond
|
||||
|
||||
const (
|
||||
httpCheckAttempts = 5
|
||||
httpCheckRetryDelay = 200 * time.Millisecond
|
||||
headComparePollDelay = time.Second
|
||||
headCompareRequiredConsecutive = 2
|
||||
minPeersForHeadCheck = 1
|
||||
)
|
||||
|
||||
// PeersConnect checks all beacon nodes and returns whether they are connected to each other as peers.
|
||||
var PeersConnect = e2etypes.Evaluator{
|
||||
Name: "peers_connect_epoch_%d",
|
||||
@@ -46,8 +56,12 @@ var FinishedSyncing = e2etypes.Evaluator{
|
||||
Evaluation: finishedSyncing,
|
||||
}
|
||||
|
||||
// AllNodesHaveSameHead ensures all nodes have the same head epoch. Checks finality and justification as well.
|
||||
// Not checking head block root as it may change irregularly for the validator connected nodes.
|
||||
// AllNodesHaveSameHead ensures all nodes converge on the same canonical head:
|
||||
// epoch, head block root, justified root, previous justified root, and finalized root.
|
||||
// We intentionally check head block root (unlike older behavior) because only comparing
|
||||
// epochs can hide real divergence where nodes are on different blocks in the same slot/epoch.
|
||||
// To avoid reintroducing flake, the evaluator now waits for readiness and requires
|
||||
// convergence across consecutive samples before passing.
|
||||
var AllNodesHaveSameHead = e2etypes.Evaluator{
|
||||
Name: "all_nodes_have_same_head_%d",
|
||||
Policy: policies.AllEpochs,
|
||||
@@ -57,45 +71,66 @@ var AllNodesHaveSameHead = e2etypes.Evaluator{
|
||||
func healthzCheck(_ *e2etypes.EvaluationContext, conns ...*grpc.ClientConn) error {
|
||||
count := len(conns)
|
||||
for i := range count {
|
||||
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/healthz", e2e.TestParams.Ports.PrysmBeaconNodeMetricsPort+i))
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
_, err := getURLBodyWithRetries(ctx, fmt.Sprintf("http://localhost:%d/healthz", e2e.TestParams.Ports.PrysmBeaconNodeMetricsPort+i), httpCheckAttempts, httpCheckRetryDelay)
|
||||
cancel()
|
||||
if err != nil {
|
||||
// Continue if the connection fails, regular flake.
|
||||
continue
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return fmt.Errorf("expected status code OK for beacon node %d, received %v with body %s", i, resp.StatusCode, body)
|
||||
}
|
||||
if err = resp.Body.Close(); err != nil {
|
||||
return err
|
||||
return fmt.Errorf("healthz check failed for beacon node %d: %w", i, err)
|
||||
}
|
||||
time.Sleep(connTimeDelay)
|
||||
}
|
||||
|
||||
for i := range count {
|
||||
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/healthz", e2e.TestParams.Ports.ValidatorMetricsPort+i))
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
_, err := getURLBodyWithRetries(ctx, fmt.Sprintf("http://localhost:%d/healthz", e2e.TestParams.Ports.ValidatorMetricsPort+i), httpCheckAttempts, httpCheckRetryDelay)
|
||||
cancel()
|
||||
if err != nil {
|
||||
// Continue if the connection fails, regular flake.
|
||||
continue
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return fmt.Errorf("expected status code OK for validator client %d, received %v with body %s", i, resp.StatusCode, body)
|
||||
}
|
||||
if err = resp.Body.Close(); err != nil {
|
||||
return err
|
||||
return fmt.Errorf("healthz check failed for validator client %d: %w", i, err)
|
||||
}
|
||||
time.Sleep(connTimeDelay)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getURLBodyWithRetries(ctx context.Context, url string, attempts int, retryDelay time.Duration) ([]byte, error) {
|
||||
var lastErr error
|
||||
for attempt := range attempts {
|
||||
if ctx.Err() != nil {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
} else {
|
||||
body, readErr := io.ReadAll(resp.Body)
|
||||
closeErr := resp.Body.Close()
|
||||
if readErr != nil {
|
||||
lastErr = readErr
|
||||
} else if closeErr != nil {
|
||||
lastErr = closeErr
|
||||
} else if resp.StatusCode != http.StatusOK {
|
||||
lastErr = fmt.Errorf("status code=%d body=%s", resp.StatusCode, strings.TrimSpace(string(body)))
|
||||
} else {
|
||||
return body, nil
|
||||
}
|
||||
}
|
||||
|
||||
if attempt < attempts-1 {
|
||||
err := sleepWithContext(ctx, retryDelay)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("request to %s failed after %d attempts: %w", url, attempts, lastErr)
|
||||
}
|
||||
|
||||
func peersConnect(_ *e2etypes.EvaluationContext, conns ...*grpc.ClientConn) error {
|
||||
if len(conns) == 1 {
|
||||
return nil
|
||||
@@ -172,81 +207,216 @@ func allNodesHaveSameHead(_ *e2etypes.EvaluationContext, conns ...*grpc.ClientCo
|
||||
return errors.Wrap(err, "failed waiting for mid-epoch")
|
||||
}
|
||||
|
||||
headEpochs := make([]primitives.Epoch, len(conns))
|
||||
headBlockRoots := make([][]byte, len(conns))
|
||||
justifiedRoots := make([][]byte, len(conns))
|
||||
prevJustifiedRoots := make([][]byte, len(conns))
|
||||
finalizedRoots := make([][]byte, len(conns))
|
||||
chainHeads := make([]*eth.ChainHead, len(conns))
|
||||
g, _ := errgroup.WithContext(context.Background())
|
||||
consecutiveSuccesses := 0
|
||||
var lastErr error
|
||||
var lastHeads []*eth.ChainHead
|
||||
var lastPeers []int
|
||||
attempt := 0
|
||||
for {
|
||||
attempt++
|
||||
if ctx.Err() != nil {
|
||||
break
|
||||
}
|
||||
|
||||
chainHeads, err := fetchAllChainHeads(ctx, conns)
|
||||
if err != nil {
|
||||
lastErr = errors.Wrap(err, "fetch chain heads")
|
||||
consecutiveSuccesses = 0
|
||||
if err := sleepWithContext(ctx, headComparePollDelay); err != nil {
|
||||
break
|
||||
}
|
||||
continue
|
||||
}
|
||||
lastHeads = chainHeads
|
||||
|
||||
peerCounts, err := fetchPeerCounts(ctx, conns)
|
||||
if err != nil {
|
||||
lastErr = errors.Wrap(err, "fetch peer counts")
|
||||
consecutiveSuccesses = 0
|
||||
if err := sleepWithContext(ctx, headComparePollDelay); err != nil {
|
||||
break
|
||||
}
|
||||
continue
|
||||
}
|
||||
lastPeers = peerCounts
|
||||
|
||||
if !allPeerCountsAtLeast(peerCounts, minPeersForHeadCheck) {
|
||||
lastErr = fmt.Errorf("nodes not ready: insufficient peers (min=%d)\n%s", minPeersForHeadCheck, summarizeHeads(chainHeads, peerCounts))
|
||||
consecutiveSuccesses = 0
|
||||
log.WithField("attempt", attempt).Warn("Insufficient peers for stable head comparison, retrying")
|
||||
if err := sleepWithContext(ctx, headComparePollDelay); err != nil {
|
||||
break
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if anyOptimistic(chainHeads) {
|
||||
log.WithField("attempt", attempt).Warn("Optimistic head(s) observed, proceeding with head comparison anyway")
|
||||
}
|
||||
|
||||
if err := compareChainHeads(chainHeads); err != nil {
|
||||
lastErr = fmt.Errorf("%w\n%s", err, summarizeHeads(chainHeads, peerCounts))
|
||||
consecutiveSuccesses = 0
|
||||
log.WithField("attempt", attempt).Warn("Chain head mismatch, retrying")
|
||||
if err := sleepWithContext(ctx, headComparePollDelay); err != nil {
|
||||
break
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
consecutiveSuccesses++
|
||||
if consecutiveSuccesses >= headCompareRequiredConsecutive {
|
||||
return nil
|
||||
}
|
||||
if err := sleepWithContext(ctx, headComparePollDelay); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if lastErr == nil {
|
||||
lastErr = errors.New("head comparison did not converge before timeout")
|
||||
}
|
||||
if len(lastHeads) > 0 {
|
||||
return fmt.Errorf("head comparison failed: %w\n%s", lastErr, summarizeHeads(lastHeads, lastPeers))
|
||||
}
|
||||
return fmt.Errorf("head comparison failed: %w", lastErr)
|
||||
}
|
||||
|
||||
// fetchAllChainHeads queries all beacon nodes for their chain head in parallel.
|
||||
func fetchAllChainHeads(ctx context.Context, conns []*grpc.ClientConn) ([]*eth.ChainHead, error) {
|
||||
chainHeads := make([]*eth.ChainHead, len(conns))
|
||||
g, gctx := errgroup.WithContext(ctx)
|
||||
for i, conn := range conns {
|
||||
conIdx := i
|
||||
currConn := conn
|
||||
g.Go(func() error {
|
||||
beaconClient := eth.NewBeaconChainClient(currConn)
|
||||
chainHead, err := beaconClient.GetChainHead(context.Background(), &emptypb.Empty{})
|
||||
chainHead, err := beaconClient.GetChainHead(gctx, &emptypb.Empty{})
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "connection number=%d", conIdx)
|
||||
}
|
||||
headEpochs[conIdx] = chainHead.HeadEpoch
|
||||
headBlockRoots[conIdx] = chainHead.HeadBlockRoot
|
||||
justifiedRoots[conIdx] = chainHead.JustifiedBlockRoot
|
||||
prevJustifiedRoots[conIdx] = chainHead.PreviousJustifiedBlockRoot
|
||||
finalizedRoots[conIdx] = chainHead.FinalizedBlockRoot
|
||||
chainHeads[conIdx] = chainHead
|
||||
return nil
|
||||
})
|
||||
}
|
||||
if err := g.Wait(); err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
return chainHeads, nil
|
||||
}
|
||||
|
||||
for i := range conns {
|
||||
if headEpochs[0] != headEpochs[i] {
|
||||
func fetchPeerCounts(ctx context.Context, conns []*grpc.ClientConn) ([]int, error) {
|
||||
peerCounts := make([]int, len(conns))
|
||||
g, gctx := errgroup.WithContext(ctx)
|
||||
for i, conn := range conns {
|
||||
conIdx := i
|
||||
currConn := conn
|
||||
g.Go(func() error {
|
||||
nodeClient := eth.NewNodeClient(currConn)
|
||||
peersResp, err := nodeClient.ListPeers(gctx, &emptypb.Empty{})
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to list peers for connection number=%d", conIdx)
|
||||
}
|
||||
peerCounts[conIdx] = len(peersResp.Peers)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
if err := g.Wait(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return peerCounts, nil
|
||||
}
|
||||
|
||||
func allPeerCountsAtLeast(peerCounts []int, minPeers int) bool {
|
||||
for _, count := range peerCounts {
|
||||
if count < minPeers {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func anyOptimistic(chainHeads []*eth.ChainHead) bool {
|
||||
for _, head := range chainHeads {
|
||||
if head.GetOptimisticStatus() {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func summarizeHeads(chainHeads []*eth.ChainHead, peerCounts []int) string {
|
||||
var b strings.Builder
|
||||
for i, head := range chainHeads {
|
||||
peers := -1
|
||||
if i < len(peerCounts) {
|
||||
peers = peerCounts[i]
|
||||
}
|
||||
_, _ = fmt.Fprintf(
|
||||
&b,
|
||||
"node=%d slot=%d epoch=%d optimistic=%t peers=%d head=%#x justified=%#x finalized=%#x\n",
|
||||
i,
|
||||
head.HeadSlot,
|
||||
head.HeadEpoch,
|
||||
head.GetOptimisticStatus(),
|
||||
peers,
|
||||
head.HeadBlockRoot,
|
||||
head.JustifiedBlockRoot,
|
||||
head.FinalizedBlockRoot,
|
||||
)
|
||||
}
|
||||
return strings.TrimSpace(b.String())
|
||||
}
|
||||
|
||||
// compareChainHeads checks that all chain heads agree on epoch, head root,
|
||||
// justified root, previous justified root, and finalized root.
|
||||
func compareChainHeads(chainHeads []*eth.ChainHead) error {
|
||||
for i := 1; i < len(chainHeads); i++ {
|
||||
if chainHeads[0].HeadEpoch != chainHeads[i].HeadEpoch {
|
||||
return fmt.Errorf(
|
||||
"received conflicting head epochs on node %d, expected %d, received %d",
|
||||
i,
|
||||
headEpochs[0],
|
||||
headEpochs[i],
|
||||
chainHeads[0].HeadEpoch,
|
||||
chainHeads[i].HeadEpoch,
|
||||
)
|
||||
}
|
||||
if !bytes.Equal(headBlockRoots[0], headBlockRoots[i]) {
|
||||
if !bytes.Equal(chainHeads[0].HeadBlockRoot, chainHeads[i].HeadBlockRoot) {
|
||||
return fmt.Errorf(
|
||||
"received conflicting head block roots on node %d, expected %#x, received %#x",
|
||||
"received conflicting head block roots on node %d (slot %d vs %d), expected %#x, received %#x",
|
||||
i,
|
||||
headBlockRoots[0],
|
||||
headBlockRoots[i],
|
||||
chainHeads[0].HeadSlot,
|
||||
chainHeads[i].HeadSlot,
|
||||
chainHeads[0].HeadBlockRoot,
|
||||
chainHeads[i].HeadBlockRoot,
|
||||
)
|
||||
}
|
||||
if !bytes.Equal(justifiedRoots[0], justifiedRoots[i]) {
|
||||
if !bytes.Equal(chainHeads[0].JustifiedBlockRoot, chainHeads[i].JustifiedBlockRoot) {
|
||||
return fmt.Errorf(
|
||||
"received conflicting justified block roots on node %d, expected %#x, received %#x: %s and %s",
|
||||
i,
|
||||
justifiedRoots[0],
|
||||
justifiedRoots[i],
|
||||
chainHeads[0].JustifiedBlockRoot,
|
||||
chainHeads[i].JustifiedBlockRoot,
|
||||
chainHeads[0].String(),
|
||||
chainHeads[i].String(),
|
||||
)
|
||||
}
|
||||
if !bytes.Equal(prevJustifiedRoots[0], prevJustifiedRoots[i]) {
|
||||
if !bytes.Equal(chainHeads[0].PreviousJustifiedBlockRoot, chainHeads[i].PreviousJustifiedBlockRoot) {
|
||||
return fmt.Errorf(
|
||||
"received conflicting previous justified block roots on node %d, expected %#x, received %#x",
|
||||
i,
|
||||
prevJustifiedRoots[0],
|
||||
prevJustifiedRoots[i],
|
||||
chainHeads[0].PreviousJustifiedBlockRoot,
|
||||
chainHeads[i].PreviousJustifiedBlockRoot,
|
||||
)
|
||||
}
|
||||
if !bytes.Equal(finalizedRoots[0], finalizedRoots[i]) {
|
||||
if !bytes.Equal(chainHeads[0].FinalizedBlockRoot, chainHeads[i].FinalizedBlockRoot) {
|
||||
return fmt.Errorf(
|
||||
"received conflicting finalized epoch roots on node %d, expected %#x, received %#x",
|
||||
i,
|
||||
finalizedRoots[0],
|
||||
finalizedRoots[i],
|
||||
chainHeads[0].FinalizedBlockRoot,
|
||||
chainHeads[i].FinalizedBlockRoot,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -623,19 +623,24 @@ func validatorsVoteWithTheMajority(ec *e2etypes.EvaluationContext, conns ...*grp
|
||||
// We treat epoch 1 differently from other epoch for two reasons:
|
||||
// - this evaluator is not executed for epoch 0 so we have to calculate the first slot differently
|
||||
// - for some reason the vote for the first slot in epoch 1 is 0x000... so we skip this slot
|
||||
var isFirstSlotInVotingPeriod bool
|
||||
if chainHead.HeadEpoch == 1 && slot%params.BeaconConfig().SlotsPerEpoch == 0 {
|
||||
continue
|
||||
}
|
||||
// We skipped the first slot so we treat the second slot as the starting slot of epoch 1.
|
||||
|
||||
// Detect voting period boundary. Use period number instead of exact slot
|
||||
// divisibility so that missed blocks at period boundaries don't prevent
|
||||
// the expected vote from being reset.
|
||||
currentPeriod := uint64(slot) / uint64(slotsPerVotingPeriod)
|
||||
// For epoch 1, treat the second slot in the epoch as the period start
|
||||
// (since we skip the first slot above).
|
||||
isNewPeriod := currentPeriod != ec.ExpectedEth1DataVotePeriod || ec.ExpectedEth1DataVote == nil
|
||||
if chainHead.HeadEpoch == 1 {
|
||||
isFirstSlotInVotingPeriod = slot%params.BeaconConfig().SlotsPerEpoch == 1
|
||||
} else {
|
||||
isFirstSlotInVotingPeriod = slot%slotsPerVotingPeriod == 0
|
||||
isNewPeriod = (slot%params.BeaconConfig().SlotsPerEpoch == 1) || ec.ExpectedEth1DataVote == nil
|
||||
}
|
||||
if isFirstSlotInVotingPeriod {
|
||||
if isNewPeriod {
|
||||
ec.ExpectedEth1DataVote = vote
|
||||
ec.Eth1DataMismatchCount = 0 // Reset for new voting period
|
||||
ec.ExpectedEth1DataVotePeriod = currentPeriod
|
||||
ec.Eth1DataMismatchCount = 0
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package evaluators
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/signing"
|
||||
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
|
||||
@@ -58,16 +59,24 @@ var SlashedValidatorsLoseBalanceAfterEpoch = func(n primitives.Epoch) e2eTypes.E
|
||||
}
|
||||
}
|
||||
|
||||
var slashedIndices []uint64
|
||||
var (
|
||||
slashedMu sync.Mutex
|
||||
slashedIndices []uint64
|
||||
)
|
||||
|
||||
func validatorsSlashed(_ *e2eTypes.EvaluationContext, conns ...*grpc.ClientConn) error {
|
||||
conn := conns[0]
|
||||
ctx := context.Background()
|
||||
client := eth.NewBeaconChainClient(conn)
|
||||
|
||||
slashedMu.Lock()
|
||||
indices := make([]uint64, len(slashedIndices))
|
||||
copy(indices, slashedIndices)
|
||||
slashedMu.Unlock()
|
||||
|
||||
actualSlashedIndices := 0
|
||||
|
||||
for _, slashedIndex := range slashedIndices {
|
||||
for _, slashedIndex := range indices {
|
||||
req := ð.GetValidatorRequest{
|
||||
QueryFilter: ð.GetValidatorRequest_Index{
|
||||
Index: primitives.ValidatorIndex(slashedIndex),
|
||||
@@ -83,8 +92,8 @@ func validatorsSlashed(_ *e2eTypes.EvaluationContext, conns ...*grpc.ClientConn)
|
||||
}
|
||||
}
|
||||
|
||||
if actualSlashedIndices != len(slashedIndices) {
|
||||
return fmt.Errorf("expected %d indices to be slashed, received %d", len(slashedIndices), actualSlashedIndices)
|
||||
if actualSlashedIndices != len(indices) {
|
||||
return fmt.Errorf("expected %d indices to be slashed, received %d", len(indices), actualSlashedIndices)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -94,7 +103,12 @@ func validatorsLoseBalance(_ *e2eTypes.EvaluationContext, conns ...*grpc.ClientC
|
||||
ctx := context.Background()
|
||||
client := eth.NewBeaconChainClient(conn)
|
||||
|
||||
for i, slashedIndex := range slashedIndices {
|
||||
slashedMu.Lock()
|
||||
indices := make([]uint64, len(slashedIndices))
|
||||
copy(indices, slashedIndices)
|
||||
slashedMu.Unlock()
|
||||
|
||||
for i, slashedIndex := range indices {
|
||||
req := ð.GetValidatorRequest{
|
||||
QueryFilter: ð.GetValidatorRequest_Index{
|
||||
Index: primitives.ValidatorIndex(slashedIndex),
|
||||
@@ -139,7 +153,10 @@ func insertDoubleAttestationIntoPool(_ *e2eTypes.EvaluationContext, conns ...*gr
|
||||
for i := uint64(0); i < valsToSlash; i++ {
|
||||
valIdx := h.validatorIndexAtCommitteeIndex(i)
|
||||
|
||||
if len(slice.IntersectionUint64(slashedIndices, []uint64{uint64(valIdx)})) > 0 {
|
||||
slashedMu.Lock()
|
||||
alreadySlashed := len(slice.IntersectionUint64(slashedIndices, []uint64{uint64(valIdx)})) > 0
|
||||
slashedMu.Unlock()
|
||||
if alreadySlashed {
|
||||
valsToSlash++
|
||||
continue
|
||||
}
|
||||
@@ -164,7 +181,9 @@ func insertDoubleAttestationIntoPool(_ *e2eTypes.EvaluationContext, conns ...*gr
|
||||
return errors.Wrap(err, "could not propose attestation")
|
||||
}
|
||||
|
||||
slashedMu.Lock()
|
||||
slashedIndices = append(slashedIndices, uint64(valIdx))
|
||||
slashedMu.Unlock()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -232,7 +251,9 @@ func proposeDoubleBlock(_ *e2eTypes.EvaluationContext, conns ...*grpc.ClientConn
|
||||
return errors.New("expected block to fail processing")
|
||||
}
|
||||
|
||||
slashedMu.Lock()
|
||||
slashedIndices = append(slashedIndices, uint64(proposerIndex))
|
||||
slashedMu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -243,6 +243,12 @@ func validatorsSyncParticipation(_ *types.EvaluationContext, conns ...*grpc.Clie
|
||||
}
|
||||
currSlot := slots.CurrentSlot(genesis.GenesisTime.AsTime())
|
||||
currEpoch := slots.ToEpoch(currSlot)
|
||||
|
||||
// Track blocks with zero sync committee participation across both loops.
|
||||
// A small number is tolerated (transient p2p issues), but sustained
|
||||
// zero participation indicates a real problem.
|
||||
zeroSyncCount := 0
|
||||
const maxZeroSyncBlocks = 2
|
||||
lowestBound := primitives.Epoch(0)
|
||||
if currEpoch >= 1 {
|
||||
lowestBound = currEpoch - 1
|
||||
@@ -264,31 +270,52 @@ func validatorsSyncParticipation(_ *types.EvaluationContext, conns ...*grpc.Clie
|
||||
if b == nil || b.IsNil() {
|
||||
return errors.New("nil block provided")
|
||||
}
|
||||
forkStartSlot, err := slots.EpochStart(params.BeaconConfig().AltairForkEpoch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if forkStartSlot == b.Block().Slot() {
|
||||
// Skip fork slot.
|
||||
continue
|
||||
}
|
||||
// Skip slots 1-2 at genesis - validators need time to ramp up after chain start
|
||||
// due to doppelganger protection. This is a startup timing issue, not a fork transition issue.
|
||||
if b.Block().Slot() < 3 {
|
||||
continue
|
||||
}
|
||||
expectedParticipation := expectedSyncParticipation
|
||||
switch slots.ToEpoch(b.Block().Slot()) {
|
||||
case params.BeaconConfig().AltairForkEpoch:
|
||||
// Drop expected sync participation figure.
|
||||
expectedParticipation = 0.90
|
||||
default:
|
||||
// no-op
|
||||
// Skip the first three slots of each fork epoch to allow
|
||||
// gossipsub mesh reformation and sync committee ramp-up.
|
||||
forkEpochs := []primitives.Epoch{
|
||||
params.BeaconConfig().AltairForkEpoch,
|
||||
params.BeaconConfig().BellatrixForkEpoch,
|
||||
params.BeaconConfig().CapellaForkEpoch,
|
||||
params.BeaconConfig().DenebForkEpoch,
|
||||
params.BeaconConfig().ElectraForkEpoch,
|
||||
params.BeaconConfig().FuluForkEpoch,
|
||||
}
|
||||
skipSlot := false
|
||||
for _, forkEpoch := range forkEpochs {
|
||||
if forkEpoch == params.BeaconConfig().FarFutureEpoch {
|
||||
continue
|
||||
}
|
||||
forkSlot, err := slots.EpochStart(forkEpoch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if b.Block().Slot() >= forkSlot && b.Block().Slot() <= forkSlot+2 {
|
||||
skipSlot = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if skipSlot {
|
||||
continue
|
||||
}
|
||||
expectedParticipation := expectedSyncParticipation
|
||||
syncAgg, err := b.Block().Body().SyncAggregate()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Tolerate a small number of blocks with zero sync committee messages
|
||||
// (transient p2p issues), but fail if it happens too often.
|
||||
if syncAgg.SyncCommitteeBits.Count() == 0 {
|
||||
zeroSyncCount++
|
||||
if zeroSyncCount > maxZeroSyncBlocks {
|
||||
return errors.Errorf("too many blocks (%d) with zero sync participation, last at slot %d", zeroSyncCount, b.Block().Slot())
|
||||
}
|
||||
continue
|
||||
}
|
||||
threshold := uint64(float64(syncAgg.SyncCommitteeBits.Len()) * expectedParticipation)
|
||||
if syncAgg.SyncCommitteeBits.Count() < threshold {
|
||||
return errors.Errorf("In block of slot %d ,the aggregate bitvector with length of %d only got a count of %d", b.Block().Slot(), threshold, syncAgg.SyncCommitteeBits.Count())
|
||||
@@ -330,8 +357,9 @@ func validatorsSyncParticipation(_ *types.EvaluationContext, conns ...*grpc.Clie
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Skip the first two slots of each fork epoch.
|
||||
if b.Block().Slot() == forkSlot || b.Block().Slot() == forkSlot+1 {
|
||||
// Skip the first three slots of each fork epoch to allow
|
||||
// gossipsub mesh reformation and sync committee ramp-up.
|
||||
if b.Block().Slot() >= forkSlot && b.Block().Slot() <= forkSlot+2 {
|
||||
skipSlot = true
|
||||
break
|
||||
}
|
||||
@@ -343,6 +371,15 @@ func validatorsSyncParticipation(_ *types.EvaluationContext, conns ...*grpc.Clie
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Tolerate a small number of blocks with zero sync committee messages
|
||||
// (transient p2p issues), but fail if it happens too often.
|
||||
if syncAgg.SyncCommitteeBits.Count() == 0 {
|
||||
zeroSyncCount++
|
||||
if zeroSyncCount > maxZeroSyncBlocks {
|
||||
return errors.Errorf("too many blocks (%d) with zero sync participation, last at slot %d", zeroSyncCount, b.Block().Slot())
|
||||
}
|
||||
continue
|
||||
}
|
||||
threshold := uint64(float64(syncAgg.SyncCommitteeBits.Len()) * expectedSyncParticipation)
|
||||
if syncAgg.SyncCommitteeBits.Count() < threshold {
|
||||
return errors.Errorf("In block of slot %d ,the aggregate bitvector with length of %d only got a count of %d", b.Block().Slot(), threshold, syncAgg.SyncCommitteeBits.Count())
|
||||
|
||||
@@ -25,16 +25,17 @@ func (s *EpochTicker) C() <-chan uint64 {
|
||||
|
||||
// Done should be called to clean up the ticker.
|
||||
func (s *EpochTicker) Done() {
|
||||
go func() {
|
||||
s.done <- struct{}{}
|
||||
}()
|
||||
select {
|
||||
case s.done <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// NewEpochTicker starts the EpochTicker.
|
||||
func NewEpochTicker(genesisTime time.Time, secondsPerEpoch uint64) *EpochTicker {
|
||||
ticker := &EpochTicker{
|
||||
c: make(chan uint64),
|
||||
done: make(chan struct{}),
|
||||
done: make(chan struct{}, 1),
|
||||
}
|
||||
ticker.start(genesisTime, secondsPerEpoch, prysmTime.Since, prysmTime.Until, time.After)
|
||||
return ticker
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -31,7 +32,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
maxPollingWaitTime = 60 * time.Second // A minute so timing out doesn't take very long.
|
||||
maxPollingWaitTime = 120 * time.Second // Two minutes to accommodate loaded CI machines.
|
||||
filePollingInterval = 500 * time.Millisecond
|
||||
memoryHeapFileName = "node_heap_%d.pb.gz"
|
||||
cpuProfileFileName = "node_cpu_profile_%d.pb.gz"
|
||||
@@ -84,17 +85,18 @@ func WaitForTextInFile(src *os.File, match string) error {
|
||||
}
|
||||
defer func() {
|
||||
if ferr := f.Close(); ferr != nil {
|
||||
if !errors.Is(err, os.ErrClosed) {
|
||||
if !errors.Is(ferr, os.ErrClosed) {
|
||||
log.WithError(ferr).Errorf("error calling .Close on the file handle for %s", f.Name())
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// spawn a goroutine to scan
|
||||
errChan := make(chan error)
|
||||
errChan := make(chan error, 1)
|
||||
foundChan := make(chan struct{})
|
||||
go func() {
|
||||
t := time.NewTicker(filePollingInterval)
|
||||
defer t.Stop()
|
||||
// This needs to happen in a loop because, even though the other process is still appending to the log file,
|
||||
// scanner will see EOF once it hits the end of what's been written so far.
|
||||
for {
|
||||
@@ -104,9 +106,8 @@ func WaitForTextInFile(src *os.File, match string) error {
|
||||
case <-t.C:
|
||||
// This is a paranoid check because I'm not sure if the underlying fd handle can be stuck mid-line
|
||||
// when Scanner sees a partially written line at EOF. It's probably safest to just keep this.
|
||||
_, err = f.Seek(0, io.SeekStart)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
if _, serr := f.Seek(0, io.SeekStart); serr != nil {
|
||||
errChan <- serr
|
||||
return
|
||||
}
|
||||
lineScanner := bufio.NewScanner(f)
|
||||
@@ -122,9 +123,10 @@ func WaitForTextInFile(src *os.File, match string) error {
|
||||
}
|
||||
}
|
||||
// If Scan returned false for an error (except EOF), Err will return it.
|
||||
if err = lineScanner.Err(); err != nil {
|
||||
if serr := lineScanner.Err(); serr != nil {
|
||||
// Bubble the error back up to the parent goroutine.
|
||||
errChan <- err
|
||||
errChan <- serr
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -135,8 +137,8 @@ func WaitForTextInFile(src *os.File, match string) error {
|
||||
return fmt.Errorf("could not find requested text \"%s\" in %s before deadline", match, f.Name())
|
||||
case <-foundChan:
|
||||
return nil
|
||||
case err = <-errChan:
|
||||
return errors.Wrapf(err, "received error while scanning %s for %s", f.Name(), match)
|
||||
case scanErr := <-errChan:
|
||||
return errors.Wrapf(scanErr, "received error while scanning %s for %s", f.Name(), match)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -381,6 +383,29 @@ func WaitOnNodes(ctx context.Context, nodes []e2etypes.ComponentRunner, nodesSta
|
||||
return g.Wait()
|
||||
}
|
||||
|
||||
// GracefulStop sends SIGTERM to a process and gives it 5 seconds to exit before
|
||||
// sending SIGKILL. It does not call p.Wait() since the caller (cmd.Wait in Start)
|
||||
// is expected to handle process reaping.
|
||||
func GracefulStop(p *os.Process) error {
|
||||
if p == nil {
|
||||
return nil
|
||||
}
|
||||
if err := p.Signal(syscall.SIGTERM); err != nil {
|
||||
// Process may have already exited; try kill as last resort.
|
||||
return p.Kill()
|
||||
}
|
||||
// Give the process time to handle SIGTERM and exit cleanly.
|
||||
// The parent goroutine's cmd.Wait() will detect the exit.
|
||||
// If still alive after the grace period, force kill.
|
||||
time.AfterFunc(5*time.Second, func() {
|
||||
// Signal(0) checks if the process is still alive without sending a signal.
|
||||
if err := p.Signal(syscall.Signal(0)); err == nil {
|
||||
_ = p.Kill()
|
||||
}
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func MinerRPCClient() (*ethclient.Client, error) {
|
||||
client, err := rpc.DialHTTP(e2e.TestParams.Eth1RPCURL(e2e.MinerComponentOffset).String())
|
||||
if err != nil {
|
||||
|
||||
@@ -179,9 +179,10 @@ type EvaluationContext struct {
|
||||
DepositBalancer
|
||||
// ExitedVals maps validator pubkey to the epoch when their exit was submitted.
|
||||
// The actual exit takes effect at: submission_epoch + 1 + MaxSeedLookahead
|
||||
ExitedVals map[[48]byte]primitives.Epoch
|
||||
SeenVotes map[primitives.Slot][]byte
|
||||
ExpectedEth1DataVote []byte
|
||||
ExitedVals map[[48]byte]primitives.Epoch
|
||||
SeenVotes map[primitives.Slot][]byte
|
||||
ExpectedEth1DataVote []byte
|
||||
ExpectedEth1DataVotePeriod uint64
|
||||
// Eth1DataMismatchCount tracks how many eth1data vote mismatches have been seen
|
||||
// in the current voting period. Some tolerance is allowed for timing differences.
|
||||
Eth1DataMismatchCount int
|
||||
|
||||
Reference in New Issue
Block a user