Compare commits

...

9 Commits

Author SHA1 Message Date
nisdas
ebcc2aeaf7 Eliminate per-validator heap allocations in epoch processing
Replace ReadOnlyValidator interface boxing with direct CompactValidator field
access throughout epoch processing, removing ~23% of all heap allocations
(12.95 GB cumulative per profiling session). Key changes:

- Add ForEachValidator and ApplyToEveryCompactValidator for zero-alloc
  validator iteration (read-only and read-write paths respectively)
- Add EffectiveBalanceAtIndex for direct effective balance reads without
  interface boxing
- Rewrite ProcessEffectiveBalanceUpdates (pre-Electra and Electra+) to
  use ApplyToEveryCompactValidator with two-pass collect-then-mutate
- Rewrite BaseRewardWithTotalBalance to use EffectiveBalanceAtIndex
- Convert InitializePrecomputeValidators, ProcessSlashingsPrecompute,
  ProcessRegistryUpdates and related helpers to use ForEachValidator

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-18 13:32:08 +01:00
Manu NALEPA
39b027c6c2 CoW merkleLayers via ref-counted sharedMerkleLayers
Replace deep-copy triple-nested loop in Copy() with ref-counted
sharedMerkleLayers. The layers are materialized only when
recomputeDirtyFields actually mutates, saving significant allocation
overhead during state copies.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-18 13:26:45 +01:00
Manu NALEPA
76d0b5706e Add changelog 2026-03-17 22:52:17 +01:00
Manu NALEPA
7df70bdffe getVerifyingState: Check in cache before replaying state. 2026-03-17 22:48:33 +01:00
Manu NALEPA
33143139d4 getVerifyingState: optimize state replay for concurrent data column verification. 2026-03-17 22:23:28 +01:00
Manu NALEPA
14d801a4da Add useful logs 2026-03-17 22:23:28 +01:00
Manu NALEPA
be4850ebba Add changelog 2026-03-17 17:24:55 +01:00
Manu NALEPA
cf75e1bc6b StateByRootIfCachedNoCopy: Check also in the epoch boundary state cache. 2026-03-17 17:24:55 +01:00
Manu NALEPA
47d4c915d3 Use state.ReadOnlyBeaconState instead of state.BeaconState when possible.
It avoids useless state copies.
2026-03-17 17:24:52 +01:00
59 changed files with 691 additions and 180 deletions

View File

@@ -2,6 +2,7 @@ package blockchain
import (
"context"
"fmt"
"time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
@@ -57,7 +58,7 @@ func (s *Service) OnAttestation(ctx context.Context, a ethpb.Att, disparity time
// save it to the cache.
baseState, err := s.getAttPreState(ctx, tgt)
if err != nil {
return err
return fmt.Errorf("get att pre state for att root %s: %w", fmt.Sprintf("%#x", a.GetData().BeaconBlockRoot), err)
}
// Verify attestation target is from current epoch or previous epoch.

View File

@@ -49,7 +49,13 @@ func (s *Service) AttestationTargetState(ctx context.Context, target *ethpb.Chec
// We acquire the lock here instead than on gettAttPreState because that function gets called from UpdateHead that holds a write lock
s.cfg.ForkChoiceStore.RLock()
defer s.cfg.ForkChoiceStore.RUnlock()
return s.getAttPreState(ctx, target)
preState, err := s.getAttPreState(ctx, target)
if err != nil {
return nil, fmt.Errorf("get att pre state: %w", err)
}
return preState, nil
}
// VerifyLmdFfgConsistency verifies that attestation's LMD and FFG votes are consistency to each other.

View File

@@ -172,7 +172,7 @@ func (s *SyncCommitteeCache) idxPositionInCommittee(
// UpdatePositionsInCommittee updates caching of validators position in sync committee in respect to
// current epoch and next epoch. This should be called when `current_sync_committee` and `next_sync_committee`
// change and that happens every `EPOCHS_PER_SYNC_COMMITTEE_PERIOD`.
func (s *SyncCommitteeCache) UpdatePositionsInCommittee(syncCommitteeBoundaryRoot [32]byte, st state.BeaconState) error {
func (s *SyncCommitteeCache) UpdatePositionsInCommittee(syncCommitteeBoundaryRoot [32]byte, st state.ReadOnlyBeaconState) error {
// since we call UpdatePositionsInCommittee asynchronously, keep track of the cache value
// seen at the beginning of the routine and compare at the end before updating. If the underlying value has been
// cycled (new address), don't update it.

View File

@@ -32,7 +32,7 @@ func (s *FakeSyncCommitteeCache) NextPeriodIndexPosition(root [32]byte, valIdx p
}
// UpdatePositionsInCommittee -- fake.
func (s *FakeSyncCommitteeCache) UpdatePositionsInCommittee(syncCommitteeBoundaryRoot [32]byte, state state.BeaconState) error {
func (s *FakeSyncCommitteeCache) UpdatePositionsInCommittee(syncCommitteeBoundaryRoot [32]byte, state state.ReadOnlyBeaconState) error {
return nil
}

View File

@@ -27,6 +27,7 @@ go_library(
"//beacon-chain/p2p/types:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/state-native:go_default_library",
"//beacon-chain/state/stateutil:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",

View File

@@ -7,6 +7,7 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/stateutil"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/math"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
@@ -41,26 +42,26 @@ func InitializePrecomputeValidators(ctx context.Context, beaconState state.Beaco
if beaconState.NumValidators() != len(inactivityScores) {
return nil, nil, errors.New("num of validators is different than num of inactivity scores")
}
if err := beaconState.ReadFromEveryValidator(func(idx int, val state.ReadOnlyValidator) error {
if err := beaconState.ForEachValidator(func(idx int, val *stateutil.CompactValidator) error {
// Set validator's balance, inactivity score and slashed/withdrawable status.
v := &precompute.Validator{
CurrentEpochEffectiveBalance: val.EffectiveBalance(),
CurrentEpochEffectiveBalance: val.EffectiveBalance,
InactivityScore: inactivityScores[idx],
IsSlashed: val.Slashed(),
IsWithdrawableCurrentEpoch: currentEpoch >= val.WithdrawableEpoch(),
IsSlashed: val.Slashed,
IsWithdrawableCurrentEpoch: currentEpoch >= val.WithdrawableEpoch,
}
// Set validator's active status for current epoch.
if helpers.IsActiveValidatorUsingTrie(val, currentEpoch) {
if helpers.IsActiveCompactValidator(val, currentEpoch) {
v.IsActiveCurrentEpoch = true
bal.ActiveCurrentEpoch, err = math.Add64(bal.ActiveCurrentEpoch, val.EffectiveBalance())
bal.ActiveCurrentEpoch, err = math.Add64(bal.ActiveCurrentEpoch, val.EffectiveBalance)
if err != nil {
return err
}
}
// Set validator's active status for previous epoch.
if helpers.IsActiveValidatorUsingTrie(val, prevEpoch) {
if helpers.IsActiveCompactValidator(val, prevEpoch) {
v.IsActivePrevEpoch = true
bal.ActivePrevEpoch, err = math.Add64(bal.ActivePrevEpoch, val.EffectiveBalance())
bal.ActivePrevEpoch, err = math.Add64(bal.ActivePrevEpoch, val.EffectiveBalance)
if err != nil {
return err
}

View File

@@ -34,12 +34,12 @@ func BaseReward(s state.ReadOnlyBeaconState, index primitives.ValidatorIndex) (u
// BaseRewardWithTotalBalance calculates the base reward with the provided total balance.
func BaseRewardWithTotalBalance(s state.ReadOnlyBeaconState, index primitives.ValidatorIndex, totalBalance uint64) (uint64, error) {
val, err := s.ValidatorAtIndexReadOnly(index)
effBal, err := s.EffectiveBalanceAtIndex(index)
if err != nil {
return 0, err
}
cfg := params.BeaconConfig()
increments := val.EffectiveBalance() / cfg.EffectiveBalanceIncrement
increments := effBal / cfg.EffectiveBalanceIncrement
baseRewardPerInc, err := BaseRewardPerIncrement(totalBalance)
if err != nil {
return 0, err

View File

@@ -28,6 +28,7 @@ go_library(
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/state-native:go_default_library",
"//beacon-chain/state/state-native/custom-types:go_default_library",
"//beacon-chain/state/stateutil:go_default_library",
"//config/params:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",

View File

@@ -4,8 +4,8 @@ import (
"fmt"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/stateutil"
"github.com/OffchainLabs/prysm/v7/config/params"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
)
// ProcessEffectiveBalanceUpdates processes effective balance updates during epoch processing.
@@ -30,32 +30,34 @@ import (
// ):
// validator.effective_balance = min(balance - balance % EFFECTIVE_BALANCE_INCREMENT, EFFECTIVE_BALANCE_LIMIT)
func ProcessEffectiveBalanceUpdates(st state.BeaconState) error {
effBalanceInc := params.BeaconConfig().EffectiveBalanceIncrement
hysteresisInc := effBalanceInc / params.BeaconConfig().HysteresisQuotient
downwardThreshold := hysteresisInc * params.BeaconConfig().HysteresisDownwardMultiplier
upwardThreshold := hysteresisInc * params.BeaconConfig().HysteresisUpwardMultiplier
cfg := params.BeaconConfig()
effBalanceInc := cfg.EffectiveBalanceIncrement
hysteresisInc := effBalanceInc / cfg.HysteresisQuotient
downwardThreshold := hysteresisInc * cfg.HysteresisDownwardMultiplier
upwardThreshold := hysteresisInc * cfg.HysteresisUpwardMultiplier
minActivationBalance := cfg.MinActivationBalance
maxEffBalanceElectra := cfg.MaxEffectiveBalanceElectra
compoundingPrefix := cfg.CompoundingWithdrawalPrefixByte
bals := st.Balances()
// Update effective balances with hysteresis.
validatorFunc := func(idx int, val state.ReadOnlyValidator) (newVal *ethpb.Validator, err error) {
return st.ApplyToEveryCompactValidator(func(idx int, val *stateutil.CompactValidator) (stateutil.CompactValidator, bool, error) {
if idx >= len(bals) {
return nil, fmt.Errorf("validator index exceeds validator length in state %d >= %d", idx, len(st.Balances()))
return stateutil.CompactValidator{}, false, fmt.Errorf("validator index exceeds validator length in state %d >= %d", idx, len(bals))
}
balance := bals[idx]
effectiveBalanceLimit := params.BeaconConfig().MinActivationBalance
if val.HasCompoundingWithdrawalCredentials() {
effectiveBalanceLimit = params.BeaconConfig().MaxEffectiveBalanceElectra
effectiveBalanceLimit := minActivationBalance
if val.WithdrawalCredentials[0] == compoundingPrefix {
effectiveBalanceLimit = maxEffBalanceElectra
}
if balance+downwardThreshold < val.EffectiveBalance() || val.EffectiveBalance()+upwardThreshold < balance {
if balance+downwardThreshold < val.EffectiveBalance || val.EffectiveBalance+upwardThreshold < balance {
effectiveBal := min(balance-balance%effBalanceInc, effectiveBalanceLimit)
newVal = val.Copy()
newVal.EffectiveBalance = effectiveBal
updated := *val
updated.EffectiveBalance = effectiveBal
return updated, true, nil
}
return newVal, nil
}
return st.ApplyToEveryValidator(validatorFunc)
return stateutil.CompactValidator{}, false, nil
})
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/validators"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/stateutil"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
)
@@ -48,19 +49,20 @@ func ProcessRegistryUpdates(ctx context.Context, st state.BeaconState) error {
eligibleForEjection := make([]primitives.ValidatorIndex, 0)
eligibleForActivation := make([]primitives.ValidatorIndex, 0)
if err := st.ReadFromEveryValidator(func(idx int, val state.ReadOnlyValidator) error {
finalizedEpoch := st.FinalizedCheckpointEpoch()
if err := st.ForEachValidator(func(idx int, val *stateutil.CompactValidator) error {
// Collect validators eligible to enter the activation queue.
if helpers.IsEligibleForActivationQueue(val, currentEpoch) {
if helpers.IsEligibleForActivationQueueCompact(val, currentEpoch) {
eligibleForActivationQ = append(eligibleForActivationQ, primitives.ValidatorIndex(idx))
}
// Collect validators to eject.
if val.EffectiveBalance() <= ejectionBal && helpers.IsActiveValidatorUsingTrie(val, currentEpoch) {
if val.EffectiveBalance <= ejectionBal && helpers.IsActiveCompactValidator(val, currentEpoch) {
eligibleForEjection = append(eligibleForEjection, primitives.ValidatorIndex(idx))
}
// Collect validators eligible for activation and not yet dequeued for activation.
if helpers.IsEligibleForActivationUsingROVal(st, val) {
if helpers.IsEligibleForActivationCompact(val, finalizedEpoch) {
eligibleForActivation = append(eligibleForActivation, primitives.ValidatorIndex(idx))
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
state_native "github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/stateutil"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
enginev1 "github.com/OffchainLabs/prysm/v7/proto/engine/v1"
@@ -258,14 +259,14 @@ func UpgradeToElectra(beaconState state.BeaconState) (state.BeaconState, error)
earliestExitEpoch := helpers.ActivationExitEpoch(time.CurrentEpoch(beaconState))
preActivationIndices := make([]primitives.ValidatorIndex, 0)
compoundWithdrawalIndices := make([]primitives.ValidatorIndex, 0)
if err = beaconState.ReadFromEveryValidator(func(index int, val state.ReadOnlyValidator) error {
if val.ExitEpoch() != params.BeaconConfig().FarFutureEpoch && val.ExitEpoch() > earliestExitEpoch {
earliestExitEpoch = val.ExitEpoch()
if err = beaconState.ForEachValidator(func(index int, val *stateutil.CompactValidator) error {
if val.ExitEpoch != params.BeaconConfig().FarFutureEpoch && val.ExitEpoch > earliestExitEpoch {
earliestExitEpoch = val.ExitEpoch
}
if val.ActivationEpoch() == params.BeaconConfig().FarFutureEpoch {
if val.ActivationEpoch == params.BeaconConfig().FarFutureEpoch {
preActivationIndices = append(preActivationIndices, primitives.ValidatorIndex(index))
}
if val.HasCompoundingWithdrawalCredentials() {
if val.WithdrawalCredentials[0] == params.BeaconConfig().CompoundingWithdrawalPrefixByte {
compoundWithdrawalIndices = append(compoundWithdrawalIndices, primitives.ValidatorIndex(index))
}
return nil

View File

@@ -59,21 +59,22 @@ func ProcessRegistryUpdates(ctx context.Context, st state.BeaconState) (state.Be
eligibleForActivation := make([]primitives.ValidatorIndex, 0)
eligibleForEjection := make([]primitives.ValidatorIndex, 0)
if err := st.ReadFromEveryValidator(func(idx int, val state.ReadOnlyValidator) error {
finalizedEpoch := st.FinalizedCheckpointEpoch()
if err := st.ForEachValidator(func(idx int, val *stateutil.CompactValidator) error {
// Collect validators eligible to enter the activation queue.
if helpers.IsEligibleForActivationQueue(val, currentEpoch) {
if helpers.IsEligibleForActivationQueueCompact(val, currentEpoch) {
eligibleForActivationQ = append(eligibleForActivationQ, primitives.ValidatorIndex(idx))
}
// Collect validators to eject.
isActive := helpers.IsActiveValidatorUsingTrie(val, currentEpoch)
belowEjectionBalance := val.EffectiveBalance() <= ejectionBal
isActive := helpers.IsActiveCompactValidator(val, currentEpoch)
belowEjectionBalance := val.EffectiveBalance <= ejectionBal
if isActive && belowEjectionBalance {
eligibleForEjection = append(eligibleForEjection, primitives.ValidatorIndex(idx))
}
// Collect validators eligible for activation and not yet dequeued for activation.
if helpers.IsEligibleForActivationUsingROVal(st, val) {
if helpers.IsEligibleForActivationCompact(val, finalizedEpoch) {
eligibleForActivation = append(eligibleForActivation, primitives.ValidatorIndex(idx))
}
@@ -243,15 +244,15 @@ func ProcessSlashings(st state.BeaconState) error {
bals := st.Balances()
changed := false
err = st.ReadFromEveryValidator(func(idx int, val state.ReadOnlyValidator) error {
correctEpoch := (currentEpoch + exitLength/2) == val.WithdrawableEpoch()
if val.Slashed() && correctEpoch {
err = st.ForEachValidator(func(idx int, val *stateutil.CompactValidator) error {
correctEpoch := (currentEpoch + exitLength/2) == val.WithdrawableEpoch
if val.Slashed && correctEpoch {
var penalty uint64
if st.Version() >= version.Electra {
effectiveBalanceIncrements := val.EffectiveBalance() / increment
effectiveBalanceIncrements := val.EffectiveBalance / increment
penalty = penaltyPerEffectiveBalanceIncrement * effectiveBalanceIncrements
} else {
penaltyNumerator := val.EffectiveBalance() / increment * minSlashing
penaltyNumerator := val.EffectiveBalance / increment * minSlashing
penalty = penaltyNumerator / totalBalance * increment
}
bals[idx] = helpers.DecreaseBalanceWithVal(bals[idx], penalty)
@@ -310,35 +311,31 @@ func ProcessEth1DataReset(state state.BeaconState) (state.BeaconState, error) {
// ):
// validator.effective_balance = min(balance - balance % EFFECTIVE_BALANCE_INCREMENT, MAX_EFFECTIVE_BALANCE)
func ProcessEffectiveBalanceUpdates(st state.BeaconState) (state.BeaconState, error) {
effBalanceInc := params.BeaconConfig().EffectiveBalanceIncrement
maxEffBalance := params.BeaconConfig().MaxEffectiveBalance
hysteresisInc := effBalanceInc / params.BeaconConfig().HysteresisQuotient
downwardThreshold := hysteresisInc * params.BeaconConfig().HysteresisDownwardMultiplier
upwardThreshold := hysteresisInc * params.BeaconConfig().HysteresisUpwardMultiplier
cfg := params.BeaconConfig()
effBalanceInc := cfg.EffectiveBalanceIncrement
maxEffBalance := cfg.MaxEffectiveBalance
hysteresisInc := effBalanceInc / cfg.HysteresisQuotient
downwardThreshold := hysteresisInc * cfg.HysteresisDownwardMultiplier
upwardThreshold := hysteresisInc * cfg.HysteresisUpwardMultiplier
bals := st.Balances()
// Update effective balances with hysteresis.
validatorFunc := func(idx int, val state.ReadOnlyValidator) (newVal *ethpb.Validator, err error) {
if val == nil {
return nil, fmt.Errorf("validator %d is nil in state", idx)
}
if err := st.ApplyToEveryCompactValidator(func(idx int, val *stateutil.CompactValidator) (stateutil.CompactValidator, bool, error) {
if idx >= len(bals) {
return nil, fmt.Errorf("validator index exceeds validator length in state %d >= %d", idx, len(st.Balances()))
return stateutil.CompactValidator{}, false, fmt.Errorf("validator index exceeds validator length in state %d >= %d", idx, len(bals))
}
balance := bals[idx]
if balance+downwardThreshold < val.EffectiveBalance() || val.EffectiveBalance()+upwardThreshold < balance {
if balance+downwardThreshold < val.EffectiveBalance || val.EffectiveBalance+upwardThreshold < balance {
effectiveBal := min(maxEffBalance, balance-balance%effBalanceInc)
if effectiveBal != val.EffectiveBalance() {
newVal = val.Copy()
newVal.EffectiveBalance = effectiveBal
if effectiveBal != val.EffectiveBalance {
updated := *val
updated.EffectiveBalance = effectiveBal
return updated, true, nil
}
}
return
}
if err := st.ApplyToEveryValidator(validatorFunc); err != nil {
return stateutil.CompactValidator{}, false, nil
}); err != nil {
return nil, err
}

View File

@@ -20,6 +20,7 @@ go_library(
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/time:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/stateutil:go_default_library",
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
"//math:go_default_library",

View File

@@ -9,6 +9,7 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/stateutil"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
"github.com/pkg/errors"
@@ -27,23 +28,23 @@ func New(ctx context.Context, s state.BeaconState) ([]*Validator, *Balance, erro
currentEpoch := time.CurrentEpoch(s)
prevEpoch := time.PrevEpoch(s)
if err := s.ReadFromEveryValidator(func(idx int, val state.ReadOnlyValidator) error {
if err := s.ForEachValidator(func(idx int, val *stateutil.CompactValidator) error {
// Was validator withdrawable or slashed
withdrawable := prevEpoch+1 >= val.WithdrawableEpoch()
withdrawable := prevEpoch+1 >= val.WithdrawableEpoch
pVal := &Validator{
IsSlashed: val.Slashed(),
IsSlashed: val.Slashed,
IsWithdrawableCurrentEpoch: withdrawable,
CurrentEpochEffectiveBalance: val.EffectiveBalance(),
CurrentEpochEffectiveBalance: val.EffectiveBalance,
}
// Was validator active current epoch
if helpers.IsActiveValidatorUsingTrie(val, currentEpoch) {
if helpers.IsActiveCompactValidator(val, currentEpoch) {
pVal.IsActiveCurrentEpoch = true
pBal.ActiveCurrentEpoch += val.EffectiveBalance()
pBal.ActiveCurrentEpoch += val.EffectiveBalance
}
// Was validator active previous epoch
if helpers.IsActiveValidatorUsingTrie(val, prevEpoch) {
if helpers.IsActiveCompactValidator(val, prevEpoch) {
pVal.IsActivePrevEpoch = true
pBal.ActivePrevEpoch += val.EffectiveBalance()
pBal.ActivePrevEpoch += val.EffectiveBalance
}
// Set inclusion slot and inclusion distance to be max, they will be compared and replaced
// with the lower values

View File

@@ -4,6 +4,7 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/stateutil"
"github.com/OffchainLabs/prysm/v7/config/params"
)
@@ -25,9 +26,9 @@ func ProcessSlashingsPrecompute(s state.BeaconState, pBal *Balance) error {
var hasSlashing bool
// Iterate through validator list in state, stop until a validator satisfies slashing condition of current epoch.
err := s.ReadFromEveryValidator(func(idx int, val state.ReadOnlyValidator) error {
correctEpoch := epochToWithdraw == val.WithdrawableEpoch()
if val.Slashed() && correctEpoch {
err := s.ForEachValidator(func(idx int, val *stateutil.CompactValidator) error {
correctEpoch := epochToWithdraw == val.WithdrawableEpoch
if val.Slashed && correctEpoch {
hasSlashing = true
}
return nil
@@ -42,16 +43,16 @@ func ProcessSlashingsPrecompute(s state.BeaconState, pBal *Balance) error {
increment := params.BeaconConfig().EffectiveBalanceIncrement
bals := s.Balances()
validatorFunc := func(idx int, val state.ReadOnlyValidator) error {
correctEpoch := epochToWithdraw == val.WithdrawableEpoch()
if val.Slashed() && correctEpoch {
penaltyNumerator := val.EffectiveBalance() / increment * minSlashing
validatorFunc := func(idx int, val *stateutil.CompactValidator) error {
correctEpoch := epochToWithdraw == val.WithdrawableEpoch
if val.Slashed && correctEpoch {
penaltyNumerator := val.EffectiveBalance / increment * minSlashing
penalty := penaltyNumerator / pBal.ActiveCurrentEpoch * increment
bals[idx] = helpers.DecreaseBalanceWithVal(bals[idx], penalty)
}
return nil
}
if err := s.ReadFromEveryValidator(validatorFunc); err != nil {
if err := s.ForEachValidator(validatorFunc); err != nil {
return err
}
return s.SetBalances(bals)

View File

@@ -29,6 +29,7 @@ go_library(
"//beacon-chain/core/time:go_default_library",
"//beacon-chain/forkchoice/types:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/stateutil:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",

View File

@@ -12,6 +12,7 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/time"
forkchoicetypes "github.com/OffchainLabs/prysm/v7/beacon-chain/forkchoice/types"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/stateutil"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
@@ -462,8 +463,8 @@ func ShuffledIndices(s state.ReadOnlyBeaconState, epoch primitives.Epoch) ([]pri
}
indices := make([]primitives.ValidatorIndex, 0, s.NumValidators())
if err := s.ReadFromEveryValidator(func(idx int, val state.ReadOnlyValidator) error {
if IsActiveValidatorUsingTrie(val, epoch) {
if err := s.ForEachValidator(func(idx int, val *stateutil.CompactValidator) error {
if IsActiveCompactValidator(val, epoch) {
indices = append(indices, primitives.ValidatorIndex(idx))
}
return nil

View File

@@ -5,6 +5,7 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/cache"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/stateutil"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
mathutil "github.com/OffchainLabs/prysm/v7/math"
@@ -69,9 +70,9 @@ func TotalActiveBalance(s state.ReadOnlyBeaconState) (uint64, error) {
total := uint64(0)
epoch := slots.ToEpoch(s.Slot())
if err := s.ReadFromEveryValidator(func(idx int, val state.ReadOnlyValidator) error {
if IsActiveValidatorUsingTrie(val, epoch) {
total += val.EffectiveBalance()
if err := s.ForEachValidator(func(idx int, val *stateutil.CompactValidator) error {
if IsActiveCompactValidator(val, epoch) {
total += val.EffectiveBalance
}
return nil
}); err != nil {

View File

@@ -121,7 +121,7 @@ func IsNextPeriodSyncCommittee(
// CurrentPeriodSyncSubcommitteeIndices returns the subcommittee indices of the
// current period sync committee for input validator.
func CurrentPeriodSyncSubcommitteeIndices(
st state.BeaconState, valIdx primitives.ValidatorIndex,
st state.ReadOnlyBeaconState, valIdx primitives.ValidatorIndex,
) ([]primitives.CommitteeIndex, error) {
root, err := SyncPeriodBoundaryRoot(st)
if err != nil {

View File

@@ -9,6 +9,7 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/time"
forkchoicetypes "github.com/OffchainLabs/prysm/v7/beacon-chain/forkchoice/types"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/stateutil"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
@@ -137,8 +138,8 @@ func ActiveValidatorIndices(ctx context.Context, s state.ReadOnlyBeaconState, ep
}()
var indices []primitives.ValidatorIndex
if err := s.ReadFromEveryValidator(func(idx int, val state.ReadOnlyValidator) error {
if IsActiveValidatorUsingTrie(val, epoch) {
if err := s.ForEachValidator(func(idx int, val *stateutil.CompactValidator) error {
if IsActiveCompactValidator(val, epoch) {
indices = append(indices, primitives.ValidatorIndex(idx))
}
return nil
@@ -190,8 +191,8 @@ func ActiveValidatorCount(ctx context.Context, s state.ReadOnlyBeaconState, epoc
}()
count := uint64(0)
if err := s.ReadFromEveryValidator(func(idx int, val state.ReadOnlyValidator) error {
if IsActiveValidatorUsingTrie(val, epoch) {
if err := s.ForEachValidator(func(idx int, val *stateutil.CompactValidator) error {
if IsActiveCompactValidator(val, epoch) {
count++
}
return nil
@@ -523,6 +524,32 @@ func isEligibleForActivation(activationEligibilityEpoch, activationEpoch, finali
activationEpoch == params.BeaconConfig().FarFutureEpoch
}
// --- CompactValidator direct-access helpers (zero-alloc) ---
// IsActiveCompactValidator checks if a compact validator is active at the given epoch.
func IsActiveCompactValidator(cv *stateutil.CompactValidator, epoch primitives.Epoch) bool {
return checkValidatorActiveStatus(cv.ActivationEpoch, cv.ExitEpoch, epoch)
}
// IsEligibleForActivationQueueCompact checks if a compact validator is eligible
// to be placed into the activation queue.
func IsEligibleForActivationQueueCompact(cv *stateutil.CompactValidator, currentEpoch primitives.Epoch) bool {
if currentEpoch >= params.BeaconConfig().ElectraForkEpoch {
return isEligibleForActivationQueueElectra(cv.ActivationEligibilityEpoch, cv.EffectiveBalance)
}
return isEligibleForActivationQueue(cv.ActivationEligibilityEpoch, cv.EffectiveBalance)
}
// IsEligibleForActivationCompact checks if a compact validator is eligible for activation.
func IsEligibleForActivationCompact(cv *stateutil.CompactValidator, finalizedEpoch primitives.Epoch) bool {
return isEligibleForActivation(cv.ActivationEligibilityEpoch, cv.ActivationEpoch, finalizedEpoch)
}
// IsActiveNonSlashedCompactValidator checks if a compact validator is active and not slashed.
func IsActiveNonSlashedCompactValidator(cv *stateutil.CompactValidator, epoch primitives.Epoch) bool {
return checkValidatorActiveStatus(cv.ActivationEpoch, cv.ExitEpoch, epoch) && !cv.Slashed
}
// LastActivatedValidatorIndex provides the last activated validator given a state
func LastActivatedValidatorIndex(ctx context.Context, st state.ReadOnlyBeaconState) (primitives.ValidatorIndex, error) {
_, span := trace.StartSpan(ctx, "helpers.LastActivatedValidatorIndex")

View File

@@ -16,6 +16,7 @@ go_library(
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/time:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/stateutil:go_default_library",
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
"//math:go_default_library",

View File

@@ -10,6 +10,7 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/stateutil"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/math"
@@ -38,8 +39,8 @@ func ExitInformation(s state.BeaconState) *ExitInfo {
currentEpoch := slots.ToEpoch(s.Slot())
totalActiveBalance := uint64(0)
err := s.ReadFromEveryValidator(func(idx int, val state.ReadOnlyValidator) error {
e := val.ExitEpoch()
err := s.ForEachValidator(func(idx int, val *stateutil.CompactValidator) error {
e := val.ExitEpoch
if e != farFutureEpoch {
if e > exitInfo.HighestExitEpoch {
exitInfo.HighestExitEpoch = e
@@ -50,8 +51,8 @@ func ExitInformation(s state.BeaconState) *ExitInfo {
}
// Calculate total active balance in the same loop
if helpers.IsActiveValidatorUsingTrie(val, currentEpoch) {
totalActiveBalance += val.EffectiveBalance()
if helpers.IsActiveCompactValidator(val, currentEpoch) {
totalActiveBalance += val.EffectiveBalance
}
return nil

View File

@@ -33,7 +33,7 @@ func (s *Service) canUpdateAttestedValidator(idx primitives.ValidatorIndex, slot
}
// attestingIndices returns the indices of validators that participated in the given aggregated attestation.
func attestingIndices(ctx context.Context, state state.BeaconState, att ethpb.Att) ([]uint64, error) {
func attestingIndices(ctx context.Context, state state.ReadOnlyBeaconState, att ethpb.Att) ([]uint64, error) {
committeeBits := att.CommitteeBitsVal().BitIndices()
committees := make([][]primitives.ValidatorIndex, len(committeeBits))
var err error
@@ -59,7 +59,7 @@ func logMessageTimelyFlagsForIndex(idx primitives.ValidatorIndex, data *ethpb.At
}
// processAttestations logs the event for the tracked validators' attestations inclusion in block
func (s *Service) processAttestations(ctx context.Context, state state.BeaconState, blk interfaces.ReadOnlyBeaconBlock) {
func (s *Service) processAttestations(ctx context.Context, state state.ReadOnlyBeaconState, blk interfaces.ReadOnlyBeaconBlock) {
if blk == nil || blk.Body() == nil {
return
}
@@ -69,7 +69,7 @@ func (s *Service) processAttestations(ctx context.Context, state state.BeaconSta
}
// processIncludedAttestation logs in the event for the tracked validators' and their latest attestation gets processed.
func (s *Service) processIncludedAttestation(ctx context.Context, state state.BeaconState, att ethpb.Att) {
func (s *Service) processIncludedAttestation(ctx context.Context, state state.ReadOnlyBeaconState, att ethpb.Att) {
attestingIndices, err := attestingIndices(ctx, state, att)
if err != nil {
log.WithError(err).Error("Could not get attesting indices")

View File

@@ -64,7 +64,7 @@ func (s *Service) processBlock(ctx context.Context, b interfaces.ReadOnlySignedB
}
// processProposedBlock logs when the beacon node observes a beacon block from a tracked validator.
func (s *Service) processProposedBlock(state state.BeaconState, root [32]byte, blk interfaces.ReadOnlyBeaconBlock) {
func (s *Service) processProposedBlock(state state.ReadOnlyBeaconState, root [32]byte, blk interfaces.ReadOnlyBeaconBlock) {
s.Lock()
defer s.Unlock()
if s.trackedIndex(blk.ProposerIndex()) {

View File

@@ -26,7 +26,7 @@ func (s *Service) processSyncCommitteeContribution(contribution *ethpb.SignedCon
}
// processSyncAggregate logs the event when tracked validators is a sync-committee member and its contribution has been included
func (s *Service) processSyncAggregate(state state.BeaconState, blk interfaces.ReadOnlyBeaconBlock) {
func (s *Service) processSyncAggregate(state state.ReadOnlyBeaconState, blk interfaces.ReadOnlyBeaconBlock) {
if blk == nil || blk.Body() == nil {
return
}

View File

@@ -277,7 +277,7 @@ func (s *Service) trackedIndex(idx primitives.ValidatorIndex) bool {
// updateSyncCommitteeTrackedVals updates the sync committee assignments of our
// tracked validators. It gets called when we sync a block after the Sync Period changes.
func (s *Service) updateSyncCommitteeTrackedVals(state state.BeaconState) {
func (s *Service) updateSyncCommitteeTrackedVals(state state.ReadOnlyBeaconState) {
s.Lock()
defer s.Unlock()
for idx := range s.TrackedValidators {

View File

@@ -166,7 +166,7 @@ func (s *Service) retrieveActiveValidators() (uint64, error) {
if err != nil {
return 0, err
}
bState, err := s.cfg.StateGen.StateByRoot(s.ctx, [32]byte(finalizedCheckpoint.Root))
bState, err := s.cfg.StateGen.StateByRootNoCopy(s.ctx, [32]byte(finalizedCheckpoint.Root))
if err != nil {
return 0, err
}

View File

@@ -742,38 +742,45 @@ func (s *Server) fillEventData(ctx context.Context, ev payloadattribute.EventDat
return ev, errors.New("head root is empty")
}
var err error
var st state.BeaconState
var rost state.ReadOnlyBeaconState
// If head is in the same block as the proposal slot, we can use the "read only" state cache.
pse := slots.ToEpoch(ev.ProposalSlot)
if slots.ToEpoch(ev.HeadBlock.Block().Slot()) == pse {
st = s.StateGen.StateByRootIfCachedNoCopy(ev.HeadRoot)
rost = s.StateGen.StateByRootIfCachedNoCopy(ev.HeadRoot)
}
// If st is nil, we couldn't get the state from the cache, or it isn't in the same epoch.
if st == nil || st.IsNil() {
st, err = s.StateGen.StateByRoot(ctx, ev.HeadRoot)
// If rost is nil, we couldn't get the state from the cache, or it isn't in the same epoch.
if rost == nil || rost.IsNil() {
st, err := s.StateGen.StateByRoot(ctx, ev.HeadRoot)
if err != nil {
return ev, errors.Wrap(err, "could not get head state")
}
// double check that we need to process_slots, just in case we got here via a hot state cache miss.
// Double check that we need to process_slots, just in case we got here via a hot state cache miss.
if slots.ToEpoch(st.Slot()) < pse {
start, err := slots.EpochStart(pse)
if err != nil {
return ev, errors.Wrap(err, "invalid state slot; could not compute epoch start")
}
st, err = transition.ProcessSlotsUsingNextSlotCache(ctx, st, ev.HeadRoot[:], start)
if err != nil {
return ev, errors.Wrap(err, "could not run process blocks on head state into the proposal slot epoch")
}
}
rost = st
}
ev.ProposerIndex, err = helpers.BeaconProposerIndexAtSlot(ctx, st, ev.ProposalSlot)
proposerIndex, err := helpers.BeaconProposerIndexAtSlot(ctx, rost, ev.ProposalSlot)
if err != nil {
return ev, errors.Wrap(err, "failed to compute proposer index")
}
randao, err := helpers.RandaoMix(st, pse)
ev.ProposerIndex = proposerIndex
randao, err := helpers.RandaoMix(rost, pse)
if err != nil {
return ev, errors.Wrap(err, "could not get head state randado")
}
@@ -785,11 +792,12 @@ func (s *Server) fillEventData(ctx context.Context, ev payloadattribute.EventDat
ev.ParentBlockHash = payload.BlockHash()
ev.ParentBlockNumber = payload.BlockNumber()
t, err := slots.StartTime(st.GenesisTime(), ev.ProposalSlot)
t, err := slots.StartTime(rost.GenesisTime(), ev.ProposalSlot)
if err != nil {
return ev, errors.Wrap(err, "could not get head state slot time")
}
ev.Attributer, err = s.computePayloadAttributes(ctx, st, ev.HeadRoot, ev.ProposerIndex, uint64(t.Unix()), randao)
ev.Attributer, err = s.computePayloadAttributes(ctx, rost, ev.HeadRoot, ev.ProposerIndex, uint64(t.Unix()), randao)
return ev, err
}

View File

@@ -433,7 +433,7 @@ func blockIndexedAttestations[T ethpb.IndexedAtt](
mappedAttestations := mapAttestationsByTargetRoot(attsArray)
indexed := make([]T, 0, numAttestations)
for targetRoot, atts := range mappedAttestations {
attState, err := stateGen.StateByRoot(ctx, targetRoot)
attState, err := stateGen.StateByRootNoCopy(ctx, targetRoot)
if err != nil && strings.Contains(err.Error(), "unknown state summary") {
// We shouldn't stop the request if we encounter an attestation we don't have the state for.
log.Debugf("Could not get state for attestation target root %#x", targetRoot)

View File

@@ -108,7 +108,7 @@ func (bs *Server) retrieveCommitteesForRoot(
ctx context.Context,
root []byte,
) (SlotToCommiteesMap, []primitives.ValidatorIndex, error) {
requestedState, err := bs.StateGen.StateByRoot(ctx, bytesutil.ToBytes32(root))
requestedState, err := bs.StateGen.StateByRootNoCopy(ctx, bytesutil.ToBytes32(root))
if err != nil {
return nil, nil, status.Error(codes.Internal, fmt.Sprintf("Could not get state: %v", err))
}

View File

@@ -73,7 +73,7 @@ func (ds *Server) GetInclusionSlot(ctx context.Context, req *pbrpc.InclusionSlot
tr := bytesutil.ToBytes32(a.GetData().Target.Root)
s, ok := targetStates[tr]
if !ok {
s, err = ds.StateGen.StateByRoot(ctx, tr)
s, err = ds.StateGen.StateByRootNoCopy(ctx, tr)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not retrieve state: %v", err)
}

View File

@@ -44,7 +44,7 @@ func (ds *Server) GetBeaconState(
Encoded: encoded,
}, nil
case *pbrpc.BeaconStateRequest_BlockRoot:
st, err := ds.StateGen.StateByRoot(ctx, bytesutil.ToBytes32(q.BlockRoot))
st, err := ds.StateGen.StateByRootNoCopy(ctx, bytesutil.ToBytes32(q.BlockRoot))
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not compute state by block root: %v", err)
}

View File

@@ -13,6 +13,7 @@ go_library(
deps = [
"//beacon-chain/state/state-native/custom-types:go_default_library",
"//beacon-chain/state/state-native/types:go_default_library",
"//beacon-chain/state/stateutil:go_default_library",
"//config/fieldparams:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",

View File

@@ -11,6 +11,7 @@ import (
"github.com/OffchainLabs/go-bitfield"
customtypes "github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native/custom-types"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native/types"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/stateutil"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
@@ -144,6 +145,8 @@ type ReadOnlyValidators interface {
AggregateKeyFromIndices(idxs []uint64) (bls.PublicKey, error)
NumValidators() int
ReadFromEveryValidator(f func(idx int, val ReadOnlyValidator) error) error
ForEachValidator(f func(idx int, val *stateutil.CompactValidator) error) error
EffectiveBalanceAtIndex(idx primitives.ValidatorIndex) (uint64, error)
}
// ReadOnlyBalances defines a struct which only has read access to balances methods.
@@ -279,6 +282,7 @@ type WriteOnlyEth1Data interface {
type WriteOnlyValidators interface {
SetValidators(val []*ethpb.Validator) error
ApplyToEveryValidator(f func(idx int, val ReadOnlyValidator) (*ethpb.Validator, error)) error
ApplyToEveryCompactValidator(f func(idx int, val *stateutil.CompactValidator) (stateutil.CompactValidator, bool, error)) error
UpdateValidatorAtIndex(idx primitives.ValidatorIndex, val *ethpb.Validator) error
AppendValidator(val *ethpb.Validator) error
}

View File

@@ -27,6 +27,7 @@ go_library(
"gloas.go",
"hasher.go",
"log.go",
"merkle_layers.go",
"metrics_gloas.go",
"multi_value_slices.go",
"proofs.go",

View File

@@ -87,7 +87,7 @@ type BeaconState struct {
stateFieldLeaves map[types.FieldIndex]*fieldtrie.FieldTrie
rebuildTrie map[types.FieldIndex]bool
valMapHandler *stateutil.ValidatorMapHandler
merkleLayers [][][]byte
merkleLayers *sharedMerkleLayers
sharedFieldReferences map[types.FieldIndex]*stateutil.Reference
}

View File

@@ -223,6 +223,44 @@ func (b *BeaconState) ReadFromEveryValidator(f func(idx int, val state.ReadOnlyV
return nil
}
// ForEachValidator iterates over every validator, passing a pointer to the
// underlying CompactValidator directly. This avoids interface boxing and heap
// allocation per validator that ReadFromEveryValidator incurs.
// The pointer is only valid for the duration of the callback; callers must not
// retain it.
//
// Internally this uses MultiValueSlice.ForEach which iterates in-place over
// the shared storage, resolving the sparse individual overrides without
// materializing a full copy of the validator slice. For ~1M validators this
// eliminates ~128 MB of allocation per call.
func (b *BeaconState) ForEachValidator(f func(idx int, val *stateutil.CompactValidator) error) error {
b.lock.RLock()
defer b.lock.RUnlock()
if b.validatorsMultiValue == nil {
return state.ErrNilValidatorsInState
}
return b.validatorsMultiValue.ForEach(b, func(idx int, val *stateutil.CompactValidator) error {
return f(idx, val)
})
}
// EffectiveBalanceAtIndex returns the effective balance of the validator at the
// given index without allocating a full validator object or boxing into an interface.
func (b *BeaconState) EffectiveBalanceAtIndex(idx primitives.ValidatorIndex) (uint64, error) {
b.lock.RLock()
defer b.lock.RUnlock()
if b.validatorsMultiValue == nil {
return 0, state.ErrNilValidatorsInState
}
cv, err := b.validatorsMultiValue.At(b, uint64(idx))
if err != nil {
return 0, err
}
return cv.EffectiveBalance, nil
}
// Balances of validators participating in consensus on the beacon chain.
func (b *BeaconState) Balances() []uint64 {
b.lock.RLock()

View File

@@ -0,0 +1,57 @@
package state_native
import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/stateutil"
)
// sharedMerkleLayers wraps the beacon state's top-level Merkle tree layers with
// reference counting so that Copy() can share them instead of deep-copying.
// All access is protected by the owning BeaconState's lock; this struct does
// not carry its own mutex.
type sharedMerkleLayers struct {
layers [][][]byte
ref *stateutil.Reference
}
// newSharedMerkleLayers wraps existing layers in a ref-counted container.
func newSharedMerkleLayers(layers [][][]byte) *sharedMerkleLayers {
return &sharedMerkleLayers{
layers: layers,
ref: stateutil.NewRef(1),
}
}
// copy increments the reference count and returns the same pointer, making
// BeaconState.Copy() O(1) for this field. The caller must call ensureUnique()
// before mutating the layers.
func (s *sharedMerkleLayers) copy() *sharedMerkleLayers {
s.ref.AddRef()
return s
}
// ensureUnique deep-copies the layers if this instance is shared (refs > 1)
// and returns the (possibly new) sharedMerkleLayers to use. The caller must
// replace its field with the returned value:
//
// b.merkleLayers = b.merkleLayers.ensureUnique()
func (s *sharedMerkleLayers) ensureUnique() *sharedMerkleLayers {
if s.ref.Refs() == 1 {
return s
}
// Shared — deep-copy and detach.
s.ref.MinusRef()
newLayers := make([][][]byte, len(s.layers))
for i, layer := range s.layers {
newLayers[i] = make([][]byte, len(layer))
for j, content := range layer {
newLayers[i][j] = make([]byte, len(content))
copy(newLayers[i][j], content)
}
}
return newSharedMerkleLayers(newLayers)
}
// release decrements the reference count. Called during finalizer cleanup.
func (s *sharedMerkleLayers) release() {
s.ref.MinusRef()
}

View File

@@ -93,7 +93,7 @@ func (b *BeaconState) proofByFieldIndex(ctx context.Context, f types.FieldIndex)
if err := b.recomputeDirtyFields(ctx); err != nil {
return nil, err
}
return trie.ProofFromMerkleLayers(b.merkleLayers, f.RealPosition()), nil
return trie.ProofFromMerkleLayers(b.merkleLayers.layers, f.RealPosition()), nil
}
func (b *BeaconState) validateFieldIndex(f types.FieldIndex) error {

View File

@@ -160,13 +160,13 @@ func (b *BeaconState) AppendHistoricalSummaries(summary *ethpb.HistoricalSummary
// hold the lock before calling this method.
func (b *BeaconState) recomputeRoot(idx int) {
hashFunc := hash.CustomSHA256Hasher()
layers := b.merkleLayers
layers := b.merkleLayers.layers
// The merkle tree structure looks as follows:
// [[r1, r2, r3, r4], [parent1, parent2], [root]]
// Using information about the index which changed, idx, we recompute
// only its branch up the tree.
currentIndex := idx
root := b.merkleLayers[0][idx]
root := layers[0][idx]
for i := 0; i < len(layers)-1; i++ {
isLeft := currentIndex%2 == 0
neighborIdx := currentIndex ^ 1
@@ -187,7 +187,6 @@ func (b *BeaconState) recomputeRoot(idx int) {
layers[i+1][parentIdx] = root
currentIndex = parentIdx
}
b.merkleLayers = layers
}
func (b *BeaconState) markFieldAsDirty(field types.FieldIndex) {

View File

@@ -62,6 +62,49 @@ func (b *BeaconState) ApplyToEveryValidator(f func(idx int, val state.ReadOnlyVa
return nil
}
// ApplyToEveryCompactValidator iterates all validators using zero-alloc ForEach,
// calling the provided function with a read pointer to each CompactValidator.
// If the function returns (newVal, true, nil), the validator at that index is
// updated to newVal. This avoids the proto round-trip and interface boxing that
// ApplyToEveryValidator incurs.
func (b *BeaconState) ApplyToEveryCompactValidator(f func(idx int, val *stateutil.CompactValidator) (stateutil.CompactValidator, bool, error)) error {
type mutation struct {
idx uint64
val stateutil.CompactValidator
}
var mutations []mutation
err := b.validatorsMultiValue.ForEach(b, func(idx int, val *stateutil.CompactValidator) error {
newVal, changed, fErr := f(idx, val)
if fErr != nil {
return fErr
}
if changed {
mutations = append(mutations, mutation{idx: uint64(idx), val: newVal})
}
return nil
})
if err != nil {
return err
}
if len(mutations) > 0 {
changedVals := make([]uint64, len(mutations))
for i, m := range mutations {
if err := b.validatorsMultiValue.UpdateAt(b, m.idx, m.val); err != nil {
return errors.Wrapf(err, "could not update validator at index %d", m.idx)
}
changedVals[i] = m.idx
}
b.lock.Lock()
defer b.lock.Unlock()
b.markFieldAsDirty(types.Validators)
b.addDirtyIndices(types.Validators, changedVals)
}
return nil
}
// UpdateValidatorAtIndex for the beacon state. Updates the validator
// at a specific index to a new value.
func (b *BeaconState) UpdateValidatorAtIndex(idx primitives.ValidatorIndex, val *ethpb.Validator) error {

View File

@@ -307,7 +307,7 @@ func TestBeaconState_AppendBalanceWithTrie(t *testing.T) {
}
_, err = st.HashTreeRoot(t.Context())
assert.NoError(t, err)
newRt := bytesutil.ToBytes32(st.merkleLayers[0][types.Balances])
newRt := bytesutil.ToBytes32(st.merkleLayers.layers[0][types.Balances])
wantedRt, err := stateutil.Uint64ListRootWithRegistryLimit(st.Balances())
assert.NoError(t, err)
assert.Equal(t, wantedRt, newRt, "state roots are unequal")

View File

@@ -1033,14 +1033,7 @@ func (b *BeaconState) Copy() state.BeaconState {
}
if b.merkleLayers != nil {
dst.merkleLayers = make([][][]byte, len(b.merkleLayers))
for i, layer := range b.merkleLayers {
dst.merkleLayers[i] = make([][]byte, len(layer))
for j, content := range layer {
dst.merkleLayers[i][j] = make([]byte, len(content))
copy(dst.merkleLayers[i][j], content)
}
}
dst.merkleLayers = b.merkleLayers.copy()
}
state.Count.Inc()
@@ -1063,14 +1056,14 @@ func (b *BeaconState) HashTreeRoot(ctx context.Context) ([32]byte, error) {
if err := b.recomputeDirtyFields(ctx); err != nil {
return [32]byte{}, err
}
return bytesutil.ToBytes32(b.merkleLayers[len(b.merkleLayers)-1][0]), nil
return bytesutil.ToBytes32(b.merkleLayers.layers[len(b.merkleLayers.layers)-1][0]), nil
}
// Initializes the Merkle layers for the beacon state if they are empty.
//
// WARNING: Caller must acquire the mutex before using.
func (b *BeaconState) initializeMerkleLayers(ctx context.Context) error {
if len(b.merkleLayers) > 0 {
if b.merkleLayers != nil && len(b.merkleLayers.layers) > 0 {
return nil
}
fieldRoots, err := ComputeFieldRootsWithHasher(ctx, b)
@@ -1078,7 +1071,7 @@ func (b *BeaconState) initializeMerkleLayers(ctx context.Context) error {
return err
}
layers := stateutil.Merkleize(fieldRoots)
b.merkleLayers = layers
b.merkleLayers = newSharedMerkleLayers(layers)
switch b.version {
case version.Phase0:
b.dirtyFields = make(map[types.FieldIndex]bool, params.BeaconConfig().BeaconStateFieldCount)
@@ -1107,13 +1100,16 @@ func (b *BeaconState) initializeMerkleLayers(ctx context.Context) error {
//
// WARNING: Caller must acquire the mutex before using.
func (b *BeaconState) recomputeDirtyFields(ctx context.Context) error {
if len(b.dirtyFields) > 0 {
b.merkleLayers = b.merkleLayers.ensureUnique()
}
for field := range b.dirtyFields {
root, err := b.rootSelector(ctx, field)
if err != nil {
return err
}
idx := field.RealPosition()
b.merkleLayers[0][idx] = root[:]
b.merkleLayers.layers[0][idx] = root[:]
b.recomputeRoot(idx)
delete(b.dirtyFields, field)
}
@@ -1534,6 +1530,9 @@ func finalizerCleanup(b *BeaconState) {
if b.validatorsMultiValue != nil {
b.validatorsMultiValue.Detach(b)
}
if b.merkleLayers != nil {
b.merkleLayers.release()
}
state.Count.Sub(1)
}

View File

@@ -28,6 +28,7 @@ go_library(
"//beacon-chain/db/filters:go_default_library",
"//beacon-chain/forkchoice:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/stateutil:go_default_library",
"//beacon-chain/sync/backfill/coverage:go_default_library",
"//cache/lru:go_default_library",
"//config/features:go_default_library",

View File

@@ -128,7 +128,7 @@ func (e *epochBoundaryState) getBySlot(s primitives.Slot) (*rootStateInfo, bool,
// put adds a state to the epoch boundary state cache. This method also trims the
// least recently added state info if the cache size has reached the max cache
// size limit.
func (e *epochBoundaryState) put(blockRoot [32]byte, s state.BeaconState) error {
func (e *epochBoundaryState) put(blockRoot [32]byte, s state.ReadOnlyBeaconState) error {
e.lock.Lock()
defer e.lock.Unlock()
@@ -151,6 +151,24 @@ func (e *epochBoundaryState) put(blockRoot [32]byte, s state.BeaconState) error
return nil
}
// getByBlockRootNoCopy returns the state for the given block root without copying.
func (e *epochBoundaryState) getByBlockRootNoCopy(r [32]byte) state.ReadOnlyBeaconState {
e.lock.RLock()
defer e.lock.RUnlock()
obj, exists, err := e.rootStateCache.GetByKey(string(r[:]))
if err != nil || !exists {
return nil
}
s, ok := obj.(*rootStateInfo)
if !ok {
return nil
}
return s.state
}
// delete the state from the epoch boundary state cache.
func (e *epochBoundaryState) delete(blockRoot [32]byte) error {
e.lock.Lock()

View File

@@ -3,10 +3,12 @@ package stategen
import (
"context"
stderrors "errors"
"fmt"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/stateutil"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
@@ -43,11 +45,12 @@ func (s *State) hasStateInCache(_ context.Context, blockRoot [32]byte) (bool, er
}
// StateByRootIfCachedNoCopy retrieves a state using the input block root only if the state is already in the cache.
func (s *State) StateByRootIfCachedNoCopy(blockRoot [32]byte) state.BeaconState {
if !s.hotStateCache.has(blockRoot) {
return nil
func (s *State) StateByRootIfCachedNoCopy(blockRoot [32]byte) state.ReadOnlyBeaconState {
if state := s.hotStateCache.getWithoutCopy(blockRoot); state != nil {
return state
}
return s.hotStateCache.getWithoutCopy(blockRoot)
return s.epochBoundaryStateCache.getByBlockRootNoCopy(blockRoot)
}
// StateByRoot retrieves the state using input block root.
@@ -63,13 +66,41 @@ func (s *State) StateByRoot(ctx context.Context, blockRoot [32]byte) (state.Beac
}
blockRoot = root
}
return s.loadStateByRoot(ctx, blockRoot)
state, err := s.loadStateByRoot(ctx, blockRoot)
if err != nil {
return nil, fmt.Errorf("load state by root: %w", err)
}
return state, nil
}
// StateByRootNoCopy retrieves the state using input block root without copying from caches.
func (s *State) StateByRootNoCopy(ctx context.Context, blockRoot [32]byte) (state.ReadOnlyBeaconState, error) {
ctx, span := trace.StartSpan(ctx, "stateGen.StateByRootNoCopy")
defer span.End()
if blockRoot == params.BeaconConfig().ZeroHash {
root, err := s.beaconDB.GenesisBlockRoot(ctx)
if err != nil {
return nil, stderrors.Join(ErrNoGenesisBlock, err)
}
blockRoot = root
}
state, err := s.loadStateByRootNoCopy(ctx, blockRoot)
if err != nil {
return nil, fmt.Errorf("load state by root no copy: %w", err)
}
return state, nil
}
// ActiveNonSlashedBalancesByRoot retrieves the effective balances of all active and non-slashed validators at the
// state with a given root
func (s *State) ActiveNonSlashedBalancesByRoot(ctx context.Context, blockRoot [32]byte) ([]uint64, error) {
st, err := s.StateByRoot(ctx, blockRoot)
st, err := s.StateByRootNoCopy(ctx, blockRoot)
if err != nil {
return nil, err
}
@@ -79,15 +110,15 @@ func (s *State) ActiveNonSlashedBalancesByRoot(ctx context.Context, blockRoot [3
epoch := time.CurrentEpoch(st)
balances := make([]uint64, st.NumValidators())
var balanceAccretor = func(idx int, val state.ReadOnlyValidator) error {
if helpers.IsActiveNonSlashedValidatorUsingTrie(val, epoch) {
balances[idx] = val.EffectiveBalance()
var balanceAccretor = func(idx int, val *stateutil.CompactValidator) error {
if helpers.IsActiveNonSlashedCompactValidator(val, epoch) {
balances[idx] = val.EffectiveBalance
} else {
balances[idx] = 0
}
return nil
}
if err := st.ReadFromEveryValidator(balanceAccretor); err != nil {
if err := st.ForEachValidator(balanceAccretor); err != nil {
return nil, err
}
return balances, nil
@@ -205,12 +236,40 @@ func (s *State) loadStateByRoot(ctx context.Context, blockRoot [32]byte) (state.
// Second, it checks if the state exists in epoch boundary state cache.
cachedInfo, ok, err := s.epochBoundaryStateCache.getByBlockRoot(blockRoot)
if err != nil {
return nil, err
return nil, fmt.Errorf("get by block root: %w", err)
}
if ok {
if ok && cachedInfo != nil {
return cachedInfo.state, nil
}
return s.loadStateByRootFromDBOrReplay(ctx, blockRoot)
}
// loadStateByRootNoCopy is like loadStateByRoot but returns cached states without copying.
// States from DB or replay are already owned by the caller.
func (s *State) loadStateByRootNoCopy(ctx context.Context, blockRoot [32]byte) (state.ReadOnlyBeaconState, error) {
ctx, span := trace.StartSpan(ctx, "stateGen.loadStateByRootNoCopy")
defer span.End()
// First, check hot state cache without copy.
cachedState := s.hotStateCache.getWithoutCopy(blockRoot)
if cachedState != nil && !cachedState.IsNil() {
return cachedState, nil
}
// Second, check epoch boundary state cache without copy.
cachedEpochState := s.epochBoundaryStateCache.getByBlockRootNoCopy(blockRoot)
if cachedEpochState != nil && !cachedEpochState.IsNil() {
return cachedEpochState, nil
}
return s.loadStateByRootFromDBOrReplay(ctx, blockRoot)
}
// loadStateByRootFromDBOrReplay loads a state from the DB or by replaying blocks from the latest ancestor.
// This is the shared tail of loadStateByRoot and loadStateByRootNoCopy.
func (s *State) loadStateByRootFromDBOrReplay(ctx context.Context, blockRoot [32]byte) (state.BeaconState, error) {
// Short circuit if the state is already in the DB.
if s.beaconDB.HasState(ctx, blockRoot) {
return s.beaconDB.State(ctx, blockRoot)

View File

@@ -26,7 +26,7 @@ func NewService() *StateManager {
}
// StateByRootIfCachedNoCopy --
func (m *StateManager) StateByRootIfCachedNoCopy(root [32]byte) state.BeaconState {
func (m *StateManager) StateByRootIfCachedNoCopy(root [32]byte) state.ReadOnlyBeaconState {
return m.StatesByRoot[root]
}
@@ -55,6 +55,11 @@ func (m *StateManager) StateByRoot(_ context.Context, blockRoot [32]byte) (state
return m.StatesByRoot[blockRoot], nil
}
// StateByRootNoCopy --
func (m *StateManager) StateByRootNoCopy(_ context.Context, blockRoot [32]byte) (state.ReadOnlyBeaconState, error) {
return m.StatesByRoot[blockRoot], nil
}
// ActiveNonSlashedBalancesByRoot --
func (*StateManager) ActiveNonSlashedBalancesByRoot(_ context.Context, _ [32]byte) ([]uint64, error) {
return []uint64{}, nil

View File

@@ -13,6 +13,7 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/db"
"github.com/OffchainLabs/prysm/v7/beacon-chain/forkchoice"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/stateutil"
"github.com/OffchainLabs/prysm/v7/beacon-chain/sync/backfill/coverage"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
@@ -47,8 +48,9 @@ type StateManager interface {
SaveFinalizedState(fSlot primitives.Slot, fRoot [32]byte, fState state.BeaconState)
MigrateToCold(ctx context.Context, fRoot [32]byte) error
StateByRoot(ctx context.Context, blockRoot [32]byte) (state.BeaconState, error)
StateByRootNoCopy(ctx context.Context, blockRoot [32]byte) (state.ReadOnlyBeaconState, error)
ActiveNonSlashedBalancesByRoot(context.Context, [32]byte) ([]uint64, error)
StateByRootIfCachedNoCopy(blockRoot [32]byte) state.BeaconState
StateByRootIfCachedNoCopy(blockRoot [32]byte) state.ReadOnlyBeaconState
StateByRootInitialSync(ctx context.Context, blockRoot [32]byte) (state.BeaconState, error)
FinalizedReadOnlyBalances() NilCheckableReadOnlyBalances
}
@@ -162,21 +164,20 @@ func (s *State) Resume(ctx context.Context, fState state.BeaconState) (state.Bea
return st, nil
}
func populatePubkeyCache(ctx context.Context, st state.BeaconState) {
func populatePubkeyCache(ctx context.Context, st state.ReadOnlyBeaconState) {
epoch := slots.ToEpoch(st.Slot())
go populatePubkeyCacheOnce.Do(func() {
log.Debug("Populating pubkey cache")
start := time.Now()
if err := st.ReadFromEveryValidator(func(_ int, val state.ReadOnlyValidator) error {
if err := st.ForEachValidator(func(_ int, val *stateutil.CompactValidator) error {
if ctx.Err() != nil {
return ctx.Err()
}
// Do not cache for non-active validators.
if !helpers.IsActiveValidatorUsingTrie(val, epoch) {
if !helpers.IsActiveCompactValidator(val, epoch) {
return nil
}
pub := val.PublicKey()
_, err := bls.PublicKeyFromBytes(pub[:])
_, err := bls.PublicKeyFromBytes(val.PublicKey[:])
return err
}); err != nil {
log.WithError(err).Error("Failed to populate pubkey cache")

View File

@@ -140,7 +140,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(
preState, err := s.cfg.chain.AttestationTargetState(ctx, data.Target)
if err != nil {
tracing.AnnotateError(span, err)
return pubsub.ValidationIgnore, err
return pubsub.ValidationIgnore, wrapAttestationError(err, att)
}
validationRes, err := s.validateUnaggregatedAttTopic(ctx, att, preState, *msg.Topic)

View File

@@ -84,6 +84,7 @@ go_test(
"//beacon-chain/startup:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/state-native:go_default_library",
"//beacon-chain/state/stateutil:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",

View File

@@ -12,6 +12,7 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/startup"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
state_native "github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/stateutil"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
@@ -624,8 +625,16 @@ var _ signatureCache = &mockSignatureCache{}
type sbrfunc func(context.Context, [32]byte) (state.BeaconState, error)
type mockStateByRooter struct {
sbr sbrfunc
calledForRoot map[[32]byte]bool
sbr sbrfunc
calledForRoot map[[32]byte]bool
cachedNoCopyFunc func(root [32]byte) state.ReadOnlyBeaconState
}
func (sbr *mockStateByRooter) StateByRootIfCachedNoCopy(root [32]byte) state.ReadOnlyBeaconState {
if sbr.cachedNoCopyFunc != nil {
return sbr.cachedNoCopyFunc(root)
}
return nil
}
func (sbr *mockStateByRooter) StateByRoot(ctx context.Context, root [32]byte) (state.BeaconState, error) {
@@ -856,6 +865,17 @@ func (v *validxStateOverride) ReadFromEveryValidator(f func(idx int, val state.R
return nil
}
func (v *validxStateOverride) ForEachValidator(f func(idx int, val *stateutil.CompactValidator) error) error {
validators := v.Validators()
for i, val := range validators {
cv := stateutil.CompactValidatorFromProto(val)
if err := f(i, &cv); err != nil {
return err
}
}
return nil
}
type mockProposerCache struct {
ComputeProposerCB func(ctx context.Context, root [32]byte, slot primitives.Slot, pst state.BeaconState) (primitives.ValidatorIndex, error)
ProposerCB func(c *forkchoicetypes.Checkpoint, slot primitives.Slot) (primitives.ValidatorIndex, bool)

View File

@@ -3,6 +3,7 @@ package verification
import (
"context"
"crypto/sha256"
"encoding/binary"
"fmt"
"strings"
"time"
@@ -14,6 +15,7 @@ import (
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
"github.com/OffchainLabs/prysm/v7/runtime/logging"
"github.com/OffchainLabs/prysm/v7/time/slots"
@@ -326,24 +328,78 @@ func (dv *RODataColumnsVerifier) getVerifyingState(ctx context.Context, dataColu
}
}
logrus.WithFields(logrus.Fields{
"slot": dataColumnSlot,
"parentRoot": fmt.Sprintf("%#x", parentRoot),
"headRoot": fmt.Sprintf("%#x", headRoot),
}).Debug("Replying state for data column verification")
// Check caches before entering the expensive singleflight state replay.
cachedState, err := dv.cachedVerifyingState(parentRoot, dataColumnEpoch)
if err != nil {
return nil, fmt.Errorf("cached verifying state: %w", err)
}
if cachedState != nil {
return cachedState, nil
}
// Deduplicate the expensive state replay across concurrent data columns for the same block.
var slotBytes [8]byte
binary.BigEndian.PutUint64(slotBytes[:], uint64(dataColumnSlot))
key := "vs:" + string(parentRoot[:]) + string(slotBytes[:])
v, err, _ := dv.sg.Do(key, func() (any, error) {
logrus.WithFields(logrus.Fields{
"slot": dataColumnSlot,
"parentRoot": fmt.Sprintf("%#x", parentRoot),
"headRoot": fmt.Sprintf("%#x", headRoot),
"index": dataColumn.Index,
}).Debug("Replying state for data column verification")
targetRoot, err := dv.fc.TargetRootForEpoch(parentRoot, dataColumnEpoch)
if err != nil {
return nil, fmt.Errorf("target root for epoch: %w", err)
}
targetState, err := dv.sr.StateByRoot(ctx, targetRoot)
if err != nil {
return nil, fmt.Errorf("state by root: %w", err)
}
targetEpoch := slots.ToEpoch(targetState.Slot())
if targetEpoch == dataColumnEpoch || targetEpoch == dataColumnEpoch-1 {
return targetState, nil
}
st, err := dv.sr.StateByRoot(ctx, parentRoot)
if err != nil {
return nil, fmt.Errorf("state by root: %w", err)
}
return st, nil
})
if err != nil {
return nil, err
}
return v.(state.ReadOnlyBeaconState), nil
}
// cachedVerifyingState attempts to find a suitable verifying state from caches,
// avoiding the expensive StateByRoot disk lookup and singleflight contention.
func (dv *RODataColumnsVerifier) cachedVerifyingState(parentRoot [fieldparams.RootLength]byte, dataColumnEpoch primitives.Epoch) (state.ReadOnlyBeaconState, error) {
targetRoot, err := dv.fc.TargetRootForEpoch(parentRoot, dataColumnEpoch)
if err != nil {
return nil, err
return nil, fmt.Errorf("target root for epoch: %w", err)
}
targetState, err := dv.sr.StateByRoot(ctx, targetRoot)
if err != nil {
return nil, err
if state := dv.sr.StateByRootIfCachedNoCopy(targetRoot); state != nil {
targetEpoch := slots.ToEpoch(state.Slot())
if targetEpoch == dataColumnEpoch || targetEpoch == dataColumnEpoch-1 {
return state, nil
}
}
targetEpoch := slots.ToEpoch(targetState.Slot())
if targetEpoch == dataColumnEpoch || targetEpoch == dataColumnEpoch-1 {
return targetState, nil
if state := dv.sr.StateByRootIfCachedNoCopy(parentRoot); state != nil {
return state, nil
}
return transition.ProcessSlotsUsingNextSlotCache(ctx, targetState, parentRoot[:], dataColumnSlot)
return nil, nil
}
func (dv *RODataColumnsVerifier) SidecarParentSeen(parentSeen func([fieldparams.RootLength]byte) bool) (err error) {

View File

@@ -1059,6 +1059,91 @@ func TestGetVerifyingStateEdgeCases(t *testing.T) {
require.NoError(t, err)
require.Equal(t, true, headStateCalled, "HeadState should be called when head is far ahead")
})
t.Run("different dependent roots - cache hit on target root avoids StateByRoot", func(t *testing.T) {
signatureCache := &mockSignatureCache{
svcb: func(signatureData signatureData) (bool, error) {
return false, nil
},
vscb: func(signatureData signatureData, _ validatorAtIndexer) (err error) {
return nil
},
}
stateByRooter := &mockStateByRooter{
sbr: sbrErrorIfCalled(t), // StateByRoot should NOT be called
cachedNoCopyFunc: func(root [32]byte) state.ReadOnlyBeaconState {
return fuluState // Cache hit for any root
},
}
initializer := Initializer{
shared: &sharedResources{
sc: signatureCache,
sr: stateByRooter,
hsp: &mockHeadStateProvider{
headRoot: []byte{0xff},
headSlot: columnSlot,
},
fc: &mockForkchoicer{
DependentRootForEpochCB: func(root [32]byte, epoch primitives.Epoch) ([32]byte, error) {
return root, nil // Different roots for parent vs head
},
TargetRootForEpochCB: fcReturnsTargetRoot([fieldparams.RootLength]byte{}),
},
},
}
verifier := initializer.NewDataColumnsVerifier(columns, GossipDataColumnSidecarRequirements)
err := verifier.ValidProposerSignature(t.Context())
require.NoError(t, err)
})
t.Run("different dependent roots - cache hit on parent root avoids StateByRoot", func(t *testing.T) {
signatureCache := &mockSignatureCache{
svcb: func(signatureData signatureData) (bool, error) {
return false, nil
},
vscb: func(signatureData signatureData, _ validatorAtIndexer) (err error) {
return nil
},
}
// Only return a cache hit for the parent root, not the target root.
// Also set the target state to a far-away epoch so it doesn't match the epoch check.
farState, _ := util.DeterministicGenesisStateFulu(t, numValidators)
stateByRooter := &mockStateByRooter{
sbr: sbrErrorIfCalled(t),
cachedNoCopyFunc: func(root [32]byte) state.ReadOnlyBeaconState {
if root == parentRoot {
return fuluState // Cache hit only for parent root
}
// Target root returns a state in epoch 0 (won't match epoch 3 or 2)
return farState
},
}
initializer := Initializer{
shared: &sharedResources{
sc: signatureCache,
sr: stateByRooter,
hsp: &mockHeadStateProvider{
headRoot: []byte{0xff},
headSlot: columnSlot,
},
fc: &mockForkchoicer{
DependentRootForEpochCB: func(root [32]byte, epoch primitives.Epoch) ([32]byte, error) {
return root, nil
},
TargetRootForEpochCB: fcReturnsTargetRoot([fieldparams.RootLength]byte{0xaa}),
},
},
}
verifier := initializer.NewDataColumnsVerifier(columns, GossipDataColumnSidecarRequirements)
err := verifier.ValidProposerSignature(t.Context())
require.NoError(t, err)
})
}
// headStateCallTracker wraps mockHeadStateProvider to track HeadState calls.

View File

@@ -34,6 +34,7 @@ type Forkchoicer interface {
// StateByRooter describes a stategen-ish type that can produce arbitrary states by their root
type StateByRooter interface {
StateByRoot(ctx context.Context, blockRoot [32]byte) (state.BeaconState, error)
StateByRootIfCachedNoCopy(blockRoot [32]byte) state.ReadOnlyBeaconState
}
// HeadStateProvider describes a type that can provide access to the current head state and related methods.

View File

@@ -0,0 +1,4 @@
### Changed
- `getVerifyingState`: Optimize state replay for concurrent data column verification using singleflight deduplication.
- `getVerifyingState`: Check state caches before entering the expensive singleflight state replay.

View File

@@ -0,0 +1,4 @@
### Changed
- `StateByRootIfCachedNoCopy` now also checks the epoch boundary state cache.
- Use `state.ReadOnlyBeaconState` instead of state.BeaconState when possible.

View File

@@ -480,6 +480,65 @@ func (s *Slice[V]) Reset(obj Identifiable) *Slice[V] {
return reset
}
// ForEach iterates over every element for the given object, calling f with a
// pointer to each value. The pointer points directly into the underlying
// storage (shared or individual) so no slice allocation is needed.
//
// This is dramatically more efficient than Value() for read-only iteration
// because it avoids allocating and copying the full materialized slice.
// For ~1M validators this eliminates ~128 MB of allocation per call.
//
// The pointer passed to f is only valid for the duration of the callback.
// Callers must not retain or modify through it.
func (s *Slice[V]) ForEach(obj Identifiable, f func(idx int, val *V) error) error {
s.lock.RLock()
defer s.lock.RUnlock()
// Pre-resolve overrides: scan the sparse individualItems map (typically
// a few hundred entries) and build a small index→*V map for this object.
overrides := make(map[uint64]*V, len(s.individualItems))
objId := obj.Id()
for idx, item := range s.individualItems {
for _, v := range item.Values {
if slices.Contains(v.ids, objId) {
overrides[idx] = &v.val
break
}
}
}
// Iterate shared items with fast override lookup.
for i := range s.sharedItems {
v := &s.sharedItems[i]
if ov, ok := overrides[uint64(i)]; ok {
v = ov
}
if err := f(i, v); err != nil {
return err
}
}
// Iterate appended items.
sharedLen := len(s.sharedItems)
for i, item := range s.appendedItems {
found := false
for _, v := range item.Values {
if slices.Contains(v.ids, objId) {
if err := f(sharedLen+i, &v.val); err != nil {
return err
}
found = true
break
}
}
if !found {
break
}
}
return nil
}
func (s *Slice[V]) fillOriginalItems(obj Identifiable, items *[]V) {
for i, item := range s.sharedItems {
ind, ok := s.individualItems[uint64(i)]

View File

@@ -28,7 +28,7 @@ go_library(
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/stateutil:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",

View File

@@ -11,7 +11,7 @@ import (
"github.com/OffchainLabs/prysm/v7/api/server/structs"
corehelpers "github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/signing"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/stateutil"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
@@ -452,8 +452,8 @@ func proposeVoluntaryExit(ec *e2etypes.EvaluationContext, conns ...*grpc.ClientC
return errors.Wrap(err, "could not get state")
}
var execIndices []int
err = st.ReadFromEveryValidator(func(idx int, val state.ReadOnlyValidator) error {
if val.GetWithdrawalCredentials()[0] == params.BeaconConfig().ETH1AddressWithdrawalPrefixByte {
err = st.ForEachValidator(func(idx int, val *stateutil.CompactValidator) error {
if val.WithdrawalCredentials[0] == params.BeaconConfig().ETH1AddressWithdrawalPrefixByte {
execIndices = append(execIndices, idx)
}
return nil