Event support for contribution_and_proof and voluntary_exit (#9779)

* Event support for `contribution_and_prrof`

* event test

* fix panic in tests

* fix

* Revert "Auxiliary commit to revert individual files from dc8d01a15f0056c1fb48733219feab6461f71695"

This reverts commit f5f198564079781f80e1a045cefad7c27f89af25.

* remove receiver

* revive test

* move sending events to sync package

* remove receiver

* remove notification test

* build file

* notifier tests

* revert removal of exit event in API

* simplify exit test

* send notification in contribution API method

* test fix

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
Co-authored-by: terence tsao <terence@prysmaticlabs.com>
This commit is contained in:
Radosław Kapka
2021-10-28 10:56:22 +02:00
committed by GitHub
parent 53c86429e4
commit bb319e02e8
17 changed files with 387 additions and 29 deletions

View File

@@ -1,6 +1,4 @@
// Package operation contains types for block operation-specific events fired
// during the runtime of a beacon node such as attestations, voluntary
// exits, and slashings.
// Package operation contains types for block operation-specific events fired during the runtime of a beacon node.
package operation
import (
@@ -18,6 +16,9 @@ const (
// ExitReceived is sent after an voluntary exit object has been received from the outside world (eg in RPC or sync)
ExitReceived
// SyncCommitteeContributionReceived is sent after a sync committee contribution object has been received.
SyncCommitteeContributionReceived
)
// UnAggregatedAttReceivedData is the data sent with UnaggregatedAttReceived events.
@@ -37,3 +38,9 @@ type ExitReceivedData struct {
// Exit is the voluntary exit object.
Exit *ethpb.SignedVoluntaryExit
}
// SyncCommitteeContributionReceivedData is the data sent with SyncCommitteeContributionReceived objects.
type SyncCommitteeContributionReceivedData struct {
// Contribution is the sync committee contribution object.
Contribution *ethpb.SignedContributionAndProof
}

View File

@@ -243,6 +243,8 @@ func receiveEvents(eventChan <-chan *sse.Event, w http.ResponseWriter, req *http
data = &eventFinalizedCheckpointJson{}
case events.ChainReorgTopic:
data = &eventChainReorgJson{}
case events.SyncCommitteeContributionTopic:
data = &signedContributionAndProofJson{}
case "error":
data = &eventErrorJson{}
default:

View File

@@ -46,6 +46,7 @@ go_test(
"//testing/util:go_default_library",
"@com_github_golang_mock//gomock:go_default_library",
"@com_github_grpc_ecosystem_grpc_gateway_v2//proto/gateway:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@org_golang_google_protobuf//types/known/anypb:go_default_library",
],
)

View File

@@ -29,15 +29,18 @@ const (
FinalizedCheckpointTopic = "finalized_checkpoint"
// ChainReorgTopic represents a chain reorganization event topic.
ChainReorgTopic = "chain_reorg"
// SyncCommitteeContributionTopic represents a new sync committee contribution event topic.
SyncCommitteeContributionTopic = "contribution_and_proof"
)
var casesHandled = map[string]bool{
HeadTopic: true,
BlockTopic: true,
AttestationTopic: true,
VoluntaryExitTopic: true,
FinalizedCheckpointTopic: true,
ChainReorgTopic: true,
HeadTopic: true,
BlockTopic: true,
AttestationTopic: true,
VoluntaryExitTopic: true,
FinalizedCheckpointTopic: true,
ChainReorgTopic: true,
SyncCommitteeContributionTopic: true,
}
// StreamEvents allows requesting all events from a set of topics defined in the Ethereum consensus API standard.
@@ -76,15 +79,15 @@ func (s *Server) StreamEvents(
for {
select {
case event := <-blockChan:
if err := s.handleBlockEvents(stream, requestedTopics, event); err != nil {
if err := handleBlockEvents(stream, requestedTopics, event); err != nil {
return status.Errorf(codes.Internal, "Could not handle block event: %v", err)
}
case event := <-opsChan:
if err := s.handleBlockOperationEvents(stream, requestedTopics, event); err != nil {
if err := handleBlockOperationEvents(stream, requestedTopics, event); err != nil {
return status.Errorf(codes.Internal, "Could not handle block operations event: %v", err)
}
case event := <-stateChan:
if err := s.handleStateEvents(stream, requestedTopics, event); err != nil {
if err := handleStateEvents(stream, requestedTopics, event); err != nil {
return status.Errorf(codes.Internal, "Could not handle state event: %v", err)
}
case <-s.Ctx.Done():
@@ -95,7 +98,7 @@ func (s *Server) StreamEvents(
}
}
func (s *Server) handleBlockEvents(
func handleBlockEvents(
stream ethpbservice.Events_StreamEventsServer, requestedTopics map[string]bool, event *feed.Event,
) error {
switch event.Type {
@@ -119,13 +122,13 @@ func (s *Server) handleBlockEvents(
Slot: v1Data.Message.Slot,
Block: item[:],
}
return s.streamData(stream, BlockTopic, eventBlock)
return streamData(stream, BlockTopic, eventBlock)
default:
return nil
}
}
func (s *Server) handleBlockOperationEvents(
func handleBlockOperationEvents(
stream ethpbservice.Events_StreamEventsServer, requestedTopics map[string]bool, event *feed.Event,
) error {
switch event.Type {
@@ -138,7 +141,7 @@ func (s *Server) handleBlockOperationEvents(
return nil
}
v1Data := migration.V1Alpha1AggregateAttAndProofToV1(attData.Attestation)
return s.streamData(stream, AttestationTopic, v1Data)
return streamData(stream, AttestationTopic, v1Data)
case operation.UnaggregatedAttReceived:
if _, ok := requestedTopics[AttestationTopic]; !ok {
return nil
@@ -148,7 +151,7 @@ func (s *Server) handleBlockOperationEvents(
return nil
}
v1Data := migration.V1Alpha1AttestationToV1(attData.Attestation)
return s.streamData(stream, AttestationTopic, v1Data)
return streamData(stream, AttestationTopic, v1Data)
case operation.ExitReceived:
if _, ok := requestedTopics[VoluntaryExitTopic]; !ok {
return nil
@@ -158,13 +161,23 @@ func (s *Server) handleBlockOperationEvents(
return nil
}
v1Data := migration.V1Alpha1ExitToV1(exitData.Exit)
return s.streamData(stream, VoluntaryExitTopic, v1Data)
return streamData(stream, VoluntaryExitTopic, v1Data)
case operation.SyncCommitteeContributionReceived:
if _, ok := requestedTopics[SyncCommitteeContributionTopic]; !ok {
return nil
}
contributionData, ok := event.Data.(*operation.SyncCommitteeContributionReceivedData)
if !ok {
return nil
}
v2Data := migration.V1Alpha1SignedContributionAndProofToV2(contributionData.Contribution)
return streamData(stream, SyncCommitteeContributionTopic, v2Data)
default:
return nil
}
}
func (s *Server) handleStateEvents(
func handleStateEvents(
stream ethpbservice.Events_StreamEventsServer, requestedTopics map[string]bool, event *feed.Event,
) error {
switch event.Type {
@@ -176,7 +189,7 @@ func (s *Server) handleStateEvents(
if !ok {
return nil
}
return s.streamData(stream, HeadTopic, head)
return streamData(stream, HeadTopic, head)
case statefeed.FinalizedCheckpoint:
if _, ok := requestedTopics[FinalizedCheckpointTopic]; !ok {
return nil
@@ -185,7 +198,7 @@ func (s *Server) handleStateEvents(
if !ok {
return nil
}
return s.streamData(stream, FinalizedCheckpointTopic, finalizedCheckpoint)
return streamData(stream, FinalizedCheckpointTopic, finalizedCheckpoint)
case statefeed.Reorg:
if _, ok := requestedTopics[ChainReorgTopic]; !ok {
return nil
@@ -194,13 +207,13 @@ func (s *Server) handleStateEvents(
if !ok {
return nil
}
return s.streamData(stream, ChainReorgTopic, reorg)
return streamData(stream, ChainReorgTopic, reorg)
default:
return nil
}
}
func (s *Server) streamData(stream ethpbservice.Events_StreamEventsServer, name string, data proto.Message) error {
func streamData(stream ethpbservice.Events_StreamEventsServer, name string, data proto.Message) error {
returnData, err := anypb.New(data)
if err != nil {
return err

View File

@@ -6,6 +6,7 @@ import (
"github.com/golang/mock/gomock"
"github.com/grpc-ecosystem/grpc-gateway/v2/proto/gateway"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/async/event"
mockChain "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
@@ -185,6 +186,49 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
feed: srv.OperationNotifier.OperationFeed(),
})
})
t.Run(SyncCommitteeContributionTopic, func(t *testing.T) {
ctx := context.Background()
srv, ctrl, mockStream := setupServer(ctx, t)
defer ctrl.Finish()
wantedContributionV1alpha1 := &eth.SignedContributionAndProof{
Message: &eth.ContributionAndProof{
AggregatorIndex: 1,
Contribution: &eth.SyncCommitteeContribution{
Slot: 1,
BlockRoot: []byte("root"),
SubcommitteeIndex: 1,
AggregationBits: bitfield.NewBitvector128(),
Signature: []byte("sig"),
},
SelectionProof: []byte("proof"),
},
Signature: []byte("sig"),
}
wantedContribution := migration.V1Alpha1SignedContributionAndProofToV2(wantedContributionV1alpha1)
genericResponse, err := anypb.New(wantedContribution)
require.NoError(t, err)
wantedMessage := &gateway.EventSource{
Event: SyncCommitteeContributionTopic,
Data: genericResponse,
}
assertFeedSendAndReceive(ctx, &assertFeedArgs{
t: t,
srv: srv,
topics: []string{SyncCommitteeContributionTopic},
stream: mockStream,
shouldReceive: wantedMessage,
itemToSend: &feed.Event{
Type: operation.SyncCommitteeContributionReceived,
Data: &operation.SyncCommitteeContributionReceivedData{
Contribution: wantedContributionV1alpha1,
},
},
feed: srv.OperationNotifier.OperationFeed(),
})
})
}
func TestStreamEvents_StateEvents(t *testing.T) {

View File

@@ -1728,7 +1728,8 @@ func TestSubmitContributionAndProofs(t *testing.T) {
aggBits := bitfield.NewBitvector128()
aggBits.SetBitAt(0, true)
v1Server := &v1alpha1validator.Server{
P2P: &p2pmock.MockBroadcaster{},
P2P: &p2pmock.MockBroadcaster{},
OperationNotifier: (&mockChain.ChainService{}).OperationNotifier(),
}
server := &Server{
V1Alpha1Server: v1Server,

View File

@@ -38,7 +38,6 @@ func (vs *Server) ProposeExit(ctx context.Context, req *ethpb.SignedVoluntaryExi
return nil, status.Error(codes.InvalidArgument, err.Error())
}
// Send the voluntary exit to the operation feed.
vs.OperationNotifier.OperationFeed().Send(&feed.Event{
Type: opfeed.ExitReceived,
Data: &opfeed.ExitReceivedData{

View File

@@ -79,9 +79,8 @@ func TestProposeExit_Notification(t *testing.T) {
if event.Type == opfeed.ExitReceived {
notificationFound = true
data, ok := event.Data.(*opfeed.ExitReceivedData)
assert.Equal(t, true, ok, "Entity is not of type *opfeed.ExitReceivedData")
assert.Equal(t, epoch, data.Exit.Exit.Epoch, "Unexpected state feed epoch")
assert.Equal(t, validatorIndex, data.Exit.Exit.ValidatorIndex, "Unexpected state feed validator index")
assert.Equal(t, true, ok, "Entity is of the wrong type")
assert.NotNil(t, data.Exit)
}
case <-opSub.Err():
t.Error("Subscription to state notifier failed")

View File

@@ -6,6 +6,8 @@ import (
"github.com/pkg/errors"
types "github.com/prysmaticlabs/eth2-types"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
opfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation"
"github.com/prysmaticlabs/prysm/config/params"
"github.com/prysmaticlabs/prysm/crypto/bls"
"github.com/prysmaticlabs/prysm/encoding/bytesutil"
@@ -120,6 +122,16 @@ func (vs *Server) SubmitSignedContributionAndProof(
// Wait for p2p broadcast to complete and return the first error (if any)
err := errs.Wait()
if err == nil {
vs.OperationNotifier.OperationFeed().Send(&feed.Event{
Type: opfeed.SyncCommitteeContributionReceived,
Data: &opfeed.SyncCommitteeContributionReceivedData{
Contribution: s,
},
})
}
return &emptypb.Empty{}, err
}

View File

@@ -6,11 +6,14 @@ import (
types "github.com/prysmaticlabs/eth2-types"
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
opfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation"
"github.com/prysmaticlabs/prysm/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/synccommittee"
mockp2p "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/config/params"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/testing/assert"
"github.com/prysmaticlabs/prysm/testing/require"
"github.com/prysmaticlabs/prysm/testing/util"
"google.golang.org/protobuf/types/known/emptypb"
@@ -70,6 +73,7 @@ func TestSubmitSignedContributionAndProof_OK(t *testing.T) {
server := &Server{
SyncCommitteePool: synccommittee.NewStore(),
P2P: &mockp2p.MockBroadcaster{},
OperationNotifier: (&mock.ChainService{}).OperationNotifier(),
}
contribution := &ethpb.SignedContributionAndProof{
Message: &ethpb.ContributionAndProof{
@@ -85,3 +89,44 @@ func TestSubmitSignedContributionAndProof_OK(t *testing.T) {
require.NoError(t, err)
require.DeepEqual(t, []*ethpb.SyncCommitteeContribution{contribution.Message.Contribution}, savedMsgs)
}
func TestSubmitSignedContributionAndProof_Notification(t *testing.T) {
server := &Server{
SyncCommitteePool: synccommittee.NewStore(),
P2P: &mockp2p.MockBroadcaster{},
OperationNotifier: (&mock.ChainService{}).OperationNotifier(),
}
// Subscribe to operation notifications.
opChannel := make(chan *feed.Event, 1024)
opSub := server.OperationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
contribution := &ethpb.SignedContributionAndProof{
Message: &ethpb.ContributionAndProof{
Contribution: &ethpb.SyncCommitteeContribution{
Slot: 1,
SubcommitteeIndex: 2,
},
},
}
_, err := server.SubmitSignedContributionAndProof(context.Background(), contribution)
require.NoError(t, err)
// Ensure the state notification was broadcast.
notificationFound := false
for !notificationFound {
select {
case event := <-opChannel:
if event.Type == opfeed.SyncCommitteeContributionReceived {
notificationFound = true
data, ok := event.Data.(*opfeed.SyncCommitteeContributionReceivedData)
assert.Equal(t, true, ok, "Entity is of the wrong type")
assert.NotNil(t, data.Contribution)
}
case <-opSub.Err():
t.Error("Subscription to state notifier failed")
return
}
}
}

View File

@@ -167,6 +167,7 @@ go_test(
"//beacon-chain/cache:go_default_library",
"//beacon-chain/core/altair:go_default_library",
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/operation:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/signing:go_default_library",

View File

@@ -8,6 +8,8 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
types "github.com/prysmaticlabs/eth2-types"
"github.com/prysmaticlabs/prysm/beacon-chain/core/altair"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
opfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation"
"github.com/prysmaticlabs/prysm/beacon-chain/core/signing"
p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/config/features"
@@ -80,6 +82,15 @@ func (s *Service) validateSyncContributionAndProof(ctx context.Context, pid peer
msg.ValidatorData = m
// Broadcast the contribution on a feed to notify other services in the beacon node
// of a received contribution.
s.cfg.OperationNotifier.OperationFeed().Send(&feed.Event{
Type: opfeed.SyncCommitteeContributionReceived,
Data: &opfeed.SyncCommitteeContributionReceivedData{
Contribution: m,
},
})
return pubsub.ValidationAccept, nil
}

View File

@@ -14,6 +14,8 @@ import (
"github.com/prysmaticlabs/go-bitfield"
mockChain "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/core/altair"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
opfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation"
"github.com/prysmaticlabs/prysm/beacon-chain/core/signing"
"github.com/prysmaticlabs/prysm/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
@@ -888,6 +890,142 @@ func TestService_ValidateSyncContributionAndProof(t *testing.T) {
}
}
func TestService_ValidateSyncContributionAndProof_Broadcast(t *testing.T) {
ctx := context.Background()
db := testingDB.SetupDB(t)
headRoot, keys := fillUpBlocksAndState(ctx, t, db)
defaultTopic := p2p.SyncContributionAndProofSubnetTopicFormat
defaultTopic = fmt.Sprintf(defaultTopic, []byte{0xAB, 0x00, 0xCC, 0x9E})
defaultTopic = defaultTopic + "/" + encoder.ProtocolSuffixSSZSnappy
emptySig := [96]byte{}
pid := peer.ID("random")
msg := &ethpb.SignedContributionAndProof{
Message: &ethpb.ContributionAndProof{
AggregatorIndex: 1,
Contribution: &ethpb.SyncCommitteeContribution{
Slot: 0,
SubcommitteeIndex: 1,
BlockRoot: params.BeaconConfig().ZeroHash[:],
AggregationBits: bitfield.NewBitvector128(),
Signature: emptySig[:],
},
SelectionProof: emptySig[:],
},
Signature: emptySig[:],
}
chainService := &mockChain.ChainService{
Genesis: time.Now(),
ValidatorsRoot: [32]byte{'A'},
}
s := NewService(context.Background(), &Config{
P2P: mockp2p.NewTestP2P(t),
InitialSync: &mockSync.Sync{IsSyncing: false},
Chain: chainService,
StateNotifier: chainService.StateNotifier(),
OperationNotifier: chainService.OperationNotifier(),
})
s.cfg.StateGen = stategen.New(db)
msg.Message.Contribution.BlockRoot = headRoot[:]
s.cfg.DB = db
hState, err := db.State(context.Background(), headRoot)
assert.NoError(t, err)
sc, err := hState.CurrentSyncCommittee()
assert.NoError(t, err)
cd, err := signing.Domain(hState.Fork(), slots.ToEpoch(slots.PrevSlot(hState.Slot())), params.BeaconConfig().DomainContributionAndProof, hState.GenesisValidatorRoot())
assert.NoError(t, err)
d, err := signing.Domain(hState.Fork(), slots.ToEpoch(hState.Slot()), params.BeaconConfig().DomainSyncCommittee, hState.GenesisValidatorRoot())
assert.NoError(t, err)
var pubkeys [][]byte
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ {
coms, err := altair.SyncSubCommitteePubkeys(sc, types.CommitteeIndex(i))
pubkeys = coms
assert.NoError(t, err)
for _, p := range coms {
idx, ok := hState.ValidatorIndexByPubkey(bytesutil.ToBytes48(p))
assert.Equal(t, true, ok)
rt, err := syncSelectionProofSigningRoot(hState, slots.PrevSlot(hState.Slot()), types.CommitteeIndex(i))
assert.NoError(t, err)
sig := keys[idx].Sign(rt[:])
isAggregator, err := altair.IsSyncCommitteeAggregator(sig.Marshal())
require.NoError(t, err)
if isAggregator {
msg.Message.AggregatorIndex = idx
msg.Message.SelectionProof = sig.Marshal()
msg.Message.Contribution.Slot = slots.PrevSlot(hState.Slot())
msg.Message.Contribution.SubcommitteeIndex = i
msg.Message.Contribution.BlockRoot = headRoot[:]
msg.Message.Contribution.AggregationBits = bitfield.NewBitvector128()
// Only Sign for 1 validator.
rawBytes := p2ptypes.SSZBytes(headRoot[:])
sigRoot, err := signing.ComputeSigningRoot(&rawBytes, d)
assert.NoError(t, err)
valIdx, ok := hState.ValidatorIndexByPubkey(bytesutil.ToBytes48(coms[0]))
assert.Equal(t, true, ok)
sig = keys[valIdx].Sign(sigRoot[:])
msg.Message.Contribution.AggregationBits.SetBitAt(uint64(0), true)
msg.Message.Contribution.Signature = sig.Marshal()
sigRoot, err = signing.ComputeSigningRoot(msg.Message, cd)
assert.NoError(t, err)
contrSig := keys[idx].Sign(sigRoot[:])
msg.Signature = contrSig.Marshal()
break
}
}
}
pd, err := signing.Domain(hState.Fork(), slots.ToEpoch(slots.PrevSlot(hState.Slot())), params.BeaconConfig().DomainSyncCommitteeSelectionProof, hState.GenesisValidatorRoot())
require.NoError(t, err)
subCommitteeSize := params.BeaconConfig().SyncCommitteeSize / params.BeaconConfig().SyncCommitteeSubnetCount
s.cfg.Chain = &mockChain.ChainService{
ValidatorsRoot: [32]byte{'A'},
Genesis: time.Now().Add(-time.Second * time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Duration(msg.Message.Contribution.Slot)),
SyncCommitteeIndices: []types.CommitteeIndex{types.CommitteeIndex(msg.Message.Contribution.SubcommitteeIndex * subCommitteeSize)},
PublicKey: bytesutil.ToBytes48(keys[msg.Message.AggregatorIndex].PublicKey().Marshal()),
SyncSelectionProofDomain: pd,
SyncContributionProofDomain: cd,
SyncCommitteeDomain: d,
SyncCommitteePubkeys: pubkeys,
}
s.initCaches()
marshalledObj, err := msg.MarshalSSZ()
assert.NoError(t, err)
marshalledObj = snappy.Encode(nil, marshalledObj)
pubsubMsg := &pubsub.Message{
Message: &pubsub_pb.Message{
Data: marshalledObj,
Topic: &defaultTopic,
},
ReceivedFrom: "",
ValidatorData: nil,
}
// Subscribe to operation notifications.
opChannel := make(chan *feed.Event, 1)
opSub := s.cfg.OperationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
_, err = s.validateSyncContributionAndProof(ctx, pid, pubsubMsg)
require.NoError(t, err)
// Ensure the state notification was broadcast.
notificationFound := false
for !notificationFound {
select {
case event := <-opChannel:
if event.Type == opfeed.SyncCommitteeContributionReceived {
notificationFound = true
_, ok := event.Data.(*opfeed.SyncCommitteeContributionReceivedData)
assert.Equal(t, true, ok, "Entity is not of type *opfeed.SyncCommitteeContributionReceivedData")
}
case <-opSub.Err():
t.Error("Subscription to state notifier failed")
return
}
}
}
func fillUpBlocksAndState(ctx context.Context, t *testing.T, beaconDB db.Database) ([32]byte, []bls.SecretKey) {
gs, keys := util.DeterministicGenesisStateAltair(t, 64)
sCom, err := altair.NextSyncCommittee(ctx, gs)

View File

@@ -8,6 +8,8 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
types "github.com/prysmaticlabs/eth2-types"
"github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
opfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation"
"github.com/prysmaticlabs/prysm/monitoring/tracing"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"go.opencensus.io/trace"
@@ -66,6 +68,15 @@ func (s *Service) validateVoluntaryExit(ctx context.Context, pid peer.ID, msg *p
msg.ValidatorData = exit // Used in downstream subscriber
// Broadcast the voluntary exit on a feed to notify other services in the beacon node
// of a received voluntary exit.
s.cfg.OperationNotifier.OperationFeed().Send(&feed.Event{
Type: opfeed.ExitReceived,
Data: &opfeed.ExitReceivedData{
Exit: exit,
},
})
return pubsub.ValidationAccept, nil
}

View File

@@ -11,6 +11,8 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb"
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
opfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation"
"github.com/prysmaticlabs/prysm/beacon-chain/core/signing"
coreTime "github.com/prysmaticlabs/prysm/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
@@ -81,7 +83,8 @@ func TestValidateVoluntaryExit_ValidExit(t *testing.T) {
State: s,
Genesis: time.Now(),
},
InitialSync: &mockSync.Sync{IsSyncing: false},
InitialSync: &mockSync.Sync{IsSyncing: false},
OperationNotifier: (&mock.ChainService{}).OperationNotifier(),
},
seenExitCache: lruwrpr.New(10),
}
@@ -99,11 +102,33 @@ func TestValidateVoluntaryExit_ValidExit(t *testing.T) {
Topic: &topic,
},
}
// Subscribe to operation notifications.
opChannel := make(chan *feed.Event, 1)
opSub := r.cfg.OperationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
res, err := r.validateVoluntaryExit(ctx, "", m)
assert.NoError(t, err)
valid := res == pubsub.ValidationAccept
assert.Equal(t, true, valid, "Failed validation")
assert.NotNil(t, m.ValidatorData, "Decoded message was not set on the message validator data")
// Ensure the state notification was broadcast.
notificationFound := false
for !notificationFound {
select {
case event := <-opChannel:
if event.Type == opfeed.ExitReceived {
notificationFound = true
_, ok := event.Data.(*opfeed.ExitReceivedData)
assert.Equal(t, true, ok, "Entity is not of type *opfeed.ExitReceivedData")
}
case <-opSub.Err():
t.Error("Subscription to state notifier failed")
return
}
}
}
func TestValidateVoluntaryExit_InvalidExitSlot(t *testing.T) {

View File

@@ -625,3 +625,21 @@ func BeaconStateAltairToV2(altairState *statev2.BeaconState) (*ethpbv2.BeaconSta
return result, nil
}
func V1Alpha1SignedContributionAndProofToV2(alphaContribution *ethpbalpha.SignedContributionAndProof) *ethpbv2.SignedContributionAndProof {
result := &ethpbv2.SignedContributionAndProof{
Message: &ethpbv2.ContributionAndProof{
AggregatorIndex: alphaContribution.Message.AggregatorIndex,
Contribution: &ethpbv2.SyncCommitteeContribution{
Slot: alphaContribution.Message.Contribution.Slot,
BeaconBlockRoot: alphaContribution.Message.Contribution.BlockRoot,
SubcommitteeIndex: alphaContribution.Message.Contribution.SubcommitteeIndex,
AggregationBits: alphaContribution.Message.Contribution.AggregationBits,
Signature: alphaContribution.Message.Contribution.Signature,
},
SelectionProof: alphaContribution.Message.SelectionProof,
},
Signature: alphaContribution.Signature,
}
return result
}

View File

@@ -689,3 +689,34 @@ func TestBeaconStateToV1(t *testing.T) {
assert.Equal(t, types.Epoch(32), resultFinalizedCheckpoint.Epoch)
assert.DeepEqual(t, bytesutil.PadTo([]byte("fcroot"), 32), resultFinalizedCheckpoint.Root)
}
func TestV1Alpha1SignedContributionAndProofToV2(t *testing.T) {
alphaContribution := &ethpbalpha.SignedContributionAndProof{
Message: &ethpbalpha.ContributionAndProof{
AggregatorIndex: validatorIndex,
Contribution: &ethpbalpha.SyncCommitteeContribution{
Slot: slot,
BlockRoot: blockHash,
SubcommitteeIndex: 1,
AggregationBits: bitfield.NewBitvector128(),
Signature: signature,
},
SelectionProof: signature,
},
Signature: signature,
}
v2Contribution := V1Alpha1SignedContributionAndProofToV2(alphaContribution)
require.NotNil(t, v2Contribution)
require.NotNil(t, v2Contribution.Message)
require.NotNil(t, v2Contribution.Message.Contribution)
assert.DeepEqual(t, signature, v2Contribution.Signature)
msg := v2Contribution.Message
assert.Equal(t, validatorIndex, msg.AggregatorIndex)
assert.DeepEqual(t, signature, msg.SelectionProof)
contrib := msg.Contribution
assert.Equal(t, slot, contrib.Slot)
assert.DeepEqual(t, blockHash, contrib.BeaconBlockRoot)
assert.Equal(t, uint64(1), contrib.SubcommitteeIndex)
assert.DeepEqual(t, bitfield.NewBitvector128(), contrib.AggregationBits)
assert.DeepEqual(t, signature, contrib.Signature)
}