Compare commits

...

8 Commits

Author SHA1 Message Date
james-prysm
daa9d39511 accidently checked in code 2025-10-13 12:01:48 -05:00
james-prysm
a37bc1cbd0 we don't actually need to use the ids 2025-10-13 11:41:15 -05:00
james-prysm
01007d8513 Merge branch 'develop' into cleanup-process-attestations 2025-10-13 09:56:23 -05:00
james-prysm
3c3740a76f deleting benchmark tests 2025-10-10 16:59:07 -05:00
james-prysm
9cc41b12d2 editing test 2025-10-10 16:56:31 -05:00
james-prysm
4764dcb729 Merge branch 'develop' into cleanup-process-attestations 2025-10-10 16:41:54 -05:00
james-prysm
94b876e26f fixing tests and removing unused function 2025-10-10 16:18:42 -05:00
james-prysm
ecd1fc8db8 suggestions for rewriting process attestations to increase readability 2025-10-10 15:31:50 -05:00
3 changed files with 91 additions and 191 deletions

View File

@@ -79,25 +79,38 @@ func (s *Service) processPendingAttsForBlock(ctx context.Context, bRoot [32]byte
}
func (s *Service) processAttestations(ctx context.Context, attestations []any) {
if len(attestations) == 0 {
return
}
atts := make([]ethpb.Att, 0, len(attestations))
bucketMap := make(map[[32]byte]*attestationBucket)
var aggs []ethpb.SignedAggregateAttAndProof
for _, att := range attestations {
switch v := att.(type) {
case ethpb.Att:
atts = append(atts, v)
data := v.GetData()
dataHash, err := data.HashTreeRoot()
if err != nil {
log.WithError(err).Debug("Failed to hash attestation data, skipping attestation")
continue
}
if bucket, ok := bucketMap[dataHash]; ok {
bucket.attestations = append(bucket.attestations, v)
} else {
bucketMap[dataHash] = &attestationBucket{
dataHash: dataHash,
data: data,
attestations: []ethpb.Att{v},
}
}
case ethpb.SignedAggregateAttAndProof:
s.processAggregate(ctx, v)
aggs = append(aggs, v)
default:
log.Warnf("Unexpected attestation type %T, skipping", v)
}
}
for _, bucket := range bucketAttestationsByData(atts) {
for _, bucket := range bucketMap {
s.processAttestationBucket(ctx, bucket)
}
for _, agg := range aggs {
s.processAggregate(ctx, agg)
}
}
// attestationBucket groups attestations with the same AttestationData for batch processing.
@@ -107,82 +120,102 @@ type attestationBucket struct {
attestations []ethpb.Att
}
// normalizeAndValidateAtt validates and (if needed) converts an attestation
// to Electra format for pool/verification, returning (bcast, pool, ok).
func (s *Service) normalizeAndValidateAtt(
ctx context.Context,
preState state.ReadOnlyBeaconState, // adjust to your concrete type if needed
data *ethpb.AttestationData,
att ethpb.Att,
) (bcast ethpb.Att, pool ethpb.Att, ok bool) {
committee, err := helpers.BeaconCommitteeFromState(ctx, preState, data.Slot, att.GetCommitteeIndex())
if err != nil {
log.WithError(err).Debug("Failed to get committee from state")
return nil, nil, false
}
v, err := validateAttesterData(ctx, att, committee)
if err != nil {
log.WithError(err).Debug("Failed attester data validation")
return nil, nil, false
}
if v != pubsub.ValidationAccept {
log.Debug("Pending attestation rejected due to invalid data")
return nil, nil, false
}
var conv ethpb.Att
if att.Version() >= version.Electra {
single, ok := att.(*ethpb.SingleAttestation)
if !ok {
log.Debugf("Wrong type: expected %T, got %T", &ethpb.SingleAttestation{}, att)
return nil, nil, false
}
conv = single.ToAttestationElectra(committee)
} else {
conv = att
}
return att, conv, true
}
// processAttestationBucket processes a bucket of attestations with shared AttestationData.
func (s *Service) processAttestationBucket(ctx context.Context, bucket *attestationBucket) {
if bucket == nil || len(bucket.attestations) == 0 {
return
}
data := bucket.data
// Shared validations for the entire bucket.
// 1) Bucket-level checks
if !s.cfg.chain.InForkchoice(bytesutil.ToBytes32(data.BeaconBlockRoot)) {
log.WithError(blockchain.ErrNotDescendantOfFinalized).WithField("root", fmt.Sprintf("%#x", data.BeaconBlockRoot)).Debug("Failed forkchoice check for bucket")
log.WithError(blockchain.ErrNotDescendantOfFinalized).Debug("Failed forkchoice check for bucket")
return
}
preState, err := s.cfg.chain.AttestationTargetState(ctx, data.Target)
if err != nil {
log.WithError(err).Debug("Failed to get attestation prestate for bucket")
return
}
if err := s.cfg.chain.VerifyLmdFfgConsistency(ctx, bucket.attestations[0]); err != nil {
log.WithError(err).Debug("Failed FFG consistency check for bucket")
return
}
// Collect valid attestations for both single and electra formats.
// Broadcast takes single format but attestation pool and batch signature verification take electra format.
forBroadcast := make([]ethpb.Att, 0, len(bucket.attestations))
forPool := make([]ethpb.Att, 0, len(bucket.attestations))
// 2) Normalize/filter into parallel slices
bcast := make([]ethpb.Att, 0, len(bucket.attestations))
pool := make([]ethpb.Att, 0, len(bucket.attestations))
for _, att := range bucket.attestations {
committee, err := helpers.BeaconCommitteeFromState(ctx, preState, data.Slot, att.GetCommitteeIndex())
if err != nil {
log.WithError(err).Debug("Failed to get committee from state")
orig, conv, ok := s.normalizeAndValidateAtt(ctx, preState, data, att)
if !ok {
continue
}
valid, err := validateAttesterData(ctx, att, committee)
if err != nil {
log.WithError(err).Debug("Failed attester data validation")
continue
}
if valid != pubsub.ValidationAccept {
log.Debug("Pending attestation rejected due to invalid data")
continue
}
var conv ethpb.Att
if att.Version() >= version.Electra {
single, ok := att.(*ethpb.SingleAttestation)
if !ok {
log.Debugf("Wrong type: expected %T, got %T", &ethpb.SingleAttestation{}, att)
continue
}
conv = single.ToAttestationElectra(committee)
} else {
conv = att
}
forBroadcast = append(forBroadcast, att)
forPool = append(forPool, conv)
bcast = append(bcast, orig)
pool = append(pool, conv)
}
if len(forPool) == 0 {
if len(pool) == 0 {
return
}
verified := s.batchVerifyAttestationSignatures(ctx, forPool, preState)
verifiedSet := make(map[ethpb.Att]struct{}, len(verified))
for _, att := range verified {
verifiedSet[att] = struct{}{}
// 3) Batch verify → returns the same slice if all pass, or a filtered subset
passed := s.batchVerifyAttestationSignatures(ctx, pool, preState)
// 4) Process verified entries
// Fast path: if all passed (same slice returned), process all by index
if len(passed) == len(pool) {
for i := range pool {
s.processVerifiedAttestation(ctx, bcast[i], pool[i], preState)
}
return
}
for i, poolAtt := range forPool {
if _, ok := verifiedSet[poolAtt]; ok {
s.processVerifiedAttestation(ctx, forBroadcast[i], poolAtt, preState)
// Slow path: build ID map and match
passedSet := make(map[ethpb.Att]struct{}, len(passed))
for _, att := range passed {
passedSet[att] = struct{}{}
}
for i, poolAtt := range pool {
if _, ok := passedSet[poolAtt]; ok {
s.processVerifiedAttestation(ctx, bcast[i], poolAtt, preState)
}
}
}
@@ -467,29 +500,3 @@ func (s *Service) validatePendingAtts(ctx context.Context, slot primitives.Slot)
}
}
}
// bucketAttestationsByData groups attestations by their AttestationData hash.
func bucketAttestationsByData(attestations []ethpb.Att) map[[32]byte]*attestationBucket {
bucketMap := make(map[[32]byte]*attestationBucket)
for _, att := range attestations {
data := att.GetData()
dataHash, err := data.HashTreeRoot()
if err != nil {
log.WithError(err).Debug("Failed to hash attestation data, skipping attestation")
continue
}
if bucket, ok := bucketMap[dataHash]; ok {
bucket.attestations = append(bucket.attestations, att)
} else {
bucketMap[dataHash] = &attestationBucket{
dataHash: dataHash,
data: data,
attestations: []ethpb.Att{att},
}
}
}
return bucketMap
}

View File

@@ -219,116 +219,6 @@ func TestProcessAttestationBucket(t *testing.T) {
})
}
func TestBucketAttestationsByData(t *testing.T) {
t.Run("EmptyInput", func(t *testing.T) {
hook := logTest.NewGlobal()
buckets := bucketAttestationsByData(nil)
require.Equal(t, 0, len(buckets))
require.Equal(t, 0, len(hook.Entries))
buckets = bucketAttestationsByData([]ethpb.Att{})
require.Equal(t, 0, len(buckets))
require.Equal(t, 0, len(hook.Entries))
})
t.Run("SingleAttestation", func(t *testing.T) {
hook := logTest.NewGlobal()
att := util.NewAttestation()
att.Data.Slot = 1
att.Data.CommitteeIndex = 0
buckets := bucketAttestationsByData([]ethpb.Att{att})
require.Equal(t, 1, len(buckets))
var bucket *attestationBucket
for _, b := range buckets {
bucket = b
break
}
require.NotNil(t, bucket)
require.Equal(t, 1, len(bucket.attestations))
require.Equal(t, att, bucket.attestations[0])
require.Equal(t, att.GetData(), bucket.data)
require.Equal(t, 0, len(hook.Entries))
})
t.Run("MultipleAttestationsSameData", func(t *testing.T) {
hook := logTest.NewGlobal()
att1 := util.NewAttestation()
att1.Data.Slot = 1
att1.Data.CommitteeIndex = 0
att2 := util.NewAttestation()
att2.Data = att1.Data // Same data
att2.Signature = make([]byte, 96) // Different signature
buckets := bucketAttestationsByData([]ethpb.Att{att1, att2})
require.Equal(t, 1, len(buckets), "Should have one bucket for same data")
var bucket *attestationBucket
for _, b := range buckets {
bucket = b
break
}
require.NotNil(t, bucket)
require.Equal(t, 2, len(bucket.attestations), "Should have both attestations in one bucket")
require.Equal(t, att1.GetData(), bucket.data)
require.Equal(t, 0, len(hook.Entries))
})
t.Run("MultipleAttestationsDifferentData", func(t *testing.T) {
hook := logTest.NewGlobal()
att1 := util.NewAttestation()
att1.Data.Slot = 1
att1.Data.CommitteeIndex = 0
att2 := util.NewAttestation()
att2.Data.Slot = 2 // Different slot
att2.Data.CommitteeIndex = 1
buckets := bucketAttestationsByData([]ethpb.Att{att1, att2})
require.Equal(t, 2, len(buckets), "Should have two buckets for different data")
bucketCount := 0
for _, bucket := range buckets {
require.Equal(t, 1, len(bucket.attestations), "Each bucket should have one attestation")
bucketCount++
}
require.Equal(t, 2, bucketCount, "Should have exactly two buckets")
require.Equal(t, 0, len(hook.Entries))
})
t.Run("MixedAttestationTypes", func(t *testing.T) {
hook := logTest.NewGlobal()
// Create Phase0 attestation
phase0Att := util.NewAttestation()
phase0Att.Data.Slot = 1
phase0Att.Data.CommitteeIndex = 0
electraAtt := &ethpb.SingleAttestation{
CommitteeId: 0,
AttesterIndex: 1,
Data: phase0Att.Data, // Same data
Signature: make([]byte, 96),
}
buckets := bucketAttestationsByData([]ethpb.Att{phase0Att, electraAtt})
require.Equal(t, 1, len(buckets), "Should have one bucket for same data")
var bucket *attestationBucket
for _, b := range buckets {
bucket = b
break
}
require.NotNil(t, bucket)
require.Equal(t, 2, len(bucket.attestations), "Should have both attestations in one bucket")
require.Equal(t, phase0Att.GetData(), bucket.data)
require.Equal(t, 0, len(hook.Entries))
})
}
func TestBatchVerifyAttestationSignatures(t *testing.T) {
t.Run("EmptyInput", func(t *testing.T) {

View File

@@ -0,0 +1,3 @@
### Ignored
- some cleanup on process attestations to reduce a loop and increase readability.