Allow Client to Retrieve Multiple Validator Statuses (#2474)

* multiple validator statuses

* gazelle

* context

* fixing bugs

* remove old way of checking

* fix logging

* make activation queue more accurate

* fix rpc test

* add test

* fix remaining tests

* lint

* comment

* review comments
This commit is contained in:
Nishant Das
2019-05-04 01:13:34 +08:00
committed by Raul Jordan
parent a152ba9528
commit 7d88e1e15e
5 changed files with 344 additions and 65 deletions

View File

@@ -16,6 +16,7 @@ go_library(
"//beacon-chain/core/blocks:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/state:go_default_library",
"//beacon-chain/core/state/stateutils:go_default_library",
"//beacon-chain/db:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//proto/beacon/rpc/v1:go_default_library",

View File

@@ -11,9 +11,11 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state/stateutils"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
pbp2p "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/params"
)
@@ -34,39 +36,38 @@ type ValidatorServer struct {
// beacon state, if not, then it creates a stream which listens for canonical states which contain
// the validator with the public key as an active validator record.
func (vs *ValidatorServer) WaitForActivation(req *pb.ValidatorActivationRequest, stream pb.ValidatorService_WaitForActivationServer) error {
beaconState, err := vs.beaconDB.HeadState(stream.Context())
reply := func() error {
beaconState, err = vs.beaconDB.HeadState(stream.Context())
if err != nil {
return fmt.Errorf("could not retrieve beacon state: %v", err)
}
activeKeys := vs.filterActivePublicKeys(beaconState, req.PublicKeys)
res := &pb.ValidatorActivationResponse{
ActivatedPublicKeys: activeKeys,
}
return stream.Send(res)
}
hasAny, err := vs.beaconDB.HasAnyValidators(beaconState, req.PublicKeys)
activeValidatorExists, validatorStatuses, err := vs.MultipleValidatorStatus(stream.Context(), req.PublicKeys)
if err != nil {
return err
}
if hasAny {
return reply()
res := &pb.ValidatorActivationResponse{
Statuses: validatorStatuses,
}
if activeValidatorExists {
return stream.Send(res)
}
if err := stream.Send(res); err != nil {
return err
}
for {
select {
case <-time.After(3 * time.Second):
hasAny, err := vs.beaconDB.HasAnyValidators(beaconState, req.PublicKeys)
case <-time.After(6 * time.Second):
activeValidatorExists, validatorStatuses, err := vs.MultipleValidatorStatus(stream.Context(), req.PublicKeys)
if err != nil {
return err
}
if !hasAny {
continue
res := &pb.ValidatorActivationResponse{
Statuses: validatorStatuses,
}
return reply()
if activeValidatorExists {
return stream.Send(res)
}
if err := stream.Send(res); err != nil {
return err
}
case <-stream.Context().Done():
return errors.New("stream context closed,exiting gorutine")
case <-vs.ctx.Done():
return errors.New("rpc context closed, exiting goroutine")
}
@@ -186,10 +187,8 @@ func (vs *ValidatorServer) assignment(
if err != nil {
return nil, err
}
status, err := vs.validatorStatus(pubkey, beaconState)
if err != nil {
return nil, err
}
status := vs.validatorStatus(pubkey, beaconState)
return &pb.CommitteeAssignmentResponse_CommitteeAssignment{
Committee: committee,
Shard: shard,
@@ -229,23 +228,11 @@ func (vs *ValidatorServer) ValidatorStatus(
}, nil
}
eth1BlockNum := eth1BlockNumBigInt.Uint64()
addFollowDistance := eth1BlockNum + params.BeaconConfig().Eth1FollowDistance
eth1Timestamp, err := vs.powChainService.BlockTimeByHeight(ctx, big.NewInt(int64(addFollowDistance)))
depositBlockSlot, err := vs.depositBlockSlot(ctx, eth1BlockNumBigInt, beaconState)
if err != nil {
return nil, err
}
votingPeriodSlots := helpers.StartSlot(params.BeaconConfig().EpochsPerEth1VotingPeriod)
votingPeriodSeconds := time.Duration(votingPeriodSlots*params.BeaconConfig().SecondsPerSlot) * time.Second
eth1UnixTime := time.Unix(int64(eth1Timestamp), 0)
timeToInclusion := eth1UnixTime.Add(votingPeriodSeconds)
eth2Genesis := time.Unix(int64(beaconState.GenesisTime), 0)
eth2TimeDifference := timeToInclusion.Sub(eth2Genesis).Seconds()
depositBlockSlot := uint64(eth2TimeDifference) / params.BeaconConfig().SecondsPerSlot
currEpoch := helpers.CurrentEpoch(beaconState)
var validatorInState *pbp2p.Validator
var validatorIndex uint64
@@ -255,7 +242,7 @@ func (vs *ValidatorServer) ValidatorStatus(
return &pb.ValidatorStatusResponse{
Status: pb.ValidatorStatus_ACTIVE,
ActivationEpoch: val.ActivationEpoch - params.BeaconConfig().GenesisEpoch,
Eth1DepositBlockNumber: eth1BlockNum,
Eth1DepositBlockNumber: eth1BlockNumBigInt.Uint64(),
DepositInclusionSlot: depositBlockSlot,
}, nil
}
@@ -276,16 +263,14 @@ func (vs *ValidatorServer) ValidatorStatus(
}
}
// Our position in the activation queue is the above index - our validator index.
positionInQueue = lastActivatedValidatorIdx - validatorIndex
positionInQueue = validatorIndex - lastActivatedValidatorIdx
}
status, err := vs.validatorStatus(req.PublicKey, beaconState)
if err != nil {
return nil, err
}
status := vs.validatorStatus(req.PublicKey, beaconState)
res := &pb.ValidatorStatusResponse{
Status: status,
Eth1DepositBlockNumber: eth1BlockNum,
Eth1DepositBlockNumber: eth1BlockNumBigInt.Uint64(),
PositionInActivationQueue: positionInQueue,
DepositInclusionSlot: depositBlockSlot,
ActivationEpoch: params.BeaconConfig().FarFutureEpoch - params.BeaconConfig().GenesisEpoch,
@@ -294,10 +279,114 @@ func (vs *ValidatorServer) ValidatorStatus(
return res, nil
}
func (vs *ValidatorServer) validatorStatus(pubkey []byte, beaconState *pbp2p.BeaconState) (pb.ValidatorStatus, error) {
// MultipleValidatorStatus returns the validator status response for the set of validators
// requested by their pubkeys.
func (vs *ValidatorServer) MultipleValidatorStatus(
ctx context.Context,
pubkeys [][]byte) (bool, []*pb.ValidatorActivationResponse_Status, error) {
activeValidatorExists := false
beaconState, err := vs.beaconDB.HeadState(ctx)
if err != nil {
return false, nil, fmt.Errorf("could not fetch beacon state: %v", err)
}
validatorMap := stateutils.ValidatorIndexMap(beaconState)
statusResponses := make([]*pb.ValidatorActivationResponse_Status, len(pubkeys))
for i, key := range pubkeys {
statusResponses[i] = &pb.ValidatorActivationResponse_Status{
PublicKey: key,
Status: &pb.ValidatorStatusResponse{},
}
dep, eth1BlockNumBigInt, err := vs.beaconDB.DepositByPubkey(ctx, key)
if err != nil {
return activeValidatorExists, nil, err
}
if eth1BlockNumBigInt == nil {
statusResponses[i].Status = &pb.ValidatorStatusResponse{
Status: pb.ValidatorStatus_UNKNOWN_STATUS,
ActivationEpoch: params.BeaconConfig().FarFutureEpoch - params.BeaconConfig().GenesisEpoch,
Eth1DepositBlockNumber: 0,
}
continue
}
depositBlockSlot, err := vs.depositBlockSlot(ctx, eth1BlockNumBigInt, beaconState)
if err != nil {
statusResponses[i].Status = &pb.ValidatorStatusResponse{
Status: pb.ValidatorStatus_UNKNOWN_STATUS,
ActivationEpoch: params.BeaconConfig().FarFutureEpoch - params.BeaconConfig().GenesisEpoch,
Eth1DepositBlockNumber: 0,
}
continue
}
validatorInState := false
currEpoch := helpers.CurrentEpoch(beaconState)
valIndex, ok := validatorMap[bytesutil.ToBytes32(key)]
if ok {
validator := beaconState.ValidatorRegistry[valIndex]
if helpers.IsActiveValidator(validator, currEpoch) {
activeValidatorExists = true
statusResponses[i].Status = &pb.ValidatorStatusResponse{
Status: pb.ValidatorStatus_ACTIVE,
ActivationEpoch: validator.ActivationEpoch - params.BeaconConfig().GenesisEpoch,
Eth1DepositBlockNumber: eth1BlockNumBigInt.Uint64(),
DepositInclusionSlot: depositBlockSlot,
}
continue
}
validatorInState = true
}
lastValidatorIndex := len(beaconState.ValidatorRegistry) - 1
var lastActivatedValidatorIdx uint64
for j := lastValidatorIndex; j >= 0; j-- {
if helpers.IsActiveValidator(beaconState.ValidatorRegistry[j], currEpoch) {
lastActivatedValidatorIdx = uint64(j)
break
}
}
lastValidator := beaconState.ValidatorRegistry[lastValidatorIndex]
lastValidatorDeposit, _, err := vs.beaconDB.DepositByPubkey(ctx, lastValidator.Pubkey)
if err != nil {
return activeValidatorExists, nil, err
}
var positionInQueue uint64
if dep.MerkleTreeIndex > lastValidatorDeposit.MerkleTreeIndex {
positionInQueue = dep.MerkleTreeIndex - lastValidatorDeposit.MerkleTreeIndex
}
// If the validator has deposited and has been added to the state:
if validatorInState {
// Our position in the activation queue is our previous position added with the
// difference between the last added validator and the last activated validator.
positionInQueue += uint64(lastValidatorIndex) - lastActivatedValidatorIdx
}
status := vs.validatorStatus(key, beaconState)
statusResponses[i].Status = &pb.ValidatorStatusResponse{
Status: status,
Eth1DepositBlockNumber: eth1BlockNumBigInt.Uint64(),
PositionInActivationQueue: positionInQueue,
DepositInclusionSlot: depositBlockSlot,
ActivationEpoch: params.BeaconConfig().FarFutureEpoch - params.BeaconConfig().GenesisEpoch,
}
}
return activeValidatorExists, statusResponses, nil
}
func (vs *ValidatorServer) validatorStatus(pubkey []byte, beaconState *pbp2p.BeaconState) pb.ValidatorStatus {
idx, err := vs.beaconDB.ValidatorIndex(pubkey)
if err != nil {
return pb.ValidatorStatus_UNKNOWN_STATUS, fmt.Errorf("could not get active validator index: %v", err)
return pb.ValidatorStatus_UNKNOWN_STATUS
}
var status pb.ValidatorStatus
@@ -321,7 +410,7 @@ func (vs *ValidatorServer) validatorStatus(pubkey []byte, beaconState *pbp2p.Bea
status = pb.ValidatorStatus_UNKNOWN_STATUS
}
return status, nil
return status
}
// filterActivePublicKeys takes a list of validator public keys and returns
@@ -358,7 +447,7 @@ func (vs *ValidatorServer) addNonActivePublicKeysAssignmentStatus(
for _, pk := range pubkeys {
hexPk := hex.EncodeToString(pk)
if _, ok := validatorMap[hexPk]; !ok || !helpers.IsActiveValidator(validatorMap[hexPk], currentEpoch) {
status, _ := vs.validatorStatus(pk, beaconState) //nolint:gosec
status := vs.validatorStatus(pk, beaconState) //nolint:gosec
a := &pb.CommitteeAssignmentResponse_CommitteeAssignment{
PublicKey: pk,
Status: status,
@@ -368,3 +457,25 @@ func (vs *ValidatorServer) addNonActivePublicKeysAssignmentStatus(
}
return assignments
}
func (vs *ValidatorServer) depositBlockSlot(ctx context.Context, eth1BlockNumBigInt *big.Int,
beaconState *pbp2p.BeaconState) (uint64, error) {
eth1BlockNum := eth1BlockNumBigInt.Uint64()
addFollowDistance := eth1BlockNum + params.BeaconConfig().Eth1FollowDistance
eth1Timestamp, err := vs.powChainService.BlockTimeByHeight(ctx, big.NewInt(int64(addFollowDistance)))
if err != nil {
return 0, err
}
votingPeriodSlots := helpers.StartSlot(params.BeaconConfig().EpochsPerEth1VotingPeriod)
votingPeriodSeconds := time.Duration(votingPeriodSlots*params.BeaconConfig().SecondsPerSlot) * time.Second
eth1UnixTime := time.Unix(int64(eth1Timestamp), 0)
timeToInclusion := eth1UnixTime.Add(votingPeriodSeconds)
eth2Genesis := time.Unix(int64(beaconState.GenesisTime), 0)
eth2TimeDifference := timeToInclusion.Sub(eth2Genesis).Seconds()
depositBlockSlot := uint64(eth2TimeDifference) / params.BeaconConfig().SecondsPerSlot
return depositBlockSlot, nil
}

View File

@@ -725,6 +725,8 @@ func TestWaitForActivation_ContextClosed(t *testing.T) {
defer ctrl.Finish()
mockStream := internal.NewMockValidatorService_WaitForActivationServer(ctrl)
mockStream.EXPECT().Context().Return(context.Background())
mockStream.EXPECT().Send(gomock.Any()).Return(nil)
mockStream.EXPECT().Context().Return(context.Background())
exitRoutine := make(chan bool)
go func(tt *testing.T) {
want := "context closed"
@@ -765,6 +767,17 @@ func TestWaitForActivation_ValidatorOriginallyExists(t *testing.T) {
if err := db.SaveState(ctx, beaconState); err != nil {
t.Fatalf("could not save state: %v", err)
}
depData, err := helpers.EncodeDepositData(&pbp2p.DepositInput{
Pubkey: []byte{'A'},
}, 10, 10)
if err != nil {
t.Fatal(err)
}
dep := &pbp2p.Deposit{
DepositData: depData,
}
db.InsertDeposit(context.Background(), dep, big.NewInt(10))
if err := db.SaveValidatorIndex(pubKeys[0], 0); err != nil {
t.Fatalf("could not save validator index: %v", err)
}
@@ -776,6 +789,7 @@ func TestWaitForActivation_ValidatorOriginallyExists(t *testing.T) {
ctx: context.Background(),
chainService: newMockChainService(),
canonicalStateChan: make(chan *pbp2p.BeaconState, 1),
powChainService: &mockPOWChainService{},
}
req := &pb.ValidatorActivationRequest{
PublicKeys: pubKeys,
@@ -785,10 +799,22 @@ func TestWaitForActivation_ValidatorOriginallyExists(t *testing.T) {
defer ctrl.Finish()
mockStream := internal.NewMockValidatorService_WaitForActivationServer(ctrl)
mockStream.EXPECT().Context().Return(context.Background())
mockStream.EXPECT().Context().Return(context.Background())
mockStream.EXPECT().Send(
&pb.ValidatorActivationResponse{
ActivatedPublicKeys: pubKeys,
Statuses: []*pb.ValidatorActivationResponse_Status{
{PublicKey: []byte{'A'},
Status: &pb.ValidatorStatusResponse{
Status: pb.ValidatorStatus_ACTIVE,
Eth1DepositBlockNumber: 10,
DepositInclusionSlot: 1024,
},
},
{PublicKey: []byte{'B'},
Status: &pb.ValidatorStatusResponse{
ActivationEpoch: params.BeaconConfig().FarFutureEpoch - params.BeaconConfig().GenesisEpoch,
},
},
},
},
).Return(nil)
@@ -797,6 +823,97 @@ func TestWaitForActivation_ValidatorOriginallyExists(t *testing.T) {
}
}
func TestMultipleValidatorStatus_OK(t *testing.T) {
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
ctx := context.Background()
pubKeys := [][]byte{{'A'}, {'B'}, {'C'}}
if err := db.SaveValidatorIndex(pubKeys[0], 0); err != nil {
t.Fatalf("Could not save validator index: %v", err)
}
if err := db.SaveValidatorIndex(pubKeys[1], 0); err != nil {
t.Fatalf("Could not save validator index: %v", err)
}
beaconState := &pbp2p.BeaconState{
Slot: params.BeaconConfig().GenesisSlot,
ValidatorRegistry: []*pbp2p.Validator{{
ActivationEpoch: params.BeaconConfig().GenesisEpoch,
ExitEpoch: params.BeaconConfig().FarFutureEpoch,
Pubkey: pubKeys[0]},
{
ActivationEpoch: params.BeaconConfig().GenesisEpoch,
ExitEpoch: params.BeaconConfig().FarFutureEpoch,
Pubkey: pubKeys[1]},
{
ActivationEpoch: params.BeaconConfig().GenesisEpoch,
ExitEpoch: params.BeaconConfig().FarFutureEpoch,
Pubkey: pubKeys[2]},
},
}
if err := db.SaveState(ctx, beaconState); err != nil {
t.Fatalf("could not save state: %v", err)
}
depData, err := helpers.EncodeDepositData(&pbp2p.DepositInput{
Pubkey: []byte{'A'},
}, 10, 10)
if err != nil {
t.Fatal(err)
}
dep := &pbp2p.Deposit{
DepositData: depData,
}
db.InsertDeposit(context.Background(), dep, big.NewInt(10))
depData, err = helpers.EncodeDepositData(&pbp2p.DepositInput{
Pubkey: []byte{'C'},
}, 10, 10)
if err != nil {
t.Fatal(err)
}
dep = &pbp2p.Deposit{
DepositData: depData,
}
db.InsertDeposit(context.Background(), dep, big.NewInt(15))
if err := db.SaveValidatorIndex(pubKeys[0], 0); err != nil {
t.Fatalf("could not save validator index: %v", err)
}
if err := db.SaveValidatorIndex(pubKeys[1], 1); err != nil {
t.Fatalf("could not save validator index: %v", err)
}
if err := db.SaveValidatorIndex(pubKeys[2], 2); err != nil {
t.Fatalf("could not save validator index: %v", err)
}
vs := &ValidatorServer{
beaconDB: db,
ctx: context.Background(),
chainService: newMockChainService(),
canonicalStateChan: make(chan *pbp2p.BeaconState, 1),
powChainService: &mockPOWChainService{},
}
activeExists, response, err := vs.MultipleValidatorStatus(context.Background(), pubKeys)
if err != nil {
t.Fatal(err)
}
if !activeExists {
t.Fatal("No activated validator exists when there was supposed to be 2")
}
if response[0].Status.Status != pb.ValidatorStatus_ACTIVE {
t.Errorf("Validator with pubkey %#x is not activated and instead has this status: %s",
response[0].PublicKey, response[0].Status.Status.String())
}
if response[1].Status.Status == pb.ValidatorStatus_ACTIVE {
t.Errorf("Validator with pubkey %#x is activated despite not supposed to be", response[1].PublicKey)
}
if response[2].Status.Status != pb.ValidatorStatus_ACTIVE {
t.Errorf("Validator with pubkey %#x is not activated and instead has this status: %s",
response[2].PublicKey, response[2].Status.Status.String())
}
}
func TestFilterActivePublicKeys(t *testing.T) {
currentEpoch := uint64(15)
beaconState := &pbp2p.BeaconState{

View File

@@ -10,6 +10,7 @@ import (
ptypes "github.com/gogo/protobuf/types"
pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/keystore"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/slotutil"
@@ -87,7 +88,6 @@ func (v *validator) WaitForActivation(ctx context.Context) error {
}
var validatorActivatedRecords [][]byte
for {
log.Info("Waiting for validator to be activated in the beacon chain")
res, err := stream.Recv()
// If the stream is closed, we stop the loop.
if err == io.EOF {
@@ -100,8 +100,11 @@ func (v *validator) WaitForActivation(ctx context.Context) error {
if err != nil {
return fmt.Errorf("could not receive validator activation from stream: %v", err)
}
if len(res.ActivatedPublicKeys) > 0 {
validatorActivatedRecords = res.ActivatedPublicKeys
log.Info("Waiting for validator to be activated in the beacon chain")
activatedKeys := v.checkAndLogValidatorStatus(res.Statuses)
if len(activatedKeys) > 0 {
validatorActivatedRecords = activatedKeys
break
}
}
@@ -113,6 +116,39 @@ func (v *validator) WaitForActivation(ctx context.Context) error {
return nil
}
func (v *validator) checkAndLogValidatorStatus(validatorStatuses []*pb.ValidatorActivationResponse_Status) [][]byte {
var activatedKeys [][]byte
for _, status := range validatorStatuses {
if status.Status.Status == pb.ValidatorStatus_ACTIVE {
activatedKeys = append(activatedKeys, status.PublicKey)
}
if status.Status.DepositInclusionSlot == 0 {
log.WithFields(logrus.Fields{
"PublicKey": fmt.Sprintf("%#x", bytesutil.Trunc(status.PublicKey)),
"Status": fmt.Sprintf("%s", status.Status.Status.String()),
}).Info("Not Deposited Yet")
continue
}
if status.Status.ActivationEpoch == (params.BeaconConfig().FarFutureEpoch - params.BeaconConfig().GenesisEpoch) {
log.WithFields(logrus.Fields{
"PublicKey": fmt.Sprintf("%#x", bytesutil.Trunc(status.PublicKey)),
"Status": status.Status.Status.String(),
"DepositInclusionSlot": status.Status.DepositInclusionSlot,
"PositionInActivationQueue": status.Status.PositionInActivationQueue,
}).Info("Waiting to be Activated")
continue
}
log.WithFields(logrus.Fields{
"PublicKey": fmt.Sprintf("%#x", bytesutil.Trunc(status.PublicKey)),
"Status": status.Status.Status.String(),
"DepositInclusionSlot": status.Status.DepositInclusionSlot,
"ActivationEpoch": status.Status.ActivationEpoch,
"PositionInActivationQueue": status.Status.PositionInActivationQueue,
}).Info("Validator Status")
}
return activatedKeys
}
// CanonicalHeadSlot returns the slot of canonical block currently found in the
// beacon chain via RPC.
func (v *validator) CanonicalHeadSlot(ctx context.Context) (uint64, error) {

View File

@@ -38,6 +38,19 @@ func publicKeys(keys map[string]*keystore.Key) [][]byte {
return pks
}
func generateMockStatusResponse(pubkeys [][]byte) *pb.ValidatorActivationResponse {
multipleStatus := make([]*pb.ValidatorActivationResponse_Status, len(pubkeys))
for i, key := range pubkeys {
multipleStatus[i] = &pb.ValidatorActivationResponse_Status{
PublicKey: key,
Status: &pb.ValidatorStatusResponse{
Status: pb.ValidatorStatus_UNKNOWN_STATUS,
},
}
}
return &pb.ValidatorActivationResponse{Statuses: multipleStatus}
}
func TestWaitForChainStart_SetsChainStartGenesisTime(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
@@ -248,6 +261,8 @@ func TestWaitActivation_LogsActivationEpochOK(t *testing.T) {
validatorClient: client,
}
v.pubkeys = publicKeys(v.keys)
resp := generateMockStatusResponse(v.pubkeys)
resp.Statuses[0].Status.Status = pb.ValidatorStatus_ACTIVE
clientStream := internal.NewMockValidatorService_WaitForActivationClient(ctrl)
client.EXPECT().WaitForActivation(
gomock.Any(),
@@ -256,9 +271,7 @@ func TestWaitActivation_LogsActivationEpochOK(t *testing.T) {
},
).Return(clientStream, nil)
clientStream.EXPECT().Recv().Return(
&pb.ValidatorActivationResponse{
ActivatedPublicKeys: publicKeys(v.keys),
},
resp,
nil,
)
if err := v.WaitForActivation(context.Background()); err != nil {
@@ -316,6 +329,9 @@ func TestWaitMultipleActivation_LogsActivationEpochOK(t *testing.T) {
validatorClient: client,
}
v.pubkeys = publicKeys(v.keys)
resp := generateMockStatusResponse(v.pubkeys)
resp.Statuses[0].Status.Status = pb.ValidatorStatus_ACTIVE
resp.Statuses[1].Status.Status = pb.ValidatorStatus_ACTIVE
clientStream := internal.NewMockValidatorService_WaitForActivationClient(ctrl)
client.EXPECT().WaitForActivation(
gomock.Any(),
@@ -324,9 +340,7 @@ func TestWaitMultipleActivation_LogsActivationEpochOK(t *testing.T) {
},
).Return(clientStream, nil)
clientStream.EXPECT().Recv().Return(
&pb.ValidatorActivationResponse{
ActivatedPublicKeys: v.pubkeys,
},
resp,
nil,
)
if err := v.WaitForActivation(context.Background()); err != nil {
@@ -344,6 +358,8 @@ func TestWaitActivation_NotAllValidatorsActivatedOK(t *testing.T) {
validatorClient: client,
pubkeys: publicKeys(keyMapThreeValidators),
}
resp := generateMockStatusResponse(v.pubkeys)
resp.Statuses[0].Status.Status = pb.ValidatorStatus_ACTIVE
clientStream := internal.NewMockValidatorService_WaitForActivationClient(ctrl)
client.EXPECT().WaitForActivation(
gomock.Any(),
@@ -356,9 +372,7 @@ func TestWaitActivation_NotAllValidatorsActivatedOK(t *testing.T) {
nil,
)
clientStream.EXPECT().Recv().Return(
&pb.ValidatorActivationResponse{
ActivatedPublicKeys: publicKeys(v.keys),
},
resp,
nil,
)
if err := v.WaitForActivation(context.Background()); err != nil {