cleaning up processAggregate (#15823)

* cleaning up some code

* kasey feedback

* further simplifying

* kasey's suggestion
This commit is contained in:
james-prysm
2025-10-08 13:34:32 -05:00
committed by GitHub
parent d5ca327c30
commit 5b20352ac6
2 changed files with 20 additions and 28 deletions

View File

@@ -241,6 +241,9 @@ func (s *Service) saveAttestation(att ethpb.Att) error {
if features.Get().EnableExperimentalAttestationPool {
return s.cfg.attestationCache.Add(att)
}
if att.IsAggregated() {
return s.cfg.attPool.SaveAggregatedAttestation(att)
}
return s.cfg.attPool.SaveUnaggregatedAttestation(att)
}
@@ -300,40 +303,26 @@ func (s *Service) processVerifiedAttestation(
}
func (s *Service) processAggregate(ctx context.Context, aggregate ethpb.SignedAggregateAttAndProof) {
att := aggregate.AggregateAttestationAndProof().AggregateVal()
// Save the pending aggregated attestation to the pool if it passes the aggregated
// validation steps.
valRes, err := s.validateAggregatedAtt(ctx, aggregate)
res, err := s.validateAggregatedAtt(ctx, aggregate)
if err != nil {
log.WithError(err).Debug("Pending aggregated attestation failed validation")
return
}
aggValid := pubsub.ValidationAccept == valRes
if aggValid && s.validateBlockInAttestation(ctx, aggregate) {
if features.Get().EnableExperimentalAttestationPool {
if err = s.cfg.attestationCache.Add(att); err != nil {
log.WithError(err).Debug("Could not save aggregated attestation")
return
}
} else {
if att.IsAggregated() {
if err = s.cfg.attPool.SaveAggregatedAttestation(att); err != nil {
log.WithError(err).Debug("Could not save aggregated attestation")
return
}
} else if err = s.cfg.attPool.SaveUnaggregatedAttestation(att); err != nil {
log.WithError(err).Debug("Could not save unaggregated attestation")
return
}
}
if res != pubsub.ValidationAccept || !s.validateBlockInAttestation(ctx, aggregate) {
log.Debug("Pending aggregated attestation failed validation")
return
}
s.setAggregatorIndexEpochSeen(att.GetData().Target.Epoch, aggregate.AggregateAttestationAndProof().GetAggregatorIndex())
att := aggregate.AggregateAttestationAndProof().AggregateVal()
if err := s.saveAttestation(att); err != nil {
log.WithError(err).Debug("Could not save aggregated attestation")
return
}
// Broadcasting the signed attestation again once a node is able to process it.
if err := s.cfg.p2p.Broadcast(ctx, aggregate); err != nil {
log.WithError(err).Debug("Could not broadcast")
}
s.setAggregatorIndexEpochSeen(att.GetData().Target.Epoch, aggregate.AggregateAttestationAndProof().GetAggregatorIndex())
if err := s.cfg.p2p.Broadcast(ctx, aggregate); err != nil {
log.WithError(err).Debug("Could not broadcast aggregated attestation")
}
}

View File

@@ -0,0 +1,3 @@
### Ignored
- Small code changes for reusability and readability to processAggregate.