Move End-to-End Tests for Altair Into Develop (#9564)

* end to end from hf1

* remove duplicate import

* skip sync eval

* conditional sync participation

* altair fork epoch to 6

* preston feedback

* proper fork epoch

* run for 6

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Raul Jordan
2021-09-10 14:59:43 -05:00
committed by GitHub
parent 2d9ae57378
commit dcc1f7c0ec
7 changed files with 253 additions and 77 deletions

View File

@@ -11,6 +11,7 @@ import (
"os"
"path"
"strings"
"sync"
"testing"
"time"
@@ -222,17 +223,27 @@ func (r *testRunner) runEvaluators(conns []*grpc.ClientConn, tickingStartTime ti
secondsPerEpoch := uint64(params.BeaconConfig().SlotsPerEpoch.Mul(params.BeaconConfig().SecondsPerSlot))
ticker := helpers.NewEpochTicker(tickingStartTime, secondsPerEpoch)
for currentEpoch := range ticker.C() {
for _, evaluator := range config.Evaluators {
wg := new(sync.WaitGroup)
for _, ev := range config.Evaluators {
// Fix reference to evaluator as it will be running
// in a separate goroutine.
evaluator := ev
// Only run if the policy says so.
if !evaluator.Policy(types.Epoch(currentEpoch)) {
continue
}
t.Run(fmt.Sprintf(evaluator.Name, currentEpoch), func(t *testing.T) {
// Add evaluator to our waitgroup.
wg.Add(1)
go t.Run(fmt.Sprintf(evaluator.Name, currentEpoch), func(t *testing.T) {
err := evaluator.Evaluation(conns...)
assert.NoError(t, err, "Evaluation failed for epoch %d: %v", currentEpoch, err)
wg.Done()
})
}
// Wait for all evaluators to finish their evaluation for the epoch.
wg.Wait()
if t.Failed() || currentEpoch >= config.EpochsToRun-1 {
ticker.Done()
if t.Failed() {

View File

@@ -8,6 +8,7 @@ go_library(
"api_gateway_v1alpha1.go",
"data.go",
"finality.go",
"fork.go",
"metrics.go",
"node.go",
"operations.go",
@@ -17,6 +18,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/endtoend/evaluators",
visibility = ["//endtoend:__subpackages__"],
deps = [
"//beacon-chain/core:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//endtoend/helpers:go_default_library",
@@ -24,6 +26,8 @@ go_library(
"//endtoend/policies:go_default_library",
"//endtoend/types:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/block:go_default_library",
"//proto/prysm/v1alpha1/wrapper:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/p2putils:go_default_library",
"//shared/params:go_default_library",

View File

@@ -0,0 +1,63 @@
package evaluators
import (
"context"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/core"
"github.com/prysmaticlabs/prysm/endtoend/policies"
"github.com/prysmaticlabs/prysm/endtoend/types"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
wrapperv2 "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper"
"github.com/prysmaticlabs/prysm/shared/params"
"google.golang.org/grpc"
)
// ForkTransition ensures that the hard fork has occurred successfully.
var ForkTransition = types.Evaluator{
Name: "fork_transition_%d",
Policy: policies.OnEpoch(params.AltairE2EForkEpoch),
Evaluation: forkOccurs,
}
func forkOccurs(conns ...*grpc.ClientConn) error {
conn := conns[0]
client := ethpb.NewBeaconNodeValidatorClient(conn)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.StreamBlocksAltair(ctx, &ethpb.StreamBlocksRequest{VerifiedOnly: true})
if err != nil {
return errors.Wrap(err, "failed to get stream")
}
fSlot, err := core.StartSlot(params.AltairE2EForkEpoch)
if err != nil {
return err
}
if ctx.Err() == context.Canceled {
return errors.New("context canceled prematurely")
}
res, err := stream.Recv()
if err != nil {
return err
}
if res == nil || res.Block == nil {
return errors.New("nil block returned by beacon node")
}
if res.GetPhase0Block() == nil && res.GetAltairBlock() == nil {
return errors.New("nil block returned by beacon node")
}
if res.GetPhase0Block() != nil {
return errors.New("phase 0 block returned after altair fork has occurred")
}
blk, err := wrapperv2.WrappedAltairSignedBeaconBlock(res.GetAltairBlock())
if err != nil {
return err
}
if blk == nil || blk.IsNil() {
return errors.New("nil altair block received from stream")
}
if blk.Block().Slot() < fSlot {
return errors.Errorf("wanted a block >= %d but received %d", fSlot, blk.Block().Slot())
}
return nil
}

View File

@@ -13,7 +13,9 @@ import (
e2e "github.com/prysmaticlabs/prysm/endtoend/params"
"github.com/prysmaticlabs/prysm/endtoend/policies"
e2etypes "github.com/prysmaticlabs/prysm/endtoend/types"
eth "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/block"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil"
@@ -91,14 +93,13 @@ var ValidatorsVoteWithTheMajority = e2etypes.Evaluator{
func processesDepositsInBlocks(conns ...*grpc.ClientConn) error {
conn := conns[0]
client := eth.NewBeaconChainClient(conn)
client := ethpb.NewBeaconChainClient(conn)
chainHead, err := client.GetChainHead(context.Background(), &emptypb.Empty{})
if err != nil {
return errors.Wrap(err, "failed to get chain head")
}
req := &eth.ListBlocksRequest{QueryFilter: &eth.ListBlocksRequest_Epoch{Epoch: chainHead.HeadEpoch - 1}}
req := &ethpb.ListBlocksRequest{QueryFilter: &ethpb.ListBlocksRequest_Epoch{Epoch: chainHead.HeadEpoch - 1}}
blks, err := client.ListBeaconBlocks(context.Background(), req)
if err != nil {
return errors.Wrap(err, "failed to get blocks from beacon-chain")
@@ -106,15 +107,15 @@ func processesDepositsInBlocks(conns ...*grpc.ClientConn) error {
var numDeposits uint64
for _, blk := range blks.BlockContainers {
var slot types.Slot
var eth1Data *eth.Eth1Data
var deposits []*eth.Deposit
var eth1Data *ethpb.Eth1Data
var deposits []*ethpb.Deposit
switch blk.Block.(type) {
case *eth.BeaconBlockContainer_Phase0Block:
case *ethpb.BeaconBlockContainer_Phase0Block:
b := blk.GetPhase0Block().Block
slot = b.Slot
eth1Data = b.Body.Eth1Data
deposits = b.Body.Deposits
case *eth.BeaconBlockContainer_AltairBlock:
case *ethpb.BeaconBlockContainer_AltairBlock:
b := blk.GetAltairBlock().Block
slot = b.Slot
eth1Data = b.Body.Eth1Data
@@ -138,34 +139,25 @@ func processesDepositsInBlocks(conns ...*grpc.ClientConn) error {
func verifyGraffitiInBlocks(conns ...*grpc.ClientConn) error {
conn := conns[0]
client := eth.NewBeaconChainClient(conn)
client := ethpb.NewBeaconChainClient(conn)
chainHead, err := client.GetChainHead(context.Background(), &emptypb.Empty{})
if err != nil {
return errors.Wrap(err, "failed to get chain head")
}
req := &eth.ListBlocksRequest{QueryFilter: &eth.ListBlocksRequest_Epoch{Epoch: chainHead.HeadEpoch - 1}}
req := &ethpb.ListBlocksRequest{QueryFilter: &ethpb.ListBlocksRequest_Epoch{Epoch: chainHead.HeadEpoch - 1}}
blks, err := client.ListBeaconBlocks(context.Background(), req)
if err != nil {
return errors.Wrap(err, "failed to get blocks from beacon-chain")
}
for _, blk := range blks.BlockContainers {
var e bool
var slot types.Slot
var graffitiInBlock []byte
switch blk.Block.(type) {
case *eth.BeaconBlockContainer_Phase0Block:
b := blk.GetPhase0Block().Block
slot = b.Slot
graffitiInBlock = b.Body.Graffiti
case *eth.BeaconBlockContainer_AltairBlock:
b := blk.GetAltairBlock().Block
slot = b.Slot
graffitiInBlock = b.Body.Graffiti
default:
return errors.New("block neither phase0 nor altair")
for _, ctr := range blks.BlockContainers {
blk, err := convertToBlockInterface(ctr)
if err != nil {
return err
}
var e bool
slot := blk.Block().Slot()
graffitiInBlock := blk.Block().Body().Graffiti()
for _, graffiti := range helpers.Graffiti {
if bytes.Equal(bytesutil.PadTo([]byte(graffiti), 32), graffitiInBlock) {
e = true
@@ -182,14 +174,14 @@ func verifyGraffitiInBlocks(conns ...*grpc.ClientConn) error {
func activatesDepositedValidators(conns ...*grpc.ClientConn) error {
conn := conns[0]
client := eth.NewBeaconChainClient(conn)
client := ethpb.NewBeaconChainClient(conn)
chainHead, err := client.GetChainHead(context.Background(), &emptypb.Empty{})
if err != nil {
return errors.Wrap(err, "failed to get chain head")
}
validatorRequest := &eth.ListValidatorsRequest{
validatorRequest := &ethpb.ListValidatorsRequest{
PageSize: int32(params.BeaconConfig().MinGenesisActiveValidatorCount),
PageToken: "1",
}
@@ -241,8 +233,8 @@ func activatesDepositedValidators(conns ...*grpc.ClientConn) error {
func depositedValidatorsAreActive(conns ...*grpc.ClientConn) error {
conn := conns[0]
client := eth.NewBeaconChainClient(conn)
validatorRequest := &eth.ListValidatorsRequest{
client := ethpb.NewBeaconChainClient(conn)
validatorRequest := &ethpb.ListValidatorsRequest{
PageSize: int32(params.BeaconConfig().MinGenesisActiveValidatorCount),
PageToken: "1",
}
@@ -291,8 +283,8 @@ func depositedValidatorsAreActive(conns ...*grpc.ClientConn) error {
func proposeVoluntaryExit(conns ...*grpc.ClientConn) error {
conn := conns[0]
valClient := eth.NewBeaconNodeValidatorClient(conn)
beaconClient := eth.NewBeaconChainClient(conn)
valClient := ethpb.NewBeaconNodeValidatorClient(conn)
beaconClient := ethpb.NewBeaconChainClient(conn)
ctx := context.Background()
chainHead, err := beaconClient.GetChainHead(ctx, &emptypb.Empty{})
@@ -308,11 +300,11 @@ func proposeVoluntaryExit(conns ...*grpc.ClientConn) error {
exitedIndex = types.ValidatorIndex(rand.Uint64() % params.BeaconConfig().MinGenesisActiveValidatorCount)
valExited = true
voluntaryExit := &eth.VoluntaryExit{
voluntaryExit := &ethpb.VoluntaryExit{
Epoch: chainHead.HeadEpoch,
ValidatorIndex: exitedIndex,
}
req := &eth.DomainRequest{
req := &ethpb.DomainRequest{
Epoch: chainHead.HeadEpoch,
Domain: params.BeaconConfig().DomainVoluntaryExit[:],
}
@@ -325,7 +317,7 @@ func proposeVoluntaryExit(conns ...*grpc.ClientConn) error {
return err
}
signature := privKeys[exitedIndex].Sign(signingData[:])
signedExit := &eth.SignedVoluntaryExit{
signedExit := &ethpb.SignedVoluntaryExit{
Exit: voluntaryExit,
Signature: signature.Marshal(),
}
@@ -338,9 +330,9 @@ func proposeVoluntaryExit(conns ...*grpc.ClientConn) error {
func validatorIsExited(conns ...*grpc.ClientConn) error {
conn := conns[0]
client := eth.NewBeaconChainClient(conn)
validatorRequest := &eth.GetValidatorRequest{
QueryFilter: &eth.GetValidatorRequest_Index{
client := ethpb.NewBeaconChainClient(conn)
validatorRequest := &ethpb.GetValidatorRequest{
QueryFilter: &ethpb.GetValidatorRequest_Index{
Index: exitedIndex,
},
}
@@ -356,15 +348,14 @@ func validatorIsExited(conns ...*grpc.ClientConn) error {
func validatorsVoteWithTheMajority(conns ...*grpc.ClientConn) error {
conn := conns[0]
client := eth.NewBeaconChainClient(conn)
client := ethpb.NewBeaconChainClient(conn)
chainHead, err := client.GetChainHead(context.Background(), &emptypb.Empty{})
if err != nil {
return errors.Wrap(err, "failed to get chain head")
}
req := &eth.ListBlocksRequest{QueryFilter: &eth.ListBlocksRequest_Epoch{Epoch: chainHead.HeadEpoch - 1}}
blks, err := client.ListBlocks(context.Background(), req)
req := &ethpb.ListBlocksRequest{QueryFilter: &ethpb.ListBlocksRequest_Epoch{Epoch: chainHead.HeadEpoch - 1}}
blks, err := client.ListBeaconBlocks(context.Background(), req)
if err != nil {
return errors.Wrap(err, "failed to get blocks from beacon-chain")
}
@@ -373,11 +364,11 @@ func validatorsVoteWithTheMajority(conns ...*grpc.ClientConn) error {
var slot types.Slot
var vote []byte
switch blk.Block.(type) {
case *eth.BeaconBlockContainer_Phase0Block:
case *ethpb.BeaconBlockContainer_Phase0Block:
b := blk.GetPhase0Block().Block
slot = b.Slot
vote = b.Body.Eth1Data.BlockHash
case *eth.BeaconBlockContainer_AltairBlock:
case *ethpb.BeaconBlockContainer_AltairBlock:
b := blk.GetAltairBlock().Block
slot = b.Slot
vote = b.Body.Eth1Data.BlockHash
@@ -413,3 +404,13 @@ func validatorsVoteWithTheMajority(conns ...*grpc.ClientConn) error {
}
var expectedEth1DataVote []byte
func convertToBlockInterface(obj *ethpb.BeaconBlockContainer) (block.SignedBeaconBlock, error) {
if obj.GetPhase0Block() != nil {
return wrapper.WrappedPhase0SignedBeaconBlock(obj.GetPhase0Block()), nil
}
if obj.GetAltairBlock() != nil {
return wrapper.WrappedAltairSignedBeaconBlock(obj.GetAltairBlock())
}
return nil, errors.New("container has no block")
}

View File

@@ -5,15 +5,19 @@ import (
"fmt"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/core"
"github.com/prysmaticlabs/prysm/endtoend/policies"
"github.com/prysmaticlabs/prysm/endtoend/types"
eth "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/params"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
)
var expectedParticipation = 0.95 // 95% participation to make room for minor issues.
var expectedSyncParticipation = 0.95 // 95% participation for sync committee members.
// ValidatorsAreActive ensures the expected amount of validators are active.
var ValidatorsAreActive = types.Evaluator{
Name: "validators_active_epoch_%d",
@@ -28,11 +32,19 @@ var ValidatorsParticipating = types.Evaluator{
Evaluation: validatorsParticipating,
}
// ValidatorSyncParticipation ensures the expected amount of sync committee participants
// are active.
var ValidatorSyncParticipation = types.Evaluator{
Name: "validator_sync_participation_%d",
Policy: policies.AfterNthEpoch(params.AltairE2EForkEpoch - 1),
Evaluation: validatorsSyncParticipation,
}
func validatorsAreActive(conns ...*grpc.ClientConn) error {
conn := conns[0]
client := eth.NewBeaconChainClient(conn)
client := ethpb.NewBeaconChainClient(conn)
// Balances actually fluctuate but we just want to check initial balance.
validatorRequest := &eth.ListValidatorsRequest{
validatorRequest := &ethpb.ListValidatorsRequest{
PageSize: int32(params.BeaconConfig().MinGenesisActiveValidatorCount),
Active: true,
}
@@ -83,8 +95,8 @@ func validatorsAreActive(conns ...*grpc.ClientConn) error {
// validatorsParticipating ensures the validators have an acceptable participation rate.
func validatorsParticipating(conns ...*grpc.ClientConn) error {
conn := conns[0]
client := eth.NewBeaconChainClient(conn)
validatorRequest := &eth.GetValidatorParticipationRequest{}
client := ethpb.NewBeaconChainClient(conn)
validatorRequest := &ethpb.GetValidatorParticipationRequest{}
participation, err := client.GetValidatorParticipation(context.Background(), validatorRequest)
if err != nil {
return errors.Wrap(err, "failed to get validator participation")
@@ -102,3 +114,78 @@ func validatorsParticipating(conns ...*grpc.ClientConn) error {
}
return nil
}
// validatorsSyncParticipation ensures the validators have an acceptable participation rate for
// sync committee assignments.
func validatorsSyncParticipation(conns ...*grpc.ClientConn) error {
conn := conns[0]
client := ethpb.NewNodeClient(conn)
altairClient := ethpb.NewBeaconChainClient(conn)
genesis, err := client.GetGenesis(context.Background(), &emptypb.Empty{})
if err != nil {
return errors.Wrap(err, "failed to get genesis data")
}
currSlot := core.CurrentSlot(uint64(genesis.GenesisTime.AsTime().Unix()))
currEpoch := core.SlotToEpoch(currSlot)
lowestBound := currEpoch - 1
if lowestBound < params.AltairE2EForkEpoch {
lowestBound = params.AltairE2EForkEpoch
}
blockCtrs, err := altairClient.ListBeaconBlocks(context.Background(), &ethpb.ListBlocksRequest{QueryFilter: &ethpb.ListBlocksRequest_Epoch{Epoch: lowestBound}})
if err != nil {
return errors.Wrap(err, "failed to get validator participation")
}
for _, ctr := range blockCtrs.BlockContainers {
if ctr.GetAltairBlock() == nil {
return errors.Errorf("Altair block type doesn't exist for block at epoch %d", lowestBound)
}
blk := ctr.GetAltairBlock()
if blk.Block == nil || blk.Block.Body == nil || blk.Block.Body.SyncAggregate == nil {
return errors.New("nil block provided")
}
forkSlot, err := core.StartSlot(params.AltairE2EForkEpoch)
if err != nil {
return err
}
// Skip evaluation of the fork slot.
if blk.Block.Slot == forkSlot {
continue
}
syncAgg := blk.Block.Body.SyncAggregate
threshold := uint64(float64(syncAgg.SyncCommitteeBits.Len()) * expectedSyncParticipation)
if syncAgg.SyncCommitteeBits.Count() < threshold {
return errors.Errorf("In block of slot %d ,the aggregate bitvector with length of %d only got a count of %d", blk.Block.Slot, threshold, syncAgg.SyncCommitteeBits.Count())
}
}
if lowestBound == currEpoch {
return nil
}
blockCtrs, err = altairClient.ListBeaconBlocks(context.Background(), &ethpb.ListBlocksRequest{QueryFilter: &ethpb.ListBlocksRequest_Epoch{Epoch: currEpoch}})
if err != nil {
return errors.Wrap(err, "failed to get validator participation")
}
for _, ctr := range blockCtrs.BlockContainers {
if ctr.GetAltairBlock() == nil {
return errors.Errorf("Altair block type doesn't exist for block at epoch %d", lowestBound)
}
blk := ctr.GetAltairBlock()
if blk.Block == nil || blk.Block.Body == nil || blk.Block.Body.SyncAggregate == nil {
return errors.New("nil block provided")
}
forkSlot, err := core.StartSlot(params.AltairE2EForkEpoch)
if err != nil {
return err
}
// Skip evaluation of the fork slot.
if blk.Block.Slot == forkSlot {
continue
}
syncAgg := blk.Block.Body.SyncAggregate
threshold := uint64(float64(syncAgg.SyncCommitteeBits.Len()) * expectedSyncParticipation)
if syncAgg.SyncCommitteeBits.Count() < threshold {
return errors.Errorf("In block of slot %d ,the aggregate bitvector with length of %d only got a count of %d", blk.Block.Slot, threshold, syncAgg.SyncCommitteeBits.Count())
}
}
return nil
}

View File

@@ -27,14 +27,45 @@ func e2eMinimal(t *testing.T, usePrysmSh bool) {
require.NoError(t, e2eParams.Init(e2eParams.StandardBeaconCount))
// Run for 10 epochs if not in long-running to confirm long-running has no issues.
epochsToRun := 10
var err error
epochsToRun := 10
if usePrysmSh {
// If using prysm.sh, run for 6 epochs.
// TODO(#9166): remove this block once v2 changes are live.
epochsToRun = 6
}
epochStr, longRunning := os.LookupEnv("E2E_EPOCHS")
if longRunning {
epochsToRun, err = strconv.Atoi(epochStr)
require.NoError(t, err)
}
const tracingEndpoint = "127.0.0.1:9411"
evals := []types.Evaluator{
ev.PeersConnect,
ev.HealthzCheck,
ev.MetricsCheck,
ev.ValidatorsAreActive,
ev.ValidatorsParticipating,
ev.FinalizationOccurs,
ev.ProcessesDepositsInBlocks,
ev.VerifyBlockGraffiti,
ev.ActivatesDepositedValidators,
ev.DepositedValidatorsAreActive,
ev.ProposeVoluntaryExit,
ev.ValidatorHasExited,
ev.ValidatorsVoteWithTheMajority,
ev.ColdStateCheckpoint,
ev.ForkTransition,
ev.APIGatewayV1VerifyIntegrity,
ev.APIGatewayV1Alpha1VerifyIntegrity,
}
// TODO(#9166): remove this block once v2 changes are live.
if !usePrysmSh {
evals = append(evals, ev.ValidatorSyncParticipation)
} else {
t.Log("Warning: Skipping v2 specific evaluators for prior release")
}
testConfig := &types.E2EConfig{
BeaconFlags: []string{
fmt.Sprintf("--slots-per-archive-point=%d", params.BeaconConfig().SlotsPerEpoch*16),
@@ -46,28 +77,11 @@ func e2eMinimal(t *testing.T, usePrysmSh bool) {
EpochsToRun: uint64(epochsToRun),
TestSync: true,
TestDeposits: true,
TestSlasher: true,
TestSlasher: false,
UsePrysmShValidator: usePrysmSh,
UsePprof: !longRunning,
TracingSinkEndpoint: tracingEndpoint,
Evaluators: []types.Evaluator{
ev.PeersConnect,
ev.HealthzCheck,
ev.MetricsCheck,
ev.ValidatorsAreActive,
ev.ValidatorsParticipating,
ev.FinalizationOccurs,
ev.ProcessesDepositsInBlocks,
ev.VerifyBlockGraffiti,
ev.ActivatesDepositedValidators,
ev.DepositedValidatorsAreActive,
ev.ProposeVoluntaryExit,
ev.ValidatorHasExited,
ev.ValidatorsVoteWithTheMajority,
ev.ColdStateCheckpoint,
ev.APIGatewayV1VerifyIntegrity,
ev.APIGatewayV1Alpha1VerifyIntegrity,
},
Evaluators: evals,
}
newTestRunner(t, testConfig).run()

View File

@@ -1,10 +1,6 @@
package params
import "math"
// AltairE2EForkEpoch is the pre-determined altair fork epoch in our
// E2E test.
const AltairE2EForkEpoch = math.MaxUint64
const AltairE2EForkEpoch = 6
// UseE2EConfig for beacon chain services.
func UseE2EConfig() {