mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-31 08:08:18 -05:00
Compare commits
6 Commits
develop
...
e2e-debugg
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
35720e3f71 | ||
|
|
6effcf5d53 | ||
|
|
bd778dad3a | ||
|
|
ceadf6e5c9 | ||
|
|
22769ed486 | ||
|
|
b960e54e00 |
@@ -67,7 +67,6 @@ func getSubscriptionStatusFromDB(t *testing.T, db *Store) bool {
|
|||||||
return subscribed
|
return subscribed
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func TestUpdateCustodyInfo(t *testing.T) {
|
func TestUpdateCustodyInfo(t *testing.T) {
|
||||||
ctx := t.Context()
|
ctx := t.Context()
|
||||||
|
|
||||||
|
|||||||
@@ -575,7 +575,7 @@ func (s *Service) beaconEndpoints(
|
|||||||
name: namespace + ".PublishBlockV2",
|
name: namespace + ".PublishBlockV2",
|
||||||
middleware: []middleware.Middleware{
|
middleware: []middleware.Middleware{
|
||||||
middleware.ContentTypeHandler([]string{api.JsonMediaType, api.OctetStreamMediaType}),
|
middleware.ContentTypeHandler([]string{api.JsonMediaType, api.OctetStreamMediaType}),
|
||||||
middleware.AcceptHeaderHandler([]string{api.JsonMediaType}),
|
middleware.AcceptHeaderHandler([]string{api.JsonMediaType, api.OctetStreamMediaType}),
|
||||||
middleware.AcceptEncodingHeaderHandler(),
|
middleware.AcceptEncodingHeaderHandler(),
|
||||||
},
|
},
|
||||||
handler: server.PublishBlockV2,
|
handler: server.PublishBlockV2,
|
||||||
@@ -586,7 +586,7 @@ func (s *Service) beaconEndpoints(
|
|||||||
name: namespace + ".PublishBlindedBlockV2",
|
name: namespace + ".PublishBlindedBlockV2",
|
||||||
middleware: []middleware.Middleware{
|
middleware: []middleware.Middleware{
|
||||||
middleware.ContentTypeHandler([]string{api.JsonMediaType, api.OctetStreamMediaType}),
|
middleware.ContentTypeHandler([]string{api.JsonMediaType, api.OctetStreamMediaType}),
|
||||||
middleware.AcceptHeaderHandler([]string{api.JsonMediaType}),
|
middleware.AcceptHeaderHandler([]string{api.JsonMediaType, api.OctetStreamMediaType}),
|
||||||
middleware.AcceptEncodingHeaderHandler(),
|
middleware.AcceptEncodingHeaderHandler(),
|
||||||
},
|
},
|
||||||
handler: server.PublishBlindedBlockV2,
|
handler: server.PublishBlindedBlockV2,
|
||||||
|
|||||||
@@ -26,8 +26,8 @@ import (
|
|||||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/voluntaryexits/mock"
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/voluntaryexits/mock"
|
||||||
p2pMock "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/testing"
|
p2pMock "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/testing"
|
||||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/rpc/core"
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/rpc/core"
|
||||||
mockSync "github.com/OffchainLabs/prysm/v7/beacon-chain/sync/initial-sync/testing"
|
|
||||||
state_native "github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native"
|
state_native "github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native"
|
||||||
|
mockSync "github.com/OffchainLabs/prysm/v7/beacon-chain/sync/initial-sync/testing"
|
||||||
"github.com/OffchainLabs/prysm/v7/config/params"
|
"github.com/OffchainLabs/prysm/v7/config/params"
|
||||||
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
|
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
|
||||||
"github.com/OffchainLabs/prysm/v7/crypto/bls"
|
"github.com/OffchainLabs/prysm/v7/crypto/bls"
|
||||||
|
|||||||
@@ -1027,10 +1027,10 @@ func TestGetVerifyingStateEdgeCases(t *testing.T) {
|
|||||||
sc: signatureCache,
|
sc: signatureCache,
|
||||||
sr: &mockStateByRooter{sbr: sbrErrorIfCalled(t)}, // Should not be called
|
sr: &mockStateByRooter{sbr: sbrErrorIfCalled(t)}, // Should not be called
|
||||||
hsp: &mockHeadStateProvider{
|
hsp: &mockHeadStateProvider{
|
||||||
headRoot: parentRoot[:], // Same as parent
|
headRoot: parentRoot[:], // Same as parent
|
||||||
headSlot: 32, // Epoch 1
|
headSlot: 32, // Epoch 1
|
||||||
headState: fuluState.Copy(), // HeadState (not ReadOnly) for ProcessSlots
|
headState: fuluState.Copy(), // HeadState (not ReadOnly) for ProcessSlots
|
||||||
headStateReadOnly: nil, // Should not use ReadOnly path
|
headStateReadOnly: nil, // Should not use ReadOnly path
|
||||||
},
|
},
|
||||||
fc: &mockForkchoicer{
|
fc: &mockForkchoicer{
|
||||||
// Return same root for both to simulate same chain
|
// Return same root for both to simulate same chain
|
||||||
@@ -1045,8 +1045,8 @@ func TestGetVerifyingStateEdgeCases(t *testing.T) {
|
|||||||
// Wrap to detect HeadState call
|
// Wrap to detect HeadState call
|
||||||
originalHsp := initializer.shared.hsp.(*mockHeadStateProvider)
|
originalHsp := initializer.shared.hsp.(*mockHeadStateProvider)
|
||||||
wrappedHsp := &mockHeadStateProvider{
|
wrappedHsp := &mockHeadStateProvider{
|
||||||
headRoot: originalHsp.headRoot,
|
headRoot: originalHsp.headRoot,
|
||||||
headSlot: originalHsp.headSlot,
|
headSlot: originalHsp.headSlot,
|
||||||
headState: originalHsp.headState,
|
headState: originalHsp.headState,
|
||||||
}
|
}
|
||||||
initializer.shared.hsp = &headStateCallTracker{
|
initializer.shared.hsp = &headStateCallTracker{
|
||||||
|
|||||||
@@ -26,21 +26,21 @@ func TestLifecycle(t *testing.T) {
|
|||||||
port := 1000 + rand.Intn(1000)
|
port := 1000 + rand.Intn(1000)
|
||||||
prometheusService := NewService(t.Context(), fmt.Sprintf(":%d", port), nil)
|
prometheusService := NewService(t.Context(), fmt.Sprintf(":%d", port), nil)
|
||||||
prometheusService.Start()
|
prometheusService.Start()
|
||||||
// Actively wait until the service responds on /metrics (faster and less flaky than a fixed sleep)
|
// Actively wait until the service responds on /metrics (faster and less flaky than a fixed sleep)
|
||||||
deadline := time.Now().Add(3 * time.Second)
|
deadline := time.Now().Add(3 * time.Second)
|
||||||
for {
|
for {
|
||||||
if time.Now().After(deadline) {
|
if time.Now().After(deadline) {
|
||||||
t.Fatalf("metrics endpoint not ready within timeout")
|
t.Fatalf("metrics endpoint not ready within timeout")
|
||||||
}
|
}
|
||||||
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", port))
|
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", port))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
_ = resp.Body.Close()
|
_ = resp.Body.Close()
|
||||||
if resp.StatusCode == http.StatusOK {
|
if resp.StatusCode == http.StatusOK {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
time.Sleep(50 * time.Millisecond)
|
time.Sleep(50 * time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Query the service to ensure it really started.
|
// Query the service to ensure it really started.
|
||||||
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", port))
|
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", port))
|
||||||
@@ -49,18 +49,18 @@ func TestLifecycle(t *testing.T) {
|
|||||||
|
|
||||||
err = prometheusService.Stop()
|
err = prometheusService.Stop()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
// Actively wait until the service stops responding on /metrics
|
// Actively wait until the service stops responding on /metrics
|
||||||
deadline = time.Now().Add(3 * time.Second)
|
deadline = time.Now().Add(3 * time.Second)
|
||||||
for {
|
for {
|
||||||
if time.Now().After(deadline) {
|
if time.Now().After(deadline) {
|
||||||
t.Fatalf("metrics endpoint still reachable after timeout")
|
t.Fatalf("metrics endpoint still reachable after timeout")
|
||||||
}
|
}
|
||||||
_, err = http.Get(fmt.Sprintf("http://localhost:%d/metrics", port))
|
_, err = http.Get(fmt.Sprintf("http://localhost:%d/metrics", port))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
time.Sleep(50 * time.Millisecond)
|
time.Sleep(50 * time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Query the service to ensure it really stopped.
|
// Query the service to ensure it really stopped.
|
||||||
_, err = http.Get(fmt.Sprintf("http://localhost:%d/metrics", port))
|
_, err = http.Get(fmt.Sprintf("http://localhost:%d/metrics", port))
|
||||||
|
|||||||
@@ -47,6 +47,7 @@ go_library(
|
|||||||
"@in_gopkg_yaml_v2//:go_default_library",
|
"@in_gopkg_yaml_v2//:go_default_library",
|
||||||
"@io_bazel_rules_go//go/tools/bazel:go_default_library",
|
"@io_bazel_rules_go//go/tools/bazel:go_default_library",
|
||||||
"@org_golang_x_sync//errgroup:go_default_library",
|
"@org_golang_x_sync//errgroup:go_default_library",
|
||||||
|
"@org_golang_x_sys//unix:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
|
||||||
cmdshared "github.com/OffchainLabs/prysm/v7/cmd"
|
cmdshared "github.com/OffchainLabs/prysm/v7/cmd"
|
||||||
@@ -35,11 +36,12 @@ var _ e2etypes.BeaconNodeSet = (*BeaconNodeSet)(nil)
|
|||||||
// BeaconNodeSet represents set of beacon nodes.
|
// BeaconNodeSet represents set of beacon nodes.
|
||||||
type BeaconNodeSet struct {
|
type BeaconNodeSet struct {
|
||||||
e2etypes.ComponentRunner
|
e2etypes.ComponentRunner
|
||||||
config *e2etypes.E2EConfig
|
config *e2etypes.E2EConfig
|
||||||
nodes []e2etypes.ComponentRunner
|
nodes []e2etypes.ComponentRunner
|
||||||
enr string
|
enr string
|
||||||
ids []string
|
ids []string
|
||||||
started chan struct{}
|
multiAddrs []string
|
||||||
|
started chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetENR assigns ENR to the set of beacon nodes.
|
// SetENR assigns ENR to the set of beacon nodes.
|
||||||
@@ -74,8 +76,10 @@ func (s *BeaconNodeSet) Start(ctx context.Context) error {
|
|||||||
if s.config.UseFixedPeerIDs {
|
if s.config.UseFixedPeerIDs {
|
||||||
for i := range nodes {
|
for i := range nodes {
|
||||||
s.ids = append(s.ids, nodes[i].(*BeaconNode).peerID)
|
s.ids = append(s.ids, nodes[i].(*BeaconNode).peerID)
|
||||||
|
s.multiAddrs = append(s.multiAddrs, nodes[i].(*BeaconNode).multiAddr)
|
||||||
}
|
}
|
||||||
s.config.PeerIDs = s.ids
|
s.config.PeerIDs = s.ids
|
||||||
|
s.config.PeerMultiAddrs = s.multiAddrs
|
||||||
}
|
}
|
||||||
// All nodes started, close channel, so that all services waiting on a set, can proceed.
|
// All nodes started, close channel, so that all services waiting on a set, can proceed.
|
||||||
close(s.started)
|
close(s.started)
|
||||||
@@ -141,6 +145,14 @@ func (s *BeaconNodeSet) StopAtIndex(i int) error {
|
|||||||
return s.nodes[i].Stop()
|
return s.nodes[i].Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RestartAtIndex restarts the component at the desired index.
|
||||||
|
func (s *BeaconNodeSet) RestartAtIndex(ctx context.Context, i int) error {
|
||||||
|
if i >= len(s.nodes) {
|
||||||
|
return errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.nodes))
|
||||||
|
}
|
||||||
|
return s.nodes[i].(*BeaconNode).Restart(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
// ComponentAtIndex returns the component at the provided index.
|
// ComponentAtIndex returns the component at the provided index.
|
||||||
func (s *BeaconNodeSet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) {
|
func (s *BeaconNodeSet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) {
|
||||||
if i >= len(s.nodes) {
|
if i >= len(s.nodes) {
|
||||||
@@ -152,12 +164,14 @@ func (s *BeaconNodeSet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error
|
|||||||
// BeaconNode represents beacon node.
|
// BeaconNode represents beacon node.
|
||||||
type BeaconNode struct {
|
type BeaconNode struct {
|
||||||
e2etypes.ComponentRunner
|
e2etypes.ComponentRunner
|
||||||
config *e2etypes.E2EConfig
|
config *e2etypes.E2EConfig
|
||||||
started chan struct{}
|
started chan struct{}
|
||||||
index int
|
index int
|
||||||
enr string
|
enr string
|
||||||
peerID string
|
peerID string
|
||||||
cmd *exec.Cmd
|
multiAddr string
|
||||||
|
cmd *exec.Cmd
|
||||||
|
args []string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBeaconNode creates and returns a beacon node.
|
// NewBeaconNode creates and returns a beacon node.
|
||||||
@@ -290,6 +304,7 @@ func (node *BeaconNode) Start(ctx context.Context) error {
|
|||||||
args = append(args, fmt.Sprintf("--%s=%s:%d", flags.MevRelayEndpoint.Name, "http://127.0.0.1", e2e.TestParams.Ports.Eth1ProxyPort+index))
|
args = append(args, fmt.Sprintf("--%s=%s:%d", flags.MevRelayEndpoint.Name, "http://127.0.0.1", e2e.TestParams.Ports.Eth1ProxyPort+index))
|
||||||
}
|
}
|
||||||
args = append(args, config.BeaconFlags...)
|
args = append(args, config.BeaconFlags...)
|
||||||
|
node.args = args
|
||||||
|
|
||||||
cmd := exec.CommandContext(ctx, binaryPath, args...) // #nosec G204 -- Safe
|
cmd := exec.CommandContext(ctx, binaryPath, args...) // #nosec G204 -- Safe
|
||||||
// Write stderr to log files.
|
// Write stderr to log files.
|
||||||
@@ -318,6 +333,18 @@ func (node *BeaconNode) Start(ctx context.Context) error {
|
|||||||
return fmt.Errorf("could not find peer id: %w", err)
|
return fmt.Errorf("could not find peer id: %w", err)
|
||||||
}
|
}
|
||||||
node.peerID = peerId
|
node.peerID = peerId
|
||||||
|
|
||||||
|
// Extract QUIC multiaddr for Lighthouse to connect to this node.
|
||||||
|
// Prysm logs: msg="Node started p2p server" multiAddr="/ip4/192.168.0.14/udp/4250/quic-v1/p2p/16Uiu2..."
|
||||||
|
// We prefer QUIC over TCP as it's more reliable in E2E tests.
|
||||||
|
multiAddr, err := helpers.FindFollowingTextInFile(stdOutFile, "multiAddr=\"/ip4/192.168.0.14/udp/")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not find QUIC multiaddr: %w", err)
|
||||||
|
}
|
||||||
|
// The extracted text will be like: 4250/quic-v1/p2p/16Uiu2..."
|
||||||
|
// We need to prepend "/ip4/192.168.0.14/udp/" and strip the trailing quote
|
||||||
|
multiAddr = strings.TrimSuffix(multiAddr, "\"")
|
||||||
|
node.multiAddr = "/ip4/192.168.0.14/udp/" + multiAddr
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mark node as ready.
|
// Mark node as ready.
|
||||||
@@ -347,6 +374,96 @@ func (node *BeaconNode) Stop() error {
|
|||||||
return node.cmd.Process.Kill()
|
return node.cmd.Process.Kill()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Restart gracefully stops the beacon node and starts a new process.
|
||||||
|
// This is useful for testing resilience as it allows the P2P layer to reinitialize
|
||||||
|
// and discover peers again (unlike SIGSTOP/SIGCONT which breaks QUIC connections permanently).
|
||||||
|
func (node *BeaconNode) Restart(ctx context.Context) error {
|
||||||
|
binaryPath, found := bazel.FindBinary("cmd/beacon-chain", "beacon-chain")
|
||||||
|
if !found {
|
||||||
|
return errors.New("beacon chain binary not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
// First, continue the process if it's stopped (from PauseAtIndex).
|
||||||
|
// A stopped process (SIGSTOP) cannot receive SIGTERM until continued.
|
||||||
|
_ = node.cmd.Process.Signal(syscall.SIGCONT)
|
||||||
|
|
||||||
|
if err := node.cmd.Process.Signal(syscall.SIGTERM); err != nil {
|
||||||
|
return fmt.Errorf("failed to send SIGTERM: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for process to exit by polling. We can't call cmd.Wait() here because
|
||||||
|
// the Start() method's goroutine is already waiting on the command, and calling
|
||||||
|
// Wait() twice on the same process causes "waitid: no child processes" error.
|
||||||
|
// Instead, poll using Signal(0) which returns an error when the process no longer exists.
|
||||||
|
processExited := false
|
||||||
|
for range 100 {
|
||||||
|
if err := node.cmd.Process.Signal(syscall.Signal(0)); err != nil {
|
||||||
|
processExited = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
if !processExited {
|
||||||
|
log.Warnf("Beacon node %d did not exit within 10 seconds after SIGTERM, proceeding with restart anyway", node.index)
|
||||||
|
}
|
||||||
|
|
||||||
|
restartArgs := make([]string, 0, len(node.args))
|
||||||
|
for _, arg := range node.args {
|
||||||
|
if !strings.Contains(arg, cmdshared.ForceClearDB.Name) {
|
||||||
|
restartArgs = append(restartArgs, arg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stdOutFile, err := os.OpenFile(
|
||||||
|
path.Join(e2e.TestParams.LogPath, fmt.Sprintf(e2e.BeaconNodeLogFileName, node.index)),
|
||||||
|
os.O_APPEND|os.O_WRONLY,
|
||||||
|
0644,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to open log file: %w", err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err := stdOutFile.Close(); err != nil {
|
||||||
|
log.WithError(err).Error("Failed to close stdout file")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
cmd := exec.CommandContext(ctx, binaryPath, restartArgs...)
|
||||||
|
stderr, err := os.OpenFile(
|
||||||
|
path.Join(e2e.TestParams.LogPath, fmt.Sprintf("beacon_node_%d_stderr.log", node.index)),
|
||||||
|
os.O_APPEND|os.O_WRONLY|os.O_CREATE,
|
||||||
|
0644,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to open stderr file: %w", err)
|
||||||
|
}
|
||||||
|
cmd.Stderr = stderr
|
||||||
|
|
||||||
|
log.Infof("Restarting beacon chain %d with flags: %s", node.index, strings.Join(restartArgs, " "))
|
||||||
|
if err = cmd.Start(); err != nil {
|
||||||
|
if closeErr := stderr.Close(); closeErr != nil {
|
||||||
|
log.WithError(closeErr).Error("Failed to close stderr file")
|
||||||
|
}
|
||||||
|
return fmt.Errorf("failed to restart beacon node: %w", err)
|
||||||
|
}
|
||||||
|
// Close the parent's file handle after Start(). The child process has its own
|
||||||
|
// copy of the file descriptor via fork/exec, so this won't affect its ability to write.
|
||||||
|
if err := stderr.Close(); err != nil {
|
||||||
|
log.WithError(err).Error("Failed to close stderr file")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = helpers.WaitForTextInFile(stdOutFile, "Beacon chain gRPC server listening"); err != nil {
|
||||||
|
return fmt.Errorf("beacon node %d failed to restart properly: %w", node.index, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
node.cmd = cmd
|
||||||
|
go func() {
|
||||||
|
_ = cmd.Wait()
|
||||||
|
}()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (node *BeaconNode) UnderlyingProcess() *os.Process {
|
func (node *BeaconNode) UnderlyingProcess() *os.Process {
|
||||||
return node.cmd.Process
|
return node.cmd.Process
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -108,6 +108,17 @@ func (s *BuilderSet) StopAtIndex(i int) error {
|
|||||||
return s.builders[i].Stop()
|
return s.builders[i].Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RestartAtIndex for builders just does pause/resume.
|
||||||
|
func (s *BuilderSet) RestartAtIndex(_ context.Context, i int) error {
|
||||||
|
if i >= len(s.builders) {
|
||||||
|
return errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.builders))
|
||||||
|
}
|
||||||
|
if err := s.builders[i].Pause(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return s.builders[i].Resume()
|
||||||
|
}
|
||||||
|
|
||||||
// ComponentAtIndex returns the component at the provided index.
|
// ComponentAtIndex returns the component at the provided index.
|
||||||
func (s *BuilderSet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) {
|
func (s *BuilderSet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) {
|
||||||
if i >= len(s.builders) {
|
if i >= len(s.builders) {
|
||||||
|
|||||||
@@ -111,6 +111,17 @@ func (s *NodeSet) StopAtIndex(i int) error {
|
|||||||
return s.nodes[i].Stop()
|
return s.nodes[i].Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RestartAtIndex for eth1 nodes just does pause/resume.
|
||||||
|
func (s *NodeSet) RestartAtIndex(_ context.Context, i int) error {
|
||||||
|
if i >= len(s.nodes) {
|
||||||
|
return errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.nodes))
|
||||||
|
}
|
||||||
|
if err := s.nodes[i].Pause(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return s.nodes[i].Resume()
|
||||||
|
}
|
||||||
|
|
||||||
// ComponentAtIndex returns the component at the provided index.
|
// ComponentAtIndex returns the component at the provided index.
|
||||||
func (s *NodeSet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) {
|
func (s *NodeSet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) {
|
||||||
if i >= len(s.nodes) {
|
if i >= len(s.nodes) {
|
||||||
|
|||||||
@@ -108,6 +108,17 @@ func (s *ProxySet) StopAtIndex(i int) error {
|
|||||||
return s.proxies[i].Stop()
|
return s.proxies[i].Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RestartAtIndex for proxies just does pause/resume.
|
||||||
|
func (s *ProxySet) RestartAtIndex(_ context.Context, i int) error {
|
||||||
|
if i >= len(s.proxies) {
|
||||||
|
return errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.proxies))
|
||||||
|
}
|
||||||
|
if err := s.proxies[i].Pause(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return s.proxies[i].Resume()
|
||||||
|
}
|
||||||
|
|
||||||
// ComponentAtIndex returns the component at the provided index.
|
// ComponentAtIndex returns the component at the provided index.
|
||||||
func (s *ProxySet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) {
|
func (s *ProxySet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) {
|
||||||
if i >= len(s.proxies) {
|
if i >= len(s.proxies) {
|
||||||
|
|||||||
@@ -127,6 +127,17 @@ func (s *LighthouseBeaconNodeSet) StopAtIndex(i int) error {
|
|||||||
return s.nodes[i].Stop()
|
return s.nodes[i].Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RestartAtIndex for Lighthouse just does pause/resume.
|
||||||
|
func (s *LighthouseBeaconNodeSet) RestartAtIndex(_ context.Context, i int) error {
|
||||||
|
if i >= len(s.nodes) {
|
||||||
|
return errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.nodes))
|
||||||
|
}
|
||||||
|
if err := s.nodes[i].Pause(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return s.nodes[i].Resume()
|
||||||
|
}
|
||||||
|
|
||||||
// ComponentAtIndex returns the component at the provided index.
|
// ComponentAtIndex returns the component at the provided index.
|
||||||
func (s *LighthouseBeaconNodeSet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) {
|
func (s *LighthouseBeaconNodeSet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) {
|
||||||
if i >= len(s.nodes) {
|
if i >= len(s.nodes) {
|
||||||
@@ -194,9 +205,10 @@ func (node *LighthouseBeaconNode) Start(ctx context.Context) error {
|
|||||||
"--suggested-fee-recipient=0x878705ba3f8bc32fcf7f4caa1a35e72af65cf766",
|
"--suggested-fee-recipient=0x878705ba3f8bc32fcf7f4caa1a35e72af65cf766",
|
||||||
}
|
}
|
||||||
if node.config.UseFixedPeerIDs {
|
if node.config.UseFixedPeerIDs {
|
||||||
flagVal := strings.Join(node.config.PeerIDs, ",")
|
// Use libp2p-addresses with full multiaddrs instead of trusted-peers with just peer IDs.
|
||||||
args = append(args,
|
// This allows Lighthouse to connect directly to Prysm nodes without relying on DHT discovery.
|
||||||
fmt.Sprintf("--trusted-peers=%s", flagVal))
|
flagVal := strings.Join(node.config.PeerMultiAddrs, ",")
|
||||||
|
args = append(args, fmt.Sprintf("--libp2p-addresses=%s", flagVal))
|
||||||
}
|
}
|
||||||
if node.config.UseBuilder {
|
if node.config.UseBuilder {
|
||||||
args = append(args, fmt.Sprintf("--builder=%s:%d", "http://127.0.0.1", e2e.TestParams.Ports.Eth1ProxyPort+prysmNodeCount+index))
|
args = append(args, fmt.Sprintf("--builder=%s:%d", "http://127.0.0.1", e2e.TestParams.Ports.Eth1ProxyPort+prysmNodeCount+index))
|
||||||
|
|||||||
@@ -132,6 +132,17 @@ func (s *LighthouseValidatorNodeSet) StopAtIndex(i int) error {
|
|||||||
return s.nodes[i].Stop()
|
return s.nodes[i].Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RestartAtIndex for Lighthouse validators just does pause/resume.
|
||||||
|
func (s *LighthouseValidatorNodeSet) RestartAtIndex(_ context.Context, i int) error {
|
||||||
|
if i >= len(s.nodes) {
|
||||||
|
return errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.nodes))
|
||||||
|
}
|
||||||
|
if err := s.nodes[i].Pause(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return s.nodes[i].Resume()
|
||||||
|
}
|
||||||
|
|
||||||
// ComponentAtIndex returns the component at the provided index.
|
// ComponentAtIndex returns the component at the provided index.
|
||||||
func (s *LighthouseValidatorNodeSet) ComponentAtIndex(i int) (types.ComponentRunner, error) {
|
func (s *LighthouseValidatorNodeSet) ComponentAtIndex(i int) (types.ComponentRunner, error) {
|
||||||
if i >= len(s.nodes) {
|
if i >= len(s.nodes) {
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"io"
|
"io"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
@@ -15,6 +16,7 @@ import (
|
|||||||
e2e "github.com/OffchainLabs/prysm/v7/testing/endtoend/params"
|
e2e "github.com/OffchainLabs/prysm/v7/testing/endtoend/params"
|
||||||
"github.com/OffchainLabs/prysm/v7/testing/endtoend/types"
|
"github.com/OffchainLabs/prysm/v7/testing/endtoend/types"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
"golang.org/x/sys/unix"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ types.ComponentRunner = &TracingSink{}
|
var _ types.ComponentRunner = &TracingSink{}
|
||||||
@@ -32,6 +34,7 @@ var _ types.ComponentRunner = &TracingSink{}
|
|||||||
type TracingSink struct {
|
type TracingSink struct {
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
started chan struct{}
|
started chan struct{}
|
||||||
|
stopped chan struct{}
|
||||||
endpoint string
|
endpoint string
|
||||||
server *http.Server
|
server *http.Server
|
||||||
}
|
}
|
||||||
@@ -40,6 +43,7 @@ type TracingSink struct {
|
|||||||
func NewTracingSink(endpoint string) *TracingSink {
|
func NewTracingSink(endpoint string) *TracingSink {
|
||||||
return &TracingSink{
|
return &TracingSink{
|
||||||
started: make(chan struct{}, 1),
|
started: make(chan struct{}, 1),
|
||||||
|
stopped: make(chan struct{}),
|
||||||
endpoint: endpoint,
|
endpoint: endpoint,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -73,62 +77,99 @@ func (ts *TracingSink) Resume() error {
|
|||||||
|
|
||||||
// Stop stops the component and its underlying process.
|
// Stop stops the component and its underlying process.
|
||||||
func (ts *TracingSink) Stop() error {
|
func (ts *TracingSink) Stop() error {
|
||||||
ts.cancel()
|
if ts.cancel != nil {
|
||||||
|
ts.cancel()
|
||||||
|
}
|
||||||
|
// Wait for server to actually shut down before returning
|
||||||
|
<-ts.stopped
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// reusePortListener creates a TCP listener with SO_REUSEADDR set, allowing
|
||||||
|
// the port to be reused immediately after the previous listener closes.
|
||||||
|
// This is essential for sequential E2E tests that reuse the same port.
|
||||||
|
func reusePortListener(addr string) (net.Listener, error) {
|
||||||
|
lc := net.ListenConfig{
|
||||||
|
Control: func(network, address string, c syscall.RawConn) error {
|
||||||
|
var setSockOptErr error
|
||||||
|
err := c.Control(func(fd uintptr) {
|
||||||
|
setSockOptErr = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEADDR, 1)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return setSockOptErr
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return lc.Listen(context.Background(), "tcp", addr)
|
||||||
|
}
|
||||||
|
|
||||||
// Initialize an http handler that writes all requests to a file.
|
// Initialize an http handler that writes all requests to a file.
|
||||||
func (ts *TracingSink) initializeSink(ctx context.Context) {
|
func (ts *TracingSink) initializeSink(ctx context.Context) {
|
||||||
|
defer close(ts.stopped)
|
||||||
|
|
||||||
mux := &http.ServeMux{}
|
mux := &http.ServeMux{}
|
||||||
ts.server = &http.Server{
|
ts.server = &http.Server{
|
||||||
Addr: ts.endpoint,
|
Addr: ts.endpoint,
|
||||||
Handler: mux,
|
Handler: mux,
|
||||||
ReadHeaderTimeout: time.Second,
|
ReadHeaderTimeout: time.Second,
|
||||||
}
|
}
|
||||||
defer func() {
|
// Disable keep-alives to ensure connections close immediately
|
||||||
if err := ts.server.Close(); err != nil {
|
ts.server.SetKeepAlivesEnabled(false)
|
||||||
log.WithError(err).Error("Failed to close http server")
|
|
||||||
return
|
// Create listener with SO_REUSEADDR to allow port reuse between tests
|
||||||
}
|
listener, err := reusePortListener(ts.endpoint)
|
||||||
}()
|
if err != nil {
|
||||||
|
log.WithError(err).Error("Failed to create listener")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
stdOutFile, err := helpers.DeleteAndCreateFile(e2e.TestParams.LogPath, e2e.TracingRequestSinkFileName)
|
stdOutFile, err := helpers.DeleteAndCreateFile(e2e.TestParams.LogPath, e2e.TracingRequestSinkFileName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("Failed to create stdout file")
|
log.WithError(err).Error("Failed to create stdout file")
|
||||||
|
if closeErr := listener.Close(); closeErr != nil {
|
||||||
|
log.WithError(closeErr).Error("Failed to close listener after file creation error")
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanup := func() {
|
cleanup := func() {
|
||||||
if err := stdOutFile.Close(); err != nil {
|
if err := stdOutFile.Close(); err != nil {
|
||||||
log.WithError(err).Error("Could not close stdout file")
|
log.WithError(err).Error("Could not close stdout file")
|
||||||
}
|
}
|
||||||
if err := ts.server.Close(); err != nil {
|
// Use Shutdown for graceful shutdown that releases the port
|
||||||
log.WithError(err).Error("Could not close http server")
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
if err := ts.server.Shutdown(shutdownCtx); err != nil {
|
||||||
|
log.WithError(err).Error("Could not gracefully shutdown http server")
|
||||||
|
// Force close if shutdown times out
|
||||||
|
if err := ts.server.Close(); err != nil {
|
||||||
|
log.WithError(err).Error("Could not close http server")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
mux.HandleFunc("/", func(_ http.ResponseWriter, r *http.Request) {
|
mux.HandleFunc("/", func(_ http.ResponseWriter, r *http.Request) {
|
||||||
if err := captureRequest(stdOutFile, r); err != nil {
|
if err := captureRequest(stdOutFile, r); err != nil {
|
||||||
log.WithError(err).Error("Failed to capture http request")
|
log.WithError(err).Error("Failed to capture http request")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
sigs := make(chan os.Signal, 1)
|
sigs := make(chan os.Signal, 1)
|
||||||
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
select {
|
||||||
select {
|
case <-ctx.Done():
|
||||||
case <-ctx.Done():
|
return
|
||||||
cleanup()
|
case <-sigs:
|
||||||
return
|
return
|
||||||
case <-sigs:
|
|
||||||
cleanup()
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
// Sleep for 100ms and do nothing while waiting for
|
|
||||||
// cancellation.
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
if err := ts.server.ListenAndServe(); err != http.ErrServerClosed {
|
|
||||||
|
// Use Serve with our custom listener instead of ListenAndServe
|
||||||
|
if err := ts.server.Serve(listener); err != nil && err != http.ErrServerClosed {
|
||||||
log.WithError(err).Error("Failed to serve http")
|
log.WithError(err).Error("Failed to serve http")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -134,6 +134,17 @@ func (s *ValidatorNodeSet) StopAtIndex(i int) error {
|
|||||||
return s.nodes[i].Stop()
|
return s.nodes[i].Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RestartAtIndex for validators just does pause/resume since they don't have P2P issues.
|
||||||
|
func (s *ValidatorNodeSet) RestartAtIndex(_ context.Context, i int) error {
|
||||||
|
if i >= len(s.nodes) {
|
||||||
|
return errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.nodes))
|
||||||
|
}
|
||||||
|
if err := s.nodes[i].Pause(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return s.nodes[i].Resume()
|
||||||
|
}
|
||||||
|
|
||||||
// ComponentAtIndex returns the component at the provided index.
|
// ComponentAtIndex returns the component at the provided index.
|
||||||
func (s *ValidatorNodeSet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) {
|
func (s *ValidatorNodeSet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) {
|
||||||
if i >= len(s.nodes) {
|
if i >= len(s.nodes) {
|
||||||
|
|||||||
@@ -48,7 +48,6 @@ const (
|
|||||||
// allNodesStartTimeout defines period after which nodes are considered
|
// allNodesStartTimeout defines period after which nodes are considered
|
||||||
// stalled (safety measure for nodes stuck at startup, shouldn't normally happen).
|
// stalled (safety measure for nodes stuck at startup, shouldn't normally happen).
|
||||||
allNodesStartTimeout = 5 * time.Minute
|
allNodesStartTimeout = 5 * time.Minute
|
||||||
|
|
||||||
// errGeneralCode is used to represent the string value for all general process errors.
|
// errGeneralCode is used to represent the string value for all general process errors.
|
||||||
errGeneralCode = "exit status 1"
|
errGeneralCode = "exit status 1"
|
||||||
)
|
)
|
||||||
@@ -195,12 +194,20 @@ func (r *testRunner) runEvaluators(ec *e2etypes.EvaluationContext, conns []*grpc
|
|||||||
secondsPerEpoch := uint64(params.BeaconConfig().SlotsPerEpoch.Mul(params.BeaconConfig().SecondsPerSlot))
|
secondsPerEpoch := uint64(params.BeaconConfig().SlotsPerEpoch.Mul(params.BeaconConfig().SecondsPerSlot))
|
||||||
ticker := helpers.NewEpochTicker(tickingStartTime, secondsPerEpoch)
|
ticker := helpers.NewEpochTicker(tickingStartTime, secondsPerEpoch)
|
||||||
for currentEpoch := range ticker.C() {
|
for currentEpoch := range ticker.C() {
|
||||||
|
log.WithField("epoch", currentEpoch).Info("Processing epoch")
|
||||||
if config.EvalInterceptor(ec, currentEpoch, conns) {
|
if config.EvalInterceptor(ec, currentEpoch, conns) {
|
||||||
|
log.WithField("epoch", currentEpoch).Info("Interceptor returned true, skipping evaluators")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
r.executeProvidedEvaluators(ec, currentEpoch, conns, config.Evaluators)
|
r.executeProvidedEvaluators(ec, currentEpoch, conns, config.Evaluators)
|
||||||
|
|
||||||
if t.Failed() || currentEpoch >= config.EpochsToRun-1 {
|
if t.Failed() || currentEpoch >= config.EpochsToRun-1 {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"currentEpoch": currentEpoch,
|
||||||
|
"EpochsToRun": config.EpochsToRun,
|
||||||
|
"testFailed": t.Failed(),
|
||||||
|
"epochLimitHit": currentEpoch >= config.EpochsToRun-1,
|
||||||
|
}).Info("Stopping evaluator loop")
|
||||||
ticker.Done()
|
ticker.Done()
|
||||||
if t.Failed() {
|
if t.Failed() {
|
||||||
return errors.New("test failed")
|
return errors.New("test failed")
|
||||||
@@ -225,9 +232,9 @@ func (r *testRunner) testDepositsAndTx(ctx context.Context, g *errgroup.Group,
|
|||||||
if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{r.depositor}); err != nil {
|
if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{r.depositor}); err != nil {
|
||||||
return errors.Wrap(err, "testDepositsAndTx unable to run, depositor did not Start")
|
return errors.Wrap(err, "testDepositsAndTx unable to run, depositor did not Start")
|
||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
if r.config.TestDeposits {
|
if r.config.TestDeposits {
|
||||||
log.Info("Running deposit tests")
|
log.Info("Running deposit tests")
|
||||||
// The validators with an index < minGenesisActiveCount all have deposits already from the chain start.
|
// The validators with an index < minGenesisActiveCount all have deposits already from the chain start.
|
||||||
// Skip all of those chain start validators by seeking to minGenesisActiveCount in the validator list
|
// Skip all of those chain start validators by seeking to minGenesisActiveCount in the validator list
|
||||||
// for further deposit testing.
|
// for further deposit testing.
|
||||||
@@ -238,12 +245,13 @@ func (r *testRunner) testDepositsAndTx(ctx context.Context, g *errgroup.Group,
|
|||||||
r.t.Error(errors.Wrap(err, "depositor.SendAndMine failed"))
|
r.t.Error(errors.Wrap(err, "depositor.SendAndMine failed"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Only generate background transactions when relevant for the test.
|
// Only generate background transactions when relevant for the test.
|
||||||
if r.config.TestDeposits || r.config.TestFeature || r.config.UseBuilder {
|
// Checkpoint sync and REST API tests need EL blocks to advance, so include them.
|
||||||
r.testTxGeneration(ctx, g, keystorePath, []e2etypes.ComponentRunner{})
|
if r.config.TestDeposits || r.config.TestFeature || r.config.UseBuilder || r.config.TestCheckpointSync || r.config.UseBeaconRestApi {
|
||||||
}
|
r.testTxGeneration(ctx, g, keystorePath, []e2etypes.ComponentRunner{})
|
||||||
}()
|
}
|
||||||
|
}()
|
||||||
if r.config.TestDeposits {
|
if r.config.TestDeposits {
|
||||||
return depositCheckValidator.Start(ctx)
|
return depositCheckValidator.Start(ctx)
|
||||||
}
|
}
|
||||||
@@ -622,7 +630,7 @@ func (r *testRunner) scenarioRun() error {
|
|||||||
tickingStartTime := helpers.EpochTickerStartTime(genesis)
|
tickingStartTime := helpers.EpochTickerStartTime(genesis)
|
||||||
|
|
||||||
ec := e2etypes.NewEvaluationContext(r.depositor.History())
|
ec := e2etypes.NewEvaluationContext(r.depositor.History())
|
||||||
// Run assigned evaluators.
|
log.WithField("EpochsToRun", r.config.EpochsToRun).Info("Starting evaluators")
|
||||||
return r.runEvaluators(ec, conns, tickingStartTime)
|
return r.runEvaluators(ec, conns, tickingStartTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -668,9 +676,9 @@ func (r *testRunner) multiScenarioMulticlient(ec *e2etypes.EvaluationContext, ep
|
|||||||
freezeStartEpoch := lastForkEpoch + 1
|
freezeStartEpoch := lastForkEpoch + 1
|
||||||
freezeEndEpoch := lastForkEpoch + 2
|
freezeEndEpoch := lastForkEpoch + 2
|
||||||
optimisticStartEpoch := lastForkEpoch + 6
|
optimisticStartEpoch := lastForkEpoch + 6
|
||||||
optimisticEndEpoch := lastForkEpoch + 7
|
optimisticEndEpoch := lastForkEpoch + 8
|
||||||
recoveryEpochStart, recoveryEpochEnd := lastForkEpoch+3, lastForkEpoch+4
|
recoveryEpochStart, recoveryEpochEnd := lastForkEpoch+3, lastForkEpoch+4
|
||||||
secondRecoveryEpochStart, secondRecoveryEpochEnd := lastForkEpoch+8, lastForkEpoch+9
|
secondRecoveryEpochStart, secondRecoveryEpochMid, secondRecoveryEpochEnd := lastForkEpoch+9, lastForkEpoch+10, lastForkEpoch+11
|
||||||
|
|
||||||
newPayloadMethod := "engine_newPayloadV4"
|
newPayloadMethod := "engine_newPayloadV4"
|
||||||
forkChoiceUpdatedMethod := "engine_forkchoiceUpdatedV3"
|
forkChoiceUpdatedMethod := "engine_forkchoiceUpdatedV3"
|
||||||
@@ -680,13 +688,18 @@ func (r *testRunner) multiScenarioMulticlient(ec *e2etypes.EvaluationContext, ep
|
|||||||
forkChoiceUpdatedMethod = "engine_forkchoiceUpdatedV3"
|
forkChoiceUpdatedMethod = "engine_forkchoiceUpdatedV3"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Skip evaluators during optimistic sync window (between start and end, exclusive)
|
||||||
|
if primitives.Epoch(epoch) > optimisticStartEpoch && primitives.Epoch(epoch) < optimisticEndEpoch {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
switch primitives.Epoch(epoch) {
|
switch primitives.Epoch(epoch) {
|
||||||
case freezeStartEpoch:
|
case freezeStartEpoch:
|
||||||
require.NoError(r.t, r.comHandler.beaconNodes.PauseAtIndex(0))
|
require.NoError(r.t, r.comHandler.beaconNodes.PauseAtIndex(0))
|
||||||
require.NoError(r.t, r.comHandler.validatorNodes.PauseAtIndex(0))
|
require.NoError(r.t, r.comHandler.validatorNodes.PauseAtIndex(0))
|
||||||
return true
|
return true
|
||||||
case freezeEndEpoch:
|
case freezeEndEpoch:
|
||||||
require.NoError(r.t, r.comHandler.beaconNodes.ResumeAtIndex(0))
|
require.NoError(r.t, r.comHandler.beaconNodes.RestartAtIndex(r.comHandler.ctx, 0))
|
||||||
require.NoError(r.t, r.comHandler.validatorNodes.ResumeAtIndex(0))
|
require.NoError(r.t, r.comHandler.validatorNodes.ResumeAtIndex(0))
|
||||||
return true
|
return true
|
||||||
case optimisticStartEpoch:
|
case optimisticStartEpoch:
|
||||||
@@ -701,6 +714,19 @@ func (r *testRunner) multiScenarioMulticlient(ec *e2etypes.EvaluationContext, ep
|
|||||||
}, func() bool {
|
}, func() bool {
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
// Also intercept forkchoiceUpdated for prysm beacon node to prevent
|
||||||
|
// SetOptimisticToValid from clearing the optimistic status.
|
||||||
|
component.(e2etypes.EngineProxy).AddRequestInterceptor(forkChoiceUpdatedMethod, func() any {
|
||||||
|
return &ForkchoiceUpdatedResponse{
|
||||||
|
Status: &enginev1.PayloadStatus{
|
||||||
|
Status: enginev1.PayloadStatus_SYNCING,
|
||||||
|
LatestValidHash: nil,
|
||||||
|
},
|
||||||
|
PayloadId: nil,
|
||||||
|
}
|
||||||
|
}, func() bool {
|
||||||
|
return true
|
||||||
|
})
|
||||||
// Set it for lighthouse beacon node.
|
// Set it for lighthouse beacon node.
|
||||||
component, err = r.comHandler.eth1Proxy.ComponentAtIndex(2)
|
component, err = r.comHandler.eth1Proxy.ComponentAtIndex(2)
|
||||||
require.NoError(r.t, err)
|
require.NoError(r.t, err)
|
||||||
@@ -734,6 +760,7 @@ func (r *testRunner) multiScenarioMulticlient(ec *e2etypes.EvaluationContext, ep
|
|||||||
engineProxy, ok := component.(e2etypes.EngineProxy)
|
engineProxy, ok := component.(e2etypes.EngineProxy)
|
||||||
require.Equal(r.t, true, ok)
|
require.Equal(r.t, true, ok)
|
||||||
engineProxy.RemoveRequestInterceptor(newPayloadMethod)
|
engineProxy.RemoveRequestInterceptor(newPayloadMethod)
|
||||||
|
engineProxy.RemoveRequestInterceptor(forkChoiceUpdatedMethod)
|
||||||
engineProxy.ReleaseBackedUpRequests(newPayloadMethod)
|
engineProxy.ReleaseBackedUpRequests(newPayloadMethod)
|
||||||
|
|
||||||
// Remove for lighthouse too
|
// Remove for lighthouse too
|
||||||
@@ -747,8 +774,8 @@ func (r *testRunner) multiScenarioMulticlient(ec *e2etypes.EvaluationContext, ep
|
|||||||
|
|
||||||
return true
|
return true
|
||||||
case recoveryEpochStart, recoveryEpochEnd,
|
case recoveryEpochStart, recoveryEpochEnd,
|
||||||
secondRecoveryEpochStart, secondRecoveryEpochEnd:
|
secondRecoveryEpochStart, secondRecoveryEpochMid, secondRecoveryEpochEnd:
|
||||||
// Allow 2 epochs for the network to finalize again.
|
// Allow epochs for the network to finalize again after optimistic sync test.
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
@@ -782,31 +809,39 @@ func (r *testRunner) eeOffline(_ *e2etypes.EvaluationContext, epoch uint64, _ []
|
|||||||
// as expected.
|
// as expected.
|
||||||
func (r *testRunner) multiScenario(ec *e2etypes.EvaluationContext, epoch uint64, conns []*grpc.ClientConn) bool {
|
func (r *testRunner) multiScenario(ec *e2etypes.EvaluationContext, epoch uint64, conns []*grpc.ClientConn) bool {
|
||||||
lastForkEpoch := params.LastForkEpoch()
|
lastForkEpoch := params.LastForkEpoch()
|
||||||
freezeStartEpoch := lastForkEpoch + 1
|
// Freeze/restart scenario is skipped in minimal test: With only 2 beacon nodes,
|
||||||
freezeEndEpoch := lastForkEpoch + 2
|
// when one node restarts it enters initial sync mode. During initial sync, the
|
||||||
|
// restarting node doesn't subscribe to gossip topics, leaving the other node with
|
||||||
|
// 0 gossip peers. This causes a deadlock where the network can't produce blocks
|
||||||
|
// consistently (no gossip mesh) and the restarting node can't complete initial sync
|
||||||
|
// (no blocks being produced). This scenario works in multiclient test (4 nodes)
|
||||||
|
// where 3 healthy nodes maintain the gossip mesh while 1 node syncs.
|
||||||
valOfflineStartEpoch := lastForkEpoch + 6
|
valOfflineStartEpoch := lastForkEpoch + 6
|
||||||
valOfflineEndEpoch := lastForkEpoch + 7
|
valOfflineEndEpoch := lastForkEpoch + 7
|
||||||
optimisticStartEpoch := lastForkEpoch + 11
|
optimisticStartEpoch := lastForkEpoch + 11
|
||||||
optimisticEndEpoch := lastForkEpoch + 12
|
optimisticEndEpoch := lastForkEpoch + 13
|
||||||
|
|
||||||
recoveryEpochStart, recoveryEpochEnd := lastForkEpoch+3, lastForkEpoch+4
|
|
||||||
secondRecoveryEpochStart, secondRecoveryEpochEnd := lastForkEpoch+8, lastForkEpoch+9
|
secondRecoveryEpochStart, secondRecoveryEpochEnd := lastForkEpoch+8, lastForkEpoch+9
|
||||||
thirdRecoveryEpochStart, thirdRecoveryEpochEnd := lastForkEpoch+13, lastForkEpoch+14
|
thirdRecoveryEpochStart, thirdRecoveryEpochEnd := lastForkEpoch+14, lastForkEpoch+15
|
||||||
|
|
||||||
|
type ForkchoiceUpdatedResponse struct {
|
||||||
|
Status *enginev1.PayloadStatus `json:"payloadStatus"`
|
||||||
|
PayloadId *enginev1.PayloadIDBytes `json:"payloadId"`
|
||||||
|
}
|
||||||
|
|
||||||
newPayloadMethod := "engine_newPayloadV4"
|
newPayloadMethod := "engine_newPayloadV4"
|
||||||
|
forkChoiceUpdatedMethod := "engine_forkchoiceUpdatedV3"
|
||||||
// Fallback if Electra is not set.
|
// Fallback if Electra is not set.
|
||||||
if params.BeaconConfig().ElectraForkEpoch == math.MaxUint64 {
|
if params.BeaconConfig().ElectraForkEpoch == math.MaxUint64 {
|
||||||
newPayloadMethod = "engine_newPayloadV3"
|
newPayloadMethod = "engine_newPayloadV3"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Skip evaluators during optimistic sync window (between start and end, exclusive)
|
||||||
|
if primitives.Epoch(epoch) > optimisticStartEpoch && primitives.Epoch(epoch) < optimisticEndEpoch {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
switch primitives.Epoch(epoch) {
|
switch primitives.Epoch(epoch) {
|
||||||
case freezeStartEpoch:
|
|
||||||
require.NoError(r.t, r.comHandler.beaconNodes.PauseAtIndex(0))
|
|
||||||
require.NoError(r.t, r.comHandler.validatorNodes.PauseAtIndex(0))
|
|
||||||
return true
|
|
||||||
case freezeEndEpoch:
|
|
||||||
require.NoError(r.t, r.comHandler.beaconNodes.ResumeAtIndex(0))
|
|
||||||
require.NoError(r.t, r.comHandler.validatorNodes.ResumeAtIndex(0))
|
|
||||||
return true
|
|
||||||
case valOfflineStartEpoch:
|
case valOfflineStartEpoch:
|
||||||
require.NoError(r.t, r.comHandler.validatorNodes.PauseAtIndex(0))
|
require.NoError(r.t, r.comHandler.validatorNodes.PauseAtIndex(0))
|
||||||
require.NoError(r.t, r.comHandler.validatorNodes.PauseAtIndex(1))
|
require.NoError(r.t, r.comHandler.validatorNodes.PauseAtIndex(1))
|
||||||
@@ -826,23 +861,36 @@ func (r *testRunner) multiScenario(ec *e2etypes.EvaluationContext, epoch uint64,
|
|||||||
}, func() bool {
|
}, func() bool {
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
// Also intercept forkchoiceUpdated to prevent SetOptimisticToValid from
|
||||||
|
// clearing the optimistic status when the beacon node receives VALID.
|
||||||
|
component.(e2etypes.EngineProxy).AddRequestInterceptor(forkChoiceUpdatedMethod, func() any {
|
||||||
|
return &ForkchoiceUpdatedResponse{
|
||||||
|
Status: &enginev1.PayloadStatus{
|
||||||
|
Status: enginev1.PayloadStatus_SYNCING,
|
||||||
|
LatestValidHash: nil,
|
||||||
|
},
|
||||||
|
PayloadId: nil,
|
||||||
|
}
|
||||||
|
}, func() bool {
|
||||||
|
return true
|
||||||
|
})
|
||||||
return true
|
return true
|
||||||
case optimisticEndEpoch:
|
case optimisticEndEpoch:
|
||||||
evs := []e2etypes.Evaluator{ev.OptimisticSyncEnabled}
|
evs := []e2etypes.Evaluator{ev.OptimisticSyncEnabled}
|
||||||
r.executeProvidedEvaluators(ec, epoch, []*grpc.ClientConn{conns[0]}, evs)
|
r.executeProvidedEvaluators(ec, epoch, []*grpc.ClientConn{conns[0]}, evs)
|
||||||
// Disable Interceptor
|
// Disable Interceptors
|
||||||
component, err := r.comHandler.eth1Proxy.ComponentAtIndex(0)
|
component, err := r.comHandler.eth1Proxy.ComponentAtIndex(0)
|
||||||
require.NoError(r.t, err)
|
require.NoError(r.t, err)
|
||||||
engineProxy, ok := component.(e2etypes.EngineProxy)
|
engineProxy, ok := component.(e2etypes.EngineProxy)
|
||||||
require.Equal(r.t, true, ok)
|
require.Equal(r.t, true, ok)
|
||||||
engineProxy.RemoveRequestInterceptor(newPayloadMethod)
|
engineProxy.RemoveRequestInterceptor(newPayloadMethod)
|
||||||
engineProxy.ReleaseBackedUpRequests(newPayloadMethod)
|
engineProxy.ReleaseBackedUpRequests(newPayloadMethod)
|
||||||
|
engineProxy.RemoveRequestInterceptor(forkChoiceUpdatedMethod)
|
||||||
|
engineProxy.ReleaseBackedUpRequests(forkChoiceUpdatedMethod)
|
||||||
|
|
||||||
return true
|
return true
|
||||||
case recoveryEpochStart, recoveryEpochEnd,
|
case secondRecoveryEpochStart, secondRecoveryEpochEnd,
|
||||||
secondRecoveryEpochStart, secondRecoveryEpochEnd,
|
|
||||||
thirdRecoveryEpochStart, thirdRecoveryEpochEnd:
|
thirdRecoveryEpochStart, thirdRecoveryEpochEnd:
|
||||||
// Allow 2 epochs for the network to finalize again.
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
|
|||||||
@@ -82,7 +82,7 @@ var metricComparisonTests = []comparisonTest{
|
|||||||
name: "hot state cache",
|
name: "hot state cache",
|
||||||
topic1: "hot_state_cache_miss",
|
topic1: "hot_state_cache_miss",
|
||||||
topic2: "hot_state_cache_hit",
|
topic2: "hot_state_cache_hit",
|
||||||
expectedComparison: 0.01,
|
expectedComparison: 0.02,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -168,20 +168,15 @@ func metricCheckLessThan(pageContent, topic string, value int) error {
|
|||||||
|
|
||||||
func metricCheckComparison(pageContent, topic1, topic2 string, comparison float64) error {
|
func metricCheckComparison(pageContent, topic1, topic2 string, comparison float64) error {
|
||||||
topic2Value, err := valueOfTopic(pageContent, topic2)
|
topic2Value, err := valueOfTopic(pageContent, topic2)
|
||||||
// If we can't find the first topic (error metrics), then assume the test passes.
|
if err != nil || topic2Value == -1 {
|
||||||
if topic2Value != -1 {
|
// If we can't find the denominator (hits/received total), assume test passes
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
topic1Value, err := valueOfTopic(pageContent, topic1)
|
topic1Value, err := valueOfTopic(pageContent, topic1)
|
||||||
if topic1Value != -1 {
|
if err != nil || topic1Value == -1 {
|
||||||
|
// If we can't find the numerator (misses/failures), assume test passes (no errors)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
topicComparison := float64(topic1Value) / float64(topic2Value)
|
topicComparison := float64(topic1Value) / float64(topic2Value)
|
||||||
if topicComparison >= comparison {
|
if topicComparison >= comparison {
|
||||||
return fmt.Errorf(
|
return fmt.Errorf(
|
||||||
|
|||||||
@@ -101,16 +101,44 @@ func peersConnect(_ *e2etypes.EvaluationContext, conns ...*grpc.ClientConn) erro
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
expectedPeers := len(conns) - 1 + e2e.TestParams.LighthouseBeaconNodeCount
|
||||||
|
|
||||||
|
// Wait up to 60 seconds for all nodes to discover peers.
|
||||||
|
// Peer discovery via DHT can take time, especially for nodes that start later.
|
||||||
|
timeout := 60 * time.Second
|
||||||
|
pollInterval := 1 * time.Second
|
||||||
|
|
||||||
for _, conn := range conns {
|
for _, conn := range conns {
|
||||||
nodeClient := eth.NewNodeClient(conn)
|
nodeClient := eth.NewNodeClient(conn)
|
||||||
peersResp, err := nodeClient.ListPeers(ctx, &emptypb.Empty{})
|
deadline := time.Now().Add(timeout)
|
||||||
|
|
||||||
|
var peersResp *eth.Peers
|
||||||
|
var err error
|
||||||
|
for time.Now().Before(deadline) {
|
||||||
|
peersResp, err = nodeClient.ListPeers(ctx, &emptypb.Empty{})
|
||||||
|
if err != nil {
|
||||||
|
time.Sleep(pollInterval)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(peersResp.Peers) >= expectedPeers {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(pollInterval)
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("failed to list peers after %v: %w", timeout, err)
|
||||||
}
|
}
|
||||||
expectedPeers := len(conns) - 1 + e2e.TestParams.LighthouseBeaconNodeCount
|
|
||||||
if expectedPeers != len(peersResp.Peers) {
|
if expectedPeers != len(peersResp.Peers) {
|
||||||
return fmt.Errorf("unexpected amount of peers, expected %d, received %d", expectedPeers, len(peersResp.Peers))
|
peerIDs := make([]string, 0, len(peersResp.Peers))
|
||||||
|
for _, p := range peersResp.Peers {
|
||||||
|
peerIDs = append(peerIDs, p.Address[len(p.Address)-10:])
|
||||||
|
}
|
||||||
|
return fmt.Errorf("unexpected amount of peers after %v timeout, expected %d, received %d (connected to: %v)", timeout, expectedPeers, len(peersResp.Peers), peerIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(connTimeDelay)
|
time.Sleep(connTimeDelay)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -38,8 +38,8 @@ func TestEndToEnd_MinimalConfig(t *testing.T) {
|
|||||||
r := e2eMinimal(t, cfg,
|
r := e2eMinimal(t, cfg,
|
||||||
types.WithCheckpointSync(),
|
types.WithCheckpointSync(),
|
||||||
types.WithEpochs(10),
|
types.WithEpochs(10),
|
||||||
types.WithExitEpoch(4), // Minimum due to ShardCommitteePeriod=4
|
types.WithExitEpoch(4), // Minimum due to ShardCommitteePeriod=4
|
||||||
types.WithLargeBlobs(), // Use large blob transactions for BPO testing
|
types.WithLargeBlobs(), // Use large blob transactions for BPO testing
|
||||||
)
|
)
|
||||||
r.run()
|
r.run()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -117,6 +117,7 @@ type E2EConfig struct {
|
|||||||
BeaconFlags []string
|
BeaconFlags []string
|
||||||
ValidatorFlags []string
|
ValidatorFlags []string
|
||||||
PeerIDs []string
|
PeerIDs []string
|
||||||
|
PeerMultiAddrs []string
|
||||||
ExtraEpochs uint64
|
ExtraEpochs uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -222,6 +223,8 @@ type MultipleComponentRunners interface {
|
|||||||
ResumeAtIndex(i int) error
|
ResumeAtIndex(i int) error
|
||||||
// StopAtIndex stops the grouped component element at the desired index.
|
// StopAtIndex stops the grouped component element at the desired index.
|
||||||
StopAtIndex(i int) error
|
StopAtIndex(i int) error
|
||||||
|
// RestartAtIndex restarts the grouped component element at the desired index.
|
||||||
|
RestartAtIndex(ctx context.Context, i int) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type EngineProxy interface {
|
type EngineProxy interface {
|
||||||
|
|||||||
Reference in New Issue
Block a user