mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 13:58:09 -05:00
Compare commits
8 Commits
fix-backwa
...
cleanup-pr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
daa9d39511 | ||
|
|
a37bc1cbd0 | ||
|
|
01007d8513 | ||
|
|
3c3740a76f | ||
|
|
9cc41b12d2 | ||
|
|
4764dcb729 | ||
|
|
94b876e26f | ||
|
|
ecd1fc8db8 |
@@ -79,25 +79,38 @@ func (s *Service) processPendingAttsForBlock(ctx context.Context, bRoot [32]byte
|
||||
}
|
||||
|
||||
func (s *Service) processAttestations(ctx context.Context, attestations []any) {
|
||||
if len(attestations) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
atts := make([]ethpb.Att, 0, len(attestations))
|
||||
bucketMap := make(map[[32]byte]*attestationBucket)
|
||||
var aggs []ethpb.SignedAggregateAttAndProof
|
||||
for _, att := range attestations {
|
||||
switch v := att.(type) {
|
||||
case ethpb.Att:
|
||||
atts = append(atts, v)
|
||||
data := v.GetData()
|
||||
dataHash, err := data.HashTreeRoot()
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("Failed to hash attestation data, skipping attestation")
|
||||
continue
|
||||
}
|
||||
if bucket, ok := bucketMap[dataHash]; ok {
|
||||
bucket.attestations = append(bucket.attestations, v)
|
||||
} else {
|
||||
bucketMap[dataHash] = &attestationBucket{
|
||||
dataHash: dataHash,
|
||||
data: data,
|
||||
attestations: []ethpb.Att{v},
|
||||
}
|
||||
}
|
||||
case ethpb.SignedAggregateAttAndProof:
|
||||
s.processAggregate(ctx, v)
|
||||
aggs = append(aggs, v)
|
||||
default:
|
||||
log.Warnf("Unexpected attestation type %T, skipping", v)
|
||||
}
|
||||
}
|
||||
|
||||
for _, bucket := range bucketAttestationsByData(atts) {
|
||||
for _, bucket := range bucketMap {
|
||||
s.processAttestationBucket(ctx, bucket)
|
||||
}
|
||||
for _, agg := range aggs {
|
||||
s.processAggregate(ctx, agg)
|
||||
}
|
||||
}
|
||||
|
||||
// attestationBucket groups attestations with the same AttestationData for batch processing.
|
||||
@@ -107,82 +120,102 @@ type attestationBucket struct {
|
||||
attestations []ethpb.Att
|
||||
}
|
||||
|
||||
// normalizeAndValidateAtt validates and (if needed) converts an attestation
|
||||
// to Electra format for pool/verification, returning (bcast, pool, ok).
|
||||
func (s *Service) normalizeAndValidateAtt(
|
||||
ctx context.Context,
|
||||
preState state.ReadOnlyBeaconState, // adjust to your concrete type if needed
|
||||
data *ethpb.AttestationData,
|
||||
att ethpb.Att,
|
||||
) (bcast ethpb.Att, pool ethpb.Att, ok bool) {
|
||||
committee, err := helpers.BeaconCommitteeFromState(ctx, preState, data.Slot, att.GetCommitteeIndex())
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("Failed to get committee from state")
|
||||
return nil, nil, false
|
||||
}
|
||||
v, err := validateAttesterData(ctx, att, committee)
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("Failed attester data validation")
|
||||
return nil, nil, false
|
||||
}
|
||||
if v != pubsub.ValidationAccept {
|
||||
log.Debug("Pending attestation rejected due to invalid data")
|
||||
return nil, nil, false
|
||||
}
|
||||
|
||||
var conv ethpb.Att
|
||||
if att.Version() >= version.Electra {
|
||||
single, ok := att.(*ethpb.SingleAttestation)
|
||||
if !ok {
|
||||
log.Debugf("Wrong type: expected %T, got %T", ðpb.SingleAttestation{}, att)
|
||||
return nil, nil, false
|
||||
}
|
||||
conv = single.ToAttestationElectra(committee)
|
||||
} else {
|
||||
conv = att
|
||||
}
|
||||
return att, conv, true
|
||||
}
|
||||
|
||||
// processAttestationBucket processes a bucket of attestations with shared AttestationData.
|
||||
func (s *Service) processAttestationBucket(ctx context.Context, bucket *attestationBucket) {
|
||||
if bucket == nil || len(bucket.attestations) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
data := bucket.data
|
||||
|
||||
// Shared validations for the entire bucket.
|
||||
// 1) Bucket-level checks
|
||||
if !s.cfg.chain.InForkchoice(bytesutil.ToBytes32(data.BeaconBlockRoot)) {
|
||||
log.WithError(blockchain.ErrNotDescendantOfFinalized).WithField("root", fmt.Sprintf("%#x", data.BeaconBlockRoot)).Debug("Failed forkchoice check for bucket")
|
||||
log.WithError(blockchain.ErrNotDescendantOfFinalized).Debug("Failed forkchoice check for bucket")
|
||||
return
|
||||
}
|
||||
|
||||
preState, err := s.cfg.chain.AttestationTargetState(ctx, data.Target)
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("Failed to get attestation prestate for bucket")
|
||||
return
|
||||
}
|
||||
|
||||
if err := s.cfg.chain.VerifyLmdFfgConsistency(ctx, bucket.attestations[0]); err != nil {
|
||||
log.WithError(err).Debug("Failed FFG consistency check for bucket")
|
||||
return
|
||||
}
|
||||
|
||||
// Collect valid attestations for both single and electra formats.
|
||||
// Broadcast takes single format but attestation pool and batch signature verification take electra format.
|
||||
forBroadcast := make([]ethpb.Att, 0, len(bucket.attestations))
|
||||
forPool := make([]ethpb.Att, 0, len(bucket.attestations))
|
||||
// 2) Normalize/filter into parallel slices
|
||||
bcast := make([]ethpb.Att, 0, len(bucket.attestations))
|
||||
pool := make([]ethpb.Att, 0, len(bucket.attestations))
|
||||
|
||||
for _, att := range bucket.attestations {
|
||||
committee, err := helpers.BeaconCommitteeFromState(ctx, preState, data.Slot, att.GetCommitteeIndex())
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("Failed to get committee from state")
|
||||
orig, conv, ok := s.normalizeAndValidateAtt(ctx, preState, data, att)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
valid, err := validateAttesterData(ctx, att, committee)
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("Failed attester data validation")
|
||||
continue
|
||||
}
|
||||
if valid != pubsub.ValidationAccept {
|
||||
log.Debug("Pending attestation rejected due to invalid data")
|
||||
continue
|
||||
}
|
||||
|
||||
var conv ethpb.Att
|
||||
if att.Version() >= version.Electra {
|
||||
single, ok := att.(*ethpb.SingleAttestation)
|
||||
if !ok {
|
||||
log.Debugf("Wrong type: expected %T, got %T", ðpb.SingleAttestation{}, att)
|
||||
continue
|
||||
}
|
||||
conv = single.ToAttestationElectra(committee)
|
||||
} else {
|
||||
conv = att
|
||||
}
|
||||
|
||||
forBroadcast = append(forBroadcast, att)
|
||||
forPool = append(forPool, conv)
|
||||
bcast = append(bcast, orig)
|
||||
pool = append(pool, conv)
|
||||
}
|
||||
|
||||
if len(forPool) == 0 {
|
||||
if len(pool) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
verified := s.batchVerifyAttestationSignatures(ctx, forPool, preState)
|
||||
verifiedSet := make(map[ethpb.Att]struct{}, len(verified))
|
||||
for _, att := range verified {
|
||||
verifiedSet[att] = struct{}{}
|
||||
// 3) Batch verify → returns the same slice if all pass, or a filtered subset
|
||||
passed := s.batchVerifyAttestationSignatures(ctx, pool, preState)
|
||||
|
||||
// 4) Process verified entries
|
||||
// Fast path: if all passed (same slice returned), process all by index
|
||||
if len(passed) == len(pool) {
|
||||
for i := range pool {
|
||||
s.processVerifiedAttestation(ctx, bcast[i], pool[i], preState)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
for i, poolAtt := range forPool {
|
||||
if _, ok := verifiedSet[poolAtt]; ok {
|
||||
s.processVerifiedAttestation(ctx, forBroadcast[i], poolAtt, preState)
|
||||
// Slow path: build ID map and match
|
||||
passedSet := make(map[ethpb.Att]struct{}, len(passed))
|
||||
for _, att := range passed {
|
||||
passedSet[att] = struct{}{}
|
||||
}
|
||||
|
||||
for i, poolAtt := range pool {
|
||||
if _, ok := passedSet[poolAtt]; ok {
|
||||
s.processVerifiedAttestation(ctx, bcast[i], poolAtt, preState)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -467,29 +500,3 @@ func (s *Service) validatePendingAtts(ctx context.Context, slot primitives.Slot)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// bucketAttestationsByData groups attestations by their AttestationData hash.
|
||||
func bucketAttestationsByData(attestations []ethpb.Att) map[[32]byte]*attestationBucket {
|
||||
bucketMap := make(map[[32]byte]*attestationBucket)
|
||||
|
||||
for _, att := range attestations {
|
||||
data := att.GetData()
|
||||
dataHash, err := data.HashTreeRoot()
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("Failed to hash attestation data, skipping attestation")
|
||||
continue
|
||||
}
|
||||
|
||||
if bucket, ok := bucketMap[dataHash]; ok {
|
||||
bucket.attestations = append(bucket.attestations, att)
|
||||
} else {
|
||||
bucketMap[dataHash] = &attestationBucket{
|
||||
dataHash: dataHash,
|
||||
data: data,
|
||||
attestations: []ethpb.Att{att},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return bucketMap
|
||||
}
|
||||
|
||||
@@ -219,116 +219,6 @@ func TestProcessAttestationBucket(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestBucketAttestationsByData(t *testing.T) {
|
||||
t.Run("EmptyInput", func(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
buckets := bucketAttestationsByData(nil)
|
||||
require.Equal(t, 0, len(buckets))
|
||||
require.Equal(t, 0, len(hook.Entries))
|
||||
|
||||
buckets = bucketAttestationsByData([]ethpb.Att{})
|
||||
require.Equal(t, 0, len(buckets))
|
||||
require.Equal(t, 0, len(hook.Entries))
|
||||
})
|
||||
|
||||
t.Run("SingleAttestation", func(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
att := util.NewAttestation()
|
||||
att.Data.Slot = 1
|
||||
att.Data.CommitteeIndex = 0
|
||||
|
||||
buckets := bucketAttestationsByData([]ethpb.Att{att})
|
||||
|
||||
require.Equal(t, 1, len(buckets))
|
||||
var bucket *attestationBucket
|
||||
for _, b := range buckets {
|
||||
bucket = b
|
||||
break
|
||||
}
|
||||
require.NotNil(t, bucket)
|
||||
require.Equal(t, 1, len(bucket.attestations))
|
||||
require.Equal(t, att, bucket.attestations[0])
|
||||
require.Equal(t, att.GetData(), bucket.data)
|
||||
require.Equal(t, 0, len(hook.Entries))
|
||||
})
|
||||
|
||||
t.Run("MultipleAttestationsSameData", func(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
|
||||
att1 := util.NewAttestation()
|
||||
att1.Data.Slot = 1
|
||||
att1.Data.CommitteeIndex = 0
|
||||
|
||||
att2 := util.NewAttestation()
|
||||
att2.Data = att1.Data // Same data
|
||||
att2.Signature = make([]byte, 96) // Different signature
|
||||
|
||||
buckets := bucketAttestationsByData([]ethpb.Att{att1, att2})
|
||||
|
||||
require.Equal(t, 1, len(buckets), "Should have one bucket for same data")
|
||||
var bucket *attestationBucket
|
||||
for _, b := range buckets {
|
||||
bucket = b
|
||||
break
|
||||
}
|
||||
require.NotNil(t, bucket)
|
||||
require.Equal(t, 2, len(bucket.attestations), "Should have both attestations in one bucket")
|
||||
require.Equal(t, att1.GetData(), bucket.data)
|
||||
require.Equal(t, 0, len(hook.Entries))
|
||||
})
|
||||
|
||||
t.Run("MultipleAttestationsDifferentData", func(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
|
||||
att1 := util.NewAttestation()
|
||||
att1.Data.Slot = 1
|
||||
att1.Data.CommitteeIndex = 0
|
||||
|
||||
att2 := util.NewAttestation()
|
||||
att2.Data.Slot = 2 // Different slot
|
||||
att2.Data.CommitteeIndex = 1
|
||||
|
||||
buckets := bucketAttestationsByData([]ethpb.Att{att1, att2})
|
||||
|
||||
require.Equal(t, 2, len(buckets), "Should have two buckets for different data")
|
||||
bucketCount := 0
|
||||
for _, bucket := range buckets {
|
||||
require.Equal(t, 1, len(bucket.attestations), "Each bucket should have one attestation")
|
||||
bucketCount++
|
||||
}
|
||||
require.Equal(t, 2, bucketCount, "Should have exactly two buckets")
|
||||
require.Equal(t, 0, len(hook.Entries))
|
||||
})
|
||||
|
||||
t.Run("MixedAttestationTypes", func(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
|
||||
// Create Phase0 attestation
|
||||
phase0Att := util.NewAttestation()
|
||||
phase0Att.Data.Slot = 1
|
||||
phase0Att.Data.CommitteeIndex = 0
|
||||
|
||||
electraAtt := ðpb.SingleAttestation{
|
||||
CommitteeId: 0,
|
||||
AttesterIndex: 1,
|
||||
Data: phase0Att.Data, // Same data
|
||||
Signature: make([]byte, 96),
|
||||
}
|
||||
|
||||
buckets := bucketAttestationsByData([]ethpb.Att{phase0Att, electraAtt})
|
||||
|
||||
require.Equal(t, 1, len(buckets), "Should have one bucket for same data")
|
||||
var bucket *attestationBucket
|
||||
for _, b := range buckets {
|
||||
bucket = b
|
||||
break
|
||||
}
|
||||
require.NotNil(t, bucket)
|
||||
require.Equal(t, 2, len(bucket.attestations), "Should have both attestations in one bucket")
|
||||
require.Equal(t, phase0Att.GetData(), bucket.data)
|
||||
require.Equal(t, 0, len(hook.Entries))
|
||||
})
|
||||
}
|
||||
|
||||
func TestBatchVerifyAttestationSignatures(t *testing.T) {
|
||||
t.Run("EmptyInput", func(t *testing.T) {
|
||||
|
||||
3
changelog/james-prysm_cleanup-process-attestations.md
Normal file
3
changelog/james-prysm_cleanup-process-attestations.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Ignored
|
||||
|
||||
- some cleanup on process attestations to reduce a loop and increase readability.
|
||||
Reference in New Issue
Block a user