Compare commits

...

1 Commits

Author SHA1 Message Date
terence tsao
c0c49d971c Save pending beacon attestation without using aggregate form 2024-09-19 10:31:34 -07:00
6 changed files with 75 additions and 23 deletions

View File

@@ -384,7 +384,10 @@ func (s *Service) SubmitSignedAggregateSelectionProof(
if agg == nil {
return &RpcError{Err: errors.New("signed aggregate request can't be nil"), Reason: BadRequest}
}
attAndProof := agg.AggregateAttestationAndProof()
attAndProof, err := agg.AggregateAttestationAndProof()
if err != nil {
return &RpcError{Err: err, Reason: BadRequest}
}
if attAndProof == nil {
return &RpcError{Err: errors.New("signed aggregate request can't be nil"), Reason: BadRequest}
}

View File

@@ -90,7 +90,11 @@ func (s *Service) processPendingAtts(ctx context.Context) error {
func (s *Service) processAttestations(ctx context.Context, attestations []ethpb.SignedAggregateAttAndProof) {
for _, signedAtt := range attestations {
aggregate := signedAtt.AggregateAttestationAndProof().AggregateVal()
a, err := signedAtt.AggregateAttestationAndProof()
if err != nil {
continue
}
aggregate := a.AggregateVal()
data := aggregate.GetData()
// The pending attestations can arrive in both aggregated and unaggregated forms,
// each from has distinct validation steps.
@@ -107,7 +111,7 @@ func (s *Service) processAttestations(ctx context.Context, attestations []ethpb.
log.WithError(err).Debug("Could not save aggregate attestation")
continue
}
s.setAggregatorIndexEpochSeen(data.Target.Epoch, signedAtt.AggregateAttestationAndProof().GetAggregatorIndex())
s.setAggregatorIndexEpochSeen(data.Target.Epoch, a.GetAggregatorIndex())
// Broadcasting the signed attestation again once a node is able to process it.
if err := s.cfg.p2p.Broadcast(ctx, signedAtt); err != nil {
@@ -163,7 +167,12 @@ func (s *Service) processAttestations(ctx context.Context, attestations []ethpb.
// that voted for that block root. The caller of this function is responsible
// for not sending repeated attestations to the pending queue.
func (s *Service) savePendingAtt(att ethpb.SignedAggregateAttAndProof) {
root := bytesutil.ToBytes32(att.AggregateAttestationAndProof().AggregateVal().GetData().BeaconBlockRoot)
a, err := att.AggregateAttestationAndProof()
if err != nil {
log.WithError(err).Debug("Could not get aggregate attestation and proof")
return
}
root := bytesutil.ToBytes32(a.AggregateVal().GetData().BeaconBlockRoot)
s.pendingAttsLock.Lock()
defer s.pendingAttsLock.Unlock()
@@ -195,15 +204,23 @@ func (s *Service) savePendingAtt(att ethpb.SignedAggregateAttAndProof) {
}
func attsAreEqual(a, b ethpb.SignedAggregateAttAndProof) bool {
aAtt, err := a.AggregateAttestationAndProof()
if err != nil {
return false
}
bAtt, err := b.AggregateAttestationAndProof()
if err != nil {
return false
}
if a.GetSignature() != nil {
return b.GetSignature() != nil && a.AggregateAttestationAndProof().GetAggregatorIndex() == b.AggregateAttestationAndProof().GetAggregatorIndex()
return b.GetSignature() != nil && aAtt.GetAggregatorIndex() == bAtt.GetAggregatorIndex()
}
if b.GetSignature() != nil {
return false
}
aAggregate := a.AggregateAttestationAndProof().AggregateVal()
bAggregate := b.AggregateAttestationAndProof().AggregateVal()
aAggregate := aAtt.AggregateVal()
bAggregate := bAtt.AggregateVal()
aData := aAggregate.GetData()
bData := bAggregate.GetData()
@@ -235,7 +252,11 @@ func (s *Service) validatePendingAtts(ctx context.Context, slot primitives.Slot)
for bRoot, atts := range s.blkRootToPendingAtts {
for i := len(atts) - 1; i >= 0; i-- {
if slot >= atts[i].AggregateAttestationAndProof().AggregateVal().GetData().Slot+params.BeaconConfig().SlotsPerEpoch {
att, err := atts[i].AggregateAttestationAndProof()
if err != nil {
continue
}
if slot >= att.AggregateVal().GetData().Slot+params.BeaconConfig().SlotsPerEpoch {
// Remove the pending attestation from the list in place.
atts = append(atts[:i], atts[i+1:]...)
}

View File

@@ -18,7 +18,11 @@ func (s *Service) beaconAggregateProofSubscriber(_ context.Context, msg proto.Me
return fmt.Errorf("message was not type ethpb.SignedAggregateAttAndProof, type=%T", msg)
}
aggregate := a.AggregateAttestationAndProof().AggregateVal()
agg, err := a.AggregateAttestationAndProof()
if err != nil {
return err
}
aggregate := agg.AggregateVal()
if aggregate == nil || aggregate.GetData() == nil {
return errors.New("nil aggregate")

View File

@@ -52,11 +52,15 @@ func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms
if !ok {
return pubsub.ValidationReject, errors.Errorf("invalid message type: %T", raw)
}
if m.AggregateAttestationAndProof() == nil {
agg, err := m.AggregateAttestationAndProof()
if err != nil {
return pubsub.ValidationIgnore, err
}
if agg == nil {
return pubsub.ValidationReject, errNilMessage
}
aggregate := m.AggregateAttestationAndProof().AggregateVal()
aggregate := agg.AggregateVal()
data := aggregate.GetData()
if err := helpers.ValidateNilAttestation(aggregate); err != nil {
@@ -98,7 +102,7 @@ func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms
}
// Verify this is the first aggregate received from the aggregator with index and slot.
if s.hasSeenAggregatorIndexEpoch(data.Target.Epoch, m.AggregateAttestationAndProof().GetAggregatorIndex()) {
if s.hasSeenAggregatorIndexEpoch(data.Target.Epoch, agg.GetAggregatorIndex()) {
return pubsub.ValidationIgnore, nil
}
// Check that the block being voted on isn't invalid.
@@ -127,7 +131,7 @@ func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms
return validationRes, err
}
s.setAggregatorIndexEpochSeen(data.Target.Epoch, m.AggregateAttestationAndProof().GetAggregatorIndex())
s.setAggregatorIndexEpochSeen(data.Target.Epoch, agg.GetAggregatorIndex())
msg.ValidatorData = m
@@ -140,7 +144,10 @@ func (s *Service) validateAggregatedAtt(ctx context.Context, signed ethpb.Signed
ctx, span := trace.StartSpan(ctx, "sync.validateAggregatedAtt")
defer span.End()
aggregateAndProof := signed.AggregateAttestationAndProof()
aggregateAndProof, err := signed.AggregateAttestationAndProof()
if err != nil {
return pubsub.ValidationIgnore, err
}
aggregatorIndex := aggregateAndProof.GetAggregatorIndex()
aggregate := aggregateAndProof.AggregateVal()
data := aggregate.GetData()
@@ -225,7 +232,11 @@ func (s *Service) validateAggregatedAtt(ctx context.Context, signed ethpb.Signed
func (s *Service) validateBlockInAttestation(ctx context.Context, satt ethpb.SignedAggregateAttAndProof) bool {
// Verify the block being voted and the processed state is in beaconDB. The block should have passed validation if it's in the beaconDB.
blockRoot := bytesutil.ToBytes32(satt.AggregateAttestationAndProof().AggregateVal().GetData().BeaconBlockRoot)
a, err := satt.AggregateAttestationAndProof()
if err != nil {
return false
}
blockRoot := bytesutil.ToBytes32(a.AggregateVal().GetData().BeaconBlockRoot)
if !s.hasBlockAndState(ctx, blockRoot) {
// A node doesn't have the block, it'll request from peer while saving the pending attestation to a queue.
s.savePendingAtt(satt)
@@ -333,7 +344,10 @@ func validateSelectionIndex(
// This returns aggregator signature set which can be used to batch verify.
func aggSigSet(s state.ReadOnlyBeaconState, a ethpb.SignedAggregateAttAndProof) (*bls.SignatureBatch, error) {
aggregateAndProof := a.AggregateAttestationAndProof()
aggregateAndProof, err := a.AggregateAttestationAndProof()
if err != nil {
return nil, err
}
v, err := s.ValidatorAtIndex(aggregateAndProof.GetAggregatorIndex())
if err != nil {

View File

@@ -126,14 +126,14 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
if !ok {
return pubsub.ValidationIgnore, fmt.Errorf("attestation has wrong type (expected %T, got %T)", &eth.AttestationElectra{}, att)
}
s.savePendingAtt(&eth.SignedAggregateAttestationAndProofElectra{Message: &eth.AggregateAttestationAndProofElectra{Aggregate: a}})
s.savePendingAtt(a)
} else {
a, ok := att.(*eth.Attestation)
// This will never fail in practice because we asserted the version
if !ok {
return pubsub.ValidationIgnore, fmt.Errorf("attestation has wrong type (expected %T, got %T)", &eth.Attestation{}, att)
}
s.savePendingAtt(&eth.SignedAggregateAttestationAndProof{Message: &eth.AggregateAttestationAndProof{Aggregate: a}})
s.savePendingAtt(a)
}
return pubsub.ValidationIgnore, nil
}

View File

@@ -46,7 +46,7 @@ type SignedAggregateAttAndProof interface {
ssz.Unmarshaler
ssz.HashRoot
Version() int
AggregateAttestationAndProof() AggregateAttAndProof
AggregateAttestationAndProof() (AggregateAttAndProof, error)
GetSignature() []byte
}
@@ -135,6 +135,11 @@ func (a *Attestation) GetCommitteeIndex() (primitives.CommitteeIndex, error) {
return a.Data.CommitteeIndex, nil
}
func (a *Attestation) AggregateAttestationAndProof() (AggregateAttAndProof, error) {
//TODO implement me
panic("implement me")
}
// Version --
func (a *PendingAttestation) Version() int {
return version.Phase0
@@ -222,6 +227,11 @@ func (a *AttestationElectra) GetCommitteeIndex() (primitives.CommitteeIndex, err
return primitives.CommitteeIndex(uint64(indices[0])), nil
}
func (a *AttestationElectra) AggregateAttestationAndProof() (AggregateAttAndProof, error) {
//TODO implement me
panic("implement me")
}
// Version --
func (a *IndexedAttestation) Version() int {
return version.Phase0
@@ -341,8 +351,8 @@ func (a *SignedAggregateAttestationAndProof) Version() int {
}
// AggregateAttestationAndProof --
func (a *SignedAggregateAttestationAndProof) AggregateAttestationAndProof() AggregateAttAndProof {
return a.Message
func (a *SignedAggregateAttestationAndProof) AggregateAttestationAndProof() (AggregateAttAndProof, error) {
return a.Message, nil
}
// Version --
@@ -351,6 +361,6 @@ func (a *SignedAggregateAttestationAndProofElectra) Version() int {
}
// AggregateAttestationAndProof --
func (a *SignedAggregateAttestationAndProofElectra) AggregateAttestationAndProof() AggregateAttAndProof {
return a.Message
func (a *SignedAggregateAttestationAndProofElectra) AggregateAttestationAndProof() (AggregateAttAndProof, error) {
return a.Message, nil
}