diff --git a/beacon-chain/rpc/BUILD.bazel b/beacon-chain/rpc/BUILD.bazel index 925959a4a8..c17cd8284a 100644 --- a/beacon-chain/rpc/BUILD.bazel +++ b/beacon-chain/rpc/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "//beacon-chain/core/blocks:go_default_library", "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/core/state:go_default_library", + "//beacon-chain/core/state/stateutils:go_default_library", "//beacon-chain/db:go_default_library", "//proto/beacon/p2p/v1:go_default_library", "//proto/beacon/rpc/v1:go_default_library", diff --git a/beacon-chain/rpc/validator_server.go b/beacon-chain/rpc/validator_server.go index 7ff1dcefda..ab59e15387 100644 --- a/beacon-chain/rpc/validator_server.go +++ b/beacon-chain/rpc/validator_server.go @@ -11,9 +11,11 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/core/state" + "github.com/prysmaticlabs/prysm/beacon-chain/core/state/stateutils" "github.com/prysmaticlabs/prysm/beacon-chain/db" pbp2p "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1" + "github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/prysmaticlabs/prysm/shared/hashutil" "github.com/prysmaticlabs/prysm/shared/params" ) @@ -34,39 +36,38 @@ type ValidatorServer struct { // beacon state, if not, then it creates a stream which listens for canonical states which contain // the validator with the public key as an active validator record. func (vs *ValidatorServer) WaitForActivation(req *pb.ValidatorActivationRequest, stream pb.ValidatorService_WaitForActivationServer) error { - beaconState, err := vs.beaconDB.HeadState(stream.Context()) - - reply := func() error { - beaconState, err = vs.beaconDB.HeadState(stream.Context()) - if err != nil { - return fmt.Errorf("could not retrieve beacon state: %v", err) - } - activeKeys := vs.filterActivePublicKeys(beaconState, req.PublicKeys) - res := &pb.ValidatorActivationResponse{ - ActivatedPublicKeys: activeKeys, - } - return stream.Send(res) - - } - - hasAny, err := vs.beaconDB.HasAnyValidators(beaconState, req.PublicKeys) + activeValidatorExists, validatorStatuses, err := vs.MultipleValidatorStatus(stream.Context(), req.PublicKeys) if err != nil { return err } - if hasAny { - return reply() + res := &pb.ValidatorActivationResponse{ + Statuses: validatorStatuses, } + if activeValidatorExists { + return stream.Send(res) + } + if err := stream.Send(res); err != nil { + return err + } + for { select { - case <-time.After(3 * time.Second): - hasAny, err := vs.beaconDB.HasAnyValidators(beaconState, req.PublicKeys) + case <-time.After(6 * time.Second): + activeValidatorExists, validatorStatuses, err := vs.MultipleValidatorStatus(stream.Context(), req.PublicKeys) if err != nil { return err } - if !hasAny { - continue + res := &pb.ValidatorActivationResponse{ + Statuses: validatorStatuses, } - return reply() + if activeValidatorExists { + return stream.Send(res) + } + if err := stream.Send(res); err != nil { + return err + } + case <-stream.Context().Done(): + return errors.New("stream context closed,exiting gorutine") case <-vs.ctx.Done(): return errors.New("rpc context closed, exiting goroutine") } @@ -186,10 +187,8 @@ func (vs *ValidatorServer) assignment( if err != nil { return nil, err } - status, err := vs.validatorStatus(pubkey, beaconState) - if err != nil { - return nil, err - } + status := vs.validatorStatus(pubkey, beaconState) + return &pb.CommitteeAssignmentResponse_CommitteeAssignment{ Committee: committee, Shard: shard, @@ -229,23 +228,11 @@ func (vs *ValidatorServer) ValidatorStatus( }, nil } - eth1BlockNum := eth1BlockNumBigInt.Uint64() - addFollowDistance := eth1BlockNum + params.BeaconConfig().Eth1FollowDistance - eth1Timestamp, err := vs.powChainService.BlockTimeByHeight(ctx, big.NewInt(int64(addFollowDistance))) + depositBlockSlot, err := vs.depositBlockSlot(ctx, eth1BlockNumBigInt, beaconState) if err != nil { return nil, err } - votingPeriodSlots := helpers.StartSlot(params.BeaconConfig().EpochsPerEth1VotingPeriod) - votingPeriodSeconds := time.Duration(votingPeriodSlots*params.BeaconConfig().SecondsPerSlot) * time.Second - - eth1UnixTime := time.Unix(int64(eth1Timestamp), 0) - timeToInclusion := eth1UnixTime.Add(votingPeriodSeconds) - - eth2Genesis := time.Unix(int64(beaconState.GenesisTime), 0) - eth2TimeDifference := timeToInclusion.Sub(eth2Genesis).Seconds() - depositBlockSlot := uint64(eth2TimeDifference) / params.BeaconConfig().SecondsPerSlot - currEpoch := helpers.CurrentEpoch(beaconState) var validatorInState *pbp2p.Validator var validatorIndex uint64 @@ -255,7 +242,7 @@ func (vs *ValidatorServer) ValidatorStatus( return &pb.ValidatorStatusResponse{ Status: pb.ValidatorStatus_ACTIVE, ActivationEpoch: val.ActivationEpoch - params.BeaconConfig().GenesisEpoch, - Eth1DepositBlockNumber: eth1BlockNum, + Eth1DepositBlockNumber: eth1BlockNumBigInt.Uint64(), DepositInclusionSlot: depositBlockSlot, }, nil } @@ -276,16 +263,14 @@ func (vs *ValidatorServer) ValidatorStatus( } } // Our position in the activation queue is the above index - our validator index. - positionInQueue = lastActivatedValidatorIdx - validatorIndex + positionInQueue = validatorIndex - lastActivatedValidatorIdx } - status, err := vs.validatorStatus(req.PublicKey, beaconState) - if err != nil { - return nil, err - } + status := vs.validatorStatus(req.PublicKey, beaconState) + res := &pb.ValidatorStatusResponse{ Status: status, - Eth1DepositBlockNumber: eth1BlockNum, + Eth1DepositBlockNumber: eth1BlockNumBigInt.Uint64(), PositionInActivationQueue: positionInQueue, DepositInclusionSlot: depositBlockSlot, ActivationEpoch: params.BeaconConfig().FarFutureEpoch - params.BeaconConfig().GenesisEpoch, @@ -294,10 +279,114 @@ func (vs *ValidatorServer) ValidatorStatus( return res, nil } -func (vs *ValidatorServer) validatorStatus(pubkey []byte, beaconState *pbp2p.BeaconState) (pb.ValidatorStatus, error) { +// MultipleValidatorStatus returns the validator status response for the set of validators +// requested by their pubkeys. +func (vs *ValidatorServer) MultipleValidatorStatus( + ctx context.Context, + pubkeys [][]byte) (bool, []*pb.ValidatorActivationResponse_Status, error) { + + activeValidatorExists := false + + beaconState, err := vs.beaconDB.HeadState(ctx) + if err != nil { + return false, nil, fmt.Errorf("could not fetch beacon state: %v", err) + } + + validatorMap := stateutils.ValidatorIndexMap(beaconState) + statusResponses := make([]*pb.ValidatorActivationResponse_Status, len(pubkeys)) + + for i, key := range pubkeys { + statusResponses[i] = &pb.ValidatorActivationResponse_Status{ + PublicKey: key, + Status: &pb.ValidatorStatusResponse{}, + } + dep, eth1BlockNumBigInt, err := vs.beaconDB.DepositByPubkey(ctx, key) + if err != nil { + return activeValidatorExists, nil, err + } + if eth1BlockNumBigInt == nil { + statusResponses[i].Status = &pb.ValidatorStatusResponse{ + Status: pb.ValidatorStatus_UNKNOWN_STATUS, + ActivationEpoch: params.BeaconConfig().FarFutureEpoch - params.BeaconConfig().GenesisEpoch, + Eth1DepositBlockNumber: 0, + } + continue + } + + depositBlockSlot, err := vs.depositBlockSlot(ctx, eth1BlockNumBigInt, beaconState) + if err != nil { + statusResponses[i].Status = &pb.ValidatorStatusResponse{ + Status: pb.ValidatorStatus_UNKNOWN_STATUS, + ActivationEpoch: params.BeaconConfig().FarFutureEpoch - params.BeaconConfig().GenesisEpoch, + Eth1DepositBlockNumber: 0, + } + continue + } + + validatorInState := false + currEpoch := helpers.CurrentEpoch(beaconState) + valIndex, ok := validatorMap[bytesutil.ToBytes32(key)] + if ok { + validator := beaconState.ValidatorRegistry[valIndex] + if helpers.IsActiveValidator(validator, currEpoch) { + activeValidatorExists = true + statusResponses[i].Status = &pb.ValidatorStatusResponse{ + Status: pb.ValidatorStatus_ACTIVE, + ActivationEpoch: validator.ActivationEpoch - params.BeaconConfig().GenesisEpoch, + Eth1DepositBlockNumber: eth1BlockNumBigInt.Uint64(), + DepositInclusionSlot: depositBlockSlot, + } + continue + } + validatorInState = true + } + + lastValidatorIndex := len(beaconState.ValidatorRegistry) - 1 + + var lastActivatedValidatorIdx uint64 + for j := lastValidatorIndex; j >= 0; j-- { + if helpers.IsActiveValidator(beaconState.ValidatorRegistry[j], currEpoch) { + lastActivatedValidatorIdx = uint64(j) + break + } + } + + lastValidator := beaconState.ValidatorRegistry[lastValidatorIndex] + lastValidatorDeposit, _, err := vs.beaconDB.DepositByPubkey(ctx, lastValidator.Pubkey) + if err != nil { + return activeValidatorExists, nil, err + } + + var positionInQueue uint64 + if dep.MerkleTreeIndex > lastValidatorDeposit.MerkleTreeIndex { + positionInQueue = dep.MerkleTreeIndex - lastValidatorDeposit.MerkleTreeIndex + } + + // If the validator has deposited and has been added to the state: + if validatorInState { + // Our position in the activation queue is our previous position added with the + // difference between the last added validator and the last activated validator. + positionInQueue += uint64(lastValidatorIndex) - lastActivatedValidatorIdx + } + + status := vs.validatorStatus(key, beaconState) + statusResponses[i].Status = &pb.ValidatorStatusResponse{ + Status: status, + Eth1DepositBlockNumber: eth1BlockNumBigInt.Uint64(), + PositionInActivationQueue: positionInQueue, + DepositInclusionSlot: depositBlockSlot, + ActivationEpoch: params.BeaconConfig().FarFutureEpoch - params.BeaconConfig().GenesisEpoch, + } + + } + + return activeValidatorExists, statusResponses, nil +} + +func (vs *ValidatorServer) validatorStatus(pubkey []byte, beaconState *pbp2p.BeaconState) pb.ValidatorStatus { idx, err := vs.beaconDB.ValidatorIndex(pubkey) if err != nil { - return pb.ValidatorStatus_UNKNOWN_STATUS, fmt.Errorf("could not get active validator index: %v", err) + return pb.ValidatorStatus_UNKNOWN_STATUS } var status pb.ValidatorStatus @@ -321,7 +410,7 @@ func (vs *ValidatorServer) validatorStatus(pubkey []byte, beaconState *pbp2p.Bea status = pb.ValidatorStatus_UNKNOWN_STATUS } - return status, nil + return status } // filterActivePublicKeys takes a list of validator public keys and returns @@ -358,7 +447,7 @@ func (vs *ValidatorServer) addNonActivePublicKeysAssignmentStatus( for _, pk := range pubkeys { hexPk := hex.EncodeToString(pk) if _, ok := validatorMap[hexPk]; !ok || !helpers.IsActiveValidator(validatorMap[hexPk], currentEpoch) { - status, _ := vs.validatorStatus(pk, beaconState) //nolint:gosec + status := vs.validatorStatus(pk, beaconState) //nolint:gosec a := &pb.CommitteeAssignmentResponse_CommitteeAssignment{ PublicKey: pk, Status: status, @@ -368,3 +457,25 @@ func (vs *ValidatorServer) addNonActivePublicKeysAssignmentStatus( } return assignments } + +func (vs *ValidatorServer) depositBlockSlot(ctx context.Context, eth1BlockNumBigInt *big.Int, + beaconState *pbp2p.BeaconState) (uint64, error) { + eth1BlockNum := eth1BlockNumBigInt.Uint64() + addFollowDistance := eth1BlockNum + params.BeaconConfig().Eth1FollowDistance + eth1Timestamp, err := vs.powChainService.BlockTimeByHeight(ctx, big.NewInt(int64(addFollowDistance))) + if err != nil { + return 0, err + } + + votingPeriodSlots := helpers.StartSlot(params.BeaconConfig().EpochsPerEth1VotingPeriod) + votingPeriodSeconds := time.Duration(votingPeriodSlots*params.BeaconConfig().SecondsPerSlot) * time.Second + + eth1UnixTime := time.Unix(int64(eth1Timestamp), 0) + timeToInclusion := eth1UnixTime.Add(votingPeriodSeconds) + + eth2Genesis := time.Unix(int64(beaconState.GenesisTime), 0) + eth2TimeDifference := timeToInclusion.Sub(eth2Genesis).Seconds() + depositBlockSlot := uint64(eth2TimeDifference) / params.BeaconConfig().SecondsPerSlot + + return depositBlockSlot, nil +} diff --git a/beacon-chain/rpc/validator_server_test.go b/beacon-chain/rpc/validator_server_test.go index d005882806..fcaed83ee7 100644 --- a/beacon-chain/rpc/validator_server_test.go +++ b/beacon-chain/rpc/validator_server_test.go @@ -725,6 +725,8 @@ func TestWaitForActivation_ContextClosed(t *testing.T) { defer ctrl.Finish() mockStream := internal.NewMockValidatorService_WaitForActivationServer(ctrl) mockStream.EXPECT().Context().Return(context.Background()) + mockStream.EXPECT().Send(gomock.Any()).Return(nil) + mockStream.EXPECT().Context().Return(context.Background()) exitRoutine := make(chan bool) go func(tt *testing.T) { want := "context closed" @@ -765,6 +767,17 @@ func TestWaitForActivation_ValidatorOriginallyExists(t *testing.T) { if err := db.SaveState(ctx, beaconState); err != nil { t.Fatalf("could not save state: %v", err) } + depData, err := helpers.EncodeDepositData(&pbp2p.DepositInput{ + Pubkey: []byte{'A'}, + }, 10, 10) + if err != nil { + t.Fatal(err) + } + dep := &pbp2p.Deposit{ + DepositData: depData, + } + db.InsertDeposit(context.Background(), dep, big.NewInt(10)) + if err := db.SaveValidatorIndex(pubKeys[0], 0); err != nil { t.Fatalf("could not save validator index: %v", err) } @@ -776,6 +789,7 @@ func TestWaitForActivation_ValidatorOriginallyExists(t *testing.T) { ctx: context.Background(), chainService: newMockChainService(), canonicalStateChan: make(chan *pbp2p.BeaconState, 1), + powChainService: &mockPOWChainService{}, } req := &pb.ValidatorActivationRequest{ PublicKeys: pubKeys, @@ -785,10 +799,22 @@ func TestWaitForActivation_ValidatorOriginallyExists(t *testing.T) { defer ctrl.Finish() mockStream := internal.NewMockValidatorService_WaitForActivationServer(ctrl) mockStream.EXPECT().Context().Return(context.Background()) - mockStream.EXPECT().Context().Return(context.Background()) mockStream.EXPECT().Send( &pb.ValidatorActivationResponse{ - ActivatedPublicKeys: pubKeys, + Statuses: []*pb.ValidatorActivationResponse_Status{ + {PublicKey: []byte{'A'}, + Status: &pb.ValidatorStatusResponse{ + Status: pb.ValidatorStatus_ACTIVE, + Eth1DepositBlockNumber: 10, + DepositInclusionSlot: 1024, + }, + }, + {PublicKey: []byte{'B'}, + Status: &pb.ValidatorStatusResponse{ + ActivationEpoch: params.BeaconConfig().FarFutureEpoch - params.BeaconConfig().GenesisEpoch, + }, + }, + }, }, ).Return(nil) @@ -797,6 +823,97 @@ func TestWaitForActivation_ValidatorOriginallyExists(t *testing.T) { } } +func TestMultipleValidatorStatus_OK(t *testing.T) { + db := internal.SetupDB(t) + defer internal.TeardownDB(t, db) + ctx := context.Background() + + pubKeys := [][]byte{{'A'}, {'B'}, {'C'}} + if err := db.SaveValidatorIndex(pubKeys[0], 0); err != nil { + t.Fatalf("Could not save validator index: %v", err) + } + if err := db.SaveValidatorIndex(pubKeys[1], 0); err != nil { + t.Fatalf("Could not save validator index: %v", err) + } + + beaconState := &pbp2p.BeaconState{ + Slot: params.BeaconConfig().GenesisSlot, + ValidatorRegistry: []*pbp2p.Validator{{ + ActivationEpoch: params.BeaconConfig().GenesisEpoch, + ExitEpoch: params.BeaconConfig().FarFutureEpoch, + Pubkey: pubKeys[0]}, + { + ActivationEpoch: params.BeaconConfig().GenesisEpoch, + ExitEpoch: params.BeaconConfig().FarFutureEpoch, + Pubkey: pubKeys[1]}, + { + ActivationEpoch: params.BeaconConfig().GenesisEpoch, + ExitEpoch: params.BeaconConfig().FarFutureEpoch, + Pubkey: pubKeys[2]}, + }, + } + if err := db.SaveState(ctx, beaconState); err != nil { + t.Fatalf("could not save state: %v", err) + } + depData, err := helpers.EncodeDepositData(&pbp2p.DepositInput{ + Pubkey: []byte{'A'}, + }, 10, 10) + if err != nil { + t.Fatal(err) + } + dep := &pbp2p.Deposit{ + DepositData: depData, + } + db.InsertDeposit(context.Background(), dep, big.NewInt(10)) + depData, err = helpers.EncodeDepositData(&pbp2p.DepositInput{ + Pubkey: []byte{'C'}, + }, 10, 10) + if err != nil { + t.Fatal(err) + } + dep = &pbp2p.Deposit{ + DepositData: depData, + } + db.InsertDeposit(context.Background(), dep, big.NewInt(15)) + + if err := db.SaveValidatorIndex(pubKeys[0], 0); err != nil { + t.Fatalf("could not save validator index: %v", err) + } + if err := db.SaveValidatorIndex(pubKeys[1], 1); err != nil { + t.Fatalf("could not save validator index: %v", err) + } + if err := db.SaveValidatorIndex(pubKeys[2], 2); err != nil { + t.Fatalf("could not save validator index: %v", err) + } + vs := &ValidatorServer{ + beaconDB: db, + ctx: context.Background(), + chainService: newMockChainService(), + canonicalStateChan: make(chan *pbp2p.BeaconState, 1), + powChainService: &mockPOWChainService{}, + } + activeExists, response, err := vs.MultipleValidatorStatus(context.Background(), pubKeys) + if err != nil { + t.Fatal(err) + } + if !activeExists { + t.Fatal("No activated validator exists when there was supposed to be 2") + } + if response[0].Status.Status != pb.ValidatorStatus_ACTIVE { + t.Errorf("Validator with pubkey %#x is not activated and instead has this status: %s", + response[0].PublicKey, response[0].Status.Status.String()) + } + + if response[1].Status.Status == pb.ValidatorStatus_ACTIVE { + t.Errorf("Validator with pubkey %#x is activated despite not supposed to be", response[1].PublicKey) + } + + if response[2].Status.Status != pb.ValidatorStatus_ACTIVE { + t.Errorf("Validator with pubkey %#x is not activated and instead has this status: %s", + response[2].PublicKey, response[2].Status.Status.String()) + } +} + func TestFilterActivePublicKeys(t *testing.T) { currentEpoch := uint64(15) beaconState := &pbp2p.BeaconState{ diff --git a/validator/client/validator.go b/validator/client/validator.go index 855edc8f05..7ae52628ec 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -10,6 +10,7 @@ import ( ptypes "github.com/gogo/protobuf/types" pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1" + "github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/prysmaticlabs/prysm/shared/keystore" "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/slotutil" @@ -87,7 +88,6 @@ func (v *validator) WaitForActivation(ctx context.Context) error { } var validatorActivatedRecords [][]byte for { - log.Info("Waiting for validator to be activated in the beacon chain") res, err := stream.Recv() // If the stream is closed, we stop the loop. if err == io.EOF { @@ -100,8 +100,11 @@ func (v *validator) WaitForActivation(ctx context.Context) error { if err != nil { return fmt.Errorf("could not receive validator activation from stream: %v", err) } - if len(res.ActivatedPublicKeys) > 0 { - validatorActivatedRecords = res.ActivatedPublicKeys + log.Info("Waiting for validator to be activated in the beacon chain") + activatedKeys := v.checkAndLogValidatorStatus(res.Statuses) + + if len(activatedKeys) > 0 { + validatorActivatedRecords = activatedKeys break } } @@ -113,6 +116,39 @@ func (v *validator) WaitForActivation(ctx context.Context) error { return nil } +func (v *validator) checkAndLogValidatorStatus(validatorStatuses []*pb.ValidatorActivationResponse_Status) [][]byte { + var activatedKeys [][]byte + for _, status := range validatorStatuses { + if status.Status.Status == pb.ValidatorStatus_ACTIVE { + activatedKeys = append(activatedKeys, status.PublicKey) + } + if status.Status.DepositInclusionSlot == 0 { + log.WithFields(logrus.Fields{ + "PublicKey": fmt.Sprintf("%#x", bytesutil.Trunc(status.PublicKey)), + "Status": fmt.Sprintf("%s", status.Status.Status.String()), + }).Info("Not Deposited Yet") + continue + } + if status.Status.ActivationEpoch == (params.BeaconConfig().FarFutureEpoch - params.BeaconConfig().GenesisEpoch) { + log.WithFields(logrus.Fields{ + "PublicKey": fmt.Sprintf("%#x", bytesutil.Trunc(status.PublicKey)), + "Status": status.Status.Status.String(), + "DepositInclusionSlot": status.Status.DepositInclusionSlot, + "PositionInActivationQueue": status.Status.PositionInActivationQueue, + }).Info("Waiting to be Activated") + continue + } + log.WithFields(logrus.Fields{ + "PublicKey": fmt.Sprintf("%#x", bytesutil.Trunc(status.PublicKey)), + "Status": status.Status.Status.String(), + "DepositInclusionSlot": status.Status.DepositInclusionSlot, + "ActivationEpoch": status.Status.ActivationEpoch, + "PositionInActivationQueue": status.Status.PositionInActivationQueue, + }).Info("Validator Status") + } + return activatedKeys +} + // CanonicalHeadSlot returns the slot of canonical block currently found in the // beacon chain via RPC. func (v *validator) CanonicalHeadSlot(ctx context.Context) (uint64, error) { diff --git a/validator/client/validator_test.go b/validator/client/validator_test.go index 29a6a571a9..2e061dd08f 100644 --- a/validator/client/validator_test.go +++ b/validator/client/validator_test.go @@ -38,6 +38,19 @@ func publicKeys(keys map[string]*keystore.Key) [][]byte { return pks } +func generateMockStatusResponse(pubkeys [][]byte) *pb.ValidatorActivationResponse { + multipleStatus := make([]*pb.ValidatorActivationResponse_Status, len(pubkeys)) + for i, key := range pubkeys { + multipleStatus[i] = &pb.ValidatorActivationResponse_Status{ + PublicKey: key, + Status: &pb.ValidatorStatusResponse{ + Status: pb.ValidatorStatus_UNKNOWN_STATUS, + }, + } + } + return &pb.ValidatorActivationResponse{Statuses: multipleStatus} +} + func TestWaitForChainStart_SetsChainStartGenesisTime(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -248,6 +261,8 @@ func TestWaitActivation_LogsActivationEpochOK(t *testing.T) { validatorClient: client, } v.pubkeys = publicKeys(v.keys) + resp := generateMockStatusResponse(v.pubkeys) + resp.Statuses[0].Status.Status = pb.ValidatorStatus_ACTIVE clientStream := internal.NewMockValidatorService_WaitForActivationClient(ctrl) client.EXPECT().WaitForActivation( gomock.Any(), @@ -256,9 +271,7 @@ func TestWaitActivation_LogsActivationEpochOK(t *testing.T) { }, ).Return(clientStream, nil) clientStream.EXPECT().Recv().Return( - &pb.ValidatorActivationResponse{ - ActivatedPublicKeys: publicKeys(v.keys), - }, + resp, nil, ) if err := v.WaitForActivation(context.Background()); err != nil { @@ -316,6 +329,9 @@ func TestWaitMultipleActivation_LogsActivationEpochOK(t *testing.T) { validatorClient: client, } v.pubkeys = publicKeys(v.keys) + resp := generateMockStatusResponse(v.pubkeys) + resp.Statuses[0].Status.Status = pb.ValidatorStatus_ACTIVE + resp.Statuses[1].Status.Status = pb.ValidatorStatus_ACTIVE clientStream := internal.NewMockValidatorService_WaitForActivationClient(ctrl) client.EXPECT().WaitForActivation( gomock.Any(), @@ -324,9 +340,7 @@ func TestWaitMultipleActivation_LogsActivationEpochOK(t *testing.T) { }, ).Return(clientStream, nil) clientStream.EXPECT().Recv().Return( - &pb.ValidatorActivationResponse{ - ActivatedPublicKeys: v.pubkeys, - }, + resp, nil, ) if err := v.WaitForActivation(context.Background()); err != nil { @@ -344,6 +358,8 @@ func TestWaitActivation_NotAllValidatorsActivatedOK(t *testing.T) { validatorClient: client, pubkeys: publicKeys(keyMapThreeValidators), } + resp := generateMockStatusResponse(v.pubkeys) + resp.Statuses[0].Status.Status = pb.ValidatorStatus_ACTIVE clientStream := internal.NewMockValidatorService_WaitForActivationClient(ctrl) client.EXPECT().WaitForActivation( gomock.Any(), @@ -356,9 +372,7 @@ func TestWaitActivation_NotAllValidatorsActivatedOK(t *testing.T) { nil, ) clientStream.EXPECT().Recv().Return( - &pb.ValidatorActivationResponse{ - ActivatedPublicKeys: publicKeys(v.keys), - }, + resp, nil, ) if err := v.WaitForActivation(context.Background()); err != nil {