Update Beacon API events to Electra (#14855)

* Update Beacon API events to Electra

* changelog <3

* fix issues

* send notifications from pending att queue

* Revert "send notifications from pending att queue"

This reverts commit 545408f6cf.
This commit is contained in:
Radosław Kapka
2025-02-03 17:16:38 +01:00
committed by GitHub
parent 967e9255a2
commit 177769a1ce
11 changed files with 144 additions and 81 deletions

View File

@@ -433,6 +433,15 @@ func (a *AttestationElectra) ToConsensus() (*eth.AttestationElectra, error) {
}, nil
}
func SingleAttFromConsensus(a *eth.SingleAttestation) *SingleAttestation {
return &SingleAttestation{
CommitteeIndex: fmt.Sprintf("%d", a.CommitteeId),
AttesterIndex: fmt.Sprintf("%d", a.AttesterIndex),
Data: AttDataFromConsensus(a.Data),
Signature: hexutil.Encode(a.Signature),
}
}
func (a *SingleAttestation) ToConsensus() (*eth.SingleAttestation, error) {
ci, err := strconv.ParseUint(a.CommitteeIndex, 10, 64)
if err != nil {

View File

@@ -32,6 +32,9 @@ const (
// AttesterSlashingReceived is sent after an attester slashing is received from gossip or rpc
AttesterSlashingReceived = 8
// SingleAttReceived is sent after a single attestation object is received from gossip or rpc
SingleAttReceived = 9
)
// UnAggregatedAttReceivedData is the data sent with UnaggregatedAttReceived events.
@@ -43,7 +46,7 @@ type UnAggregatedAttReceivedData struct {
// AggregatedAttReceivedData is the data sent with AggregatedAttReceived events.
type AggregatedAttReceivedData struct {
// Attestation is the aggregated attestation object.
Attestation *ethpb.AggregateAttestationAndProof
Attestation ethpb.AggregateAttAndProof
}
// ExitReceivedData is the data sent with ExitReceived events.
@@ -77,3 +80,8 @@ type ProposerSlashingReceivedData struct {
type AttesterSlashingReceivedData struct {
AttesterSlashing ethpb.AttSlashing
}
// SingleAttReceivedData is the data sent with SingleAttReceived events.
type SingleAttReceivedData struct {
Attestation ethpb.Att
}

View File

@@ -236,6 +236,13 @@ func (s *Service) monitorRoutine(stateChannel chan *feed.Event, stateSub event.S
} else {
s.processAggregatedAttestation(s.ctx, data.Attestation)
}
case operation.SingleAttReceived:
data, ok := e.Data.(*operation.SingleAttReceivedData)
if !ok {
log.Error("Event feed data is not of type *operation.SingleAttReceivedData")
} else {
s.processUnaggregatedAttestation(s.ctx, data.Attestation)
}
case operation.ExitReceived:
data, ok := e.Data.(*operation.ExitReceivedData)
if !ok {

View File

@@ -320,6 +320,13 @@ func (s *Server) handleAttestationsElectra(
}
for i, singleAtt := range validAttestations {
s.OperationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.SingleAttReceived,
Data: &operation.SingleAttReceivedData{
Attestation: singleAtt,
},
})
targetState, err := s.AttestationStateFetcher.AttestationTargetState(ctx, singleAtt.Data.Target)
if err != nil {
return nil, nil, errors.Wrap(err, "could not get target state for attestation")
@@ -330,13 +337,6 @@ func (s *Server) handleAttestationsElectra(
}
att := singleAtt.ToAttestationElectra(committee)
s.OperationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.UnaggregatedAttReceived,
Data: &operation.UnAggregatedAttReceivedData{
Attestation: att,
},
})
wantedEpoch := slots.ToEpoch(att.Data.Slot)
vals, err := s.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch)
if err != nil {

View File

@@ -45,6 +45,8 @@ const (
BlockTopic = "block"
// AttestationTopic represents a new submitted attestation event topic.
AttestationTopic = "attestation"
// SingleAttestationTopic represents a new submitted single attestation event topic.
SingleAttestationTopic = "single_attestation"
// VoluntaryExitTopic represents a new performed voluntary exit event topic.
VoluntaryExitTopic = "voluntary_exit"
// FinalizedCheckpointTopic represents a new finalized checkpoint event topic.
@@ -92,6 +94,7 @@ type lazyReader func() io.Reader
var opsFeedEventTopics = map[feed.EventType]string{
operation.AggregatedAttReceived: AttestationTopic,
operation.UnaggregatedAttReceived: AttestationTopic,
operation.SingleAttReceived: SingleAttestationTopic,
operation.ExitReceived: VoluntaryExitTopic,
operation.SyncCommitteeContributionReceived: SyncCommitteeContributionTopic,
operation.BLSToExecutionChangeReceived: BLSToExecutionChangeTopic,
@@ -403,7 +406,7 @@ func (es *eventStreamer) writeOutbox(ctx context.Context, w *streamingResponseWr
func jsonMarshalReader(name string, v any) io.Reader {
d, err := json.Marshal(v)
if err != nil {
log.WithError(err).WithField("type_name", fmt.Sprintf("%T", v)).Error("Could not marshal event data.")
log.WithError(err).WithField("type_name", fmt.Sprintf("%T", v)).Error("Could not marshal event data")
return nil
}
return bytes.NewBufferString("event: " + name + "\ndata: " + string(d) + "\n\n")
@@ -415,6 +418,8 @@ func topicForEvent(event *feed.Event) string {
return AttestationTopic
case *operation.UnAggregatedAttReceivedData:
return AttestationTopic
case *operation.SingleAttReceivedData:
return SingleAttestationTopic
case *operation.ExitReceivedData:
return VoluntaryExitTopic
case *operation.SyncCommitteeContributionReceivedData:
@@ -464,10 +469,20 @@ func (s *Server) lazyReaderForEvent(ctx context.Context, event *feed.Event, topi
return jsonMarshalReader(eventName, structs.HeadEventFromV1(v))
}, nil
case *operation.AggregatedAttReceivedData:
return func() io.Reader {
att := structs.AttFromConsensus(v.Attestation.Aggregate)
return jsonMarshalReader(eventName, att)
}, nil
switch att := v.Attestation.AggregateVal().(type) {
case *eth.Attestation:
return func() io.Reader {
att := structs.AttFromConsensus(att)
return jsonMarshalReader(eventName, att)
}, nil
case *eth.AttestationElectra:
return func() io.Reader {
att := structs.AttElectraFromConsensus(att)
return jsonMarshalReader(eventName, att)
}, nil
default:
return nil, errors.Wrapf(errUnhandledEventData, "Unexpected type %T for the .Attestation field of AggregatedAttReceivedData", v.Attestation)
}
case *operation.UnAggregatedAttReceivedData:
switch att := v.Attestation.(type) {
case *eth.Attestation:
@@ -483,6 +498,16 @@ func (s *Server) lazyReaderForEvent(ctx context.Context, event *feed.Event, topi
default:
return nil, errors.Wrapf(errUnhandledEventData, "Unexpected type %T for the .Attestation field of UnAggregatedAttReceivedData", v.Attestation)
}
case *operation.SingleAttReceivedData:
switch att := v.Attestation.(type) {
case *eth.SingleAttestation:
return func() io.Reader {
att := structs.SingleAttFromConsensus(att)
return jsonMarshalReader(eventName, att)
}, nil
default:
return nil, errors.Wrapf(errUnhandledEventData, "Unexpected type %T for the .Attestation field of SingleAttReceivedData", v.Attestation)
}
case *operation.ExitReceivedData:
return func() io.Reader {
return jsonMarshalReader(eventName, structs.SignedExitFromConsensus(v.Exit))

View File

@@ -110,6 +110,7 @@ func (tr *topicRequest) testHttpRequest(ctx context.Context, _ *testing.T) *http
func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) {
topics, err := newTopicRequest([]string{
AttestationTopic,
SingleAttestationTopic,
VoluntaryExitTopic,
SyncCommitteeContributionTopic,
BLSToExecutionChangeTopic,
@@ -123,13 +124,13 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) {
vblob := blocks.NewVerifiedROBlob(ro)
return topics, []*feed.Event{
&feed.Event{
{
Type: operation.UnaggregatedAttReceived,
Data: &operation.UnAggregatedAttReceivedData{
Attestation: util.HydrateAttestation(&eth.Attestation{}),
},
},
&feed.Event{
{
Type: operation.AggregatedAttReceived,
Data: &operation.AggregatedAttReceivedData{
Attestation: &eth.AggregateAttestationAndProof{
@@ -139,7 +140,13 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) {
},
},
},
&feed.Event{
{
Type: operation.SingleAttReceived,
Data: &operation.SingleAttReceivedData{
Attestation: util.HydrateSingleAttestation(&eth.SingleAttestation{}),
},
},
{
Type: operation.ExitReceived,
Data: &operation.ExitReceivedData{
Exit: &eth.SignedVoluntaryExit{
@@ -151,7 +158,7 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) {
},
},
},
&feed.Event{
{
Type: operation.SyncCommitteeContributionReceived,
Data: &operation.SyncCommitteeContributionReceivedData{
Contribution: &eth.SignedContributionAndProof{
@@ -170,7 +177,7 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) {
},
},
},
&feed.Event{
{
Type: operation.BLSToExecutionChangeReceived,
Data: &operation.BLSToExecutionChangeReceivedData{
Change: &eth.SignedBLSToExecutionChange{
@@ -183,13 +190,13 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) {
},
},
},
&feed.Event{
{
Type: operation.BlobSidecarReceived,
Data: &operation.BlobSidecarReceivedData{
Blob: &vblob,
},
},
&feed.Event{
{
Type: operation.AttesterSlashingReceived,
Data: &operation.AttesterSlashingReceivedData{
AttesterSlashing: &eth.AttesterSlashing{
@@ -222,7 +229,7 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) {
},
},
},
&feed.Event{
{
Type: operation.AttesterSlashingReceived,
Data: &operation.AttesterSlashingReceivedData{
AttesterSlashing: &eth.AttesterSlashingElectra{
@@ -255,7 +262,7 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) {
},
},
},
&feed.Event{
{
Type: operation.ProposerSlashingReceived,
Data: &operation.ProposerSlashingReceivedData{
ProposerSlashing: &eth.ProposerSlashing{
@@ -367,7 +374,7 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
b, err := blocks.NewSignedBeaconBlock(util.HydrateSignedBeaconBlock(&eth.SignedBeaconBlock{}))
require.NoError(t, err)
events := []*feed.Event{
&feed.Event{
{
Type: statefeed.BlockProcessed,
Data: &statefeed.BlockProcessedData{
Slot: 0,
@@ -377,7 +384,7 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
Optimistic: false,
},
},
&feed.Event{
{
Type: statefeed.NewHead,
Data: &ethpb.EventHead{
Slot: 0,
@@ -389,7 +396,7 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
ExecutionOptimistic: false,
},
},
&feed.Event{
{
Type: statefeed.Reorg,
Data: &ethpb.EventChainReorg{
Slot: 0,
@@ -402,7 +409,7 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
ExecutionOptimistic: false,
},
},
&feed.Event{
{
Type: statefeed.FinalizedCheckpoint,
Data: &ethpb.EventFinalizedCheckpoint{
Block: make([]byte, 32),
@@ -525,7 +532,7 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
request := topics.testHttpRequest(testSync.ctx, t)
w := NewStreamingResponseWriterRecorder(testSync.ctx)
events := []*feed.Event{
&feed.Event{
{
Type: statefeed.PayloadAttributes,
Data: payloadattribute.EventData{
ProposerIndex: 0,
@@ -577,7 +584,7 @@ func TestStuckReaderScenarios(t *testing.T) {
func wedgedWriterTestCase(t *testing.T, queueDepth func([]*feed.Event) int) {
topics, events := operationEventsFixtures(t)
require.Equal(t, 9, len(events))
require.Equal(t, 10, len(events))
// set eventFeedDepth to a number lower than the events we intend to send to force the server to drop the reader.
stn := mockChain.NewEventFeedWrapper()

View File

@@ -13,7 +13,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/crypto/bls"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -46,7 +45,7 @@ func (vs *Server) ProposeAttestation(ctx context.Context, att *ethpb.Attestation
ctx, span := trace.StartSpan(ctx, "AttesterServer.ProposeAttestation")
defer span.End()
resp, err := vs.proposeAtt(ctx, att, nil, att.GetData().CommitteeIndex)
resp, err := vs.proposeAtt(ctx, att, att.GetData().CommitteeIndex)
if err != nil {
return nil, err
}
@@ -74,21 +73,20 @@ func (vs *Server) ProposeAttestationElectra(ctx context.Context, singleAtt *ethp
ctx, span := trace.StartSpan(ctx, "AttesterServer.ProposeAttestationElectra")
defer span.End()
resp, err := vs.proposeAtt(ctx, singleAtt, singleAtt.GetCommitteeIndex())
if err != nil {
return nil, err
}
targetState, err := vs.AttestationStateFetcher.AttestationTargetState(ctx, singleAtt.Data.Target)
if err != nil {
return nil, status.Error(codes.Internal, "Could not get target state")
}
committeeIndex := singleAtt.GetCommitteeIndex()
committee, err := helpers.BeaconCommitteeFromState(ctx, targetState, singleAtt.Data.Slot, committeeIndex)
committee, err := helpers.BeaconCommitteeFromState(ctx, targetState, singleAtt.Data.Slot, singleAtt.GetCommitteeIndex())
if err != nil {
return nil, status.Error(codes.Internal, "Could not get committee")
}
resp, err := vs.proposeAtt(ctx, singleAtt, committee, committeeIndex)
if err != nil {
return nil, err
}
singleAttCopy := singleAtt.Copy()
att := singleAttCopy.ToAttestationElectra(committee)
if features.Get().EnableExperimentalAttestationPool {
@@ -158,7 +156,6 @@ func (vs *Server) SubscribeCommitteeSubnets(ctx context.Context, req *ethpb.Comm
func (vs *Server) proposeAtt(
ctx context.Context,
att ethpb.Att,
committee []primitives.ValidatorIndex, // required post-Electra
committeeIndex primitives.CommitteeIndex,
) (*ethpb.AttestResponse, error) {
if _, err := bls.SignatureFromBytes(att.GetSignature()); err != nil {
@@ -170,24 +167,23 @@ func (vs *Server) proposeAtt(
return nil, status.Errorf(codes.Internal, "Could not get attestation root: %v", err)
}
var singleAtt *ethpb.SingleAttestation
if att.Version() >= version.Electra {
var ok bool
singleAtt, ok = att.(*ethpb.SingleAttestation)
if !ok {
return nil, status.Errorf(codes.Internal, "Attestation has wrong type (expected %T, got %T)", &ethpb.SingleAttestation{}, att)
}
att = singleAtt.ToAttestationElectra(committee)
}
// Broadcast the unaggregated attestation on a feed to notify other services in the beacon node
// of a received unaggregated attestation.
vs.OperationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.UnaggregatedAttReceived,
Data: &operation.UnAggregatedAttReceivedData{
Attestation: att,
},
})
if att.IsSingle() {
vs.OperationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.SingleAttReceived,
Data: &operation.SingleAttReceivedData{
Attestation: att,
},
})
} else {
vs.OperationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.UnaggregatedAttReceived,
Data: &operation.UnAggregatedAttReceivedData{
Attestation: att,
},
})
}
// Determine subnet to broadcast attestation to
wantedEpoch := slots.ToEpoch(att.GetData().Slot)
@@ -198,13 +194,7 @@ func (vs *Server) proposeAtt(
subnet := helpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), committeeIndex, att.GetData().Slot)
// Broadcast the new attestation to the network.
var attToBroadcast ethpb.Att
if singleAtt != nil {
attToBroadcast = singleAtt
} else {
attToBroadcast = att
}
if err := vs.P2P.BroadcastAttestation(ctx, subnet, attToBroadcast); err != nil {
if err := vs.P2P.BroadcastAttestation(ctx, subnet, att); err != nil {
return nil, status.Errorf(codes.Internal, "Could not broadcast attestation: %v", err)
}

View File

@@ -22,7 +22,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
prysmTime "github.com/prysmaticlabs/prysm/v5/time"
"github.com/prysmaticlabs/prysm/v5/time/slots"
)
@@ -69,18 +68,12 @@ func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms
// Broadcast the aggregated attestation on a feed to notify other services in the beacon node
// of a received aggregated attestation.
// TODO: this will be extended to Electra in a later PR
if m.Version() == version.Phase0 {
phase0Att, ok := m.(*ethpb.SignedAggregateAttestationAndProof)
if ok {
s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.AggregatedAttReceived,
Data: &operation.AggregatedAttReceivedData{
Attestation: phase0Att.Message,
},
})
}
}
s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.AggregatedAttReceived,
Data: &operation.AggregatedAttReceivedData{
Attestation: m.AggregateAttestationAndProof(),
},
})
if err := helpers.ValidateSlotTargetEpoch(data); err != nil {
return pubsub.ValidationReject, err

View File

@@ -139,8 +139,9 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
return validationRes, err
}
var singleAtt *eth.SingleAttestation
if att.Version() >= version.Electra {
singleAtt, ok := att.(*eth.SingleAttestation)
singleAtt, ok = att.(*eth.SingleAttestation)
if !ok {
return pubsub.ValidationIgnore, fmt.Errorf("attestation has wrong type (expected %T, got %T)", &eth.SingleAttestation{}, att)
}
@@ -183,12 +184,21 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
// Broadcast the unaggregated attestation on a feed to notify other services in the beacon node
// of a received unaggregated attestation.
s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.UnaggregatedAttReceived,
Data: &operation.UnAggregatedAttReceivedData{
Attestation: att,
},
})
if singleAtt != nil {
s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.SingleAttReceived,
Data: &operation.SingleAttReceivedData{
Attestation: singleAtt,
},
})
} else {
s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.UnaggregatedAttReceived,
Data: &operation.UnAggregatedAttReceivedData{
Attestation: att,
},
})
}
s.setSeenCommitteeIndicesSlot(data.Slot, committeeIndex, att.GetAggregationBits())

View File

@@ -0,0 +1,3 @@
### Added
- Beacon API event support for `SingleAttestation` and `SignedAggregateAttestationAndProofElectra`.

View File

@@ -321,6 +321,17 @@ func HydrateAttestationElectra(a *ethpb.AttestationElectra) *ethpb.AttestationEl
return a
}
func HydrateSingleAttestation(a *ethpb.SingleAttestation) *ethpb.SingleAttestation {
if a.Signature == nil {
a.Signature = make([]byte, 96)
}
if a.Data == nil {
a.Data = &ethpb.AttestationData{}
}
a.Data = HydrateAttestationData(a.Data)
return a
}
// HydrateV1Attestation hydrates a v1 attestation object with correct field length sizes
// to comply with fssz marshalling and unmarshalling rules.
func HydrateV1Attestation(a *attv1.Attestation) *attv1.Attestation {