diff --git a/beacon-chain/sync/pending_attestations_queue.go b/beacon-chain/sync/pending_attestations_queue.go index 5ee5b71119..96a3546ca1 100644 --- a/beacon-chain/sync/pending_attestations_queue.go +++ b/beacon-chain/sync/pending_attestations_queue.go @@ -140,8 +140,7 @@ func (s *Service) processUnaggregated(ctx context.Context, att ethpb.Att) { data := att.GetData() // This is an important validation before retrieving attestation pre state to defend against - // attestation's target intentionally reference checkpoint that's long ago. - // Verify current finalized checkpoint is an ancestor of the block defined by the attestation's beacon block root. + // attestation's target intentionally referencing a checkpoint that's long ago. if !s.cfg.chain.InForkchoice(bytesutil.ToBytes32(data.BeaconBlockRoot)) { log.WithError(blockchain.ErrNotDescendantOfFinalized).Debug("Could not verify finalized consistency") return @@ -169,35 +168,57 @@ func (s *Service) processUnaggregated(ctx context.Context, att ethpb.Att) { return } - var singleAtt *ethpb.SingleAttestation + // Decide if the attestation is an Electra SingleAttestation or a Phase0 unaggregated attestation + var ( + attForValidation ethpb.Att + broadcastAtt ethpb.Att + eventType feed.EventType + eventData interface{} + ) + if att.Version() >= version.Electra { - var ok bool - singleAtt, ok = att.(*ethpb.SingleAttestation) + singleAtt, ok := att.(*ethpb.SingleAttestation) if !ok { log.Debugf("Attestation has wrong type (expected %T, got %T)", ðpb.SingleAttestation{}, att) return } - att = singleAtt.ToAttestationElectra(committee) + // Convert Electra SingleAttestation to unaggregated ElectraAttestation. This is needed because many parts of the codebase assume that attestations have a certain structure and SingleAttestation validates these assumptions. + attForValidation = singleAtt.ToAttestationElectra(committee) + broadcastAtt = singleAtt + eventType = operation.SingleAttReceived + eventData = &operation.SingleAttReceivedData{ + Attestation: singleAtt, + } + } else { + // Phase0 attestation + attForValidation = att + broadcastAtt = att + eventType = operation.UnaggregatedAttReceived + eventData = &operation.UnAggregatedAttReceivedData{ + Attestation: att, + } } - valid, err = s.validateUnaggregatedAttWithState(ctx, att, preState) + valid, err = s.validateUnaggregatedAttWithState(ctx, attForValidation, preState) if err != nil { log.WithError(err).Debug("Pending unaggregated attestation failed validation") return } + if valid == pubsub.ValidationAccept { if features.Get().EnableExperimentalAttestationPool { - if err = s.cfg.attestationCache.Add(att); err != nil { + if err = s.cfg.attestationCache.Add(attForValidation); err != nil { log.WithError(err).Debug("Could not save unaggregated attestation") return } } else { - if err := s.cfg.attPool.SaveUnaggregatedAttestation(att); err != nil { + if err := s.cfg.attPool.SaveUnaggregatedAttestation(attForValidation); err != nil { log.WithError(err).Debug("Could not save unaggregated attestation") return } } - s.setSeenCommitteeIndicesSlot(data.Slot, att.GetCommitteeIndex(), att.GetAggregationBits()) + + s.setSeenCommitteeIndicesSlot(data.Slot, attForValidation.GetCommitteeIndex(), attForValidation.GetAggregationBits()) valCount, err := helpers.ActiveValidatorCount(ctx, preState, slots.ToEpoch(data.Slot)) if err != nil { @@ -205,34 +226,16 @@ func (s *Service) processUnaggregated(ctx context.Context, att ethpb.Att) { return } - // Broadcasting the signed attestation again once a node is able to process it. - var attToBroadcast ethpb.Att - if singleAtt != nil { - attToBroadcast = singleAtt - } else { - attToBroadcast = att - } - if err := s.cfg.p2p.BroadcastAttestation(ctx, helpers.ComputeSubnetForAttestation(valCount, attToBroadcast), attToBroadcast); err != nil { + // Broadcast the final 'broadcastAtt' object + if err := s.cfg.p2p.BroadcastAttestation(ctx, helpers.ComputeSubnetForAttestation(valCount, broadcastAtt), broadcastAtt); err != nil { log.WithError(err).Debug("Could not broadcast") } - // Broadcast the unaggregated attestation on a feed to notify other services in the beacon node - // of a received unaggregated attestation. - if singleAtt != nil { - s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{ - Type: operation.SingleAttReceived, - Data: &operation.SingleAttReceivedData{ - Attestation: singleAtt, - }, - }) - } else { - s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{ - Type: operation.UnaggregatedAttReceived, - Data: &operation.UnAggregatedAttReceivedData{ - Attestation: att, - }, - }) - } + // Feed event notification for other services + s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{ + Type: eventType, + Data: eventData, + }) } } diff --git a/beacon-chain/sync/validate_beacon_attestation.go b/beacon-chain/sync/validate_beacon_attestation.go index 7aaa943bbe..eca8f9c09b 100644 --- a/beacon-chain/sync/validate_beacon_attestation.go +++ b/beacon-chain/sync/validate_beacon_attestation.go @@ -34,7 +34,11 @@ import ( // - The attestation is unaggregated -- that is, it has exactly one participating validator (len(get_attesting_indices(state, attestation.data, attestation.aggregation_bits)) == 1). // - attestation.data.slot is within the last ATTESTATION_PROPAGATION_SLOT_RANGE slots (attestation.data.slot + ATTESTATION_PROPAGATION_SLOT_RANGE >= current_slot >= attestation.data.slot). // - The signature of attestation is valid. -func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) { +func (s *Service) validateCommitteeIndexBeaconAttestation( + ctx context.Context, + pid peer.ID, + msg *pubsub.Message, +) (pubsub.ValidationResult, error) { if pid == s.cfg.p2p.PeerID() { return pubsub.ValidationAccept, nil } @@ -64,6 +68,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p if err := helpers.ValidateNilAttestation(att); err != nil { return pubsub.ValidationReject, err } + data := att.GetData() // Do not process slot 0 attestations. @@ -73,8 +78,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p // Attestation's slot is within ATTESTATION_PROPAGATION_SLOT_RANGE and early attestation // processing tolerance. - if err := helpers.ValidateAttestationTime(data.Slot, s.cfg.clock.GenesisTime(), - earlyAttestationProcessingTolerance); err != nil { + if err := helpers.ValidateAttestationTime(data.Slot, s.cfg.clock.GenesisTime(), earlyAttestationProcessingTolerance); err != nil { tracing.AnnotateError(span, err) return pubsub.ValidationIgnore, err } @@ -89,7 +93,6 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p if s.hasSeenCommitteeIndicesSlot(data.Slot, committeeIndex, att.GetAggregationBits()) { return pubsub.ValidationIgnore, nil } - // Reject an attestation if it references an invalid block. if s.hasBadBlock(bytesutil.ToBytes32(data.BeaconBlockRoot)) || s.hasBadBlock(bytesutil.ToBytes32(data.Target.Root)) || @@ -99,15 +102,12 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p } } - var validationRes pubsub.ValidationResult - // Verify the block being voted and the processed state is in beaconDB and the block has passed validation if it's in the beaconDB. blockRoot := bytesutil.ToBytes32(data.BeaconBlockRoot) if !s.hasBlockAndState(ctx, blockRoot) { return s.saveToPendingAttPool(att) } - - if !s.cfg.chain.InForkchoice(bytesutil.ToBytes32(data.BeaconBlockRoot)) { + if !s.cfg.chain.InForkchoice(blockRoot) { tracing.AnnotateError(span, blockchain.ErrNotDescendantOfFinalized) return pubsub.ValidationIgnore, blockchain.ErrNotDescendantOfFinalized } @@ -123,12 +123,12 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p return pubsub.ValidationIgnore, err } - validationRes, err = s.validateUnaggregatedAttTopic(ctx, att, preState, *msg.Topic) + validationRes, err := s.validateUnaggregatedAttTopic(ctx, att, preState, *msg.Topic) if validationRes != pubsub.ValidationAccept { return validationRes, err } - committee, err := helpers.BeaconCommitteeFromState(ctx, preState, att.GetData().Slot, committeeIndex) + committee, err := helpers.BeaconCommitteeFromState(ctx, preState, data.Slot, committeeIndex) if err != nil { tracing.AnnotateError(span, err) return pubsub.ValidationIgnore, err @@ -139,16 +139,37 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p return validationRes, err } - var singleAtt *eth.SingleAttestation + // Consolidated handling of Electra SingleAttestation vs Phase0 unaggregated attestation + var ( + attForValidation eth.Att // what we'll pass to further validation + eventType feed.EventType + eventData interface{} + ) + if att.Version() >= version.Electra { - singleAtt, ok = att.(*eth.SingleAttestation) + singleAtt, ok := att.(*eth.SingleAttestation) if !ok { - return pubsub.ValidationIgnore, fmt.Errorf("attestation has wrong type (expected %T, got %T)", ð.SingleAttestation{}, att) + return pubsub.ValidationIgnore, fmt.Errorf( + "attestation has wrong type (expected %T, got %T)", + ð.SingleAttestation{}, att, + ) + } + // Convert Electra SingleAttestation to unaggregated ElectraAttestation. This is needed because many parts of the codebase assume that attestations have a certain structure and SingleAttestation validates these assumptions. + attForValidation = singleAtt.ToAttestationElectra(committee) + eventType = operation.SingleAttReceived + eventData = &operation.SingleAttReceivedData{ + Attestation: singleAtt, + } + } else { + // Phase0 unaggregated attestation + attForValidation = att + eventType = operation.UnaggregatedAttReceived + eventData = &operation.UnAggregatedAttReceivedData{ + Attestation: att, } - att = singleAtt.ToAttestationElectra(committee) } - validationRes, err = s.validateUnaggregatedAttWithState(ctx, att, preState) + validationRes, err = s.validateUnaggregatedAttWithState(ctx, attForValidation, preState) if validationRes != pubsub.ValidationAccept { return validationRes, err } @@ -172,7 +193,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p tracing.AnnotateError(span, err) return } - indexedAtt, err := attestation.ConvertToIndexed(ctx, att, committee) + indexedAtt, err := attestation.ConvertToIndexed(ctx, attForValidation, committee) if err != nil { log.WithError(err).Error("Could not convert to indexed attestation") tracing.AnnotateError(span, err) @@ -182,27 +203,16 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p }() } - // Broadcast the unaggregated attestation on a feed to notify other services in the beacon node - // of a received unaggregated attestation. - if singleAtt != nil { - s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{ - Type: operation.SingleAttReceived, - Data: &operation.SingleAttReceivedData{ - Attestation: singleAtt, - }, - }) - } else { - s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{ - Type: operation.UnaggregatedAttReceived, - Data: &operation.UnAggregatedAttReceivedData{ - Attestation: att, - }, - }) - } + // Notify other services in the beacon node + s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{ + Type: eventType, + Data: eventData, + }) - s.setSeenCommitteeIndicesSlot(data.Slot, committeeIndex, att.GetAggregationBits()) + s.setSeenCommitteeIndicesSlot(data.Slot, committeeIndex, attForValidation.GetAggregationBits()) - msg.ValidatorData = att + // Attach final validated attestation to the message for further pipeline use + msg.ValidatorData = attForValidation return pubsub.ValidationAccept, nil } diff --git a/changelog/james-prysm_attestation-cleanup-suggestions.md b/changelog/james-prysm_attestation-cleanup-suggestions.md new file mode 100644 index 0000000000..b28c8614de --- /dev/null +++ b/changelog/james-prysm_attestation-cleanup-suggestions.md @@ -0,0 +1,4 @@ +### Ignored + +- Cleanup single attestation code for readability. +