mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 15:37:56 -05:00
single attestation cleanup (#14984)
* some cleanup and minor bug fix * adding some comments back in * Update beacon-chain/sync/pending_attestations_queue.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * Update beacon-chain/sync/pending_attestations_queue.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * Update beacon-chain/sync/validate_beacon_attestation.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * Update beacon-chain/sync/validate_beacon_attestation.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * Update beacon-chain/sync/validate_beacon_attestation.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * Update beacon-chain/sync/validate_beacon_attestation.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * Update beacon-chain/sync/validate_beacon_attestation.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * Update beacon-chain/sync/pending_attestations_queue.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * adding comment back in * linting * fixing committeeIndiciesSLot * fixing changelog --------- Co-authored-by: Radosław Kapka <rkapka@wp.pl>
This commit is contained in:
@@ -140,8 +140,7 @@ func (s *Service) processUnaggregated(ctx context.Context, att ethpb.Att) {
|
||||
data := att.GetData()
|
||||
|
||||
// This is an important validation before retrieving attestation pre state to defend against
|
||||
// attestation's target intentionally reference checkpoint that's long ago.
|
||||
// Verify current finalized checkpoint is an ancestor of the block defined by the attestation's beacon block root.
|
||||
// attestation's target intentionally referencing a checkpoint that's long ago.
|
||||
if !s.cfg.chain.InForkchoice(bytesutil.ToBytes32(data.BeaconBlockRoot)) {
|
||||
log.WithError(blockchain.ErrNotDescendantOfFinalized).Debug("Could not verify finalized consistency")
|
||||
return
|
||||
@@ -169,35 +168,57 @@ func (s *Service) processUnaggregated(ctx context.Context, att ethpb.Att) {
|
||||
return
|
||||
}
|
||||
|
||||
var singleAtt *ethpb.SingleAttestation
|
||||
// Decide if the attestation is an Electra SingleAttestation or a Phase0 unaggregated attestation
|
||||
var (
|
||||
attForValidation ethpb.Att
|
||||
broadcastAtt ethpb.Att
|
||||
eventType feed.EventType
|
||||
eventData interface{}
|
||||
)
|
||||
|
||||
if att.Version() >= version.Electra {
|
||||
var ok bool
|
||||
singleAtt, ok = att.(*ethpb.SingleAttestation)
|
||||
singleAtt, ok := att.(*ethpb.SingleAttestation)
|
||||
if !ok {
|
||||
log.Debugf("Attestation has wrong type (expected %T, got %T)", ðpb.SingleAttestation{}, att)
|
||||
return
|
||||
}
|
||||
att = singleAtt.ToAttestationElectra(committee)
|
||||
// Convert Electra SingleAttestation to unaggregated ElectraAttestation. This is needed because many parts of the codebase assume that attestations have a certain structure and SingleAttestation validates these assumptions.
|
||||
attForValidation = singleAtt.ToAttestationElectra(committee)
|
||||
broadcastAtt = singleAtt
|
||||
eventType = operation.SingleAttReceived
|
||||
eventData = &operation.SingleAttReceivedData{
|
||||
Attestation: singleAtt,
|
||||
}
|
||||
} else {
|
||||
// Phase0 attestation
|
||||
attForValidation = att
|
||||
broadcastAtt = att
|
||||
eventType = operation.UnaggregatedAttReceived
|
||||
eventData = &operation.UnAggregatedAttReceivedData{
|
||||
Attestation: att,
|
||||
}
|
||||
}
|
||||
|
||||
valid, err = s.validateUnaggregatedAttWithState(ctx, att, preState)
|
||||
valid, err = s.validateUnaggregatedAttWithState(ctx, attForValidation, preState)
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("Pending unaggregated attestation failed validation")
|
||||
return
|
||||
}
|
||||
|
||||
if valid == pubsub.ValidationAccept {
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
if err = s.cfg.attestationCache.Add(att); err != nil {
|
||||
if err = s.cfg.attestationCache.Add(attForValidation); err != nil {
|
||||
log.WithError(err).Debug("Could not save unaggregated attestation")
|
||||
return
|
||||
}
|
||||
} else {
|
||||
if err := s.cfg.attPool.SaveUnaggregatedAttestation(att); err != nil {
|
||||
if err := s.cfg.attPool.SaveUnaggregatedAttestation(attForValidation); err != nil {
|
||||
log.WithError(err).Debug("Could not save unaggregated attestation")
|
||||
return
|
||||
}
|
||||
}
|
||||
s.setSeenCommitteeIndicesSlot(data.Slot, att.GetCommitteeIndex(), att.GetAggregationBits())
|
||||
|
||||
s.setSeenCommitteeIndicesSlot(data.Slot, attForValidation.GetCommitteeIndex(), attForValidation.GetAggregationBits())
|
||||
|
||||
valCount, err := helpers.ActiveValidatorCount(ctx, preState, slots.ToEpoch(data.Slot))
|
||||
if err != nil {
|
||||
@@ -205,34 +226,16 @@ func (s *Service) processUnaggregated(ctx context.Context, att ethpb.Att) {
|
||||
return
|
||||
}
|
||||
|
||||
// Broadcasting the signed attestation again once a node is able to process it.
|
||||
var attToBroadcast ethpb.Att
|
||||
if singleAtt != nil {
|
||||
attToBroadcast = singleAtt
|
||||
} else {
|
||||
attToBroadcast = att
|
||||
}
|
||||
if err := s.cfg.p2p.BroadcastAttestation(ctx, helpers.ComputeSubnetForAttestation(valCount, attToBroadcast), attToBroadcast); err != nil {
|
||||
// Broadcast the final 'broadcastAtt' object
|
||||
if err := s.cfg.p2p.BroadcastAttestation(ctx, helpers.ComputeSubnetForAttestation(valCount, broadcastAtt), broadcastAtt); err != nil {
|
||||
log.WithError(err).Debug("Could not broadcast")
|
||||
}
|
||||
|
||||
// Broadcast the unaggregated attestation on a feed to notify other services in the beacon node
|
||||
// of a received unaggregated attestation.
|
||||
if singleAtt != nil {
|
||||
s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{
|
||||
Type: operation.SingleAttReceived,
|
||||
Data: &operation.SingleAttReceivedData{
|
||||
Attestation: singleAtt,
|
||||
},
|
||||
})
|
||||
} else {
|
||||
s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{
|
||||
Type: operation.UnaggregatedAttReceived,
|
||||
Data: &operation.UnAggregatedAttReceivedData{
|
||||
Attestation: att,
|
||||
},
|
||||
})
|
||||
}
|
||||
// Feed event notification for other services
|
||||
s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{
|
||||
Type: eventType,
|
||||
Data: eventData,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -34,7 +34,11 @@ import (
|
||||
// - The attestation is unaggregated -- that is, it has exactly one participating validator (len(get_attesting_indices(state, attestation.data, attestation.aggregation_bits)) == 1).
|
||||
// - attestation.data.slot is within the last ATTESTATION_PROPAGATION_SLOT_RANGE slots (attestation.data.slot + ATTESTATION_PROPAGATION_SLOT_RANGE >= current_slot >= attestation.data.slot).
|
||||
// - The signature of attestation is valid.
|
||||
func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
|
||||
func (s *Service) validateCommitteeIndexBeaconAttestation(
|
||||
ctx context.Context,
|
||||
pid peer.ID,
|
||||
msg *pubsub.Message,
|
||||
) (pubsub.ValidationResult, error) {
|
||||
if pid == s.cfg.p2p.PeerID() {
|
||||
return pubsub.ValidationAccept, nil
|
||||
}
|
||||
@@ -64,6 +68,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
|
||||
if err := helpers.ValidateNilAttestation(att); err != nil {
|
||||
return pubsub.ValidationReject, err
|
||||
}
|
||||
|
||||
data := att.GetData()
|
||||
|
||||
// Do not process slot 0 attestations.
|
||||
@@ -73,8 +78,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
|
||||
|
||||
// Attestation's slot is within ATTESTATION_PROPAGATION_SLOT_RANGE and early attestation
|
||||
// processing tolerance.
|
||||
if err := helpers.ValidateAttestationTime(data.Slot, s.cfg.clock.GenesisTime(),
|
||||
earlyAttestationProcessingTolerance); err != nil {
|
||||
if err := helpers.ValidateAttestationTime(data.Slot, s.cfg.clock.GenesisTime(), earlyAttestationProcessingTolerance); err != nil {
|
||||
tracing.AnnotateError(span, err)
|
||||
return pubsub.ValidationIgnore, err
|
||||
}
|
||||
@@ -89,7 +93,6 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
|
||||
if s.hasSeenCommitteeIndicesSlot(data.Slot, committeeIndex, att.GetAggregationBits()) {
|
||||
return pubsub.ValidationIgnore, nil
|
||||
}
|
||||
|
||||
// Reject an attestation if it references an invalid block.
|
||||
if s.hasBadBlock(bytesutil.ToBytes32(data.BeaconBlockRoot)) ||
|
||||
s.hasBadBlock(bytesutil.ToBytes32(data.Target.Root)) ||
|
||||
@@ -99,15 +102,12 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
|
||||
}
|
||||
}
|
||||
|
||||
var validationRes pubsub.ValidationResult
|
||||
|
||||
// Verify the block being voted and the processed state is in beaconDB and the block has passed validation if it's in the beaconDB.
|
||||
blockRoot := bytesutil.ToBytes32(data.BeaconBlockRoot)
|
||||
if !s.hasBlockAndState(ctx, blockRoot) {
|
||||
return s.saveToPendingAttPool(att)
|
||||
}
|
||||
|
||||
if !s.cfg.chain.InForkchoice(bytesutil.ToBytes32(data.BeaconBlockRoot)) {
|
||||
if !s.cfg.chain.InForkchoice(blockRoot) {
|
||||
tracing.AnnotateError(span, blockchain.ErrNotDescendantOfFinalized)
|
||||
return pubsub.ValidationIgnore, blockchain.ErrNotDescendantOfFinalized
|
||||
}
|
||||
@@ -123,12 +123,12 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
|
||||
return pubsub.ValidationIgnore, err
|
||||
}
|
||||
|
||||
validationRes, err = s.validateUnaggregatedAttTopic(ctx, att, preState, *msg.Topic)
|
||||
validationRes, err := s.validateUnaggregatedAttTopic(ctx, att, preState, *msg.Topic)
|
||||
if validationRes != pubsub.ValidationAccept {
|
||||
return validationRes, err
|
||||
}
|
||||
|
||||
committee, err := helpers.BeaconCommitteeFromState(ctx, preState, att.GetData().Slot, committeeIndex)
|
||||
committee, err := helpers.BeaconCommitteeFromState(ctx, preState, data.Slot, committeeIndex)
|
||||
if err != nil {
|
||||
tracing.AnnotateError(span, err)
|
||||
return pubsub.ValidationIgnore, err
|
||||
@@ -139,16 +139,37 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
|
||||
return validationRes, err
|
||||
}
|
||||
|
||||
var singleAtt *eth.SingleAttestation
|
||||
// Consolidated handling of Electra SingleAttestation vs Phase0 unaggregated attestation
|
||||
var (
|
||||
attForValidation eth.Att // what we'll pass to further validation
|
||||
eventType feed.EventType
|
||||
eventData interface{}
|
||||
)
|
||||
|
||||
if att.Version() >= version.Electra {
|
||||
singleAtt, ok = att.(*eth.SingleAttestation)
|
||||
singleAtt, ok := att.(*eth.SingleAttestation)
|
||||
if !ok {
|
||||
return pubsub.ValidationIgnore, fmt.Errorf("attestation has wrong type (expected %T, got %T)", ð.SingleAttestation{}, att)
|
||||
return pubsub.ValidationIgnore, fmt.Errorf(
|
||||
"attestation has wrong type (expected %T, got %T)",
|
||||
ð.SingleAttestation{}, att,
|
||||
)
|
||||
}
|
||||
// Convert Electra SingleAttestation to unaggregated ElectraAttestation. This is needed because many parts of the codebase assume that attestations have a certain structure and SingleAttestation validates these assumptions.
|
||||
attForValidation = singleAtt.ToAttestationElectra(committee)
|
||||
eventType = operation.SingleAttReceived
|
||||
eventData = &operation.SingleAttReceivedData{
|
||||
Attestation: singleAtt,
|
||||
}
|
||||
} else {
|
||||
// Phase0 unaggregated attestation
|
||||
attForValidation = att
|
||||
eventType = operation.UnaggregatedAttReceived
|
||||
eventData = &operation.UnAggregatedAttReceivedData{
|
||||
Attestation: att,
|
||||
}
|
||||
att = singleAtt.ToAttestationElectra(committee)
|
||||
}
|
||||
|
||||
validationRes, err = s.validateUnaggregatedAttWithState(ctx, att, preState)
|
||||
validationRes, err = s.validateUnaggregatedAttWithState(ctx, attForValidation, preState)
|
||||
if validationRes != pubsub.ValidationAccept {
|
||||
return validationRes, err
|
||||
}
|
||||
@@ -172,7 +193,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
|
||||
tracing.AnnotateError(span, err)
|
||||
return
|
||||
}
|
||||
indexedAtt, err := attestation.ConvertToIndexed(ctx, att, committee)
|
||||
indexedAtt, err := attestation.ConvertToIndexed(ctx, attForValidation, committee)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not convert to indexed attestation")
|
||||
tracing.AnnotateError(span, err)
|
||||
@@ -182,27 +203,16 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
|
||||
}()
|
||||
}
|
||||
|
||||
// Broadcast the unaggregated attestation on a feed to notify other services in the beacon node
|
||||
// of a received unaggregated attestation.
|
||||
if singleAtt != nil {
|
||||
s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{
|
||||
Type: operation.SingleAttReceived,
|
||||
Data: &operation.SingleAttReceivedData{
|
||||
Attestation: singleAtt,
|
||||
},
|
||||
})
|
||||
} else {
|
||||
s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{
|
||||
Type: operation.UnaggregatedAttReceived,
|
||||
Data: &operation.UnAggregatedAttReceivedData{
|
||||
Attestation: att,
|
||||
},
|
||||
})
|
||||
}
|
||||
// Notify other services in the beacon node
|
||||
s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{
|
||||
Type: eventType,
|
||||
Data: eventData,
|
||||
})
|
||||
|
||||
s.setSeenCommitteeIndicesSlot(data.Slot, committeeIndex, att.GetAggregationBits())
|
||||
s.setSeenCommitteeIndicesSlot(data.Slot, committeeIndex, attForValidation.GetAggregationBits())
|
||||
|
||||
msg.ValidatorData = att
|
||||
// Attach final validated attestation to the message for further pipeline use
|
||||
msg.ValidatorData = attForValidation
|
||||
|
||||
return pubsub.ValidationAccept, nil
|
||||
}
|
||||
|
||||
4
changelog/james-prysm_attestation-cleanup-suggestions.md
Normal file
4
changelog/james-prysm_attestation-cleanup-suggestions.md
Normal file
@@ -0,0 +1,4 @@
|
||||
### Ignored
|
||||
|
||||
- Cleanup single attestation code for readability.
|
||||
|
||||
Reference in New Issue
Block a user