diff --git a/api/server/structs/conversions.go b/api/server/structs/conversions.go index 25db9676a8..f080584071 100644 --- a/api/server/structs/conversions.go +++ b/api/server/structs/conversions.go @@ -433,6 +433,15 @@ func (a *AttestationElectra) ToConsensus() (*eth.AttestationElectra, error) { }, nil } +func SingleAttFromConsensus(a *eth.SingleAttestation) *SingleAttestation { + return &SingleAttestation{ + CommitteeIndex: fmt.Sprintf("%d", a.CommitteeId), + AttesterIndex: fmt.Sprintf("%d", a.AttesterIndex), + Data: AttDataFromConsensus(a.Data), + Signature: hexutil.Encode(a.Signature), + } +} + func (a *SingleAttestation) ToConsensus() (*eth.SingleAttestation, error) { ci, err := strconv.ParseUint(a.CommitteeIndex, 10, 64) if err != nil { diff --git a/beacon-chain/core/feed/operation/events.go b/beacon-chain/core/feed/operation/events.go index 86287da922..4244253acc 100644 --- a/beacon-chain/core/feed/operation/events.go +++ b/beacon-chain/core/feed/operation/events.go @@ -32,6 +32,9 @@ const ( // AttesterSlashingReceived is sent after an attester slashing is received from gossip or rpc AttesterSlashingReceived = 8 + + // SingleAttReceived is sent after a single attestation object is received from gossip or rpc + SingleAttReceived = 9 ) // UnAggregatedAttReceivedData is the data sent with UnaggregatedAttReceived events. @@ -43,7 +46,7 @@ type UnAggregatedAttReceivedData struct { // AggregatedAttReceivedData is the data sent with AggregatedAttReceived events. type AggregatedAttReceivedData struct { // Attestation is the aggregated attestation object. - Attestation *ethpb.AggregateAttestationAndProof + Attestation ethpb.AggregateAttAndProof } // ExitReceivedData is the data sent with ExitReceived events. @@ -77,3 +80,8 @@ type ProposerSlashingReceivedData struct { type AttesterSlashingReceivedData struct { AttesterSlashing ethpb.AttSlashing } + +// SingleAttReceivedData is the data sent with SingleAttReceived events. +type SingleAttReceivedData struct { + Attestation ethpb.Att +} diff --git a/beacon-chain/monitor/service.go b/beacon-chain/monitor/service.go index 21f96641ac..3996e20764 100644 --- a/beacon-chain/monitor/service.go +++ b/beacon-chain/monitor/service.go @@ -236,6 +236,13 @@ func (s *Service) monitorRoutine(stateChannel chan *feed.Event, stateSub event.S } else { s.processAggregatedAttestation(s.ctx, data.Attestation) } + case operation.SingleAttReceived: + data, ok := e.Data.(*operation.SingleAttReceivedData) + if !ok { + log.Error("Event feed data is not of type *operation.SingleAttReceivedData") + } else { + s.processUnaggregatedAttestation(s.ctx, data.Attestation) + } case operation.ExitReceived: data, ok := e.Data.(*operation.ExitReceivedData) if !ok { diff --git a/beacon-chain/rpc/eth/beacon/handlers_pool.go b/beacon-chain/rpc/eth/beacon/handlers_pool.go index 7e5f319513..67c95d2995 100644 --- a/beacon-chain/rpc/eth/beacon/handlers_pool.go +++ b/beacon-chain/rpc/eth/beacon/handlers_pool.go @@ -320,6 +320,13 @@ func (s *Server) handleAttestationsElectra( } for i, singleAtt := range validAttestations { + s.OperationNotifier.OperationFeed().Send(&feed.Event{ + Type: operation.SingleAttReceived, + Data: &operation.SingleAttReceivedData{ + Attestation: singleAtt, + }, + }) + targetState, err := s.AttestationStateFetcher.AttestationTargetState(ctx, singleAtt.Data.Target) if err != nil { return nil, nil, errors.Wrap(err, "could not get target state for attestation") @@ -330,13 +337,6 @@ func (s *Server) handleAttestationsElectra( } att := singleAtt.ToAttestationElectra(committee) - s.OperationNotifier.OperationFeed().Send(&feed.Event{ - Type: operation.UnaggregatedAttReceived, - Data: &operation.UnAggregatedAttReceivedData{ - Attestation: att, - }, - }) - wantedEpoch := slots.ToEpoch(att.Data.Slot) vals, err := s.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch) if err != nil { diff --git a/beacon-chain/rpc/eth/events/events.go b/beacon-chain/rpc/eth/events/events.go index d8aaec6b5b..a9f858ad48 100644 --- a/beacon-chain/rpc/eth/events/events.go +++ b/beacon-chain/rpc/eth/events/events.go @@ -45,6 +45,8 @@ const ( BlockTopic = "block" // AttestationTopic represents a new submitted attestation event topic. AttestationTopic = "attestation" + // SingleAttestationTopic represents a new submitted single attestation event topic. + SingleAttestationTopic = "single_attestation" // VoluntaryExitTopic represents a new performed voluntary exit event topic. VoluntaryExitTopic = "voluntary_exit" // FinalizedCheckpointTopic represents a new finalized checkpoint event topic. @@ -92,6 +94,7 @@ type lazyReader func() io.Reader var opsFeedEventTopics = map[feed.EventType]string{ operation.AggregatedAttReceived: AttestationTopic, operation.UnaggregatedAttReceived: AttestationTopic, + operation.SingleAttReceived: SingleAttestationTopic, operation.ExitReceived: VoluntaryExitTopic, operation.SyncCommitteeContributionReceived: SyncCommitteeContributionTopic, operation.BLSToExecutionChangeReceived: BLSToExecutionChangeTopic, @@ -403,7 +406,7 @@ func (es *eventStreamer) writeOutbox(ctx context.Context, w *streamingResponseWr func jsonMarshalReader(name string, v any) io.Reader { d, err := json.Marshal(v) if err != nil { - log.WithError(err).WithField("type_name", fmt.Sprintf("%T", v)).Error("Could not marshal event data.") + log.WithError(err).WithField("type_name", fmt.Sprintf("%T", v)).Error("Could not marshal event data") return nil } return bytes.NewBufferString("event: " + name + "\ndata: " + string(d) + "\n\n") @@ -415,6 +418,8 @@ func topicForEvent(event *feed.Event) string { return AttestationTopic case *operation.UnAggregatedAttReceivedData: return AttestationTopic + case *operation.SingleAttReceivedData: + return SingleAttestationTopic case *operation.ExitReceivedData: return VoluntaryExitTopic case *operation.SyncCommitteeContributionReceivedData: @@ -464,10 +469,20 @@ func (s *Server) lazyReaderForEvent(ctx context.Context, event *feed.Event, topi return jsonMarshalReader(eventName, structs.HeadEventFromV1(v)) }, nil case *operation.AggregatedAttReceivedData: - return func() io.Reader { - att := structs.AttFromConsensus(v.Attestation.Aggregate) - return jsonMarshalReader(eventName, att) - }, nil + switch att := v.Attestation.AggregateVal().(type) { + case *eth.Attestation: + return func() io.Reader { + att := structs.AttFromConsensus(att) + return jsonMarshalReader(eventName, att) + }, nil + case *eth.AttestationElectra: + return func() io.Reader { + att := structs.AttElectraFromConsensus(att) + return jsonMarshalReader(eventName, att) + }, nil + default: + return nil, errors.Wrapf(errUnhandledEventData, "Unexpected type %T for the .Attestation field of AggregatedAttReceivedData", v.Attestation) + } case *operation.UnAggregatedAttReceivedData: switch att := v.Attestation.(type) { case *eth.Attestation: @@ -483,6 +498,16 @@ func (s *Server) lazyReaderForEvent(ctx context.Context, event *feed.Event, topi default: return nil, errors.Wrapf(errUnhandledEventData, "Unexpected type %T for the .Attestation field of UnAggregatedAttReceivedData", v.Attestation) } + case *operation.SingleAttReceivedData: + switch att := v.Attestation.(type) { + case *eth.SingleAttestation: + return func() io.Reader { + att := structs.SingleAttFromConsensus(att) + return jsonMarshalReader(eventName, att) + }, nil + default: + return nil, errors.Wrapf(errUnhandledEventData, "Unexpected type %T for the .Attestation field of SingleAttReceivedData", v.Attestation) + } case *operation.ExitReceivedData: return func() io.Reader { return jsonMarshalReader(eventName, structs.SignedExitFromConsensus(v.Exit)) diff --git a/beacon-chain/rpc/eth/events/events_test.go b/beacon-chain/rpc/eth/events/events_test.go index 09c82309d3..82c9708394 100644 --- a/beacon-chain/rpc/eth/events/events_test.go +++ b/beacon-chain/rpc/eth/events/events_test.go @@ -110,6 +110,7 @@ func (tr *topicRequest) testHttpRequest(ctx context.Context, _ *testing.T) *http func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) { topics, err := newTopicRequest([]string{ AttestationTopic, + SingleAttestationTopic, VoluntaryExitTopic, SyncCommitteeContributionTopic, BLSToExecutionChangeTopic, @@ -123,13 +124,13 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) { vblob := blocks.NewVerifiedROBlob(ro) return topics, []*feed.Event{ - &feed.Event{ + { Type: operation.UnaggregatedAttReceived, Data: &operation.UnAggregatedAttReceivedData{ Attestation: util.HydrateAttestation(ð.Attestation{}), }, }, - &feed.Event{ + { Type: operation.AggregatedAttReceived, Data: &operation.AggregatedAttReceivedData{ Attestation: ð.AggregateAttestationAndProof{ @@ -139,7 +140,13 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) { }, }, }, - &feed.Event{ + { + Type: operation.SingleAttReceived, + Data: &operation.SingleAttReceivedData{ + Attestation: util.HydrateSingleAttestation(ð.SingleAttestation{}), + }, + }, + { Type: operation.ExitReceived, Data: &operation.ExitReceivedData{ Exit: ð.SignedVoluntaryExit{ @@ -151,7 +158,7 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) { }, }, }, - &feed.Event{ + { Type: operation.SyncCommitteeContributionReceived, Data: &operation.SyncCommitteeContributionReceivedData{ Contribution: ð.SignedContributionAndProof{ @@ -170,7 +177,7 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) { }, }, }, - &feed.Event{ + { Type: operation.BLSToExecutionChangeReceived, Data: &operation.BLSToExecutionChangeReceivedData{ Change: ð.SignedBLSToExecutionChange{ @@ -183,13 +190,13 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) { }, }, }, - &feed.Event{ + { Type: operation.BlobSidecarReceived, Data: &operation.BlobSidecarReceivedData{ Blob: &vblob, }, }, - &feed.Event{ + { Type: operation.AttesterSlashingReceived, Data: &operation.AttesterSlashingReceivedData{ AttesterSlashing: ð.AttesterSlashing{ @@ -222,7 +229,7 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) { }, }, }, - &feed.Event{ + { Type: operation.AttesterSlashingReceived, Data: &operation.AttesterSlashingReceivedData{ AttesterSlashing: ð.AttesterSlashingElectra{ @@ -255,7 +262,7 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) { }, }, }, - &feed.Event{ + { Type: operation.ProposerSlashingReceived, Data: &operation.ProposerSlashingReceivedData{ ProposerSlashing: ð.ProposerSlashing{ @@ -367,7 +374,7 @@ func TestStreamEvents_OperationsEvents(t *testing.T) { b, err := blocks.NewSignedBeaconBlock(util.HydrateSignedBeaconBlock(ð.SignedBeaconBlock{})) require.NoError(t, err) events := []*feed.Event{ - &feed.Event{ + { Type: statefeed.BlockProcessed, Data: &statefeed.BlockProcessedData{ Slot: 0, @@ -377,7 +384,7 @@ func TestStreamEvents_OperationsEvents(t *testing.T) { Optimistic: false, }, }, - &feed.Event{ + { Type: statefeed.NewHead, Data: ðpb.EventHead{ Slot: 0, @@ -389,7 +396,7 @@ func TestStreamEvents_OperationsEvents(t *testing.T) { ExecutionOptimistic: false, }, }, - &feed.Event{ + { Type: statefeed.Reorg, Data: ðpb.EventChainReorg{ Slot: 0, @@ -402,7 +409,7 @@ func TestStreamEvents_OperationsEvents(t *testing.T) { ExecutionOptimistic: false, }, }, - &feed.Event{ + { Type: statefeed.FinalizedCheckpoint, Data: ðpb.EventFinalizedCheckpoint{ Block: make([]byte, 32), @@ -525,7 +532,7 @@ func TestStreamEvents_OperationsEvents(t *testing.T) { request := topics.testHttpRequest(testSync.ctx, t) w := NewStreamingResponseWriterRecorder(testSync.ctx) events := []*feed.Event{ - &feed.Event{ + { Type: statefeed.PayloadAttributes, Data: payloadattribute.EventData{ ProposerIndex: 0, @@ -577,7 +584,7 @@ func TestStuckReaderScenarios(t *testing.T) { func wedgedWriterTestCase(t *testing.T, queueDepth func([]*feed.Event) int) { topics, events := operationEventsFixtures(t) - require.Equal(t, 9, len(events)) + require.Equal(t, 10, len(events)) // set eventFeedDepth to a number lower than the events we intend to send to force the server to drop the reader. stn := mockChain.NewEventFeedWrapper() diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/attester.go b/beacon-chain/rpc/prysm/v1alpha1/validator/attester.go index 9615436816..342bac44e1 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/attester.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/attester.go @@ -13,7 +13,6 @@ import ( "github.com/prysmaticlabs/prysm/v5/crypto/bls" "github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" - "github.com/prysmaticlabs/prysm/v5/runtime/version" "github.com/prysmaticlabs/prysm/v5/time/slots" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -46,7 +45,7 @@ func (vs *Server) ProposeAttestation(ctx context.Context, att *ethpb.Attestation ctx, span := trace.StartSpan(ctx, "AttesterServer.ProposeAttestation") defer span.End() - resp, err := vs.proposeAtt(ctx, att, nil, att.GetData().CommitteeIndex) + resp, err := vs.proposeAtt(ctx, att, att.GetData().CommitteeIndex) if err != nil { return nil, err } @@ -74,21 +73,20 @@ func (vs *Server) ProposeAttestationElectra(ctx context.Context, singleAtt *ethp ctx, span := trace.StartSpan(ctx, "AttesterServer.ProposeAttestationElectra") defer span.End() + resp, err := vs.proposeAtt(ctx, singleAtt, singleAtt.GetCommitteeIndex()) + if err != nil { + return nil, err + } + targetState, err := vs.AttestationStateFetcher.AttestationTargetState(ctx, singleAtt.Data.Target) if err != nil { return nil, status.Error(codes.Internal, "Could not get target state") } - committeeIndex := singleAtt.GetCommitteeIndex() - committee, err := helpers.BeaconCommitteeFromState(ctx, targetState, singleAtt.Data.Slot, committeeIndex) + committee, err := helpers.BeaconCommitteeFromState(ctx, targetState, singleAtt.Data.Slot, singleAtt.GetCommitteeIndex()) if err != nil { return nil, status.Error(codes.Internal, "Could not get committee") } - resp, err := vs.proposeAtt(ctx, singleAtt, committee, committeeIndex) - if err != nil { - return nil, err - } - singleAttCopy := singleAtt.Copy() att := singleAttCopy.ToAttestationElectra(committee) if features.Get().EnableExperimentalAttestationPool { @@ -158,7 +156,6 @@ func (vs *Server) SubscribeCommitteeSubnets(ctx context.Context, req *ethpb.Comm func (vs *Server) proposeAtt( ctx context.Context, att ethpb.Att, - committee []primitives.ValidatorIndex, // required post-Electra committeeIndex primitives.CommitteeIndex, ) (*ethpb.AttestResponse, error) { if _, err := bls.SignatureFromBytes(att.GetSignature()); err != nil { @@ -170,24 +167,23 @@ func (vs *Server) proposeAtt( return nil, status.Errorf(codes.Internal, "Could not get attestation root: %v", err) } - var singleAtt *ethpb.SingleAttestation - if att.Version() >= version.Electra { - var ok bool - singleAtt, ok = att.(*ethpb.SingleAttestation) - if !ok { - return nil, status.Errorf(codes.Internal, "Attestation has wrong type (expected %T, got %T)", ðpb.SingleAttestation{}, att) - } - att = singleAtt.ToAttestationElectra(committee) - } - // Broadcast the unaggregated attestation on a feed to notify other services in the beacon node // of a received unaggregated attestation. - vs.OperationNotifier.OperationFeed().Send(&feed.Event{ - Type: operation.UnaggregatedAttReceived, - Data: &operation.UnAggregatedAttReceivedData{ - Attestation: att, - }, - }) + if att.IsSingle() { + vs.OperationNotifier.OperationFeed().Send(&feed.Event{ + Type: operation.SingleAttReceived, + Data: &operation.SingleAttReceivedData{ + Attestation: att, + }, + }) + } else { + vs.OperationNotifier.OperationFeed().Send(&feed.Event{ + Type: operation.UnaggregatedAttReceived, + Data: &operation.UnAggregatedAttReceivedData{ + Attestation: att, + }, + }) + } // Determine subnet to broadcast attestation to wantedEpoch := slots.ToEpoch(att.GetData().Slot) @@ -198,13 +194,7 @@ func (vs *Server) proposeAtt( subnet := helpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), committeeIndex, att.GetData().Slot) // Broadcast the new attestation to the network. - var attToBroadcast ethpb.Att - if singleAtt != nil { - attToBroadcast = singleAtt - } else { - attToBroadcast = att - } - if err := vs.P2P.BroadcastAttestation(ctx, subnet, attToBroadcast); err != nil { + if err := vs.P2P.BroadcastAttestation(ctx, subnet, att); err != nil { return nil, status.Errorf(codes.Internal, "Could not broadcast attestation: %v", err) } diff --git a/beacon-chain/sync/validate_aggregate_proof.go b/beacon-chain/sync/validate_aggregate_proof.go index ec692854fa..b33f1f0193 100644 --- a/beacon-chain/sync/validate_aggregate_proof.go +++ b/beacon-chain/sync/validate_aggregate_proof.go @@ -22,7 +22,6 @@ import ( "github.com/prysmaticlabs/prysm/v5/monitoring/tracing" "github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" - "github.com/prysmaticlabs/prysm/v5/runtime/version" prysmTime "github.com/prysmaticlabs/prysm/v5/time" "github.com/prysmaticlabs/prysm/v5/time/slots" ) @@ -69,18 +68,12 @@ func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms // Broadcast the aggregated attestation on a feed to notify other services in the beacon node // of a received aggregated attestation. - // TODO: this will be extended to Electra in a later PR - if m.Version() == version.Phase0 { - phase0Att, ok := m.(*ethpb.SignedAggregateAttestationAndProof) - if ok { - s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{ - Type: operation.AggregatedAttReceived, - Data: &operation.AggregatedAttReceivedData{ - Attestation: phase0Att.Message, - }, - }) - } - } + s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{ + Type: operation.AggregatedAttReceived, + Data: &operation.AggregatedAttReceivedData{ + Attestation: m.AggregateAttestationAndProof(), + }, + }) if err := helpers.ValidateSlotTargetEpoch(data); err != nil { return pubsub.ValidationReject, err diff --git a/beacon-chain/sync/validate_beacon_attestation.go b/beacon-chain/sync/validate_beacon_attestation.go index 7948846cd7..7aaa943bbe 100644 --- a/beacon-chain/sync/validate_beacon_attestation.go +++ b/beacon-chain/sync/validate_beacon_attestation.go @@ -139,8 +139,9 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p return validationRes, err } + var singleAtt *eth.SingleAttestation if att.Version() >= version.Electra { - singleAtt, ok := att.(*eth.SingleAttestation) + singleAtt, ok = att.(*eth.SingleAttestation) if !ok { return pubsub.ValidationIgnore, fmt.Errorf("attestation has wrong type (expected %T, got %T)", ð.SingleAttestation{}, att) } @@ -183,12 +184,21 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p // Broadcast the unaggregated attestation on a feed to notify other services in the beacon node // of a received unaggregated attestation. - s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{ - Type: operation.UnaggregatedAttReceived, - Data: &operation.UnAggregatedAttReceivedData{ - Attestation: att, - }, - }) + if singleAtt != nil { + s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{ + Type: operation.SingleAttReceived, + Data: &operation.SingleAttReceivedData{ + Attestation: singleAtt, + }, + }) + } else { + s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{ + Type: operation.UnaggregatedAttReceived, + Data: &operation.UnAggregatedAttReceivedData{ + Attestation: att, + }, + }) + } s.setSeenCommitteeIndicesSlot(data.Slot, committeeIndex, att.GetAggregationBits()) diff --git a/changelog/radek_electra-events-api.md b/changelog/radek_electra-events-api.md new file mode 100644 index 0000000000..e0ca6da707 --- /dev/null +++ b/changelog/radek_electra-events-api.md @@ -0,0 +1,3 @@ +### Added + +- Beacon API event support for `SingleAttestation` and `SignedAggregateAttestationAndProofElectra`. \ No newline at end of file diff --git a/testing/util/attestation.go b/testing/util/attestation.go index 73e92f7c8a..5cdab0231e 100644 --- a/testing/util/attestation.go +++ b/testing/util/attestation.go @@ -321,6 +321,17 @@ func HydrateAttestationElectra(a *ethpb.AttestationElectra) *ethpb.AttestationEl return a } +func HydrateSingleAttestation(a *ethpb.SingleAttestation) *ethpb.SingleAttestation { + if a.Signature == nil { + a.Signature = make([]byte, 96) + } + if a.Data == nil { + a.Data = ðpb.AttestationData{} + } + a.Data = HydrateAttestationData(a.Data) + return a +} + // HydrateV1Attestation hydrates a v1 attestation object with correct field length sizes // to comply with fssz marshalling and unmarshalling rules. func HydrateV1Attestation(a *attv1.Attestation) *attv1.Attestation {