Files
prysm/testing/endtoend/evaluators/validator.go
james-prysm d4613aee0c skipping slot 1 sync committee check e2e (#16145)
<!-- Thanks for sending a PR! Before submitting:

1. If this is your first PR, check out our contribution guide here
https://docs.prylabs.network/docs/contribute/contribution-guidelines
You will then need to sign our Contributor License Agreement (CLA),
which will show up as a comment from a bot in this pull request after
you open it. We cannot review code without a signed CLA.
2. Please file an associated tracking issue if this pull request is
non-trivial and requires context for our team to understand. All
features and most bug fixes should have
an associated issue with a design discussed and decided upon. Small bug
   fixes and documentation improvements don't need issues.
3. New features and bug fixes must have tests. Documentation may need to
be updated. If you're unsure what to update, send the PR, and we'll
discuss
   in review.
4. Note that PRs updating dependencies and new Go versions are not
accepted.
   Please file an issue instead.
5. A changelog entry is required for user facing issues.
-->

**What type of PR is this?**

Tests

**What does this PR do? Why is it needed?**

```

--- PASS: TestEndToEnd_MinimalConfig/chain_started (0.50s)
--
--- PASS: TestEndToEnd_MinimalConfig/finished_syncing_0 (0.00s)
--- PASS: TestEndToEnd_MinimalConfig/all_nodes_have_same_head_0 (0.00s)
--- PASS: TestEndToEnd_MinimalConfig/validators_active_epoch_0 (0.00s)
--- FAIL: TestEndToEnd_MinimalConfig/validator_sync_participation_0 (0.01s)
--- PASS: TestEndToEnd_MinimalConfig/peers_connect_epoch_0 (0.11s)


```
This PR attempts to reduce flakes on validator sync participation
failures by skipping the first slot of the block after startup

**Which issues(s) does this PR fix?**

Fixes #

**Other notes for review**

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
2025-12-15 20:00:34 +00:00

397 lines
14 KiB
Go

package evaluators
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strconv"
"github.com/OffchainLabs/prysm/v7/api/server/structs"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/altair"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
"github.com/OffchainLabs/prysm/v7/network/httputil"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/runtime/version"
e2eparams "github.com/OffchainLabs/prysm/v7/testing/endtoend/params"
"github.com/OffchainLabs/prysm/v7/testing/endtoend/policies"
"github.com/OffchainLabs/prysm/v7/testing/endtoend/types"
"github.com/OffchainLabs/prysm/v7/time/slots"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
)
var expectedParticipation = 0.99
var expectedMulticlientParticipation = 0.95
var expectedSyncParticipation = 0.99
// ValidatorsAreActive ensures the expected amount of validators are active.
var ValidatorsAreActive = types.Evaluator{
Name: "validators_active_epoch_%d",
Policy: policies.AllEpochs,
Evaluation: validatorsAreActive,
}
// ValidatorsParticipatingAtEpoch ensures the expected amount of validators are participating.
var ValidatorsParticipatingAtEpoch = func(epoch primitives.Epoch) types.Evaluator {
return types.Evaluator{
Name: "validators_participating_epoch_%d",
Policy: policies.AfterNthEpoch(epoch),
Evaluation: validatorsParticipating,
}
}
// ValidatorSyncParticipation ensures the expected amount of sync committee participants
// are active.
var ValidatorSyncParticipation = types.Evaluator{
Name: "validator_sync_participation_%d",
Policy: func(e primitives.Epoch) bool {
fEpoch := params.BeaconConfig().AltairForkEpoch
return policies.OnwardsNthEpoch(fEpoch)(e)
},
Evaluation: validatorsSyncParticipation,
}
func validatorsAreActive(ec *types.EvaluationContext, conns ...*grpc.ClientConn) error {
conn := conns[0]
client := ethpb.NewBeaconChainClient(conn)
// Balances actually fluctuate but we just want to check initial balance.
validatorRequest := &ethpb.ListValidatorsRequest{
PageSize: int32(params.BeaconConfig().MinGenesisActiveValidatorCount),
Active: true,
}
validators, err := client.ListValidators(context.Background(), validatorRequest)
if err != nil {
return errors.Wrap(err, "failed to get validators")
}
expectedCount := params.BeaconConfig().MinGenesisActiveValidatorCount
receivedCount := uint64(len(validators.ValidatorList))
if expectedCount != receivedCount {
return fmt.Errorf("expected validator count to be %d, received %d", expectedCount, receivedCount)
}
effBalanceLowCount := 0
exitEpochWrongCount := 0
withdrawEpochWrongCount := 0
for _, item := range validators.ValidatorList {
if ec.ExitedVals[bytesutil.ToBytes48(item.Validator.PublicKey)] {
continue
}
if item.Validator.EffectiveBalance < params.BeaconConfig().MaxEffectiveBalance {
effBalanceLowCount++
}
if item.Validator.ExitEpoch != params.BeaconConfig().FarFutureEpoch {
exitEpochWrongCount++
}
if item.Validator.WithdrawableEpoch != params.BeaconConfig().FarFutureEpoch {
withdrawEpochWrongCount++
}
}
if effBalanceLowCount > 0 {
return fmt.Errorf(
"%d validators did not have genesis validator effective balance of %d",
effBalanceLowCount,
params.BeaconConfig().MaxEffectiveBalance,
)
} else if exitEpochWrongCount > 0 {
return fmt.Errorf("%d validators did not have genesis validator exit epoch of far future epoch", exitEpochWrongCount)
} else if withdrawEpochWrongCount > 0 {
return fmt.Errorf("%d validators did not have genesis validator withdrawable epoch of far future epoch", withdrawEpochWrongCount)
}
return nil
}
// validatorsParticipating ensures the validators have an acceptable participation rate.
func validatorsParticipating(_ *types.EvaluationContext, conns ...*grpc.ClientConn) error {
conn := conns[0]
client := ethpb.NewBeaconChainClient(conn)
validatorRequest := &ethpb.GetValidatorParticipationRequest{}
participation, err := client.GetValidatorParticipation(context.Background(), validatorRequest)
if err != nil {
return errors.Wrap(err, "failed to get validator participation")
}
partRate := participation.Participation.GlobalParticipationRate
expected := float32(expectedParticipation)
if e2eparams.TestParams.LighthouseBeaconNodeCount != 0 {
expected = float32(expectedMulticlientParticipation)
}
if participation.Epoch == params.BeaconConfig().ElectraForkEpoch {
// The first slot of Electra will be missed due to the switching of attestation types
// 5/6 slots =~0.83
// validator REST always is slightly reduced at ~0.82
expected = 0.82
}
if participation.Epoch > 0 && participation.Epoch.Sub(1) == params.BeaconConfig().BellatrixForkEpoch {
// Reduce Participation requirement to 95% to account for longer EE calls for
// the merge block. Target and head will likely be missed for a few validators at
// slot 0.
expected = 0.95
}
if partRate < expected {
path := fmt.Sprintf("http://localhost:%d/eth/v2/debug/beacon/states/head", e2eparams.TestParams.Ports.PrysmBeaconNodeHTTPPort)
resp := structs.GetBeaconStateV2Response{}
httpResp, err := http.Get(path) // #nosec G107 -- path can't be constant because it depends on port param
if err != nil {
return err
}
if httpResp.StatusCode != http.StatusOK {
e := httputil.DefaultJsonError{}
if err = json.NewDecoder(httpResp.Body).Decode(&e); err != nil {
return err
}
return fmt.Errorf("%s (status code %d)", e.Message, e.Code)
}
if err = json.NewDecoder(httpResp.Body).Decode(&resp); err != nil {
return err
}
var respPrevEpochParticipation []string
switch resp.Version {
case version.String(version.Phase0):
// Do Nothing
case version.String(version.Altair):
st := &structs.BeaconStateAltair{}
if err = json.Unmarshal(resp.Data, st); err != nil {
return err
}
respPrevEpochParticipation = st.PreviousEpochParticipation
case version.String(version.Bellatrix):
st := &structs.BeaconStateBellatrix{}
if err = json.Unmarshal(resp.Data, st); err != nil {
return err
}
respPrevEpochParticipation = st.PreviousEpochParticipation
case version.String(version.Capella):
st := &structs.BeaconStateCapella{}
if err = json.Unmarshal(resp.Data, st); err != nil {
return err
}
respPrevEpochParticipation = st.PreviousEpochParticipation
case version.String(version.Deneb):
st := &structs.BeaconStateDeneb{}
if err = json.Unmarshal(resp.Data, st); err != nil {
return err
}
respPrevEpochParticipation = st.PreviousEpochParticipation
case version.String(version.Electra):
st := &structs.BeaconStateElectra{}
if err = json.Unmarshal(resp.Data, st); err != nil {
return err
}
respPrevEpochParticipation = st.PreviousEpochParticipation
default:
return fmt.Errorf("unrecognized version %s", resp.Version)
}
prevEpochParticipation := make([]byte, len(respPrevEpochParticipation))
for i, p := range respPrevEpochParticipation {
n, err := strconv.ParseUint(p, 10, 64)
if err != nil {
return err
}
prevEpochParticipation[i] = byte(n)
}
missSrcVals, missTgtVals, missHeadVals, err := findMissingValidators(prevEpochParticipation)
if err != nil {
return errors.Wrap(err, "failed to get missing validators")
}
return fmt.Errorf(
"validator participation was below for epoch %d, expected %f, received: %f."+
" Missing Source,Target and Head validators are %v, %v, %v",
participation.Epoch,
expected,
partRate,
missSrcVals,
missTgtVals,
missHeadVals,
)
}
return nil
}
// validatorsSyncParticipation ensures the validators have an acceptable participation rate for
// sync committee assignments.
func validatorsSyncParticipation(_ *types.EvaluationContext, conns ...*grpc.ClientConn) error {
conn := conns[0]
client := ethpb.NewNodeClient(conn)
altairClient := ethpb.NewBeaconChainClient(conn)
genesis, err := client.GetGenesis(context.Background(), &emptypb.Empty{})
if err != nil {
return errors.Wrap(err, "failed to get genesis data")
}
currSlot := slots.CurrentSlot(genesis.GenesisTime.AsTime())
currEpoch := slots.ToEpoch(currSlot)
lowestBound := primitives.Epoch(0)
if currEpoch >= 1 {
lowestBound = currEpoch - 1
}
if lowestBound < params.BeaconConfig().AltairForkEpoch {
lowestBound = params.BeaconConfig().AltairForkEpoch
}
blockCtrs, err := altairClient.ListBeaconBlocks(context.Background(), &ethpb.ListBlocksRequest{QueryFilter: &ethpb.ListBlocksRequest_Epoch{Epoch: lowestBound}})
if err != nil {
return errors.Wrap(err, "failed to get validator participation")
}
for _, ctr := range blockCtrs.BlockContainers {
b, err := syncCompatibleBlockFromCtr(ctr)
if err != nil {
return errors.Wrapf(err, "block type doesn't exist for block at epoch %d", lowestBound)
}
if b == nil || b.IsNil() {
return errors.New("nil block provided")
}
forkStartSlot, err := slots.EpochStart(params.BeaconConfig().AltairForkEpoch)
if err != nil {
return err
}
if forkStartSlot == b.Block().Slot() {
// Skip fork slot.
continue
}
// Skip slot 1 at genesis - validators need time to ramp up after chain start.
// This is a startup timing issue, not a fork transition issue.
if b.Block().Slot() == 1 {
continue
}
expectedParticipation := expectedSyncParticipation
switch slots.ToEpoch(b.Block().Slot()) {
case params.BeaconConfig().AltairForkEpoch:
// Drop expected sync participation figure.
expectedParticipation = 0.90
default:
// no-op
}
syncAgg, err := b.Block().Body().SyncAggregate()
if err != nil {
return err
}
threshold := uint64(float64(syncAgg.SyncCommitteeBits.Len()) * expectedParticipation)
if syncAgg.SyncCommitteeBits.Count() < threshold {
return errors.Errorf("In block of slot %d ,the aggregate bitvector with length of %d only got a count of %d", b.Block().Slot(), threshold, syncAgg.SyncCommitteeBits.Count())
}
}
if lowestBound == currEpoch {
return nil
}
blockCtrs, err = altairClient.ListBeaconBlocks(context.Background(), &ethpb.ListBlocksRequest{QueryFilter: &ethpb.ListBlocksRequest_Epoch{Epoch: currEpoch}})
if err != nil {
return errors.Wrap(err, "failed to get validator participation")
}
for _, ctr := range blockCtrs.BlockContainers {
b, err := syncCompatibleBlockFromCtr(ctr)
if err != nil {
return errors.Wrapf(err, "block type doesn't exist for block at epoch %d", lowestBound)
}
if b == nil || b.IsNil() {
return errors.New("nil block provided")
}
forkSlot, err := slots.EpochStart(params.BeaconConfig().AltairForkEpoch)
if err != nil {
return err
}
nexForkSlot, err := slots.EpochStart(params.BeaconConfig().BellatrixForkEpoch)
if err != nil {
return err
}
switch b.Block().Slot() {
case forkSlot, forkSlot + 1, nexForkSlot:
// Skip evaluation of the slot.
continue
default:
// no-op
}
syncAgg, err := b.Block().Body().SyncAggregate()
if err != nil {
return err
}
threshold := uint64(float64(syncAgg.SyncCommitteeBits.Len()) * expectedSyncParticipation)
if syncAgg.SyncCommitteeBits.Count() < threshold {
return errors.Errorf("In block of slot %d ,the aggregate bitvector with length of %d only got a count of %d", b.Block().Slot(), threshold, syncAgg.SyncCommitteeBits.Count())
}
}
return nil
}
func syncCompatibleBlockFromCtr(container *ethpb.BeaconBlockContainer) (interfaces.ReadOnlySignedBeaconBlock, error) {
if container.GetPhase0Block() != nil {
return nil, errors.New("block doesn't support sync committees")
}
if container.GetAltairBlock() != nil {
return blocks.NewSignedBeaconBlock(container.GetAltairBlock())
}
if container.GetBellatrixBlock() != nil {
return blocks.NewSignedBeaconBlock(container.GetBellatrixBlock())
}
if container.GetBlindedBellatrixBlock() != nil {
return blocks.NewSignedBeaconBlock(container.GetBlindedBellatrixBlock())
}
if container.GetCapellaBlock() != nil {
return blocks.NewSignedBeaconBlock(container.GetCapellaBlock())
}
if container.GetBlindedCapellaBlock() != nil {
return blocks.NewSignedBeaconBlock(container.GetBlindedCapellaBlock())
}
if container.GetDenebBlock() != nil {
return blocks.NewSignedBeaconBlock(container.GetDenebBlock())
}
if container.GetBlindedDenebBlock() != nil {
return blocks.NewSignedBeaconBlock(container.GetBlindedDenebBlock())
}
if container.GetElectraBlock() != nil {
return blocks.NewSignedBeaconBlock(container.GetElectraBlock())
}
if container.GetBlindedElectraBlock() != nil {
return blocks.NewSignedBeaconBlock(container.GetBlindedElectraBlock())
}
return nil, errors.New("no supported block type in container")
}
func findMissingValidators(participation []byte) ([]uint64, []uint64, []uint64, error) {
cfg := params.BeaconConfig()
sourceFlagIndex := cfg.TimelySourceFlagIndex
targetFlagIndex := cfg.TimelyTargetFlagIndex
headFlagIndex := cfg.TimelyHeadFlagIndex
var missingSourceValidators []uint64
var missingHeadValidators []uint64
var missingTargetValidators []uint64
for i, b := range participation {
hasSource, err := altair.HasValidatorFlag(b, sourceFlagIndex)
if err != nil {
return nil, nil, nil, err
}
if !hasSource {
missingSourceValidators = append(missingSourceValidators, uint64(i))
}
hasTarget, err := altair.HasValidatorFlag(b, targetFlagIndex)
if err != nil {
return nil, nil, nil, err
}
if !hasTarget {
missingTargetValidators = append(missingTargetValidators, uint64(i))
}
hasHead, err := altair.HasValidatorFlag(b, headFlagIndex)
if err != nil {
return nil, nil, nil, err
}
if !hasHead {
missingHeadValidators = append(missingHeadValidators, uint64(i))
}
}
return missingSourceValidators, missingTargetValidators, missingHeadValidators, nil
}