diff --git a/beacon-chain/types/crystallized_state.go b/beacon-chain/types/crystallized_state.go index 1fcd536af1..01dd75fd4c 100644 --- a/beacon-chain/types/crystallized_state.go +++ b/beacon-chain/types/crystallized_state.go @@ -77,6 +77,7 @@ func NewGenesisCrystallizedState(genesisJSONPath string) (*CrystallizedState, er var genesisValidators []*pb.ValidatorRecord var err error if genesisJSONPath != "" { + log.Infof("Initializing crystallized state from %s", genesisJSONPath) genesisValidators, err = initialValidatorsFromJSON(genesisJSONPath) if err != nil { return nil, err diff --git a/validator/beacon/service.go b/validator/beacon/service.go index de0fa23a05..019cfd97e3 100644 --- a/validator/beacon/service.go +++ b/validator/beacon/service.go @@ -198,7 +198,9 @@ func (s *Service) waitForAssignment(ticker <-chan time.Time, client pb.BeaconSer return case <-ticker: - if s.role == pb.ValidatorRole_ATTESTER && s.assignedSlot == s.CurrentBeaconSlot() { + currentSlot := s.CurrentBeaconSlot() + log.Infof("role: %v, assigned slot: %d, current slot: %d", s.role, s.assignedSlot, currentSlot) + if s.role == pb.ValidatorRole_ATTESTER && s.assignedSlot == currentSlot { log.WithField("slotNumber", s.CurrentBeaconSlot()).Info("Assigned attest slot number reached") block, err := client.CanonicalHead(s.ctx, &empty.Empty{}) if err != nil { @@ -208,7 +210,7 @@ func (s *Service) waitForAssignment(ticker <-chan time.Time, client pb.BeaconSer // We forward the latest canonical block to the attester service a feed. s.attesterAssignmentFeed.Send(block) - } else if s.role == pb.ValidatorRole_PROPOSER && s.assignedSlot == s.CurrentBeaconSlot() { + } else if s.role == pb.ValidatorRole_PROPOSER && s.assignedSlot == currentSlot { log.WithField("slotNumber", s.CurrentBeaconSlot()).Info("Assigned proposal slot number reached") block, err := client.CanonicalHead(s.ctx, &empty.Empty{}) if err != nil { diff --git a/validator/beacon/service_test.go b/validator/beacon/service_test.go index 21a6fed06f..ae4758a1de 100644 --- a/validator/beacon/service_test.go +++ b/validator/beacon/service_test.go @@ -290,3 +290,72 @@ func TestListenForProcessedAttestations(t *testing.T) { b.listenForProcessedAttestations(mockServiceClient) testutil.AssertLogsContain(t, hook, "Context has been canceled so shutting down the loop") } + +func TestListenForAssignmentProposer(t *testing.T) { + hook := logTest.NewGlobal() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + b := NewBeaconValidator(context.Background(), []byte{'A'}, &mockClient{ctrl}) + + // Create mock for the stream returned by LatestAttestation. + stream := internal.NewMockBeaconService_ValidatorAssignmentsClient(ctrl) + + // Testing proposer assignment. + stream.EXPECT().Recv().Return(&pb.ValidatorAssignmentResponse{Assignments: []*pb.Assignment{{ + PublicKey: &pb.PublicKey{PublicKey: []byte{'A'}}, + ShardId: 2, + AssignedSlot: 2, + Role: pb.ValidatorRole_PROPOSER}}}, nil) + stream.EXPECT().Recv().Return(&pb.ValidatorAssignmentResponse{}, io.EOF) + + mockServiceValidator := internal.NewMockBeaconServiceClient(ctrl) + mockServiceValidator.EXPECT().ValidatorAssignments( + gomock.Any(), + gomock.Any(), + ).Return(stream, nil) + + b.listenForAssignmentChange(mockServiceValidator) + + testutil.AssertLogsContain(t, hook, "Validator with pub key 0xA re-assigned to shard ID 2 for PROPOSER duty at slot 2") + + // Testing an error coming from the stream. + stream = internal.NewMockBeaconService_ValidatorAssignmentsClient(ctrl) + stream.EXPECT().Recv().Return(&pb.ValidatorAssignmentResponse{}, errors.New("stream error")) + stream.EXPECT().Recv().Return(&pb.ValidatorAssignmentResponse{}, io.EOF) + + mockServiceValidator = internal.NewMockBeaconServiceClient(ctrl) + mockServiceValidator.EXPECT().ValidatorAssignments( + gomock.Any(), + gomock.Any(), + ).Return(stream, nil) + + b.listenForAssignmentChange(mockServiceValidator) + + testutil.AssertLogsContain(t, hook, "stream error") + + // Creating a faulty stream will trigger error. + mockServiceValidator = internal.NewMockBeaconServiceClient(ctrl) + mockServiceValidator.EXPECT().ValidatorAssignments( + gomock.Any(), + gomock.Any(), + ).Return(stream, errors.New("stream creation failed")) + + b.listenForAssignmentChange(mockServiceValidator) + testutil.AssertLogsContain(t, hook, "stream creation failed") + testutil.AssertLogsContain(t, hook, "could not fetch validator assigned slot and responsibility from beacon node") + + // Test that the routine exits when context is closed + stream = internal.NewMockBeaconService_ValidatorAssignmentsClient(ctrl) + stream.EXPECT().Recv().Return(&pb.ValidatorAssignmentResponse{}, nil) + + //mockServiceClient = internal.NewMockBeaconServiceClient(ctrl) + mockServiceValidator = internal.NewMockBeaconServiceClient(ctrl) + mockServiceValidator.EXPECT().ValidatorAssignments( + gomock.Any(), + gomock.Any(), + ).Return(stream, nil) + b.cancel() + // + b.listenForAssignmentChange(mockServiceValidator) + testutil.AssertLogsContain(t, hook, "Context has been canceled so shutting down the loop") +}