From bb319e02e8e5a0e4621cff618aab3a2ef005e500 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Kapka?= Date: Thu, 28 Oct 2021 10:56:22 +0200 Subject: [PATCH] Event support for `contribution_and_proof` and `voluntary_exit` (#9779) * Event support for `contribution_and_prrof` * event test * fix panic in tests * fix * Revert "Auxiliary commit to revert individual files from dc8d01a15f0056c1fb48733219feab6461f71695" This reverts commit f5f198564079781f80e1a045cefad7c27f89af25. * remove receiver * revive test * move sending events to sync package * remove receiver * remove notification test * build file * notifier tests * revert removal of exit event in API * simplify exit test * send notification in contribution API method * test fix Co-authored-by: Raul Jordan Co-authored-by: terence tsao --- beacon-chain/core/feed/operation/events.go | 13 +- .../rpc/apimiddleware/custom_handlers.go | 2 + beacon-chain/rpc/eth/events/BUILD.bazel | 1 + beacon-chain/rpc/eth/events/events.go | 53 ++++--- beacon-chain/rpc/eth/events/events_test.go | 44 ++++++ .../rpc/eth/validator/validator_test.go | 3 +- .../rpc/prysm/v1alpha1/validator/exit.go | 1 - .../rpc/prysm/v1alpha1/validator/exit_test.go | 5 +- .../v1alpha1/validator/sync_committee.go | 12 ++ .../v1alpha1/validator/sync_committee_test.go | 45 ++++++ beacon-chain/sync/BUILD.bazel | 1 + .../sync/validate_sync_contribution_proof.go | 11 ++ .../validate_sync_contribution_proof_test.go | 138 ++++++++++++++++++ beacon-chain/sync/validate_voluntary_exit.go | 11 ++ .../sync/validate_voluntary_exit_test.go | 27 +++- proto/migration/v1alpha1_to_v1.go | 18 +++ proto/migration/v1alpha1_to_v1_test.go | 31 ++++ 17 files changed, 387 insertions(+), 29 deletions(-) diff --git a/beacon-chain/core/feed/operation/events.go b/beacon-chain/core/feed/operation/events.go index 677e4a150e..28b8392744 100644 --- a/beacon-chain/core/feed/operation/events.go +++ b/beacon-chain/core/feed/operation/events.go @@ -1,6 +1,4 @@ -// Package operation contains types for block operation-specific events fired -// during the runtime of a beacon node such as attestations, voluntary -// exits, and slashings. +// Package operation contains types for block operation-specific events fired during the runtime of a beacon node. package operation import ( @@ -18,6 +16,9 @@ const ( // ExitReceived is sent after an voluntary exit object has been received from the outside world (eg in RPC or sync) ExitReceived + + // SyncCommitteeContributionReceived is sent after a sync committee contribution object has been received. + SyncCommitteeContributionReceived ) // UnAggregatedAttReceivedData is the data sent with UnaggregatedAttReceived events. @@ -37,3 +38,9 @@ type ExitReceivedData struct { // Exit is the voluntary exit object. Exit *ethpb.SignedVoluntaryExit } + +// SyncCommitteeContributionReceivedData is the data sent with SyncCommitteeContributionReceived objects. +type SyncCommitteeContributionReceivedData struct { + // Contribution is the sync committee contribution object. + Contribution *ethpb.SignedContributionAndProof +} diff --git a/beacon-chain/rpc/apimiddleware/custom_handlers.go b/beacon-chain/rpc/apimiddleware/custom_handlers.go index 4d6744433c..df8922ef31 100644 --- a/beacon-chain/rpc/apimiddleware/custom_handlers.go +++ b/beacon-chain/rpc/apimiddleware/custom_handlers.go @@ -243,6 +243,8 @@ func receiveEvents(eventChan <-chan *sse.Event, w http.ResponseWriter, req *http data = &eventFinalizedCheckpointJson{} case events.ChainReorgTopic: data = &eventChainReorgJson{} + case events.SyncCommitteeContributionTopic: + data = &signedContributionAndProofJson{} case "error": data = &eventErrorJson{} default: diff --git a/beacon-chain/rpc/eth/events/BUILD.bazel b/beacon-chain/rpc/eth/events/BUILD.bazel index a48e96f8f5..6dbee148f0 100644 --- a/beacon-chain/rpc/eth/events/BUILD.bazel +++ b/beacon-chain/rpc/eth/events/BUILD.bazel @@ -46,6 +46,7 @@ go_test( "//testing/util:go_default_library", "@com_github_golang_mock//gomock:go_default_library", "@com_github_grpc_ecosystem_grpc_gateway_v2//proto/gateway:go_default_library", + "@com_github_prysmaticlabs_go_bitfield//:go_default_library", "@org_golang_google_protobuf//types/known/anypb:go_default_library", ], ) diff --git a/beacon-chain/rpc/eth/events/events.go b/beacon-chain/rpc/eth/events/events.go index ca174b80a5..e40c475ac6 100644 --- a/beacon-chain/rpc/eth/events/events.go +++ b/beacon-chain/rpc/eth/events/events.go @@ -29,15 +29,18 @@ const ( FinalizedCheckpointTopic = "finalized_checkpoint" // ChainReorgTopic represents a chain reorganization event topic. ChainReorgTopic = "chain_reorg" + // SyncCommitteeContributionTopic represents a new sync committee contribution event topic. + SyncCommitteeContributionTopic = "contribution_and_proof" ) var casesHandled = map[string]bool{ - HeadTopic: true, - BlockTopic: true, - AttestationTopic: true, - VoluntaryExitTopic: true, - FinalizedCheckpointTopic: true, - ChainReorgTopic: true, + HeadTopic: true, + BlockTopic: true, + AttestationTopic: true, + VoluntaryExitTopic: true, + FinalizedCheckpointTopic: true, + ChainReorgTopic: true, + SyncCommitteeContributionTopic: true, } // StreamEvents allows requesting all events from a set of topics defined in the Ethereum consensus API standard. @@ -76,15 +79,15 @@ func (s *Server) StreamEvents( for { select { case event := <-blockChan: - if err := s.handleBlockEvents(stream, requestedTopics, event); err != nil { + if err := handleBlockEvents(stream, requestedTopics, event); err != nil { return status.Errorf(codes.Internal, "Could not handle block event: %v", err) } case event := <-opsChan: - if err := s.handleBlockOperationEvents(stream, requestedTopics, event); err != nil { + if err := handleBlockOperationEvents(stream, requestedTopics, event); err != nil { return status.Errorf(codes.Internal, "Could not handle block operations event: %v", err) } case event := <-stateChan: - if err := s.handleStateEvents(stream, requestedTopics, event); err != nil { + if err := handleStateEvents(stream, requestedTopics, event); err != nil { return status.Errorf(codes.Internal, "Could not handle state event: %v", err) } case <-s.Ctx.Done(): @@ -95,7 +98,7 @@ func (s *Server) StreamEvents( } } -func (s *Server) handleBlockEvents( +func handleBlockEvents( stream ethpbservice.Events_StreamEventsServer, requestedTopics map[string]bool, event *feed.Event, ) error { switch event.Type { @@ -119,13 +122,13 @@ func (s *Server) handleBlockEvents( Slot: v1Data.Message.Slot, Block: item[:], } - return s.streamData(stream, BlockTopic, eventBlock) + return streamData(stream, BlockTopic, eventBlock) default: return nil } } -func (s *Server) handleBlockOperationEvents( +func handleBlockOperationEvents( stream ethpbservice.Events_StreamEventsServer, requestedTopics map[string]bool, event *feed.Event, ) error { switch event.Type { @@ -138,7 +141,7 @@ func (s *Server) handleBlockOperationEvents( return nil } v1Data := migration.V1Alpha1AggregateAttAndProofToV1(attData.Attestation) - return s.streamData(stream, AttestationTopic, v1Data) + return streamData(stream, AttestationTopic, v1Data) case operation.UnaggregatedAttReceived: if _, ok := requestedTopics[AttestationTopic]; !ok { return nil @@ -148,7 +151,7 @@ func (s *Server) handleBlockOperationEvents( return nil } v1Data := migration.V1Alpha1AttestationToV1(attData.Attestation) - return s.streamData(stream, AttestationTopic, v1Data) + return streamData(stream, AttestationTopic, v1Data) case operation.ExitReceived: if _, ok := requestedTopics[VoluntaryExitTopic]; !ok { return nil @@ -158,13 +161,23 @@ func (s *Server) handleBlockOperationEvents( return nil } v1Data := migration.V1Alpha1ExitToV1(exitData.Exit) - return s.streamData(stream, VoluntaryExitTopic, v1Data) + return streamData(stream, VoluntaryExitTopic, v1Data) + case operation.SyncCommitteeContributionReceived: + if _, ok := requestedTopics[SyncCommitteeContributionTopic]; !ok { + return nil + } + contributionData, ok := event.Data.(*operation.SyncCommitteeContributionReceivedData) + if !ok { + return nil + } + v2Data := migration.V1Alpha1SignedContributionAndProofToV2(contributionData.Contribution) + return streamData(stream, SyncCommitteeContributionTopic, v2Data) default: return nil } } -func (s *Server) handleStateEvents( +func handleStateEvents( stream ethpbservice.Events_StreamEventsServer, requestedTopics map[string]bool, event *feed.Event, ) error { switch event.Type { @@ -176,7 +189,7 @@ func (s *Server) handleStateEvents( if !ok { return nil } - return s.streamData(stream, HeadTopic, head) + return streamData(stream, HeadTopic, head) case statefeed.FinalizedCheckpoint: if _, ok := requestedTopics[FinalizedCheckpointTopic]; !ok { return nil @@ -185,7 +198,7 @@ func (s *Server) handleStateEvents( if !ok { return nil } - return s.streamData(stream, FinalizedCheckpointTopic, finalizedCheckpoint) + return streamData(stream, FinalizedCheckpointTopic, finalizedCheckpoint) case statefeed.Reorg: if _, ok := requestedTopics[ChainReorgTopic]; !ok { return nil @@ -194,13 +207,13 @@ func (s *Server) handleStateEvents( if !ok { return nil } - return s.streamData(stream, ChainReorgTopic, reorg) + return streamData(stream, ChainReorgTopic, reorg) default: return nil } } -func (s *Server) streamData(stream ethpbservice.Events_StreamEventsServer, name string, data proto.Message) error { +func streamData(stream ethpbservice.Events_StreamEventsServer, name string, data proto.Message) error { returnData, err := anypb.New(data) if err != nil { return err diff --git a/beacon-chain/rpc/eth/events/events_test.go b/beacon-chain/rpc/eth/events/events_test.go index c32b82db5b..994207a7f7 100644 --- a/beacon-chain/rpc/eth/events/events_test.go +++ b/beacon-chain/rpc/eth/events/events_test.go @@ -6,6 +6,7 @@ import ( "github.com/golang/mock/gomock" "github.com/grpc-ecosystem/grpc-gateway/v2/proto/gateway" + "github.com/prysmaticlabs/go-bitfield" "github.com/prysmaticlabs/prysm/async/event" mockChain "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" @@ -185,6 +186,49 @@ func TestStreamEvents_OperationsEvents(t *testing.T) { feed: srv.OperationNotifier.OperationFeed(), }) }) + t.Run(SyncCommitteeContributionTopic, func(t *testing.T) { + ctx := context.Background() + srv, ctrl, mockStream := setupServer(ctx, t) + defer ctrl.Finish() + + wantedContributionV1alpha1 := ð.SignedContributionAndProof{ + Message: ð.ContributionAndProof{ + AggregatorIndex: 1, + Contribution: ð.SyncCommitteeContribution{ + Slot: 1, + BlockRoot: []byte("root"), + SubcommitteeIndex: 1, + AggregationBits: bitfield.NewBitvector128(), + Signature: []byte("sig"), + }, + SelectionProof: []byte("proof"), + }, + Signature: []byte("sig"), + } + wantedContribution := migration.V1Alpha1SignedContributionAndProofToV2(wantedContributionV1alpha1) + genericResponse, err := anypb.New(wantedContribution) + require.NoError(t, err) + + wantedMessage := &gateway.EventSource{ + Event: SyncCommitteeContributionTopic, + Data: genericResponse, + } + + assertFeedSendAndReceive(ctx, &assertFeedArgs{ + t: t, + srv: srv, + topics: []string{SyncCommitteeContributionTopic}, + stream: mockStream, + shouldReceive: wantedMessage, + itemToSend: &feed.Event{ + Type: operation.SyncCommitteeContributionReceived, + Data: &operation.SyncCommitteeContributionReceivedData{ + Contribution: wantedContributionV1alpha1, + }, + }, + feed: srv.OperationNotifier.OperationFeed(), + }) + }) } func TestStreamEvents_StateEvents(t *testing.T) { diff --git a/beacon-chain/rpc/eth/validator/validator_test.go b/beacon-chain/rpc/eth/validator/validator_test.go index a45d32b00b..6b5ff915a7 100644 --- a/beacon-chain/rpc/eth/validator/validator_test.go +++ b/beacon-chain/rpc/eth/validator/validator_test.go @@ -1728,7 +1728,8 @@ func TestSubmitContributionAndProofs(t *testing.T) { aggBits := bitfield.NewBitvector128() aggBits.SetBitAt(0, true) v1Server := &v1alpha1validator.Server{ - P2P: &p2pmock.MockBroadcaster{}, + P2P: &p2pmock.MockBroadcaster{}, + OperationNotifier: (&mockChain.ChainService{}).OperationNotifier(), } server := &Server{ V1Alpha1Server: v1Server, diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/exit.go b/beacon-chain/rpc/prysm/v1alpha1/validator/exit.go index 039afa982e..23feb8dc08 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/exit.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/exit.go @@ -38,7 +38,6 @@ func (vs *Server) ProposeExit(ctx context.Context, req *ethpb.SignedVoluntaryExi return nil, status.Error(codes.InvalidArgument, err.Error()) } - // Send the voluntary exit to the operation feed. vs.OperationNotifier.OperationFeed().Send(&feed.Event{ Type: opfeed.ExitReceived, Data: &opfeed.ExitReceivedData{ diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/exit_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/exit_test.go index a32de35078..efe8cc1ba3 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/exit_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/exit_test.go @@ -79,9 +79,8 @@ func TestProposeExit_Notification(t *testing.T) { if event.Type == opfeed.ExitReceived { notificationFound = true data, ok := event.Data.(*opfeed.ExitReceivedData) - assert.Equal(t, true, ok, "Entity is not of type *opfeed.ExitReceivedData") - assert.Equal(t, epoch, data.Exit.Exit.Epoch, "Unexpected state feed epoch") - assert.Equal(t, validatorIndex, data.Exit.Exit.ValidatorIndex, "Unexpected state feed validator index") + assert.Equal(t, true, ok, "Entity is of the wrong type") + assert.NotNil(t, data.Exit) } case <-opSub.Err(): t.Error("Subscription to state notifier failed") diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/sync_committee.go b/beacon-chain/rpc/prysm/v1alpha1/validator/sync_committee.go index 917011bf23..769ad42f15 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/sync_committee.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/sync_committee.go @@ -6,6 +6,8 @@ import ( "github.com/pkg/errors" types "github.com/prysmaticlabs/eth2-types" + "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" + opfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation" "github.com/prysmaticlabs/prysm/config/params" "github.com/prysmaticlabs/prysm/crypto/bls" "github.com/prysmaticlabs/prysm/encoding/bytesutil" @@ -120,6 +122,16 @@ func (vs *Server) SubmitSignedContributionAndProof( // Wait for p2p broadcast to complete and return the first error (if any) err := errs.Wait() + + if err == nil { + vs.OperationNotifier.OperationFeed().Send(&feed.Event{ + Type: opfeed.SyncCommitteeContributionReceived, + Data: &opfeed.SyncCommitteeContributionReceivedData{ + Contribution: s, + }, + }) + } + return &emptypb.Empty{}, err } diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/sync_committee_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/sync_committee_test.go index 2d68870b07..6a56d7a8d9 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/sync_committee_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/sync_committee_test.go @@ -6,11 +6,14 @@ import ( types "github.com/prysmaticlabs/eth2-types" mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" + opfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation" "github.com/prysmaticlabs/prysm/beacon-chain/core/transition" "github.com/prysmaticlabs/prysm/beacon-chain/operations/synccommittee" mockp2p "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing" "github.com/prysmaticlabs/prysm/config/params" ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/testing/assert" "github.com/prysmaticlabs/prysm/testing/require" "github.com/prysmaticlabs/prysm/testing/util" "google.golang.org/protobuf/types/known/emptypb" @@ -70,6 +73,7 @@ func TestSubmitSignedContributionAndProof_OK(t *testing.T) { server := &Server{ SyncCommitteePool: synccommittee.NewStore(), P2P: &mockp2p.MockBroadcaster{}, + OperationNotifier: (&mock.ChainService{}).OperationNotifier(), } contribution := ðpb.SignedContributionAndProof{ Message: ðpb.ContributionAndProof{ @@ -85,3 +89,44 @@ func TestSubmitSignedContributionAndProof_OK(t *testing.T) { require.NoError(t, err) require.DeepEqual(t, []*ethpb.SyncCommitteeContribution{contribution.Message.Contribution}, savedMsgs) } + +func TestSubmitSignedContributionAndProof_Notification(t *testing.T) { + server := &Server{ + SyncCommitteePool: synccommittee.NewStore(), + P2P: &mockp2p.MockBroadcaster{}, + OperationNotifier: (&mock.ChainService{}).OperationNotifier(), + } + + // Subscribe to operation notifications. + opChannel := make(chan *feed.Event, 1024) + opSub := server.OperationNotifier.OperationFeed().Subscribe(opChannel) + defer opSub.Unsubscribe() + + contribution := ðpb.SignedContributionAndProof{ + Message: ðpb.ContributionAndProof{ + Contribution: ðpb.SyncCommitteeContribution{ + Slot: 1, + SubcommitteeIndex: 2, + }, + }, + } + _, err := server.SubmitSignedContributionAndProof(context.Background(), contribution) + require.NoError(t, err) + + // Ensure the state notification was broadcast. + notificationFound := false + for !notificationFound { + select { + case event := <-opChannel: + if event.Type == opfeed.SyncCommitteeContributionReceived { + notificationFound = true + data, ok := event.Data.(*opfeed.SyncCommitteeContributionReceivedData) + assert.Equal(t, true, ok, "Entity is of the wrong type") + assert.NotNil(t, data.Contribution) + } + case <-opSub.Err(): + t.Error("Subscription to state notifier failed") + return + } + } +} diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 2977ef7270..0d42f2eceb 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -167,6 +167,7 @@ go_test( "//beacon-chain/cache:go_default_library", "//beacon-chain/core/altair:go_default_library", "//beacon-chain/core/feed:go_default_library", + "//beacon-chain/core/feed/operation:go_default_library", "//beacon-chain/core/feed/state:go_default_library", "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/core/signing:go_default_library", diff --git a/beacon-chain/sync/validate_sync_contribution_proof.go b/beacon-chain/sync/validate_sync_contribution_proof.go index 44e05fc96e..f8a149f4d6 100644 --- a/beacon-chain/sync/validate_sync_contribution_proof.go +++ b/beacon-chain/sync/validate_sync_contribution_proof.go @@ -8,6 +8,8 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" types "github.com/prysmaticlabs/eth2-types" "github.com/prysmaticlabs/prysm/beacon-chain/core/altair" + "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" + opfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation" "github.com/prysmaticlabs/prysm/beacon-chain/core/signing" p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types" "github.com/prysmaticlabs/prysm/config/features" @@ -80,6 +82,15 @@ func (s *Service) validateSyncContributionAndProof(ctx context.Context, pid peer msg.ValidatorData = m + // Broadcast the contribution on a feed to notify other services in the beacon node + // of a received contribution. + s.cfg.OperationNotifier.OperationFeed().Send(&feed.Event{ + Type: opfeed.SyncCommitteeContributionReceived, + Data: &opfeed.SyncCommitteeContributionReceivedData{ + Contribution: m, + }, + }) + return pubsub.ValidationAccept, nil } diff --git a/beacon-chain/sync/validate_sync_contribution_proof_test.go b/beacon-chain/sync/validate_sync_contribution_proof_test.go index b1015fb6ee..bad7ec756f 100644 --- a/beacon-chain/sync/validate_sync_contribution_proof_test.go +++ b/beacon-chain/sync/validate_sync_contribution_proof_test.go @@ -14,6 +14,8 @@ import ( "github.com/prysmaticlabs/go-bitfield" mockChain "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" "github.com/prysmaticlabs/prysm/beacon-chain/core/altair" + "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" + opfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation" "github.com/prysmaticlabs/prysm/beacon-chain/core/signing" "github.com/prysmaticlabs/prysm/beacon-chain/core/transition" "github.com/prysmaticlabs/prysm/beacon-chain/db" @@ -888,6 +890,142 @@ func TestService_ValidateSyncContributionAndProof(t *testing.T) { } } +func TestService_ValidateSyncContributionAndProof_Broadcast(t *testing.T) { + ctx := context.Background() + db := testingDB.SetupDB(t) + headRoot, keys := fillUpBlocksAndState(ctx, t, db) + defaultTopic := p2p.SyncContributionAndProofSubnetTopicFormat + defaultTopic = fmt.Sprintf(defaultTopic, []byte{0xAB, 0x00, 0xCC, 0x9E}) + defaultTopic = defaultTopic + "/" + encoder.ProtocolSuffixSSZSnappy + emptySig := [96]byte{} + pid := peer.ID("random") + msg := ðpb.SignedContributionAndProof{ + Message: ðpb.ContributionAndProof{ + AggregatorIndex: 1, + Contribution: ðpb.SyncCommitteeContribution{ + Slot: 0, + SubcommitteeIndex: 1, + BlockRoot: params.BeaconConfig().ZeroHash[:], + AggregationBits: bitfield.NewBitvector128(), + Signature: emptySig[:], + }, + SelectionProof: emptySig[:], + }, + Signature: emptySig[:], + } + chainService := &mockChain.ChainService{ + Genesis: time.Now(), + ValidatorsRoot: [32]byte{'A'}, + } + s := NewService(context.Background(), &Config{ + P2P: mockp2p.NewTestP2P(t), + InitialSync: &mockSync.Sync{IsSyncing: false}, + Chain: chainService, + StateNotifier: chainService.StateNotifier(), + OperationNotifier: chainService.OperationNotifier(), + }) + s.cfg.StateGen = stategen.New(db) + msg.Message.Contribution.BlockRoot = headRoot[:] + s.cfg.DB = db + hState, err := db.State(context.Background(), headRoot) + assert.NoError(t, err) + sc, err := hState.CurrentSyncCommittee() + assert.NoError(t, err) + cd, err := signing.Domain(hState.Fork(), slots.ToEpoch(slots.PrevSlot(hState.Slot())), params.BeaconConfig().DomainContributionAndProof, hState.GenesisValidatorRoot()) + assert.NoError(t, err) + d, err := signing.Domain(hState.Fork(), slots.ToEpoch(hState.Slot()), params.BeaconConfig().DomainSyncCommittee, hState.GenesisValidatorRoot()) + assert.NoError(t, err) + var pubkeys [][]byte + for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ { + coms, err := altair.SyncSubCommitteePubkeys(sc, types.CommitteeIndex(i)) + pubkeys = coms + assert.NoError(t, err) + for _, p := range coms { + idx, ok := hState.ValidatorIndexByPubkey(bytesutil.ToBytes48(p)) + assert.Equal(t, true, ok) + rt, err := syncSelectionProofSigningRoot(hState, slots.PrevSlot(hState.Slot()), types.CommitteeIndex(i)) + assert.NoError(t, err) + sig := keys[idx].Sign(rt[:]) + isAggregator, err := altair.IsSyncCommitteeAggregator(sig.Marshal()) + require.NoError(t, err) + if isAggregator { + msg.Message.AggregatorIndex = idx + msg.Message.SelectionProof = sig.Marshal() + msg.Message.Contribution.Slot = slots.PrevSlot(hState.Slot()) + msg.Message.Contribution.SubcommitteeIndex = i + msg.Message.Contribution.BlockRoot = headRoot[:] + msg.Message.Contribution.AggregationBits = bitfield.NewBitvector128() + // Only Sign for 1 validator. + rawBytes := p2ptypes.SSZBytes(headRoot[:]) + sigRoot, err := signing.ComputeSigningRoot(&rawBytes, d) + assert.NoError(t, err) + valIdx, ok := hState.ValidatorIndexByPubkey(bytesutil.ToBytes48(coms[0])) + assert.Equal(t, true, ok) + sig = keys[valIdx].Sign(sigRoot[:]) + msg.Message.Contribution.AggregationBits.SetBitAt(uint64(0), true) + msg.Message.Contribution.Signature = sig.Marshal() + + sigRoot, err = signing.ComputeSigningRoot(msg.Message, cd) + assert.NoError(t, err) + contrSig := keys[idx].Sign(sigRoot[:]) + msg.Signature = contrSig.Marshal() + break + } + } + } + + pd, err := signing.Domain(hState.Fork(), slots.ToEpoch(slots.PrevSlot(hState.Slot())), params.BeaconConfig().DomainSyncCommitteeSelectionProof, hState.GenesisValidatorRoot()) + require.NoError(t, err) + subCommitteeSize := params.BeaconConfig().SyncCommitteeSize / params.BeaconConfig().SyncCommitteeSubnetCount + s.cfg.Chain = &mockChain.ChainService{ + ValidatorsRoot: [32]byte{'A'}, + Genesis: time.Now().Add(-time.Second * time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Duration(msg.Message.Contribution.Slot)), + SyncCommitteeIndices: []types.CommitteeIndex{types.CommitteeIndex(msg.Message.Contribution.SubcommitteeIndex * subCommitteeSize)}, + PublicKey: bytesutil.ToBytes48(keys[msg.Message.AggregatorIndex].PublicKey().Marshal()), + SyncSelectionProofDomain: pd, + SyncContributionProofDomain: cd, + SyncCommitteeDomain: d, + SyncCommitteePubkeys: pubkeys, + } + s.initCaches() + + marshalledObj, err := msg.MarshalSSZ() + assert.NoError(t, err) + marshalledObj = snappy.Encode(nil, marshalledObj) + pubsubMsg := &pubsub.Message{ + Message: &pubsub_pb.Message{ + Data: marshalledObj, + Topic: &defaultTopic, + }, + ReceivedFrom: "", + ValidatorData: nil, + } + + // Subscribe to operation notifications. + opChannel := make(chan *feed.Event, 1) + opSub := s.cfg.OperationNotifier.OperationFeed().Subscribe(opChannel) + defer opSub.Unsubscribe() + + _, err = s.validateSyncContributionAndProof(ctx, pid, pubsubMsg) + require.NoError(t, err) + + // Ensure the state notification was broadcast. + notificationFound := false + for !notificationFound { + select { + case event := <-opChannel: + if event.Type == opfeed.SyncCommitteeContributionReceived { + notificationFound = true + _, ok := event.Data.(*opfeed.SyncCommitteeContributionReceivedData) + assert.Equal(t, true, ok, "Entity is not of type *opfeed.SyncCommitteeContributionReceivedData") + } + case <-opSub.Err(): + t.Error("Subscription to state notifier failed") + return + } + } +} + func fillUpBlocksAndState(ctx context.Context, t *testing.T, beaconDB db.Database) ([32]byte, []bls.SecretKey) { gs, keys := util.DeterministicGenesisStateAltair(t, 64) sCom, err := altair.NextSyncCommittee(ctx, gs) diff --git a/beacon-chain/sync/validate_voluntary_exit.go b/beacon-chain/sync/validate_voluntary_exit.go index eec695a535..104b855f58 100644 --- a/beacon-chain/sync/validate_voluntary_exit.go +++ b/beacon-chain/sync/validate_voluntary_exit.go @@ -8,6 +8,8 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" types "github.com/prysmaticlabs/eth2-types" "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks" + "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" + opfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation" "github.com/prysmaticlabs/prysm/monitoring/tracing" ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" "go.opencensus.io/trace" @@ -66,6 +68,15 @@ func (s *Service) validateVoluntaryExit(ctx context.Context, pid peer.ID, msg *p msg.ValidatorData = exit // Used in downstream subscriber + // Broadcast the voluntary exit on a feed to notify other services in the beacon node + // of a received voluntary exit. + s.cfg.OperationNotifier.OperationFeed().Send(&feed.Event{ + Type: opfeed.ExitReceived, + Data: &opfeed.ExitReceivedData{ + Exit: exit, + }, + }) + return pubsub.ValidationAccept, nil } diff --git a/beacon-chain/sync/validate_voluntary_exit_test.go b/beacon-chain/sync/validate_voluntary_exit_test.go index 5d701bbea1..bcf6436980 100644 --- a/beacon-chain/sync/validate_voluntary_exit_test.go +++ b/beacon-chain/sync/validate_voluntary_exit_test.go @@ -11,6 +11,8 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb" mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" + opfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation" "github.com/prysmaticlabs/prysm/beacon-chain/core/signing" coreTime "github.com/prysmaticlabs/prysm/beacon-chain/core/time" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" @@ -81,7 +83,8 @@ func TestValidateVoluntaryExit_ValidExit(t *testing.T) { State: s, Genesis: time.Now(), }, - InitialSync: &mockSync.Sync{IsSyncing: false}, + InitialSync: &mockSync.Sync{IsSyncing: false}, + OperationNotifier: (&mock.ChainService{}).OperationNotifier(), }, seenExitCache: lruwrpr.New(10), } @@ -99,11 +102,33 @@ func TestValidateVoluntaryExit_ValidExit(t *testing.T) { Topic: &topic, }, } + + // Subscribe to operation notifications. + opChannel := make(chan *feed.Event, 1) + opSub := r.cfg.OperationNotifier.OperationFeed().Subscribe(opChannel) + defer opSub.Unsubscribe() + res, err := r.validateVoluntaryExit(ctx, "", m) assert.NoError(t, err) valid := res == pubsub.ValidationAccept assert.Equal(t, true, valid, "Failed validation") assert.NotNil(t, m.ValidatorData, "Decoded message was not set on the message validator data") + + // Ensure the state notification was broadcast. + notificationFound := false + for !notificationFound { + select { + case event := <-opChannel: + if event.Type == opfeed.ExitReceived { + notificationFound = true + _, ok := event.Data.(*opfeed.ExitReceivedData) + assert.Equal(t, true, ok, "Entity is not of type *opfeed.ExitReceivedData") + } + case <-opSub.Err(): + t.Error("Subscription to state notifier failed") + return + } + } } func TestValidateVoluntaryExit_InvalidExitSlot(t *testing.T) { diff --git a/proto/migration/v1alpha1_to_v1.go b/proto/migration/v1alpha1_to_v1.go index 726ca7c7bc..a46d49b7dd 100644 --- a/proto/migration/v1alpha1_to_v1.go +++ b/proto/migration/v1alpha1_to_v1.go @@ -625,3 +625,21 @@ func BeaconStateAltairToV2(altairState *statev2.BeaconState) (*ethpbv2.BeaconSta return result, nil } + +func V1Alpha1SignedContributionAndProofToV2(alphaContribution *ethpbalpha.SignedContributionAndProof) *ethpbv2.SignedContributionAndProof { + result := ðpbv2.SignedContributionAndProof{ + Message: ðpbv2.ContributionAndProof{ + AggregatorIndex: alphaContribution.Message.AggregatorIndex, + Contribution: ðpbv2.SyncCommitteeContribution{ + Slot: alphaContribution.Message.Contribution.Slot, + BeaconBlockRoot: alphaContribution.Message.Contribution.BlockRoot, + SubcommitteeIndex: alphaContribution.Message.Contribution.SubcommitteeIndex, + AggregationBits: alphaContribution.Message.Contribution.AggregationBits, + Signature: alphaContribution.Message.Contribution.Signature, + }, + SelectionProof: alphaContribution.Message.SelectionProof, + }, + Signature: alphaContribution.Signature, + } + return result +} diff --git a/proto/migration/v1alpha1_to_v1_test.go b/proto/migration/v1alpha1_to_v1_test.go index f98c0376d7..810c1181ee 100644 --- a/proto/migration/v1alpha1_to_v1_test.go +++ b/proto/migration/v1alpha1_to_v1_test.go @@ -689,3 +689,34 @@ func TestBeaconStateToV1(t *testing.T) { assert.Equal(t, types.Epoch(32), resultFinalizedCheckpoint.Epoch) assert.DeepEqual(t, bytesutil.PadTo([]byte("fcroot"), 32), resultFinalizedCheckpoint.Root) } + +func TestV1Alpha1SignedContributionAndProofToV2(t *testing.T) { + alphaContribution := ðpbalpha.SignedContributionAndProof{ + Message: ðpbalpha.ContributionAndProof{ + AggregatorIndex: validatorIndex, + Contribution: ðpbalpha.SyncCommitteeContribution{ + Slot: slot, + BlockRoot: blockHash, + SubcommitteeIndex: 1, + AggregationBits: bitfield.NewBitvector128(), + Signature: signature, + }, + SelectionProof: signature, + }, + Signature: signature, + } + v2Contribution := V1Alpha1SignedContributionAndProofToV2(alphaContribution) + require.NotNil(t, v2Contribution) + require.NotNil(t, v2Contribution.Message) + require.NotNil(t, v2Contribution.Message.Contribution) + assert.DeepEqual(t, signature, v2Contribution.Signature) + msg := v2Contribution.Message + assert.Equal(t, validatorIndex, msg.AggregatorIndex) + assert.DeepEqual(t, signature, msg.SelectionProof) + contrib := msg.Contribution + assert.Equal(t, slot, contrib.Slot) + assert.DeepEqual(t, blockHash, contrib.BeaconBlockRoot) + assert.Equal(t, uint64(1), contrib.SubcommitteeIndex) + assert.DeepEqual(t, bitfield.NewBitvector128(), contrib.AggregationBits) + assert.DeepEqual(t, signature, contrib.Signature) +}