mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-31 08:08:18 -05:00
Compare commits
10 Commits
gloas-ligh
...
e2e-debugg
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
35720e3f71 | ||
|
|
6effcf5d53 | ||
|
|
bd778dad3a | ||
|
|
ceadf6e5c9 | ||
|
|
22769ed486 | ||
|
|
b960e54e00 | ||
|
|
1c65c8866a | ||
|
|
14a4b97d57 | ||
|
|
0e537694c3 | ||
|
|
1b190e966e |
@@ -67,7 +67,6 @@ func getSubscriptionStatusFromDB(t *testing.T, db *Store) bool {
|
||||
return subscribed
|
||||
}
|
||||
|
||||
|
||||
func TestUpdateCustodyInfo(t *testing.T) {
|
||||
ctx := t.Context()
|
||||
|
||||
|
||||
@@ -22,6 +22,10 @@ var ErrNotFoundFeeRecipient = errors.Wrap(ErrNotFound, "fee recipient")
|
||||
// ErrNotFoundMetadataSeqNum is a not found error specifically for the metadata sequence number getter
|
||||
var ErrNotFoundMetadataSeqNum = errors.Wrap(ErrNotFound, "metadata sequence number")
|
||||
|
||||
// ErrStateDiffIncompatible is returned when state-diff feature is enabled
|
||||
// but the database was created without state-diff support.
|
||||
var ErrStateDiffIncompatible = errors.New("state-diff feature enabled but database was created without state-diff support")
|
||||
|
||||
var errEmptyBlockSlice = errors.New("[]blocks.ROBlock is empty")
|
||||
var errIncorrectBlockParent = errors.New("unexpected missing or forked blocks in a []ROBlock")
|
||||
var errFinalizedChildNotFound = errors.New("unable to find finalized root descending from backfill batch")
|
||||
|
||||
@@ -42,6 +42,10 @@ func (s *Store) SaveGenesisData(ctx context.Context, genesisState state.BeaconSt
|
||||
if err := s.SaveGenesisBlockRoot(ctx, genesisBlkRoot); err != nil {
|
||||
return errors.Wrap(err, "could not save genesis block root")
|
||||
}
|
||||
|
||||
if err := s.initializeStateDiff(0, genesisState); err != nil {
|
||||
return errors.Wrap(err, "failed to initialize state diff for genesis")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -203,15 +203,45 @@ func NewKVStore(ctx context.Context, dirPath string, opts ...KVStoreOption) (*St
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if features.Get().EnableStateDiff {
|
||||
sdCache, err := newStateDiffCache(kv)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if err := kv.startStateDiff(ctx); err != nil {
|
||||
if errors.Is(err, ErrStateDiffIncompatible) {
|
||||
return kv, err
|
||||
}
|
||||
kv.stateDiffCache = sdCache
|
||||
return nil, err
|
||||
}
|
||||
return kv, nil
|
||||
}
|
||||
|
||||
func (kv *Store) startStateDiff(ctx context.Context) error {
|
||||
if !features.Get().EnableStateDiff {
|
||||
return nil
|
||||
}
|
||||
// Check if offset already exists (existing state-diff database).
|
||||
hasOffset, err := kv.hasStateDiffOffset()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return kv, nil
|
||||
if hasOffset {
|
||||
// Existing state-diff database - restarts not yet supported.
|
||||
return errors.New("restarting with existing state-diff database not yet supported")
|
||||
}
|
||||
|
||||
// Check if this is a new database (no head block).
|
||||
headBlock, err := kv.HeadBlock(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not get head block")
|
||||
}
|
||||
|
||||
if headBlock == nil {
|
||||
// New database - will be initialized later during checkpoint/genesis sync.
|
||||
// stateDiffCache stays nil until SaveOrigin or SaveGenesisData initializes it.
|
||||
log.Info("State-diff enabled: will be initialized during checkpoint or genesis sync")
|
||||
} else {
|
||||
// Existing database without state-diff - return store with error for caller to handle.
|
||||
return ErrStateDiffIncompatible
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ClearDB removes the previously stored database in the data directory.
|
||||
|
||||
@@ -9,11 +9,13 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
|
||||
statenative "github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native"
|
||||
"github.com/OffchainLabs/prysm/v7/cmd/beacon-chain/flags"
|
||||
"github.com/OffchainLabs/prysm/v7/config/features"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/hdiff"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
|
||||
"github.com/OffchainLabs/prysm/v7/math"
|
||||
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
||||
"github.com/OffchainLabs/prysm/v7/runtime/version"
|
||||
pkgerrors "github.com/pkg/errors"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
@@ -122,6 +124,66 @@ func (s *Store) getOffset() uint64 {
|
||||
return s.stateDiffCache.getOffset()
|
||||
}
|
||||
|
||||
// hasStateDiffOffset checks if the state-diff offset has been set in the database.
|
||||
// This is used to detect if an existing database has state-diff enabled.
|
||||
func (s *Store) hasStateDiffOffset() (bool, error) {
|
||||
var hasOffset bool
|
||||
err := s.db.View(func(tx *bbolt.Tx) error {
|
||||
bucket := tx.Bucket(stateDiffBucket)
|
||||
if bucket == nil {
|
||||
return nil
|
||||
}
|
||||
hasOffset = bucket.Get(offsetKey) != nil
|
||||
return nil
|
||||
})
|
||||
return hasOffset, err
|
||||
}
|
||||
|
||||
// initializeStateDiff sets up the state-diff schema for a new database.
|
||||
// This should be called during checkpoint sync or genesis sync.
|
||||
func (s *Store) initializeStateDiff(slot primitives.Slot, initialState state.ReadOnlyBeaconState) error {
|
||||
// Return early if the feature is not set
|
||||
if !features.Get().EnableStateDiff {
|
||||
return nil
|
||||
}
|
||||
// Only reinitialize if the offset is different
|
||||
if s.stateDiffCache != nil {
|
||||
if s.stateDiffCache.getOffset() == uint64(slot) {
|
||||
log.WithField("offset", slot).Warning("Ignoring state diff cache reinitialization")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
// Write offset directly to the database (without using cache which doesn't exist yet).
|
||||
err := s.db.Update(func(tx *bbolt.Tx) error {
|
||||
bucket := tx.Bucket(stateDiffBucket)
|
||||
if bucket == nil {
|
||||
return bbolt.ErrBucketNotFound
|
||||
}
|
||||
|
||||
offsetBytes := make([]byte, 8)
|
||||
binary.LittleEndian.PutUint64(offsetBytes, uint64(slot))
|
||||
return bucket.Put(offsetKey, offsetBytes)
|
||||
})
|
||||
if err != nil {
|
||||
return pkgerrors.Wrap(err, "failed to set offset")
|
||||
}
|
||||
|
||||
// Create the state diff cache (this will read the offset from the database).
|
||||
sdCache, err := newStateDiffCache(s)
|
||||
if err != nil {
|
||||
return pkgerrors.Wrap(err, "failed to create state diff cache")
|
||||
}
|
||||
s.stateDiffCache = sdCache
|
||||
|
||||
// Save the initial state as a full snapshot.
|
||||
if err := s.saveFullSnapshot(initialState); err != nil {
|
||||
return pkgerrors.Wrap(err, "failed to save initial snapshot")
|
||||
}
|
||||
|
||||
log.WithField("offset", slot).Info("Initialized state-diff cache")
|
||||
return nil
|
||||
}
|
||||
|
||||
func keyForSnapshot(v int) ([]byte, error) {
|
||||
switch v {
|
||||
case version.Fulu:
|
||||
|
||||
@@ -110,6 +110,8 @@ func (s *Store) SaveOrigin(ctx context.Context, serState, serBlock []byte) error
|
||||
if err = s.SaveFinalizedCheckpoint(ctx, chkpt); err != nil {
|
||||
return errors.Wrap(err, "save finalized checkpoint")
|
||||
}
|
||||
|
||||
if err := s.initializeStateDiff(state.Slot(), state); err != nil {
|
||||
return errors.Wrap(err, "failed to initialize state diff")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -535,7 +535,12 @@ func openDB(ctx context.Context, dbPath string, clearer *dbClearer) (*kv.Store,
|
||||
log.WithField("databasePath", dbPath).Info("Checking DB")
|
||||
|
||||
d, err := kv.NewKVStore(ctx, dbPath)
|
||||
if err != nil {
|
||||
if errors.Is(err, kv.ErrStateDiffIncompatible) {
|
||||
log.WithError(err).Warn("Disabling state-diff feature")
|
||||
cfg := features.Get()
|
||||
cfg.EnableStateDiff = false
|
||||
features.Init(cfg)
|
||||
} else if err != nil {
|
||||
return nil, errors.Wrapf(err, "could not create database at %s", dbPath)
|
||||
}
|
||||
|
||||
|
||||
@@ -575,7 +575,7 @@ func (s *Service) beaconEndpoints(
|
||||
name: namespace + ".PublishBlockV2",
|
||||
middleware: []middleware.Middleware{
|
||||
middleware.ContentTypeHandler([]string{api.JsonMediaType, api.OctetStreamMediaType}),
|
||||
middleware.AcceptHeaderHandler([]string{api.JsonMediaType}),
|
||||
middleware.AcceptHeaderHandler([]string{api.JsonMediaType, api.OctetStreamMediaType}),
|
||||
middleware.AcceptEncodingHeaderHandler(),
|
||||
},
|
||||
handler: server.PublishBlockV2,
|
||||
@@ -586,7 +586,7 @@ func (s *Service) beaconEndpoints(
|
||||
name: namespace + ".PublishBlindedBlockV2",
|
||||
middleware: []middleware.Middleware{
|
||||
middleware.ContentTypeHandler([]string{api.JsonMediaType, api.OctetStreamMediaType}),
|
||||
middleware.AcceptHeaderHandler([]string{api.JsonMediaType}),
|
||||
middleware.AcceptHeaderHandler([]string{api.JsonMediaType, api.OctetStreamMediaType}),
|
||||
middleware.AcceptEncodingHeaderHandler(),
|
||||
},
|
||||
handler: server.PublishBlindedBlockV2,
|
||||
|
||||
@@ -26,8 +26,8 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/voluntaryexits/mock"
|
||||
p2pMock "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/testing"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/rpc/core"
|
||||
mockSync "github.com/OffchainLabs/prysm/v7/beacon-chain/sync/initial-sync/testing"
|
||||
state_native "github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native"
|
||||
mockSync "github.com/OffchainLabs/prysm/v7/beacon-chain/sync/initial-sync/testing"
|
||||
"github.com/OffchainLabs/prysm/v7/config/params"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
|
||||
"github.com/OffchainLabs/prysm/v7/crypto/bls"
|
||||
|
||||
@@ -83,6 +83,7 @@ func TestGetSpec(t *testing.T) {
|
||||
config.ElectraForkEpoch = 107
|
||||
config.FuluForkVersion = []byte("FuluForkVersion")
|
||||
config.FuluForkEpoch = 109
|
||||
config.GloasForkEpoch = 110
|
||||
config.BLSWithdrawalPrefixByte = byte('b')
|
||||
config.ETH1AddressWithdrawalPrefixByte = byte('c')
|
||||
config.GenesisDelay = 24
|
||||
@@ -134,6 +135,10 @@ func TestGetSpec(t *testing.T) {
|
||||
config.AttestationDueBPS = primitives.BP(122)
|
||||
config.AggregateDueBPS = primitives.BP(123)
|
||||
config.ContributionDueBPS = primitives.BP(124)
|
||||
config.AttestationDueBPSGloas = primitives.BP(126)
|
||||
config.AggregateDueBPSGloas = primitives.BP(127)
|
||||
config.SyncMessageDueBPSGloas = primitives.BP(128)
|
||||
config.ContributionDueBPSGloas = primitives.BP(129)
|
||||
config.TerminalBlockHash = common.HexToHash("TerminalBlockHash")
|
||||
config.TerminalBlockHashActivationEpoch = 72
|
||||
config.TerminalTotalDifficulty = "73"
|
||||
@@ -215,7 +220,7 @@ func TestGetSpec(t *testing.T) {
|
||||
require.NoError(t, json.Unmarshal(writer.Body.Bytes(), &resp))
|
||||
data, ok := resp.Data.(map[string]any)
|
||||
require.Equal(t, true, ok)
|
||||
assert.Equal(t, 187, len(data))
|
||||
assert.Equal(t, 192, len(data))
|
||||
for k, v := range data {
|
||||
t.Run(k, func(t *testing.T) {
|
||||
switch k {
|
||||
@@ -295,6 +300,8 @@ func TestGetSpec(t *testing.T) {
|
||||
assert.Equal(t, "0x"+hex.EncodeToString([]byte("FuluForkVersion")), v)
|
||||
case "FULU_FORK_EPOCH":
|
||||
assert.Equal(t, "109", v)
|
||||
case "GLOAS_FORK_EPOCH":
|
||||
assert.Equal(t, "110", v)
|
||||
case "MIN_ANCHOR_POW_BLOCK_DIFFICULTY":
|
||||
assert.Equal(t, "1000", v)
|
||||
case "BLS_WITHDRAWAL_PREFIX":
|
||||
@@ -479,6 +486,14 @@ func TestGetSpec(t *testing.T) {
|
||||
assert.Equal(t, "123", v)
|
||||
case "CONTRIBUTION_DUE_BPS":
|
||||
assert.Equal(t, "124", v)
|
||||
case "ATTESTATION_DUE_BPS_GLOAS":
|
||||
assert.Equal(t, "126", v)
|
||||
case "AGGREGATE_DUE_BPS_GLOAS":
|
||||
assert.Equal(t, "127", v)
|
||||
case "SYNC_MESSAGE_DUE_BPS_GLOAS":
|
||||
assert.Equal(t, "128", v)
|
||||
case "CONTRIBUTION_DUE_BPS_GLOAS":
|
||||
assert.Equal(t, "129", v)
|
||||
case "MAX_PER_EPOCH_ACTIVATION_CHURN_LIMIT":
|
||||
assert.Equal(t, "8", v)
|
||||
case "MAX_REQUEST_LIGHT_CLIENT_UPDATES":
|
||||
|
||||
@@ -137,7 +137,7 @@ func (s *State) migrateToColdHdiff(ctx context.Context, fRoot [32]byte) error {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
_, lvl, err := s.beaconDB.SlotInDiffTree(slot)
|
||||
offset, lvl, err := s.beaconDB.SlotInDiffTree(slot)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("could not determine if slot %d is in diff tree", slot)
|
||||
continue
|
||||
@@ -145,6 +145,9 @@ func (s *State) migrateToColdHdiff(ctx context.Context, fRoot [32]byte) error {
|
||||
if lvl == -1 {
|
||||
continue
|
||||
}
|
||||
if uint64(slot) == offset {
|
||||
continue
|
||||
}
|
||||
// The state needs to be saved.
|
||||
// Try the epoch boundary cache first.
|
||||
cached, exists, err := s.epochBoundaryStateCache.getBySlot(slot)
|
||||
|
||||
@@ -1027,10 +1027,10 @@ func TestGetVerifyingStateEdgeCases(t *testing.T) {
|
||||
sc: signatureCache,
|
||||
sr: &mockStateByRooter{sbr: sbrErrorIfCalled(t)}, // Should not be called
|
||||
hsp: &mockHeadStateProvider{
|
||||
headRoot: parentRoot[:], // Same as parent
|
||||
headSlot: 32, // Epoch 1
|
||||
headState: fuluState.Copy(), // HeadState (not ReadOnly) for ProcessSlots
|
||||
headStateReadOnly: nil, // Should not use ReadOnly path
|
||||
headRoot: parentRoot[:], // Same as parent
|
||||
headSlot: 32, // Epoch 1
|
||||
headState: fuluState.Copy(), // HeadState (not ReadOnly) for ProcessSlots
|
||||
headStateReadOnly: nil, // Should not use ReadOnly path
|
||||
},
|
||||
fc: &mockForkchoicer{
|
||||
// Return same root for both to simulate same chain
|
||||
@@ -1045,8 +1045,8 @@ func TestGetVerifyingStateEdgeCases(t *testing.T) {
|
||||
// Wrap to detect HeadState call
|
||||
originalHsp := initializer.shared.hsp.(*mockHeadStateProvider)
|
||||
wrappedHsp := &mockHeadStateProvider{
|
||||
headRoot: originalHsp.headRoot,
|
||||
headSlot: originalHsp.headSlot,
|
||||
headRoot: originalHsp.headRoot,
|
||||
headSlot: originalHsp.headSlot,
|
||||
headState: originalHsp.headState,
|
||||
}
|
||||
initializer.shared.hsp = &headStateCallTracker{
|
||||
|
||||
3
changelog/bastin_path-ephemeral-log.md
Normal file
3
changelog/bastin_path-ephemeral-log.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Ignored
|
||||
|
||||
- Added a field `path` for the ephemeral log file initialization log.
|
||||
3
changelog/farazdagi_fix-hashtree-darwin-amd64.md
Normal file
3
changelog/farazdagi_fix-hashtree-darwin-amd64.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Fixed
|
||||
|
||||
- Fix Bazel build failure on macOS x86_64 (darwin_amd64) (adds missing assembly stub to hashtree patch).
|
||||
3
changelog/potuz_hdiff_start_db.md
Normal file
3
changelog/potuz_hdiff_start_db.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Added
|
||||
|
||||
- Initialize db with state-diff feature flag.
|
||||
2
changelog/terencechain_gloas-duty-timings.md
Normal file
2
changelog/terencechain_gloas-duty-timings.md
Normal file
@@ -0,0 +1,2 @@
|
||||
### Added
|
||||
- Gloas-specific timing intervals for validator attestation, aggregation, and sync duties.
|
||||
@@ -160,6 +160,7 @@ var appFlags = []cli.Flag{
|
||||
dasFlags.BackfillOldestSlot,
|
||||
dasFlags.BlobRetentionEpochFlag,
|
||||
flags.BatchVerifierLimit,
|
||||
flags.StateDiffExponents,
|
||||
flags.DisableEphemeralLogFile,
|
||||
}
|
||||
|
||||
|
||||
@@ -74,6 +74,7 @@ var appHelpFlagGroups = []flagGroup{
|
||||
flags.RPCHost,
|
||||
flags.RPCPort,
|
||||
flags.BatchVerifierLimit,
|
||||
flags.StateDiffExponents,
|
||||
},
|
||||
},
|
||||
{
|
||||
|
||||
@@ -280,6 +280,7 @@ var BeaconChainFlags = combinedFlags([]cli.Flag{
|
||||
DisableQUIC,
|
||||
EnableDiscoveryReboot,
|
||||
enableExperimentalAttestationPool,
|
||||
EnableStateDiff,
|
||||
forceHeadFlag,
|
||||
blacklistRoots,
|
||||
enableHashtree,
|
||||
|
||||
@@ -91,6 +91,10 @@ type BeaconChainConfig struct {
|
||||
AggregateDueBPS primitives.BP `yaml:"AGGREGATE_DUE_BPS" spec:"true"` // AggregateDueBPS defines the aggregate due time in basis points of the slot.
|
||||
SyncMessageDueBPS primitives.BP `yaml:"SYNC_MESSAGE_DUE_BPS" spec:"true"` // SyncMessageDueBPS defines the sync message due time in basis points of the slot.
|
||||
ContributionDueBPS primitives.BP `yaml:"CONTRIBUTION_DUE_BPS" spec:"true"` // ContributionDueBPS defines the contribution due time in basis points of the slot.
|
||||
AttestationDueBPSGloas primitives.BP `yaml:"ATTESTATION_DUE_BPS_GLOAS" spec:"true"` // AttestationDueBPSGloas defines the attestation due time in basis points of the slot (Gloas).
|
||||
AggregateDueBPSGloas primitives.BP `yaml:"AGGREGATE_DUE_BPS_GLOAS" spec:"true"` // AggregateDueBPSGloas defines the aggregate due time in basis points of the slot (Gloas).
|
||||
SyncMessageDueBPSGloas primitives.BP `yaml:"SYNC_MESSAGE_DUE_BPS_GLOAS" spec:"true"` // SyncMessageDueBPSGloas defines the sync message due time in basis points of the slot (Gloas).
|
||||
ContributionDueBPSGloas primitives.BP `yaml:"CONTRIBUTION_DUE_BPS_GLOAS" spec:"true"` // ContributionDueBPSGloas defines the contribution due time in basis points of the slot (Gloas).
|
||||
|
||||
// Ethereum PoW parameters.
|
||||
DepositChainID uint64 `yaml:"DEPOSIT_CHAIN_ID" spec:"true"` // DepositChainID of the eth1 network. This used for replay protection.
|
||||
@@ -188,6 +192,7 @@ type BeaconChainConfig struct {
|
||||
ElectraForkEpoch primitives.Epoch `yaml:"ELECTRA_FORK_EPOCH" spec:"true"` // ElectraForkEpoch is used to represent the assigned fork epoch for electra.
|
||||
FuluForkVersion []byte `yaml:"FULU_FORK_VERSION" spec:"true"` // FuluForkVersion is used to represent the fork version for fulu.
|
||||
FuluForkEpoch primitives.Epoch `yaml:"FULU_FORK_EPOCH" spec:"true"` // FuluForkEpoch is used to represent the assigned fork epoch for fulu.
|
||||
GloasForkEpoch primitives.Epoch `yaml:"GLOAS_FORK_EPOCH" spec:"true"` // GloasForkEpoch is used to represent the assigned fork epoch for gloas.
|
||||
|
||||
ForkVersionSchedule map[[fieldparams.VersionLength]byte]primitives.Epoch // Schedule of fork epochs by version.
|
||||
ForkVersionNames map[[fieldparams.VersionLength]byte]string // Human-readable names of fork versions.
|
||||
@@ -343,6 +348,7 @@ func (b *BeaconChainConfig) VersionToForkEpochMap() map[int]primitives.Epoch {
|
||||
version.Deneb: b.DenebForkEpoch,
|
||||
version.Electra: b.ElectraForkEpoch,
|
||||
version.Fulu: b.FuluForkEpoch,
|
||||
version.Gloas: b.GloasForkEpoch,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -224,6 +224,7 @@ func ConfigToYaml(cfg *BeaconChainConfig) []byte {
|
||||
fmt.Sprintf("ELECTRA_FORK_VERSION: %#x", cfg.ElectraForkVersion),
|
||||
fmt.Sprintf("FULU_FORK_EPOCH: %d", cfg.FuluForkEpoch),
|
||||
fmt.Sprintf("FULU_FORK_VERSION: %#x", cfg.FuluForkVersion),
|
||||
fmt.Sprintf("GLOAS_FORK_EPOCH: %d", cfg.GloasForkEpoch),
|
||||
fmt.Sprintf("EPOCHS_PER_SUBNET_SUBSCRIPTION: %d", cfg.EpochsPerSubnetSubscription),
|
||||
fmt.Sprintf("ATTESTATION_SUBNET_EXTRA_BITS: %d", cfg.AttestationSubnetExtraBits),
|
||||
fmt.Sprintf("ATTESTATION_SUBNET_PREFIX_BITS: %d", cfg.AttestationSubnetPrefixBits),
|
||||
@@ -246,6 +247,10 @@ func ConfigToYaml(cfg *BeaconChainConfig) []byte {
|
||||
fmt.Sprintf("AGGREGATE_DUE_BPS: %d", cfg.AggregateDueBPS),
|
||||
fmt.Sprintf("SYNC_MESSAGE_DUE_BPS: %d", cfg.SyncMessageDueBPS),
|
||||
fmt.Sprintf("CONTRIBUTION_DUE_BPS: %d", cfg.ContributionDueBPS),
|
||||
fmt.Sprintf("ATTESTATION_DUE_BPS_GLOAS: %d", cfg.AttestationDueBPSGloas),
|
||||
fmt.Sprintf("AGGREGATE_DUE_BPS_GLOAS: %d", cfg.AggregateDueBPSGloas),
|
||||
fmt.Sprintf("SYNC_MESSAGE_DUE_BPS_GLOAS: %d", cfg.SyncMessageDueBPSGloas),
|
||||
fmt.Sprintf("CONTRIBUTION_DUE_BPS_GLOAS: %d", cfg.ContributionDueBPSGloas),
|
||||
}
|
||||
|
||||
if len(cfg.BlobSchedule) > 0 {
|
||||
|
||||
@@ -24,12 +24,9 @@ import (
|
||||
// These are variables that we don't use in Prysm. (i.e. future hardfork, light client... etc)
|
||||
// IMPORTANT: Use one field per line and sort these alphabetically to reduce conflicts.
|
||||
var placeholderFields = []string{
|
||||
"AGGREGATE_DUE_BPS_GLOAS",
|
||||
"ATTESTATION_DEADLINE",
|
||||
"ATTESTATION_DUE_BPS_GLOAS",
|
||||
"BLOB_SIDECAR_SUBNET_COUNT_FULU",
|
||||
"CELLS_PER_EXT_BLOB",
|
||||
"CONTRIBUTION_DUE_BPS_GLOAS",
|
||||
"EIP6110_FORK_EPOCH",
|
||||
"EIP6110_FORK_VERSION",
|
||||
"EIP7002_FORK_EPOCH",
|
||||
@@ -45,7 +42,6 @@ var placeholderFields = []string{
|
||||
"EPOCHS_PER_SHUFFLING_PHASE",
|
||||
"FIELD_ELEMENTS_PER_CELL", // Configured as a constant in config/fieldparams/mainnet.go
|
||||
"FIELD_ELEMENTS_PER_EXT_BLOB", // Configured in proto/ssz_proto_library.bzl
|
||||
"GLOAS_FORK_EPOCH",
|
||||
"GLOAS_FORK_VERSION",
|
||||
"INCLUSION_LIST_SUBMISSION_DEADLINE",
|
||||
"INCLUSION_LIST_SUBMISSION_DUE_BPS",
|
||||
@@ -60,7 +56,6 @@ var placeholderFields = []string{
|
||||
"PROPOSER_INCLUSION_LIST_CUTOFF",
|
||||
"PROPOSER_INCLUSION_LIST_CUTOFF_BPS",
|
||||
"PROPOSER_SELECTION_GAP",
|
||||
"SYNC_MESSAGE_DUE_BPS_GLOAS",
|
||||
"TARGET_NUMBER_OF_PEERS",
|
||||
"UPDATE_TIMEOUT",
|
||||
"VIEW_FREEZE_CUTOFF_BPS",
|
||||
|
||||
@@ -32,6 +32,8 @@ const (
|
||||
mainnetElectraForkEpoch = 364032 // May 7, 2025, 10:05:11 UTC
|
||||
// Fulu Fork Epoch for mainnet config
|
||||
mainnetFuluForkEpoch = 411392 // December 3, 2025, 09:49:11pm UTC
|
||||
// Gloas Fork Epoch for mainnet config
|
||||
mainnetGloasForkEpoch = math.MaxUint64
|
||||
)
|
||||
|
||||
var mainnetNetworkConfig = &NetworkConfig{
|
||||
@@ -121,11 +123,15 @@ var mainnetBeaconConfig = &BeaconChainConfig{
|
||||
IntervalsPerSlot: 3,
|
||||
|
||||
// Time-based protocol parameters.
|
||||
ProposerReorgCutoffBPS: primitives.BP(1667),
|
||||
AttestationDueBPS: primitives.BP(3333),
|
||||
AggregateDueBPS: primitives.BP(6667),
|
||||
SyncMessageDueBPS: primitives.BP(3333),
|
||||
ContributionDueBPS: primitives.BP(6667),
|
||||
ProposerReorgCutoffBPS: primitives.BP(1667),
|
||||
AttestationDueBPS: primitives.BP(3333),
|
||||
AggregateDueBPS: primitives.BP(6667),
|
||||
SyncMessageDueBPS: primitives.BP(3333),
|
||||
ContributionDueBPS: primitives.BP(6667),
|
||||
AttestationDueBPSGloas: primitives.BP(2500),
|
||||
AggregateDueBPSGloas: primitives.BP(5000),
|
||||
SyncMessageDueBPSGloas: primitives.BP(2500),
|
||||
ContributionDueBPSGloas: primitives.BP(5000),
|
||||
|
||||
// Ethereum PoW parameters.
|
||||
DepositChainID: 1, // Chain ID of eth1 mainnet.
|
||||
@@ -235,6 +241,7 @@ var mainnetBeaconConfig = &BeaconChainConfig{
|
||||
ElectraForkEpoch: mainnetElectraForkEpoch,
|
||||
FuluForkVersion: []byte{6, 0, 0, 0},
|
||||
FuluForkEpoch: mainnetFuluForkEpoch,
|
||||
GloasForkEpoch: mainnetGloasForkEpoch,
|
||||
|
||||
// New values introduced in Altair hard fork 1.
|
||||
// Participation flag indices.
|
||||
|
||||
@@ -35,6 +35,10 @@ func MinimalSpecConfig() *BeaconChainConfig {
|
||||
// Time parameters
|
||||
minimalConfig.SecondsPerSlot = 6
|
||||
minimalConfig.SlotDurationMilliseconds = 6000
|
||||
minimalConfig.AttestationDueBPSGloas = 2500
|
||||
minimalConfig.AggregateDueBPSGloas = 5000
|
||||
minimalConfig.SyncMessageDueBPSGloas = 2500
|
||||
minimalConfig.ContributionDueBPSGloas = 5000
|
||||
minimalConfig.MinAttestationInclusionDelay = 1
|
||||
minimalConfig.SlotsPerEpoch = 8
|
||||
minimalConfig.SqrRootSlotsPerEpoch = 2
|
||||
@@ -98,6 +102,7 @@ func MinimalSpecConfig() *BeaconChainConfig {
|
||||
minimalConfig.ElectraForkEpoch = math.MaxUint64
|
||||
minimalConfig.FuluForkVersion = []byte{6, 0, 0, 1}
|
||||
minimalConfig.FuluForkEpoch = math.MaxUint64
|
||||
minimalConfig.GloasForkEpoch = minimalConfig.FarFutureEpoch
|
||||
|
||||
minimalConfig.SyncCommitteeSize = 32
|
||||
minimalConfig.InactivityScoreBias = 4
|
||||
|
||||
@@ -103,7 +103,7 @@ func ConfigureEphemeralLogFile(datadirPath string, app string) error {
|
||||
AllowedLevels: logrus.AllLevels[:ephemeralLogFileVerbosity+1],
|
||||
})
|
||||
|
||||
logrus.Debug("Ephemeral log file initialized")
|
||||
logrus.WithField("path", logFilePath).Debug("Ephemeral log file initialized")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -26,21 +26,21 @@ func TestLifecycle(t *testing.T) {
|
||||
port := 1000 + rand.Intn(1000)
|
||||
prometheusService := NewService(t.Context(), fmt.Sprintf(":%d", port), nil)
|
||||
prometheusService.Start()
|
||||
// Actively wait until the service responds on /metrics (faster and less flaky than a fixed sleep)
|
||||
deadline := time.Now().Add(3 * time.Second)
|
||||
for {
|
||||
if time.Now().After(deadline) {
|
||||
t.Fatalf("metrics endpoint not ready within timeout")
|
||||
}
|
||||
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", port))
|
||||
if err == nil {
|
||||
_ = resp.Body.Close()
|
||||
if resp.StatusCode == http.StatusOK {
|
||||
break
|
||||
}
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
// Actively wait until the service responds on /metrics (faster and less flaky than a fixed sleep)
|
||||
deadline := time.Now().Add(3 * time.Second)
|
||||
for {
|
||||
if time.Now().After(deadline) {
|
||||
t.Fatalf("metrics endpoint not ready within timeout")
|
||||
}
|
||||
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", port))
|
||||
if err == nil {
|
||||
_ = resp.Body.Close()
|
||||
if resp.StatusCode == http.StatusOK {
|
||||
break
|
||||
}
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
|
||||
// Query the service to ensure it really started.
|
||||
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", port))
|
||||
@@ -49,18 +49,18 @@ func TestLifecycle(t *testing.T) {
|
||||
|
||||
err = prometheusService.Stop()
|
||||
require.NoError(t, err)
|
||||
// Actively wait until the service stops responding on /metrics
|
||||
deadline = time.Now().Add(3 * time.Second)
|
||||
for {
|
||||
if time.Now().After(deadline) {
|
||||
t.Fatalf("metrics endpoint still reachable after timeout")
|
||||
}
|
||||
_, err = http.Get(fmt.Sprintf("http://localhost:%d/metrics", port))
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
// Actively wait until the service stops responding on /metrics
|
||||
deadline = time.Now().Add(3 * time.Second)
|
||||
for {
|
||||
if time.Now().After(deadline) {
|
||||
t.Fatalf("metrics endpoint still reachable after timeout")
|
||||
}
|
||||
_, err = http.Get(fmt.Sprintf("http://localhost:%d/metrics", port))
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
|
||||
// Query the service to ensure it really stopped.
|
||||
_, err = http.Get(fmt.Sprintf("http://localhost:%d/metrics", port))
|
||||
|
||||
@@ -97,6 +97,7 @@ go_test(
|
||||
"endtoend_setup_test.go",
|
||||
"endtoend_test.go",
|
||||
"minimal_e2e_test.go",
|
||||
"minimal_hdiff_e2e_test.go",
|
||||
"minimal_slashing_e2e_test.go",
|
||||
"slasher_simulator_e2e_test.go",
|
||||
],
|
||||
|
||||
@@ -47,6 +47,7 @@ go_library(
|
||||
"@in_gopkg_yaml_v2//:go_default_library",
|
||||
"@io_bazel_rules_go//go/tools/bazel:go_default_library",
|
||||
"@org_golang_x_sync//errgroup:go_default_library",
|
||||
"@org_golang_x_sys//unix:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
|
||||
cmdshared "github.com/OffchainLabs/prysm/v7/cmd"
|
||||
@@ -35,11 +36,12 @@ var _ e2etypes.BeaconNodeSet = (*BeaconNodeSet)(nil)
|
||||
// BeaconNodeSet represents set of beacon nodes.
|
||||
type BeaconNodeSet struct {
|
||||
e2etypes.ComponentRunner
|
||||
config *e2etypes.E2EConfig
|
||||
nodes []e2etypes.ComponentRunner
|
||||
enr string
|
||||
ids []string
|
||||
started chan struct{}
|
||||
config *e2etypes.E2EConfig
|
||||
nodes []e2etypes.ComponentRunner
|
||||
enr string
|
||||
ids []string
|
||||
multiAddrs []string
|
||||
started chan struct{}
|
||||
}
|
||||
|
||||
// SetENR assigns ENR to the set of beacon nodes.
|
||||
@@ -74,8 +76,10 @@ func (s *BeaconNodeSet) Start(ctx context.Context) error {
|
||||
if s.config.UseFixedPeerIDs {
|
||||
for i := range nodes {
|
||||
s.ids = append(s.ids, nodes[i].(*BeaconNode).peerID)
|
||||
s.multiAddrs = append(s.multiAddrs, nodes[i].(*BeaconNode).multiAddr)
|
||||
}
|
||||
s.config.PeerIDs = s.ids
|
||||
s.config.PeerMultiAddrs = s.multiAddrs
|
||||
}
|
||||
// All nodes started, close channel, so that all services waiting on a set, can proceed.
|
||||
close(s.started)
|
||||
@@ -141,6 +145,14 @@ func (s *BeaconNodeSet) StopAtIndex(i int) error {
|
||||
return s.nodes[i].Stop()
|
||||
}
|
||||
|
||||
// RestartAtIndex restarts the component at the desired index.
|
||||
func (s *BeaconNodeSet) RestartAtIndex(ctx context.Context, i int) error {
|
||||
if i >= len(s.nodes) {
|
||||
return errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.nodes))
|
||||
}
|
||||
return s.nodes[i].(*BeaconNode).Restart(ctx)
|
||||
}
|
||||
|
||||
// ComponentAtIndex returns the component at the provided index.
|
||||
func (s *BeaconNodeSet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) {
|
||||
if i >= len(s.nodes) {
|
||||
@@ -152,12 +164,14 @@ func (s *BeaconNodeSet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error
|
||||
// BeaconNode represents beacon node.
|
||||
type BeaconNode struct {
|
||||
e2etypes.ComponentRunner
|
||||
config *e2etypes.E2EConfig
|
||||
started chan struct{}
|
||||
index int
|
||||
enr string
|
||||
peerID string
|
||||
cmd *exec.Cmd
|
||||
config *e2etypes.E2EConfig
|
||||
started chan struct{}
|
||||
index int
|
||||
enr string
|
||||
peerID string
|
||||
multiAddr string
|
||||
cmd *exec.Cmd
|
||||
args []string
|
||||
}
|
||||
|
||||
// NewBeaconNode creates and returns a beacon node.
|
||||
@@ -290,6 +304,7 @@ func (node *BeaconNode) Start(ctx context.Context) error {
|
||||
args = append(args, fmt.Sprintf("--%s=%s:%d", flags.MevRelayEndpoint.Name, "http://127.0.0.1", e2e.TestParams.Ports.Eth1ProxyPort+index))
|
||||
}
|
||||
args = append(args, config.BeaconFlags...)
|
||||
node.args = args
|
||||
|
||||
cmd := exec.CommandContext(ctx, binaryPath, args...) // #nosec G204 -- Safe
|
||||
// Write stderr to log files.
|
||||
@@ -318,6 +333,18 @@ func (node *BeaconNode) Start(ctx context.Context) error {
|
||||
return fmt.Errorf("could not find peer id: %w", err)
|
||||
}
|
||||
node.peerID = peerId
|
||||
|
||||
// Extract QUIC multiaddr for Lighthouse to connect to this node.
|
||||
// Prysm logs: msg="Node started p2p server" multiAddr="/ip4/192.168.0.14/udp/4250/quic-v1/p2p/16Uiu2..."
|
||||
// We prefer QUIC over TCP as it's more reliable in E2E tests.
|
||||
multiAddr, err := helpers.FindFollowingTextInFile(stdOutFile, "multiAddr=\"/ip4/192.168.0.14/udp/")
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not find QUIC multiaddr: %w", err)
|
||||
}
|
||||
// The extracted text will be like: 4250/quic-v1/p2p/16Uiu2..."
|
||||
// We need to prepend "/ip4/192.168.0.14/udp/" and strip the trailing quote
|
||||
multiAddr = strings.TrimSuffix(multiAddr, "\"")
|
||||
node.multiAddr = "/ip4/192.168.0.14/udp/" + multiAddr
|
||||
}
|
||||
|
||||
// Mark node as ready.
|
||||
@@ -347,6 +374,96 @@ func (node *BeaconNode) Stop() error {
|
||||
return node.cmd.Process.Kill()
|
||||
}
|
||||
|
||||
// Restart gracefully stops the beacon node and starts a new process.
|
||||
// This is useful for testing resilience as it allows the P2P layer to reinitialize
|
||||
// and discover peers again (unlike SIGSTOP/SIGCONT which breaks QUIC connections permanently).
|
||||
func (node *BeaconNode) Restart(ctx context.Context) error {
|
||||
binaryPath, found := bazel.FindBinary("cmd/beacon-chain", "beacon-chain")
|
||||
if !found {
|
||||
return errors.New("beacon chain binary not found")
|
||||
}
|
||||
|
||||
// First, continue the process if it's stopped (from PauseAtIndex).
|
||||
// A stopped process (SIGSTOP) cannot receive SIGTERM until continued.
|
||||
_ = node.cmd.Process.Signal(syscall.SIGCONT)
|
||||
|
||||
if err := node.cmd.Process.Signal(syscall.SIGTERM); err != nil {
|
||||
return fmt.Errorf("failed to send SIGTERM: %w", err)
|
||||
}
|
||||
|
||||
// Wait for process to exit by polling. We can't call cmd.Wait() here because
|
||||
// the Start() method's goroutine is already waiting on the command, and calling
|
||||
// Wait() twice on the same process causes "waitid: no child processes" error.
|
||||
// Instead, poll using Signal(0) which returns an error when the process no longer exists.
|
||||
processExited := false
|
||||
for range 100 {
|
||||
if err := node.cmd.Process.Signal(syscall.Signal(0)); err != nil {
|
||||
processExited = true
|
||||
break
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
if !processExited {
|
||||
log.Warnf("Beacon node %d did not exit within 10 seconds after SIGTERM, proceeding with restart anyway", node.index)
|
||||
}
|
||||
|
||||
restartArgs := make([]string, 0, len(node.args))
|
||||
for _, arg := range node.args {
|
||||
if !strings.Contains(arg, cmdshared.ForceClearDB.Name) {
|
||||
restartArgs = append(restartArgs, arg)
|
||||
}
|
||||
}
|
||||
|
||||
stdOutFile, err := os.OpenFile(
|
||||
path.Join(e2e.TestParams.LogPath, fmt.Sprintf(e2e.BeaconNodeLogFileName, node.index)),
|
||||
os.O_APPEND|os.O_WRONLY,
|
||||
0644,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open log file: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
if err := stdOutFile.Close(); err != nil {
|
||||
log.WithError(err).Error("Failed to close stdout file")
|
||||
}
|
||||
}()
|
||||
|
||||
cmd := exec.CommandContext(ctx, binaryPath, restartArgs...)
|
||||
stderr, err := os.OpenFile(
|
||||
path.Join(e2e.TestParams.LogPath, fmt.Sprintf("beacon_node_%d_stderr.log", node.index)),
|
||||
os.O_APPEND|os.O_WRONLY|os.O_CREATE,
|
||||
0644,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open stderr file: %w", err)
|
||||
}
|
||||
cmd.Stderr = stderr
|
||||
|
||||
log.Infof("Restarting beacon chain %d with flags: %s", node.index, strings.Join(restartArgs, " "))
|
||||
if err = cmd.Start(); err != nil {
|
||||
if closeErr := stderr.Close(); closeErr != nil {
|
||||
log.WithError(closeErr).Error("Failed to close stderr file")
|
||||
}
|
||||
return fmt.Errorf("failed to restart beacon node: %w", err)
|
||||
}
|
||||
// Close the parent's file handle after Start(). The child process has its own
|
||||
// copy of the file descriptor via fork/exec, so this won't affect its ability to write.
|
||||
if err := stderr.Close(); err != nil {
|
||||
log.WithError(err).Error("Failed to close stderr file")
|
||||
}
|
||||
|
||||
if err = helpers.WaitForTextInFile(stdOutFile, "Beacon chain gRPC server listening"); err != nil {
|
||||
return fmt.Errorf("beacon node %d failed to restart properly: %w", node.index, err)
|
||||
}
|
||||
|
||||
node.cmd = cmd
|
||||
go func() {
|
||||
_ = cmd.Wait()
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (node *BeaconNode) UnderlyingProcess() *os.Process {
|
||||
return node.cmd.Process
|
||||
}
|
||||
|
||||
@@ -108,6 +108,17 @@ func (s *BuilderSet) StopAtIndex(i int) error {
|
||||
return s.builders[i].Stop()
|
||||
}
|
||||
|
||||
// RestartAtIndex for builders just does pause/resume.
|
||||
func (s *BuilderSet) RestartAtIndex(_ context.Context, i int) error {
|
||||
if i >= len(s.builders) {
|
||||
return errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.builders))
|
||||
}
|
||||
if err := s.builders[i].Pause(); err != nil {
|
||||
return err
|
||||
}
|
||||
return s.builders[i].Resume()
|
||||
}
|
||||
|
||||
// ComponentAtIndex returns the component at the provided index.
|
||||
func (s *BuilderSet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) {
|
||||
if i >= len(s.builders) {
|
||||
|
||||
@@ -111,6 +111,17 @@ func (s *NodeSet) StopAtIndex(i int) error {
|
||||
return s.nodes[i].Stop()
|
||||
}
|
||||
|
||||
// RestartAtIndex for eth1 nodes just does pause/resume.
|
||||
func (s *NodeSet) RestartAtIndex(_ context.Context, i int) error {
|
||||
if i >= len(s.nodes) {
|
||||
return errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.nodes))
|
||||
}
|
||||
if err := s.nodes[i].Pause(); err != nil {
|
||||
return err
|
||||
}
|
||||
return s.nodes[i].Resume()
|
||||
}
|
||||
|
||||
// ComponentAtIndex returns the component at the provided index.
|
||||
func (s *NodeSet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) {
|
||||
if i >= len(s.nodes) {
|
||||
|
||||
@@ -108,6 +108,17 @@ func (s *ProxySet) StopAtIndex(i int) error {
|
||||
return s.proxies[i].Stop()
|
||||
}
|
||||
|
||||
// RestartAtIndex for proxies just does pause/resume.
|
||||
func (s *ProxySet) RestartAtIndex(_ context.Context, i int) error {
|
||||
if i >= len(s.proxies) {
|
||||
return errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.proxies))
|
||||
}
|
||||
if err := s.proxies[i].Pause(); err != nil {
|
||||
return err
|
||||
}
|
||||
return s.proxies[i].Resume()
|
||||
}
|
||||
|
||||
// ComponentAtIndex returns the component at the provided index.
|
||||
func (s *ProxySet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) {
|
||||
if i >= len(s.proxies) {
|
||||
|
||||
@@ -127,6 +127,17 @@ func (s *LighthouseBeaconNodeSet) StopAtIndex(i int) error {
|
||||
return s.nodes[i].Stop()
|
||||
}
|
||||
|
||||
// RestartAtIndex for Lighthouse just does pause/resume.
|
||||
func (s *LighthouseBeaconNodeSet) RestartAtIndex(_ context.Context, i int) error {
|
||||
if i >= len(s.nodes) {
|
||||
return errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.nodes))
|
||||
}
|
||||
if err := s.nodes[i].Pause(); err != nil {
|
||||
return err
|
||||
}
|
||||
return s.nodes[i].Resume()
|
||||
}
|
||||
|
||||
// ComponentAtIndex returns the component at the provided index.
|
||||
func (s *LighthouseBeaconNodeSet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) {
|
||||
if i >= len(s.nodes) {
|
||||
@@ -194,9 +205,10 @@ func (node *LighthouseBeaconNode) Start(ctx context.Context) error {
|
||||
"--suggested-fee-recipient=0x878705ba3f8bc32fcf7f4caa1a35e72af65cf766",
|
||||
}
|
||||
if node.config.UseFixedPeerIDs {
|
||||
flagVal := strings.Join(node.config.PeerIDs, ",")
|
||||
args = append(args,
|
||||
fmt.Sprintf("--trusted-peers=%s", flagVal))
|
||||
// Use libp2p-addresses with full multiaddrs instead of trusted-peers with just peer IDs.
|
||||
// This allows Lighthouse to connect directly to Prysm nodes without relying on DHT discovery.
|
||||
flagVal := strings.Join(node.config.PeerMultiAddrs, ",")
|
||||
args = append(args, fmt.Sprintf("--libp2p-addresses=%s", flagVal))
|
||||
}
|
||||
if node.config.UseBuilder {
|
||||
args = append(args, fmt.Sprintf("--builder=%s:%d", "http://127.0.0.1", e2e.TestParams.Ports.Eth1ProxyPort+prysmNodeCount+index))
|
||||
|
||||
@@ -132,6 +132,17 @@ func (s *LighthouseValidatorNodeSet) StopAtIndex(i int) error {
|
||||
return s.nodes[i].Stop()
|
||||
}
|
||||
|
||||
// RestartAtIndex for Lighthouse validators just does pause/resume.
|
||||
func (s *LighthouseValidatorNodeSet) RestartAtIndex(_ context.Context, i int) error {
|
||||
if i >= len(s.nodes) {
|
||||
return errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.nodes))
|
||||
}
|
||||
if err := s.nodes[i].Pause(); err != nil {
|
||||
return err
|
||||
}
|
||||
return s.nodes[i].Resume()
|
||||
}
|
||||
|
||||
// ComponentAtIndex returns the component at the provided index.
|
||||
func (s *LighthouseValidatorNodeSet) ComponentAtIndex(i int) (types.ComponentRunner, error) {
|
||||
if i >= len(s.nodes) {
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
@@ -15,6 +16,7 @@ import (
|
||||
e2e "github.com/OffchainLabs/prysm/v7/testing/endtoend/params"
|
||||
"github.com/OffchainLabs/prysm/v7/testing/endtoend/types"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
var _ types.ComponentRunner = &TracingSink{}
|
||||
@@ -32,6 +34,7 @@ var _ types.ComponentRunner = &TracingSink{}
|
||||
type TracingSink struct {
|
||||
cancel context.CancelFunc
|
||||
started chan struct{}
|
||||
stopped chan struct{}
|
||||
endpoint string
|
||||
server *http.Server
|
||||
}
|
||||
@@ -40,6 +43,7 @@ type TracingSink struct {
|
||||
func NewTracingSink(endpoint string) *TracingSink {
|
||||
return &TracingSink{
|
||||
started: make(chan struct{}, 1),
|
||||
stopped: make(chan struct{}),
|
||||
endpoint: endpoint,
|
||||
}
|
||||
}
|
||||
@@ -73,62 +77,99 @@ func (ts *TracingSink) Resume() error {
|
||||
|
||||
// Stop stops the component and its underlying process.
|
||||
func (ts *TracingSink) Stop() error {
|
||||
ts.cancel()
|
||||
if ts.cancel != nil {
|
||||
ts.cancel()
|
||||
}
|
||||
// Wait for server to actually shut down before returning
|
||||
<-ts.stopped
|
||||
return nil
|
||||
}
|
||||
|
||||
// reusePortListener creates a TCP listener with SO_REUSEADDR set, allowing
|
||||
// the port to be reused immediately after the previous listener closes.
|
||||
// This is essential for sequential E2E tests that reuse the same port.
|
||||
func reusePortListener(addr string) (net.Listener, error) {
|
||||
lc := net.ListenConfig{
|
||||
Control: func(network, address string, c syscall.RawConn) error {
|
||||
var setSockOptErr error
|
||||
err := c.Control(func(fd uintptr) {
|
||||
setSockOptErr = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEADDR, 1)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return setSockOptErr
|
||||
},
|
||||
}
|
||||
return lc.Listen(context.Background(), "tcp", addr)
|
||||
}
|
||||
|
||||
// Initialize an http handler that writes all requests to a file.
|
||||
func (ts *TracingSink) initializeSink(ctx context.Context) {
|
||||
defer close(ts.stopped)
|
||||
|
||||
mux := &http.ServeMux{}
|
||||
ts.server = &http.Server{
|
||||
Addr: ts.endpoint,
|
||||
Handler: mux,
|
||||
ReadHeaderTimeout: time.Second,
|
||||
}
|
||||
defer func() {
|
||||
if err := ts.server.Close(); err != nil {
|
||||
log.WithError(err).Error("Failed to close http server")
|
||||
return
|
||||
}
|
||||
}()
|
||||
// Disable keep-alives to ensure connections close immediately
|
||||
ts.server.SetKeepAlivesEnabled(false)
|
||||
|
||||
// Create listener with SO_REUSEADDR to allow port reuse between tests
|
||||
listener, err := reusePortListener(ts.endpoint)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to create listener")
|
||||
return
|
||||
}
|
||||
|
||||
stdOutFile, err := helpers.DeleteAndCreateFile(e2e.TestParams.LogPath, e2e.TracingRequestSinkFileName)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to create stdout file")
|
||||
if closeErr := listener.Close(); closeErr != nil {
|
||||
log.WithError(closeErr).Error("Failed to close listener after file creation error")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
cleanup := func() {
|
||||
if err := stdOutFile.Close(); err != nil {
|
||||
log.WithError(err).Error("Could not close stdout file")
|
||||
}
|
||||
if err := ts.server.Close(); err != nil {
|
||||
log.WithError(err).Error("Could not close http server")
|
||||
// Use Shutdown for graceful shutdown that releases the port
|
||||
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if err := ts.server.Shutdown(shutdownCtx); err != nil {
|
||||
log.WithError(err).Error("Could not gracefully shutdown http server")
|
||||
// Force close if shutdown times out
|
||||
if err := ts.server.Close(); err != nil {
|
||||
log.WithError(err).Error("Could not close http server")
|
||||
}
|
||||
}
|
||||
}
|
||||
defer cleanup()
|
||||
|
||||
mux.HandleFunc("/", func(_ http.ResponseWriter, r *http.Request) {
|
||||
if err := captureRequest(stdOutFile, r); err != nil {
|
||||
log.WithError(err).Error("Failed to capture http request")
|
||||
return
|
||||
}
|
||||
})
|
||||
|
||||
sigs := make(chan os.Signal, 1)
|
||||
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
cleanup()
|
||||
return
|
||||
case <-sigs:
|
||||
cleanup()
|
||||
return
|
||||
default:
|
||||
// Sleep for 100ms and do nothing while waiting for
|
||||
// cancellation.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-sigs:
|
||||
return
|
||||
}
|
||||
}()
|
||||
if err := ts.server.ListenAndServe(); err != http.ErrServerClosed {
|
||||
|
||||
// Use Serve with our custom listener instead of ListenAndServe
|
||||
if err := ts.server.Serve(listener); err != nil && err != http.ErrServerClosed {
|
||||
log.WithError(err).Error("Failed to serve http")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -134,6 +134,17 @@ func (s *ValidatorNodeSet) StopAtIndex(i int) error {
|
||||
return s.nodes[i].Stop()
|
||||
}
|
||||
|
||||
// RestartAtIndex for validators just does pause/resume since they don't have P2P issues.
|
||||
func (s *ValidatorNodeSet) RestartAtIndex(_ context.Context, i int) error {
|
||||
if i >= len(s.nodes) {
|
||||
return errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.nodes))
|
||||
}
|
||||
if err := s.nodes[i].Pause(); err != nil {
|
||||
return err
|
||||
}
|
||||
return s.nodes[i].Resume()
|
||||
}
|
||||
|
||||
// ComponentAtIndex returns the component at the provided index.
|
||||
func (s *ValidatorNodeSet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) {
|
||||
if i >= len(s.nodes) {
|
||||
|
||||
@@ -48,7 +48,6 @@ const (
|
||||
// allNodesStartTimeout defines period after which nodes are considered
|
||||
// stalled (safety measure for nodes stuck at startup, shouldn't normally happen).
|
||||
allNodesStartTimeout = 5 * time.Minute
|
||||
|
||||
// errGeneralCode is used to represent the string value for all general process errors.
|
||||
errGeneralCode = "exit status 1"
|
||||
)
|
||||
@@ -195,12 +194,20 @@ func (r *testRunner) runEvaluators(ec *e2etypes.EvaluationContext, conns []*grpc
|
||||
secondsPerEpoch := uint64(params.BeaconConfig().SlotsPerEpoch.Mul(params.BeaconConfig().SecondsPerSlot))
|
||||
ticker := helpers.NewEpochTicker(tickingStartTime, secondsPerEpoch)
|
||||
for currentEpoch := range ticker.C() {
|
||||
log.WithField("epoch", currentEpoch).Info("Processing epoch")
|
||||
if config.EvalInterceptor(ec, currentEpoch, conns) {
|
||||
log.WithField("epoch", currentEpoch).Info("Interceptor returned true, skipping evaluators")
|
||||
continue
|
||||
}
|
||||
r.executeProvidedEvaluators(ec, currentEpoch, conns, config.Evaluators)
|
||||
|
||||
if t.Failed() || currentEpoch >= config.EpochsToRun-1 {
|
||||
log.WithFields(log.Fields{
|
||||
"currentEpoch": currentEpoch,
|
||||
"EpochsToRun": config.EpochsToRun,
|
||||
"testFailed": t.Failed(),
|
||||
"epochLimitHit": currentEpoch >= config.EpochsToRun-1,
|
||||
}).Info("Stopping evaluator loop")
|
||||
ticker.Done()
|
||||
if t.Failed() {
|
||||
return errors.New("test failed")
|
||||
@@ -225,9 +232,9 @@ func (r *testRunner) testDepositsAndTx(ctx context.Context, g *errgroup.Group,
|
||||
if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{r.depositor}); err != nil {
|
||||
return errors.Wrap(err, "testDepositsAndTx unable to run, depositor did not Start")
|
||||
}
|
||||
go func() {
|
||||
if r.config.TestDeposits {
|
||||
log.Info("Running deposit tests")
|
||||
go func() {
|
||||
if r.config.TestDeposits {
|
||||
log.Info("Running deposit tests")
|
||||
// The validators with an index < minGenesisActiveCount all have deposits already from the chain start.
|
||||
// Skip all of those chain start validators by seeking to minGenesisActiveCount in the validator list
|
||||
// for further deposit testing.
|
||||
@@ -238,12 +245,13 @@ func (r *testRunner) testDepositsAndTx(ctx context.Context, g *errgroup.Group,
|
||||
r.t.Error(errors.Wrap(err, "depositor.SendAndMine failed"))
|
||||
}
|
||||
}
|
||||
}
|
||||
// Only generate background transactions when relevant for the test.
|
||||
if r.config.TestDeposits || r.config.TestFeature || r.config.UseBuilder {
|
||||
r.testTxGeneration(ctx, g, keystorePath, []e2etypes.ComponentRunner{})
|
||||
}
|
||||
}()
|
||||
}
|
||||
// Only generate background transactions when relevant for the test.
|
||||
// Checkpoint sync and REST API tests need EL blocks to advance, so include them.
|
||||
if r.config.TestDeposits || r.config.TestFeature || r.config.UseBuilder || r.config.TestCheckpointSync || r.config.UseBeaconRestApi {
|
||||
r.testTxGeneration(ctx, g, keystorePath, []e2etypes.ComponentRunner{})
|
||||
}
|
||||
}()
|
||||
if r.config.TestDeposits {
|
||||
return depositCheckValidator.Start(ctx)
|
||||
}
|
||||
@@ -622,7 +630,7 @@ func (r *testRunner) scenarioRun() error {
|
||||
tickingStartTime := helpers.EpochTickerStartTime(genesis)
|
||||
|
||||
ec := e2etypes.NewEvaluationContext(r.depositor.History())
|
||||
// Run assigned evaluators.
|
||||
log.WithField("EpochsToRun", r.config.EpochsToRun).Info("Starting evaluators")
|
||||
return r.runEvaluators(ec, conns, tickingStartTime)
|
||||
}
|
||||
|
||||
@@ -668,9 +676,9 @@ func (r *testRunner) multiScenarioMulticlient(ec *e2etypes.EvaluationContext, ep
|
||||
freezeStartEpoch := lastForkEpoch + 1
|
||||
freezeEndEpoch := lastForkEpoch + 2
|
||||
optimisticStartEpoch := lastForkEpoch + 6
|
||||
optimisticEndEpoch := lastForkEpoch + 7
|
||||
optimisticEndEpoch := lastForkEpoch + 8
|
||||
recoveryEpochStart, recoveryEpochEnd := lastForkEpoch+3, lastForkEpoch+4
|
||||
secondRecoveryEpochStart, secondRecoveryEpochEnd := lastForkEpoch+8, lastForkEpoch+9
|
||||
secondRecoveryEpochStart, secondRecoveryEpochMid, secondRecoveryEpochEnd := lastForkEpoch+9, lastForkEpoch+10, lastForkEpoch+11
|
||||
|
||||
newPayloadMethod := "engine_newPayloadV4"
|
||||
forkChoiceUpdatedMethod := "engine_forkchoiceUpdatedV3"
|
||||
@@ -680,13 +688,18 @@ func (r *testRunner) multiScenarioMulticlient(ec *e2etypes.EvaluationContext, ep
|
||||
forkChoiceUpdatedMethod = "engine_forkchoiceUpdatedV3"
|
||||
}
|
||||
|
||||
// Skip evaluators during optimistic sync window (between start and end, exclusive)
|
||||
if primitives.Epoch(epoch) > optimisticStartEpoch && primitives.Epoch(epoch) < optimisticEndEpoch {
|
||||
return true
|
||||
}
|
||||
|
||||
switch primitives.Epoch(epoch) {
|
||||
case freezeStartEpoch:
|
||||
require.NoError(r.t, r.comHandler.beaconNodes.PauseAtIndex(0))
|
||||
require.NoError(r.t, r.comHandler.validatorNodes.PauseAtIndex(0))
|
||||
return true
|
||||
case freezeEndEpoch:
|
||||
require.NoError(r.t, r.comHandler.beaconNodes.ResumeAtIndex(0))
|
||||
require.NoError(r.t, r.comHandler.beaconNodes.RestartAtIndex(r.comHandler.ctx, 0))
|
||||
require.NoError(r.t, r.comHandler.validatorNodes.ResumeAtIndex(0))
|
||||
return true
|
||||
case optimisticStartEpoch:
|
||||
@@ -701,6 +714,19 @@ func (r *testRunner) multiScenarioMulticlient(ec *e2etypes.EvaluationContext, ep
|
||||
}, func() bool {
|
||||
return true
|
||||
})
|
||||
// Also intercept forkchoiceUpdated for prysm beacon node to prevent
|
||||
// SetOptimisticToValid from clearing the optimistic status.
|
||||
component.(e2etypes.EngineProxy).AddRequestInterceptor(forkChoiceUpdatedMethod, func() any {
|
||||
return &ForkchoiceUpdatedResponse{
|
||||
Status: &enginev1.PayloadStatus{
|
||||
Status: enginev1.PayloadStatus_SYNCING,
|
||||
LatestValidHash: nil,
|
||||
},
|
||||
PayloadId: nil,
|
||||
}
|
||||
}, func() bool {
|
||||
return true
|
||||
})
|
||||
// Set it for lighthouse beacon node.
|
||||
component, err = r.comHandler.eth1Proxy.ComponentAtIndex(2)
|
||||
require.NoError(r.t, err)
|
||||
@@ -734,6 +760,7 @@ func (r *testRunner) multiScenarioMulticlient(ec *e2etypes.EvaluationContext, ep
|
||||
engineProxy, ok := component.(e2etypes.EngineProxy)
|
||||
require.Equal(r.t, true, ok)
|
||||
engineProxy.RemoveRequestInterceptor(newPayloadMethod)
|
||||
engineProxy.RemoveRequestInterceptor(forkChoiceUpdatedMethod)
|
||||
engineProxy.ReleaseBackedUpRequests(newPayloadMethod)
|
||||
|
||||
// Remove for lighthouse too
|
||||
@@ -747,8 +774,8 @@ func (r *testRunner) multiScenarioMulticlient(ec *e2etypes.EvaluationContext, ep
|
||||
|
||||
return true
|
||||
case recoveryEpochStart, recoveryEpochEnd,
|
||||
secondRecoveryEpochStart, secondRecoveryEpochEnd:
|
||||
// Allow 2 epochs for the network to finalize again.
|
||||
secondRecoveryEpochStart, secondRecoveryEpochMid, secondRecoveryEpochEnd:
|
||||
// Allow epochs for the network to finalize again after optimistic sync test.
|
||||
return true
|
||||
}
|
||||
return false
|
||||
@@ -782,31 +809,39 @@ func (r *testRunner) eeOffline(_ *e2etypes.EvaluationContext, epoch uint64, _ []
|
||||
// as expected.
|
||||
func (r *testRunner) multiScenario(ec *e2etypes.EvaluationContext, epoch uint64, conns []*grpc.ClientConn) bool {
|
||||
lastForkEpoch := params.LastForkEpoch()
|
||||
freezeStartEpoch := lastForkEpoch + 1
|
||||
freezeEndEpoch := lastForkEpoch + 2
|
||||
// Freeze/restart scenario is skipped in minimal test: With only 2 beacon nodes,
|
||||
// when one node restarts it enters initial sync mode. During initial sync, the
|
||||
// restarting node doesn't subscribe to gossip topics, leaving the other node with
|
||||
// 0 gossip peers. This causes a deadlock where the network can't produce blocks
|
||||
// consistently (no gossip mesh) and the restarting node can't complete initial sync
|
||||
// (no blocks being produced). This scenario works in multiclient test (4 nodes)
|
||||
// where 3 healthy nodes maintain the gossip mesh while 1 node syncs.
|
||||
valOfflineStartEpoch := lastForkEpoch + 6
|
||||
valOfflineEndEpoch := lastForkEpoch + 7
|
||||
optimisticStartEpoch := lastForkEpoch + 11
|
||||
optimisticEndEpoch := lastForkEpoch + 12
|
||||
optimisticEndEpoch := lastForkEpoch + 13
|
||||
|
||||
recoveryEpochStart, recoveryEpochEnd := lastForkEpoch+3, lastForkEpoch+4
|
||||
secondRecoveryEpochStart, secondRecoveryEpochEnd := lastForkEpoch+8, lastForkEpoch+9
|
||||
thirdRecoveryEpochStart, thirdRecoveryEpochEnd := lastForkEpoch+13, lastForkEpoch+14
|
||||
thirdRecoveryEpochStart, thirdRecoveryEpochEnd := lastForkEpoch+14, lastForkEpoch+15
|
||||
|
||||
type ForkchoiceUpdatedResponse struct {
|
||||
Status *enginev1.PayloadStatus `json:"payloadStatus"`
|
||||
PayloadId *enginev1.PayloadIDBytes `json:"payloadId"`
|
||||
}
|
||||
|
||||
newPayloadMethod := "engine_newPayloadV4"
|
||||
forkChoiceUpdatedMethod := "engine_forkchoiceUpdatedV3"
|
||||
// Fallback if Electra is not set.
|
||||
if params.BeaconConfig().ElectraForkEpoch == math.MaxUint64 {
|
||||
newPayloadMethod = "engine_newPayloadV3"
|
||||
}
|
||||
|
||||
// Skip evaluators during optimistic sync window (between start and end, exclusive)
|
||||
if primitives.Epoch(epoch) > optimisticStartEpoch && primitives.Epoch(epoch) < optimisticEndEpoch {
|
||||
return true
|
||||
}
|
||||
|
||||
switch primitives.Epoch(epoch) {
|
||||
case freezeStartEpoch:
|
||||
require.NoError(r.t, r.comHandler.beaconNodes.PauseAtIndex(0))
|
||||
require.NoError(r.t, r.comHandler.validatorNodes.PauseAtIndex(0))
|
||||
return true
|
||||
case freezeEndEpoch:
|
||||
require.NoError(r.t, r.comHandler.beaconNodes.ResumeAtIndex(0))
|
||||
require.NoError(r.t, r.comHandler.validatorNodes.ResumeAtIndex(0))
|
||||
return true
|
||||
case valOfflineStartEpoch:
|
||||
require.NoError(r.t, r.comHandler.validatorNodes.PauseAtIndex(0))
|
||||
require.NoError(r.t, r.comHandler.validatorNodes.PauseAtIndex(1))
|
||||
@@ -826,23 +861,36 @@ func (r *testRunner) multiScenario(ec *e2etypes.EvaluationContext, epoch uint64,
|
||||
}, func() bool {
|
||||
return true
|
||||
})
|
||||
// Also intercept forkchoiceUpdated to prevent SetOptimisticToValid from
|
||||
// clearing the optimistic status when the beacon node receives VALID.
|
||||
component.(e2etypes.EngineProxy).AddRequestInterceptor(forkChoiceUpdatedMethod, func() any {
|
||||
return &ForkchoiceUpdatedResponse{
|
||||
Status: &enginev1.PayloadStatus{
|
||||
Status: enginev1.PayloadStatus_SYNCING,
|
||||
LatestValidHash: nil,
|
||||
},
|
||||
PayloadId: nil,
|
||||
}
|
||||
}, func() bool {
|
||||
return true
|
||||
})
|
||||
return true
|
||||
case optimisticEndEpoch:
|
||||
evs := []e2etypes.Evaluator{ev.OptimisticSyncEnabled}
|
||||
r.executeProvidedEvaluators(ec, epoch, []*grpc.ClientConn{conns[0]}, evs)
|
||||
// Disable Interceptor
|
||||
// Disable Interceptors
|
||||
component, err := r.comHandler.eth1Proxy.ComponentAtIndex(0)
|
||||
require.NoError(r.t, err)
|
||||
engineProxy, ok := component.(e2etypes.EngineProxy)
|
||||
require.Equal(r.t, true, ok)
|
||||
engineProxy.RemoveRequestInterceptor(newPayloadMethod)
|
||||
engineProxy.ReleaseBackedUpRequests(newPayloadMethod)
|
||||
engineProxy.RemoveRequestInterceptor(forkChoiceUpdatedMethod)
|
||||
engineProxy.ReleaseBackedUpRequests(forkChoiceUpdatedMethod)
|
||||
|
||||
return true
|
||||
case recoveryEpochStart, recoveryEpochEnd,
|
||||
secondRecoveryEpochStart, secondRecoveryEpochEnd,
|
||||
case secondRecoveryEpochStart, secondRecoveryEpochEnd,
|
||||
thirdRecoveryEpochStart, thirdRecoveryEpochEnd:
|
||||
// Allow 2 epochs for the network to finalize again.
|
||||
return true
|
||||
}
|
||||
return false
|
||||
|
||||
@@ -82,7 +82,7 @@ var metricComparisonTests = []comparisonTest{
|
||||
name: "hot state cache",
|
||||
topic1: "hot_state_cache_miss",
|
||||
topic2: "hot_state_cache_hit",
|
||||
expectedComparison: 0.01,
|
||||
expectedComparison: 0.02,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -168,20 +168,15 @@ func metricCheckLessThan(pageContent, topic string, value int) error {
|
||||
|
||||
func metricCheckComparison(pageContent, topic1, topic2 string, comparison float64) error {
|
||||
topic2Value, err := valueOfTopic(pageContent, topic2)
|
||||
// If we can't find the first topic (error metrics), then assume the test passes.
|
||||
if topic2Value != -1 {
|
||||
if err != nil || topic2Value == -1 {
|
||||
// If we can't find the denominator (hits/received total), assume test passes
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
topic1Value, err := valueOfTopic(pageContent, topic1)
|
||||
if topic1Value != -1 {
|
||||
if err != nil || topic1Value == -1 {
|
||||
// If we can't find the numerator (misses/failures), assume test passes (no errors)
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
topicComparison := float64(topic1Value) / float64(topic2Value)
|
||||
if topicComparison >= comparison {
|
||||
return fmt.Errorf(
|
||||
|
||||
@@ -101,16 +101,44 @@ func peersConnect(_ *e2etypes.EvaluationContext, conns ...*grpc.ClientConn) erro
|
||||
return nil
|
||||
}
|
||||
ctx := context.Background()
|
||||
expectedPeers := len(conns) - 1 + e2e.TestParams.LighthouseBeaconNodeCount
|
||||
|
||||
// Wait up to 60 seconds for all nodes to discover peers.
|
||||
// Peer discovery via DHT can take time, especially for nodes that start later.
|
||||
timeout := 60 * time.Second
|
||||
pollInterval := 1 * time.Second
|
||||
|
||||
for _, conn := range conns {
|
||||
nodeClient := eth.NewNodeClient(conn)
|
||||
peersResp, err := nodeClient.ListPeers(ctx, &emptypb.Empty{})
|
||||
deadline := time.Now().Add(timeout)
|
||||
|
||||
var peersResp *eth.Peers
|
||||
var err error
|
||||
for time.Now().Before(deadline) {
|
||||
peersResp, err = nodeClient.ListPeers(ctx, &emptypb.Empty{})
|
||||
if err != nil {
|
||||
time.Sleep(pollInterval)
|
||||
continue
|
||||
}
|
||||
|
||||
if len(peersResp.Peers) >= expectedPeers {
|
||||
break
|
||||
}
|
||||
|
||||
time.Sleep(pollInterval)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("failed to list peers after %v: %w", timeout, err)
|
||||
}
|
||||
expectedPeers := len(conns) - 1 + e2e.TestParams.LighthouseBeaconNodeCount
|
||||
if expectedPeers != len(peersResp.Peers) {
|
||||
return fmt.Errorf("unexpected amount of peers, expected %d, received %d", expectedPeers, len(peersResp.Peers))
|
||||
peerIDs := make([]string, 0, len(peersResp.Peers))
|
||||
for _, p := range peersResp.Peers {
|
||||
peerIDs = append(peerIDs, p.Address[len(p.Address)-10:])
|
||||
}
|
||||
return fmt.Errorf("unexpected amount of peers after %v timeout, expected %d, received %d (connected to: %v)", timeout, expectedPeers, len(peersResp.Peers), peerIDs)
|
||||
}
|
||||
|
||||
time.Sleep(connTimeDelay)
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -38,8 +38,8 @@ func TestEndToEnd_MinimalConfig(t *testing.T) {
|
||||
r := e2eMinimal(t, cfg,
|
||||
types.WithCheckpointSync(),
|
||||
types.WithEpochs(10),
|
||||
types.WithExitEpoch(4), // Minimum due to ShardCommitteePeriod=4
|
||||
types.WithLargeBlobs(), // Use large blob transactions for BPO testing
|
||||
types.WithExitEpoch(4), // Minimum due to ShardCommitteePeriod=4
|
||||
types.WithLargeBlobs(), // Use large blob transactions for BPO testing
|
||||
)
|
||||
r.run()
|
||||
}
|
||||
}
|
||||
|
||||
16
testing/endtoend/minimal_hdiff_e2e_test.go
Normal file
16
testing/endtoend/minimal_hdiff_e2e_test.go
Normal file
@@ -0,0 +1,16 @@
|
||||
package endtoend
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v7/config/params"
|
||||
"github.com/OffchainLabs/prysm/v7/runtime/version"
|
||||
"github.com/OffchainLabs/prysm/v7/testing/endtoend/types"
|
||||
)
|
||||
|
||||
func TestEndToEnd_MinimalConfig_WithStateDiff(t *testing.T) {
|
||||
r := e2eMinimal(t, types.InitForkCfg(version.Bellatrix, version.Electra, params.E2ETestConfig()),
|
||||
types.WithStateDiff(),
|
||||
)
|
||||
r.run()
|
||||
}
|
||||
@@ -76,6 +76,15 @@ func WithSSZOnly() E2EConfigOpt {
|
||||
}
|
||||
}
|
||||
|
||||
func WithStateDiff() E2EConfigOpt {
|
||||
return func(cfg *E2EConfig) {
|
||||
cfg.BeaconFlags = append(cfg.BeaconFlags,
|
||||
"--enable-state-diff",
|
||||
"--state-diff-exponents=6,5", // Small exponents for quick testing
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// WithExitEpoch sets a custom epoch for voluntary exit submission.
|
||||
// This affects ProposeVoluntaryExit, ValidatorsHaveExited, SubmitWithdrawal, and ValidatorsHaveWithdrawn evaluators.
|
||||
func WithExitEpoch(e primitives.Epoch) E2EConfigOpt {
|
||||
@@ -108,6 +117,7 @@ type E2EConfig struct {
|
||||
BeaconFlags []string
|
||||
ValidatorFlags []string
|
||||
PeerIDs []string
|
||||
PeerMultiAddrs []string
|
||||
ExtraEpochs uint64
|
||||
}
|
||||
|
||||
@@ -213,6 +223,8 @@ type MultipleComponentRunners interface {
|
||||
ResumeAtIndex(i int) error
|
||||
// StopAtIndex stops the grouped component element at the desired index.
|
||||
StopAtIndex(i int) error
|
||||
// RestartAtIndex restarts the grouped component element at the desired index.
|
||||
RestartAtIndex(ctx context.Context, i int) error
|
||||
}
|
||||
|
||||
type EngineProxy interface {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
diff -urN a/BUILD.bazel b/BUILD.bazel
|
||||
--- a/BUILD.bazel 1969-12-31 18:00:00.000000000 -0600
|
||||
+++ b/BUILD.bazel 2025-01-05 12:00:00.000000000 -0600
|
||||
@@ -0,0 +1,89 @@
|
||||
@@ -0,0 +1,90 @@
|
||||
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||
+
|
||||
+go_library(
|
||||
@@ -32,6 +32,7 @@ diff -urN a/BUILD.bazel b/BUILD.bazel
|
||||
+ ],
|
||||
+ "@io_bazel_rules_go//go/platform:darwin_amd64": [
|
||||
+ "bindings_darwin_amd64.go",
|
||||
+ "wrapper_darwin_amd64.s",
|
||||
+ ],
|
||||
+ "//conditions:default": [],
|
||||
+ }),
|
||||
|
||||
@@ -66,7 +66,7 @@ func (v *validator) SubmitAggregateAndProof(ctx context.Context, slot primitives
|
||||
// As specified in spec, an aggregator should wait until two thirds of the way through slot
|
||||
// to broadcast the best aggregate to the global aggregate channel.
|
||||
// https://github.com/ethereum/consensus-specs/blob/v0.9.3/specs/validator/0_beacon-chain-validator.md#broadcast-aggregate
|
||||
v.waitToSlotTwoThirds(ctx, slot)
|
||||
v.waitUntilAggregateDue(ctx, slot)
|
||||
|
||||
// In a DV setup, selection proofs need to be agreed upon by the DV.
|
||||
// Checking for selection proofs at slot 0 of the epoch will result in an error, as the call to the DV executes slower than the start of this function.
|
||||
@@ -203,11 +203,18 @@ func (v *validator) signSlotWithSelectionProof(ctx context.Context, pubKey [fiel
|
||||
return sig.Marshal(), nil
|
||||
}
|
||||
|
||||
// waitToSlotTwoThirds waits until two third through the current slot period
|
||||
// such that any attestations from this slot have time to reach the beacon node
|
||||
// before creating the aggregated attestation.
|
||||
func (v *validator) waitToSlotTwoThirds(ctx context.Context, slot primitives.Slot) {
|
||||
v.waitUntilSlotComponent(ctx, slot, params.BeaconConfig().AggregateDueBPS)
|
||||
// waitUntilAggregateDue waits until the configured aggregation due time within the current slot
|
||||
// such that any attestations from this slot have time to reach the beacon node before creating
|
||||
// the aggregated attestation.
|
||||
//
|
||||
// Note: Historically this was ~2/3 of the slot, but may differ across forks (e.g. Gloas).
|
||||
func (v *validator) waitUntilAggregateDue(ctx context.Context, slot primitives.Slot) {
|
||||
cfg := params.BeaconConfig()
|
||||
component := cfg.AggregateDueBPS
|
||||
if slots.ToEpoch(slot) >= cfg.GloasForkEpoch {
|
||||
component = cfg.AggregateDueBPSGloas
|
||||
}
|
||||
v.waitUntilSlotComponent(ctx, slot, component)
|
||||
}
|
||||
|
||||
// This returns the signature of validator signing over aggregate and
|
||||
|
||||
@@ -260,7 +260,7 @@ func TestWaitForSlotTwoThird_WaitCorrectly(t *testing.T) {
|
||||
timeToSleep := params.BeaconConfig().SlotComponentDuration(params.BeaconConfig().AggregateDueBPS)
|
||||
|
||||
twoThirdTime := currentTime.Add(timeToSleep)
|
||||
validator.waitToSlotTwoThirds(t.Context(), numOfSlots)
|
||||
validator.waitUntilAggregateDue(t.Context(), numOfSlots)
|
||||
currentTime = time.Now()
|
||||
assert.Equal(t, twoThirdTime.Unix(), currentTime.Unix())
|
||||
})
|
||||
@@ -280,7 +280,7 @@ func TestWaitForSlotTwoThird_DoneContext_ReturnsImmediately(t *testing.T) {
|
||||
expectedTime := time.Now()
|
||||
ctx, cancel := context.WithCancel(t.Context())
|
||||
cancel()
|
||||
validator.waitToSlotTwoThirds(ctx, numOfSlots)
|
||||
validator.waitUntilAggregateDue(ctx, numOfSlots)
|
||||
currentTime = time.Now()
|
||||
assert.Equal(t, expectedTime.Unix(), currentTime.Unix())
|
||||
})
|
||||
|
||||
@@ -37,7 +37,7 @@ func (v *validator) SubmitAttestation(ctx context.Context, slot primitives.Slot,
|
||||
defer span.End()
|
||||
span.SetAttributes(trace.StringAttribute("validator", fmt.Sprintf("%#x", pubKey)))
|
||||
|
||||
v.waitOneThirdOrValidBlock(ctx, slot)
|
||||
v.waitUntilAttestationDueOrValidBlock(ctx, slot)
|
||||
|
||||
var b strings.Builder
|
||||
if err := b.WriteByte(byte(iface.RoleAttester)); err != nil {
|
||||
@@ -259,12 +259,12 @@ func (v *validator) setHighestSlot(slot primitives.Slot) {
|
||||
}
|
||||
}
|
||||
|
||||
// waitOneThirdOrValidBlock waits until (a) or (b) whichever comes first:
|
||||
// waitUntilAttestationDueOrValidBlock waits until (a) or (b) whichever comes first:
|
||||
//
|
||||
// (a) the validator has received a valid block that is the same slot as input slot
|
||||
// (b) one-third of the slot has transpired (SECONDS_PER_SLOT / 3 seconds after the start of slot)
|
||||
func (v *validator) waitOneThirdOrValidBlock(ctx context.Context, slot primitives.Slot) {
|
||||
ctx, span := trace.StartSpan(ctx, "validator.waitOneThirdOrValidBlock")
|
||||
// (b) the configured attestation due time has transpired (as basis points of the slot duration)
|
||||
func (v *validator) waitUntilAttestationDueOrValidBlock(ctx context.Context, slot primitives.Slot) {
|
||||
ctx, span := trace.StartSpan(ctx, "validator.waitUntilAttestationDueOrValidBlock")
|
||||
defer span.End()
|
||||
|
||||
// Don't need to wait if requested slot is the same as highest valid slot.
|
||||
@@ -272,7 +272,12 @@ func (v *validator) waitOneThirdOrValidBlock(ctx context.Context, slot primitive
|
||||
return
|
||||
}
|
||||
|
||||
finalTime, err := v.slotComponentDeadline(slot, params.BeaconConfig().AttestationDueBPS)
|
||||
cfg := params.BeaconConfig()
|
||||
component := cfg.AttestationDueBPS
|
||||
if slots.ToEpoch(slot) >= cfg.GloasForkEpoch {
|
||||
component = cfg.AttestationDueBPSGloas
|
||||
}
|
||||
finalTime, err := v.slotComponentDeadline(slot, component)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("slot", slot).Error("Slot overflows, unable to wait for attestation deadline")
|
||||
return
|
||||
|
||||
@@ -706,7 +706,7 @@ func TestServer_WaitToSlotOneThird_CanWait(t *testing.T) {
|
||||
|
||||
timeToSleep := params.BeaconConfig().SecondsPerSlot / 3
|
||||
oneThird := currentTime.Add(time.Duration(timeToSleep) * time.Second)
|
||||
v.waitOneThirdOrValidBlock(t.Context(), currentSlot)
|
||||
v.waitUntilAttestationDueOrValidBlock(t.Context(), currentSlot)
|
||||
|
||||
if oneThird.Sub(time.Now()) > 10*time.Millisecond { // Allow for small diff due to execution time.
|
||||
t.Errorf("Wanted %s time for slot one third but got %s", oneThird, currentTime)
|
||||
@@ -724,7 +724,7 @@ func TestServer_WaitToSlotOneThird_SameReqSlot(t *testing.T) {
|
||||
highestValidSlot: currentSlot,
|
||||
}
|
||||
|
||||
v.waitOneThirdOrValidBlock(t.Context(), currentSlot)
|
||||
v.waitUntilAttestationDueOrValidBlock(t.Context(), currentSlot)
|
||||
|
||||
if currentTime.Sub(time.Now()) > 10*time.Millisecond { // Allow for small diff due to execution time.
|
||||
t.Errorf("Wanted %s time for slot one third but got %s", time.Now(), currentTime)
|
||||
@@ -750,7 +750,7 @@ func TestServer_WaitToSlotOneThird_ReceiveBlockSlot(t *testing.T) {
|
||||
v.slotFeed.Send(currentSlot)
|
||||
})
|
||||
|
||||
v.waitOneThirdOrValidBlock(t.Context(), currentSlot)
|
||||
v.waitUntilAttestationDueOrValidBlock(t.Context(), currentSlot)
|
||||
|
||||
if currentTime.Sub(time.Now()) > 10*time.Millisecond { // Allow for small diff due to execution time.
|
||||
t.Errorf("Wanted %s time for slot one third but got %s", time.Now(), currentTime)
|
||||
|
||||
@@ -29,7 +29,7 @@ func (v *validator) SubmitSyncCommitteeMessage(ctx context.Context, slot primiti
|
||||
defer span.End()
|
||||
span.SetAttributes(trace.StringAttribute("validator", fmt.Sprintf("%#x", pubKey)))
|
||||
|
||||
v.waitOneThirdOrValidBlock(ctx, slot)
|
||||
v.waitUntilAttestationDueOrValidBlock(ctx, slot)
|
||||
|
||||
res, err := v.validatorClient.SyncMessageBlockRoot(ctx, &emptypb.Empty{})
|
||||
if err != nil {
|
||||
@@ -127,7 +127,12 @@ func (v *validator) SubmitSignedContributionAndProof(ctx context.Context, slot p
|
||||
return
|
||||
}
|
||||
|
||||
v.waitUntilSlotComponent(ctx, slot, params.BeaconConfig().ContributionDueBPS)
|
||||
cfg := params.BeaconConfig()
|
||||
component := cfg.ContributionDueBPS
|
||||
if slots.ToEpoch(slot) >= cfg.GloasForkEpoch {
|
||||
component = cfg.ContributionDueBPSGloas
|
||||
}
|
||||
v.waitUntilSlotComponent(ctx, slot, component)
|
||||
|
||||
coveredSubnets := make(map[uint64]bool)
|
||||
for i, comIdx := range indexRes.Indices {
|
||||
|
||||
@@ -51,12 +51,20 @@ func (v *validator) slotComponentSpanName(component primitives.BP) string {
|
||||
switch component {
|
||||
case cfg.AttestationDueBPS:
|
||||
return "validator.waitAttestationWindow"
|
||||
case cfg.AttestationDueBPSGloas:
|
||||
return "validator.waitAttestationWindow"
|
||||
case cfg.AggregateDueBPS:
|
||||
return "validator.waitAggregateWindow"
|
||||
case cfg.AggregateDueBPSGloas:
|
||||
return "validator.waitAggregateWindow"
|
||||
case cfg.SyncMessageDueBPS:
|
||||
return "validator.waitSyncMessageWindow"
|
||||
case cfg.SyncMessageDueBPSGloas:
|
||||
return "validator.waitSyncMessageWindow"
|
||||
case cfg.ContributionDueBPS:
|
||||
return "validator.waitContributionWindow"
|
||||
case cfg.ContributionDueBPSGloas:
|
||||
return "validator.waitContributionWindow"
|
||||
case cfg.ProposerReorgCutoffBPS:
|
||||
return "validator.waitProposerReorgWindow"
|
||||
default:
|
||||
|
||||
Reference in New Issue
Block a user