Compare commits

...

2 Commits

Author SHA1 Message Date
terence tsao
8b85494c3e Update validator in the same loop 2024-06-20 15:58:40 -07:00
Preston Van Loon
40ceb2eada Avoid copying validator set in ProcessRegistryUpdates 2024-06-20 08:34:13 -05:00
5 changed files with 89 additions and 43 deletions

View File

@@ -13,6 +13,7 @@ go_library(
"//beacon-chain/core/time:go_default_library",
"//beacon-chain/core/validators:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/state-native:go_default_library",
"//beacon-chain/state/stateutil:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",

View File

@@ -14,6 +14,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/validators"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
state_native "github.com/prysmaticlabs/prysm/v5/beacon-chain/state/state-native"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state/stateutil"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
@@ -27,8 +28,8 @@ import (
// sortableIndices implements the Sort interface to sort newly activated validator indices
// by activation epoch and by index number.
type sortableIndices struct {
indices []primitives.ValidatorIndex
validators []*ethpb.Validator
indices []primitives.ValidatorIndex
state state.BeaconState
}
// Len is the number of elements in the collection.
@@ -39,10 +40,18 @@ func (s sortableIndices) Swap(i, j int) { s.indices[i], s.indices[j] = s.indices
// Less reports whether the element with index i must sort before the element with index j.
func (s sortableIndices) Less(i, j int) bool {
if s.validators[s.indices[i]].ActivationEligibilityEpoch == s.validators[s.indices[j]].ActivationEligibilityEpoch {
vi, erri := s.state.ValidatorAtIndexReadOnly(primitives.ValidatorIndex(s.indices[i]))
vj, errj := s.state.ValidatorAtIndexReadOnly(primitives.ValidatorIndex(s.indices[j]))
if erri != nil || errj != nil {
return false
}
a, b := vi.ActivationEligibilityEpoch(), vj.ActivationEligibilityEpoch()
if a == b {
return s.indices[i] < s.indices[j]
}
return s.validators[s.indices[i]].ActivationEligibilityEpoch < s.validators[s.indices[j]].ActivationEligibilityEpoch
return a < b
}
// AttestingBalance returns the total balance from all the attesting indices.
@@ -91,55 +100,87 @@ func AttestingBalance(ctx context.Context, state state.ReadOnlyBeaconState, atts
// for index in activation_queue[:get_validator_churn_limit(state)]:
// validator = state.validators[index]
// validator.activation_epoch = compute_activation_exit_epoch(get_current_epoch(state))
func ProcessRegistryUpdates(ctx context.Context, state state.BeaconState) (state.BeaconState, error) {
currentEpoch := time.CurrentEpoch(state)
vals := state.Validators()
func ProcessRegistryUpdates(ctx context.Context, st state.BeaconState) (state.BeaconState, error) {
currentEpoch := time.CurrentEpoch(st)
var err error
ejectionBal := params.BeaconConfig().EjectionBalance
activationEligibilityEpoch := time.CurrentEpoch(state) + 1
for idx, validator := range vals {
// Process the validators for activation eligibility.
if helpers.IsEligibleForActivationQueue(validator, currentEpoch) {
validator.ActivationEligibilityEpoch = activationEligibilityEpoch
if err := state.UpdateValidatorAtIndex(primitives.ValidatorIndex(idx), validator); err != nil {
return nil, err
// To avoid copying the state validator set via st.Validators(), we will perform a read only pass
// over the validator set while collecting validator indices where the validator copy is actually
// necessary, then we will process these operations.
eligibleForActivationQ := make([]primitives.ValidatorIndex, 0)
eligibleForActivation := make([]primitives.ValidatorIndex, 0)
eligibleForEjection := make([]primitives.ValidatorIndex, 0)
activationEligibilityEpoch := time.CurrentEpoch(st) + 1
if err := st.ReadFromEveryValidator(func(idx int, val state.ReadOnlyValidator) error {
// Collect validators eligible to enter the activation queue.
if helpers.IsEligibleForActivationQueue(val, currentEpoch) {
eligibleForActivationQ = append(eligibleForActivationQ, primitives.ValidatorIndex(idx))
v, err := st.ValidatorAtIndex(primitives.ValidatorIndex(idx))
if err != nil {
return err
}
v.ActivationEligibilityEpoch = activationEligibilityEpoch
val, err = state_native.NewValidator(v)
if err != nil {
return err
}
}
// Process the validators for ejection.
isActive := helpers.IsActiveValidator(validator, currentEpoch)
belowEjectionBalance := validator.EffectiveBalance <= ejectionBal
// Collect validators to eject.
isActive := helpers.IsActiveValidatorUsingTrie(val, currentEpoch)
belowEjectionBalance := val.EffectiveBalance() <= ejectionBal
if isActive && belowEjectionBalance {
// Here is fine to do a quadratic loop since this should
// barely happen
maxExitEpoch, churn := validators.MaxExitEpochAndChurn(state)
state, _, err = validators.InitiateValidatorExit(ctx, state, primitives.ValidatorIndex(idx), maxExitEpoch, churn)
if err != nil && !errors.Is(err, validators.ErrValidatorAlreadyExited) {
return nil, errors.Wrapf(err, "could not initiate exit for validator %d", idx)
}
eligibleForEjection = append(eligibleForEjection, primitives.ValidatorIndex(idx))
}
// Collect validators eligible for activation and not yet dequeued for activation.
if helpers.IsEligibleForActivationUsingTrie(st, val) {
eligibleForActivation = append(eligibleForActivation, primitives.ValidatorIndex(idx))
}
return nil
}); err != nil {
return st, fmt.Errorf("failed to read validators: %w", err)
}
// Process validators for activation eligibility.
for _, idx := range eligibleForActivationQ {
v, err := st.ValidatorAtIndex(idx)
if err != nil {
return nil, err
}
v.ActivationEligibilityEpoch = activationEligibilityEpoch
if err := st.UpdateValidatorAtIndex(idx, v); err != nil {
return nil, err
}
}
// Process validators eligible for ejection.
for _, idx := range eligibleForEjection {
// Here is fine to do a quadratic loop since this should
// barely happen
maxExitEpoch, churn := validators.MaxExitEpochAndChurn(st)
st, _, err = validators.InitiateValidatorExit(ctx, st, idx, maxExitEpoch, churn)
if err != nil && !errors.Is(err, validators.ErrValidatorAlreadyExited) {
return nil, errors.Wrapf(err, "could not initiate exit for validator %d", idx)
}
}
// Queue validators eligible for activation and not yet dequeued for activation.
var activationQ []primitives.ValidatorIndex
for idx, validator := range vals {
if helpers.IsEligibleForActivation(state, validator) {
activationQ = append(activationQ, primitives.ValidatorIndex(idx))
}
}
sort.Sort(sortableIndices{indices: activationQ, validators: vals})
sort.Sort(sortableIndices{indices: eligibleForActivation, state: st})
// Only activate just enough validators according to the activation churn limit.
limit := uint64(len(activationQ))
activeValidatorCount, err := helpers.ActiveValidatorCount(ctx, state, currentEpoch)
limit := uint64(len(eligibleForActivation))
activeValidatorCount, err := helpers.ActiveValidatorCount(ctx, st, currentEpoch)
if err != nil {
return nil, errors.Wrap(err, "could not get active validator count")
}
churnLimit := helpers.ValidatorActivationChurnLimit(activeValidatorCount)
if state.Version() >= version.Deneb {
if st.Version() >= version.Deneb {
churnLimit = helpers.ValidatorActivationChurnLimitDeneb(activeValidatorCount)
}
@@ -149,17 +190,17 @@ func ProcessRegistryUpdates(ctx context.Context, state state.BeaconState) (state
}
activationExitEpoch := helpers.ActivationExitEpoch(currentEpoch)
for _, index := range activationQ[:limit] {
validator, err := state.ValidatorAtIndex(index)
for _, index := range eligibleForActivation[:limit] {
validator, err := st.ValidatorAtIndex(index)
if err != nil {
return nil, err
}
validator.ActivationEpoch = activationExitEpoch
if err := state.UpdateValidatorAtIndex(index, validator); err != nil {
if err := st.UpdateValidatorAtIndex(index, validator); err != nil {
return nil, err
}
}
return state, nil
return st, nil
}
// ProcessSlashings processes the slashed validators during epoch processing,

View File

@@ -63,6 +63,7 @@ go_test(
"validators_test.go",
"weak_subjectivity_test.go",
],
data = glob(["testdata/**"]),
embed = [":go_default_library"],
shard_count = 2,
tags = ["CI_race_detection"],

View File

@@ -16,6 +16,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/crypto/hash"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/time/slots"
log "github.com/sirupsen/logrus"
@@ -41,7 +42,7 @@ var (
// Check if ``validator`` is active.
// """
// return validator.activation_epoch <= epoch < validator.exit_epoch
func IsActiveValidator(validator *ethpb.Validator, epoch primitives.Epoch) bool {
func IsActiveValidator(validator *eth.Validator, epoch primitives.Epoch) bool {
return checkValidatorActiveStatus(validator.ActivationEpoch, validator.ExitEpoch, epoch)
}
@@ -404,11 +405,11 @@ func ComputeProposerIndex(bState state.ReadOnlyValidators, activeIndices []primi
// validator.activation_eligibility_epoch == FAR_FUTURE_EPOCH
// and validator.effective_balance >= MIN_ACTIVATION_BALANCE # [Modified in Electra:EIP7251]
// )
func IsEligibleForActivationQueue(validator *ethpb.Validator, currentEpoch primitives.Epoch) bool {
func IsEligibleForActivationQueue(validator state.ReadOnlyValidator, currentEpoch primitives.Epoch) bool {
if currentEpoch >= params.BeaconConfig().ElectraForkEpoch {
return isEligibleForActivationQueueElectra(validator.ActivationEligibilityEpoch, validator.EffectiveBalance)
return isEligibleForActivationQueueElectra(validator.ActivationEligibilityEpoch(), validator.EffectiveBalance())
}
return isEligibleForActivationQueue(validator.ActivationEligibilityEpoch, validator.EffectiveBalance)
return isEligibleForActivationQueue(validator.ActivationEligibilityEpoch(), validator.EffectiveBalance())
}
// isEligibleForActivationQueue carries out the logic for IsEligibleForActivationQueue

View File

@@ -744,7 +744,9 @@ func TestIsEligibleForActivationQueue(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
helpers.ClearCache()
assert.Equal(t, tt.want, helpers.IsEligibleForActivationQueue(tt.validator, tt.currentEpoch), "IsEligibleForActivationQueue()")
v, err := state_native.NewValidator(tt.validator)
assert.NoError(t, err)
assert.Equal(t, tt.want, helpers.IsEligibleForActivationQueue(v, tt.currentEpoch), "IsEligibleForActivationQueue()")
})
}
}