diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 00b9354aa1..d151af3d3e 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -174,6 +174,7 @@ go_test( "fork_watcher_test.go", "kzg_batch_verifier_test.go", "once_test.go", + "pending_attestations_queue_bucket_test.go", "pending_attestations_queue_test.go", "pending_blocks_queue_test.go", "rate_limiter_test.go", diff --git a/beacon-chain/sync/pending_attestations_queue.go b/beacon-chain/sync/pending_attestations_queue.go index 1270ffac71..3153ec6ed3 100644 --- a/beacon-chain/sync/pending_attestations_queue.go +++ b/beacon-chain/sync/pending_attestations_queue.go @@ -7,9 +7,11 @@ import ( "fmt" "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain" + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/blocks" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/operation" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers" + "github.com/OffchainLabs/prysm/v6/beacon-chain/state" "github.com/OffchainLabs/prysm/v6/config/features" "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" @@ -18,6 +20,7 @@ import ( "github.com/OffchainLabs/prysm/v6/monitoring/tracing/trace" ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" "github.com/OffchainLabs/prysm/v6/runtime/version" + "github.com/OffchainLabs/prysm/v6/time" "github.com/OffchainLabs/prysm/v6/time/slots" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/sirupsen/logrus" @@ -47,12 +50,16 @@ func (s *Service) processPendingAttsForBlock(ctx context.Context, bRoot [32]byte s.pendingAttsLock.RUnlock() if len(attestations) > 0 { + start := time.Now() s.processAttestations(ctx, attestations) + duration := time.Since(start) log.WithFields(logrus.Fields{ "blockRoot": hex.EncodeToString(bytesutil.Trunc(bRoot[:])), "pendingAttsCount": len(attestations), + "duration": duration, }).Debug("Verified and saved pending attestations to pool") } + randGen := rand.NewGenerator() // Delete the missing block root key from pending attestation queue so a node will not request for the block again. s.pendingAttsLock.Lock() @@ -72,18 +79,224 @@ func (s *Service) processPendingAttsForBlock(ctx context.Context, bRoot [32]byte } func (s *Service) processAttestations(ctx context.Context, attestations []any) { - for _, signedAtt := range attestations { - // The pending attestations can arrive as both aggregates and attestations, - // and each form has to be processed differently. - switch t := signedAtt.(type) { + if len(attestations) == 0 { + return + } + + atts := make([]ethpb.Att, 0, len(attestations)) + for _, att := range attestations { + switch v := att.(type) { case ethpb.Att: - s.processAtt(ctx, t) + atts = append(atts, v) case ethpb.SignedAggregateAttAndProof: - s.processAggregate(ctx, t) + s.processAggregate(ctx, v) default: - log.Warnf("Unexpected item of type %T in pending attestation queue. Item will not be processed", t) + log.Warnf("Unexpected attestation type %T, skipping", v) } } + + for _, bucket := range bucketAttestationsByData(atts) { + s.processAttestationBucket(ctx, bucket) + } +} + +// attestationBucket groups attestations with the same AttestationData for batch processing. +type attestationBucket struct { + dataHash [32]byte + data *ethpb.AttestationData + attestations []ethpb.Att +} + +// processAttestationBucket processes a bucket of attestations with shared AttestationData. +func (s *Service) processAttestationBucket(ctx context.Context, bucket *attestationBucket) { + if bucket == nil || len(bucket.attestations) == 0 { + return + } + + data := bucket.data + + // Shared validations for the entire bucket. + if !s.cfg.chain.InForkchoice(bytesutil.ToBytes32(data.BeaconBlockRoot)) { + log.WithError(blockchain.ErrNotDescendantOfFinalized).WithField("root", fmt.Sprintf("%#x", data.BeaconBlockRoot)).Debug("Failed forkchoice check for bucket") + return + } + + preState, err := s.cfg.chain.AttestationTargetState(ctx, data.Target) + if err != nil { + log.WithError(err).Debug("Failed to get attestation prestate for bucket") + return + } + + if err := s.cfg.chain.VerifyLmdFfgConsistency(ctx, bucket.attestations[0]); err != nil { + log.WithError(err).Debug("Failed FFG consistency check for bucket") + return + } + + // Collect valid attestations for both single and electra formats. + // Broadcast takes single format but attestation pool and batch signature verification take electra format. + forBroadcast := make([]ethpb.Att, 0, len(bucket.attestations)) + forPool := make([]ethpb.Att, 0, len(bucket.attestations)) + + for _, att := range bucket.attestations { + committee, err := helpers.BeaconCommitteeFromState(ctx, preState, data.Slot, att.GetCommitteeIndex()) + if err != nil { + log.WithError(err).Debug("Failed to get committee from state") + continue + } + + valid, err := validateAttesterData(ctx, att, committee) + if err != nil { + log.WithError(err).Debug("Failed attester data validation") + continue + } + if valid != pubsub.ValidationAccept { + log.Debug("Pending attestation rejected due to invalid data") + continue + } + + var conv ethpb.Att + if att.Version() >= version.Electra { + single, ok := att.(*ethpb.SingleAttestation) + if !ok { + log.Debugf("Wrong type: expected %T, got %T", ðpb.SingleAttestation{}, att) + continue + } + conv = single.ToAttestationElectra(committee) + } else { + conv = att + } + + forBroadcast = append(forBroadcast, att) + forPool = append(forPool, conv) + } + + if len(forPool) == 0 { + return + } + + verified := s.batchVerifyAttestationSignatures(ctx, forPool, preState) + verifiedSet := make(map[ethpb.Att]struct{}, len(verified)) + for _, att := range verified { + verifiedSet[att] = struct{}{} + } + + for i, poolAtt := range forPool { + if _, ok := verifiedSet[poolAtt]; ok { + s.processVerifiedAttestation(ctx, forBroadcast[i], poolAtt, preState) + } + } +} + +// batchVerifyAttestationSignatures attempts batch verification, with individual fallback on failure. +func (s *Service) batchVerifyAttestationSignatures( + ctx context.Context, + attestations []ethpb.Att, + preState state.ReadOnlyBeaconState, +) []ethpb.Att { + const fallbackMsg = "batch verification failed, using individual checks" + + set, err := blocks.AttestationSignatureBatch(ctx, preState, attestations) + if err != nil { + log.WithError(err).Debug(fallbackMsg) + return s.fallbackToIndividualVerification(ctx, attestations, preState) + } + + ok, err := set.Verify() + if err != nil || !ok { + if err != nil { + log.WithError(err).Debug(fallbackMsg) + } else { + log.Debug(fallbackMsg) + } + return s.fallbackToIndividualVerification(ctx, attestations, preState) + } + + return attestations +} + +// fallbackToIndividualVerification verifies each attestation individually if batch verification fails. +func (s *Service) fallbackToIndividualVerification( + ctx context.Context, + attestations []ethpb.Att, + preState state.ReadOnlyBeaconState, +) []ethpb.Att { + verified := make([]ethpb.Att, 0, len(attestations)) + + for _, att := range attestations { + res, err := s.validateUnaggregatedAttWithState(ctx, att, preState) + if err != nil { + log.WithError(err).Debug("Individual signature verification error") + continue + } + if res == pubsub.ValidationAccept { + verified = append(verified, att) + } + } + + return verified +} + +// saveAttestation saves an attestation to the appropriate pool. +func (s *Service) saveAttestation(att ethpb.Att) error { + if features.Get().EnableExperimentalAttestationPool { + return s.cfg.attestationCache.Add(att) + } + return s.cfg.attPool.SaveUnaggregatedAttestation(att) +} + +// processVerifiedAttestation handles a signature-verified attestation. +func (s *Service) processVerifiedAttestation( + ctx context.Context, + broadcastAtt ethpb.Att, + poolAtt ethpb.Att, + preState state.ReadOnlyBeaconState, +) { + data := broadcastAtt.GetData() + + if err := s.saveAttestation(poolAtt); err != nil { + log.WithError(err).Debug("Failed to save unaggregated attestation") + return + } + + if key, err := generateUnaggregatedAttCacheKey(broadcastAtt); err != nil { + log.WithError(err).Error("Failed to generate cache key for attestation tracking") + } else { + s.setSeenUnaggregatedAtt(key) + } + + valCount, err := helpers.ActiveValidatorCount(ctx, preState, slots.ToEpoch(data.Slot)) + if err != nil { + log.WithError(err).Debug("Failed to retrieve active validator count") + return + } + + if err := s.cfg.p2p.BroadcastAttestation(ctx, helpers.ComputeSubnetForAttestation(valCount, broadcastAtt), broadcastAtt); err != nil { + log.WithError(err).Debug("Failed to broadcast attestation") + } + + var ( + eventType feed.EventType + eventData any + ) + + switch { + case broadcastAtt.Version() >= version.Electra: + if sa, ok := broadcastAtt.(*ethpb.SingleAttestation); ok { + eventType = operation.SingleAttReceived + eventData = &operation.SingleAttReceivedData{Attestation: sa} + break + } + fallthrough + default: + eventType = operation.UnaggregatedAttReceived + eventData = &operation.UnAggregatedAttReceivedData{Attestation: broadcastAtt} + } + + // Send event notification + s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{ + Type: eventType, + Data: eventData, + }) } func (s *Service) processAggregate(ctx context.Context, aggregate ethpb.SignedAggregateAttAndProof) { @@ -94,9 +307,10 @@ func (s *Service) processAggregate(ctx context.Context, aggregate ethpb.SignedAg valRes, err := s.validateAggregatedAtt(ctx, aggregate) if err != nil { log.WithError(err).Debug("Pending aggregated attestation failed validation") + return } aggValid := pubsub.ValidationAccept == valRes - if s.validateBlockInAttestation(ctx, aggregate) && aggValid { + if aggValid && s.validateBlockInAttestation(ctx, aggregate) { if features.Get().EnableExperimentalAttestationPool { if err = s.cfg.attestationCache.Add(att); err != nil { log.WithError(err).Debug("Could not save aggregated attestation") @@ -123,114 +337,6 @@ func (s *Service) processAggregate(ctx context.Context, aggregate ethpb.SignedAg } } -func (s *Service) processAtt(ctx context.Context, att ethpb.Att) { - data := att.GetData() - - // This is an important validation before retrieving attestation pre state to defend against - // attestation's target intentionally referencing a checkpoint that's long ago. - if !s.cfg.chain.InForkchoice(bytesutil.ToBytes32(data.BeaconBlockRoot)) { - log.WithError(blockchain.ErrNotDescendantOfFinalized).Debug("Could not verify finalized consistency") - return - } - if err := s.cfg.chain.VerifyLmdFfgConsistency(ctx, att); err != nil { - log.WithError(err).Debug("Could not verify FFG consistency") - return - } - preState, err := s.cfg.chain.AttestationTargetState(ctx, data.Target) - if err != nil { - log.WithError(err).Debug("Could not retrieve attestation prestate") - return - } - committee, err := helpers.BeaconCommitteeFromState(ctx, preState, data.Slot, att.GetCommitteeIndex()) - if err != nil { - log.WithError(err).Debug("Could not retrieve committee from state") - return - } - valid, err := validateAttesterData(ctx, att, committee) - if err != nil { - log.WithError(err).Debug("Could not validate attester data") - return - } else if valid != pubsub.ValidationAccept { - log.Debug("Attestation failed attester data validation") - return - } - - // Decide if the attestation is an Electra SingleAttestation or a Phase0 unaggregated attestation - var ( - attForValidation ethpb.Att - broadcastAtt ethpb.Att - eventType feed.EventType - eventData interface{} - ) - - if att.Version() >= version.Electra { - singleAtt, ok := att.(*ethpb.SingleAttestation) - if !ok { - log.Debugf("Attestation has wrong type (expected %T, got %T)", ðpb.SingleAttestation{}, att) - return - } - // Convert Electra SingleAttestation to unaggregated ElectraAttestation. This is needed because many parts of the codebase assume that attestations have a certain structure and SingleAttestation validates these assumptions. - attForValidation = singleAtt.ToAttestationElectra(committee) - broadcastAtt = singleAtt - eventType = operation.SingleAttReceived - eventData = &operation.SingleAttReceivedData{ - Attestation: singleAtt, - } - } else { - // Phase0 attestation - attForValidation = att - broadcastAtt = att - eventType = operation.UnaggregatedAttReceived - eventData = &operation.UnAggregatedAttReceivedData{ - Attestation: att, - } - } - - valid, err = s.validateUnaggregatedAttWithState(ctx, attForValidation, preState) - if err != nil { - log.WithError(err).Debug("Pending unaggregated attestation failed validation") - return - } - - if valid == pubsub.ValidationAccept { - if features.Get().EnableExperimentalAttestationPool { - if err = s.cfg.attestationCache.Add(attForValidation); err != nil { - log.WithError(err).Debug("Could not save unaggregated attestation") - return - } - } else { - if err := s.cfg.attPool.SaveUnaggregatedAttestation(attForValidation); err != nil { - log.WithError(err).Debug("Could not save unaggregated attestation") - return - } - } - - attKey, err := generateUnaggregatedAttCacheKey(att) - if err != nil { - log.WithError(err).Error("Could not generate cache key for attestation tracking") - } else { - s.setSeenUnaggregatedAtt(attKey) - } - - valCount, err := helpers.ActiveValidatorCount(ctx, preState, slots.ToEpoch(data.Slot)) - if err != nil { - log.WithError(err).Debug("Could not retrieve active validator count") - return - } - - // Broadcast the final 'broadcastAtt' object - if err := s.cfg.p2p.BroadcastAttestation(ctx, helpers.ComputeSubnetForAttestation(valCount, broadcastAtt), broadcastAtt); err != nil { - log.WithError(err).Debug("Could not broadcast") - } - - // Feed event notification for other services - s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{ - Type: eventType, - Data: eventData, - }) - } -} - // This defines how pending aggregates are saved in the map. The key is the // root of the missing block. The value is the list of pending attestations/aggregates // that voted for that block root. The caller of this function is responsible @@ -372,3 +478,29 @@ func (s *Service) validatePendingAtts(ctx context.Context, slot primitives.Slot) } } } + +// bucketAttestationsByData groups attestations by their AttestationData hash. +func bucketAttestationsByData(attestations []ethpb.Att) map[[32]byte]*attestationBucket { + bucketMap := make(map[[32]byte]*attestationBucket) + + for _, att := range attestations { + data := att.GetData() + dataHash, err := data.HashTreeRoot() + if err != nil { + log.WithError(err).Debug("Failed to hash attestation data, skipping attestation") + continue + } + + if bucket, ok := bucketMap[dataHash]; ok { + bucket.attestations = append(bucket.attestations, att) + } else { + bucketMap[dataHash] = &attestationBucket{ + dataHash: dataHash, + data: data, + attestations: []ethpb.Att{att}, + } + } + } + + return bucketMap +} diff --git a/beacon-chain/sync/pending_attestations_queue_bucket_test.go b/beacon-chain/sync/pending_attestations_queue_bucket_test.go new file mode 100644 index 0000000000..59b72e71c1 --- /dev/null +++ b/beacon-chain/sync/pending_attestations_queue_bucket_test.go @@ -0,0 +1,417 @@ +package sync + +import ( + "context" + "testing" + + mockChain "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing" + "github.com/OffchainLabs/prysm/v6/encoding/bytesutil" + ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" + "github.com/OffchainLabs/prysm/v6/testing/assert" + "github.com/OffchainLabs/prysm/v6/testing/require" + "github.com/OffchainLabs/prysm/v6/testing/util" + logTest "github.com/sirupsen/logrus/hooks/test" +) + +func TestProcessAttestationBucket(t *testing.T) { + t.Run("EmptyBucket", func(t *testing.T) { + hook := logTest.NewGlobal() + s := &Service{} + + s.processAttestationBucket(context.Background(), nil) + + emptyBucket := &attestationBucket{ + attestations: []ethpb.Att{}, + } + s.processAttestationBucket(context.Background(), emptyBucket) + + require.Equal(t, 0, len(hook.Entries), "Should not log any messages for empty buckets") + }) + + t.Run("ForkchoiceFailure", func(t *testing.T) { + hook := logTest.NewGlobal() + chainService := &mockChain.ChainService{ + NotFinalized: true, // This makes InForkchoice return false + } + + s := &Service{ + cfg: &config{ + chain: chainService, + }, + } + + attData := ðpb.AttestationData{ + BeaconBlockRoot: bytesutil.PadTo([]byte("blockroot"), 32), + } + + bucket := &attestationBucket{ + data: attData, + attestations: []ethpb.Att{util.NewAttestation()}, + } + + s.processAttestationBucket(context.Background(), bucket) + + require.Equal(t, 1, len(hook.Entries)) + assert.StringContains(t, "Failed forkchoice check for bucket", hook.LastEntry().Message) + require.NotNil(t, hook.LastEntry().Data["error"]) + }) + + t.Run("CommitteeFailure", func(t *testing.T) { + hook := logTest.NewGlobal() + beaconState, err := util.NewBeaconState() + require.NoError(t, err) + require.NoError(t, beaconState.SetSlot(1)) + + chainService := &mockChain.ChainService{ + State: beaconState, + ValidAttestation: true, + } + + s := &Service{ + cfg: &config{ + chain: chainService, + }, + } + + attData := ðpb.AttestationData{ + BeaconBlockRoot: bytesutil.PadTo([]byte("blockroot"), 32), + Target: ðpb.Checkpoint{ + Epoch: 1, + Root: bytesutil.PadTo([]byte("blockroot"), 32), + }, + CommitteeIndex: 999999, + } + + att := util.NewAttestation() + att.Data = attData + + bucket := &attestationBucket{ + data: attData, + attestations: []ethpb.Att{att}, + } + + s.processAttestationBucket(context.Background(), bucket) + + require.Equal(t, 1, len(hook.Entries)) + assert.StringContains(t, "Failed to get committee from state", hook.LastEntry().Message) + }) + + t.Run("FFGConsistencyFailure", func(t *testing.T) { + hook := logTest.NewGlobal() + + validators := make([]*ethpb.Validator, 64) + for i := range validators { + validators[i] = ðpb.Validator{ + ExitEpoch: 1000000, + EffectiveBalance: 32000000000, + } + } + + beaconState, err := util.NewBeaconState() + require.NoError(t, err) + require.NoError(t, beaconState.SetSlot(1)) + require.NoError(t, beaconState.SetValidators(validators)) + + chainService := &mockChain.ChainService{ + State: beaconState, + } + + s := &Service{ + cfg: &config{ + chain: chainService, + }, + } + + attData := ðpb.AttestationData{ + BeaconBlockRoot: bytesutil.PadTo([]byte("blockroot"), 32), + Target: ðpb.Checkpoint{ + Epoch: 1, + Root: bytesutil.PadTo([]byte("different_target"), 32), // Different from BeaconBlockRoot to trigger FFG failure + }, + } + + att := util.NewAttestation() + att.Data = attData + + bucket := &attestationBucket{ + data: attData, + attestations: []ethpb.Att{att}, + } + + s.processAttestationBucket(context.Background(), bucket) + + require.Equal(t, 1, len(hook.Entries)) + assert.StringContains(t, "Failed FFG consistency check for bucket", hook.LastEntry().Message) + }) + + t.Run("ProcessingSuccess", func(t *testing.T) { + hook := logTest.NewGlobal() + validators := make([]*ethpb.Validator, 64) + for i := range validators { + validators[i] = ðpb.Validator{ + ExitEpoch: 1000000, + EffectiveBalance: 32000000000, + } + } + + beaconState, err := util.NewBeaconState() + require.NoError(t, err) + require.NoError(t, beaconState.SetSlot(1)) + require.NoError(t, beaconState.SetValidators(validators)) + + chainService := &mockChain.ChainService{ + State: beaconState, + ValidAttestation: true, + } + + s := &Service{ + cfg: &config{ + chain: chainService, + }, + } + + // Test with Phase0 attestation + t.Run("Phase0_NoError", func(t *testing.T) { + hook.Reset() // Reset logs before test + phase0Att := util.NewAttestation() + phase0Att.Data.Slot = 1 + phase0Att.Data.CommitteeIndex = 0 + + bucket := &attestationBucket{ + data: phase0Att.GetData(), + attestations: []ethpb.Att{phase0Att}, + } + + s.processAttestationBucket(context.Background(), bucket) + }) + + // Test with SingleAttestation + t.Run("Electra_NoError", func(t *testing.T) { + hook.Reset() // Reset logs before test + attData := ðpb.AttestationData{ + Slot: 1, + CommitteeIndex: 0, + BeaconBlockRoot: bytesutil.PadTo([]byte("blockroot"), 32), + Source: ðpb.Checkpoint{ + Epoch: 0, + Root: bytesutil.PadTo([]byte("source"), 32), + }, + Target: ðpb.Checkpoint{ + Epoch: 1, + Root: bytesutil.PadTo([]byte("blockroot"), 32), // Same as BeaconBlockRoot for LMD/FFG consistency + }, + } + + singleAtt := ðpb.SingleAttestation{ + CommitteeId: 0, + AttesterIndex: 0, + Data: attData, + Signature: make([]byte, 96), + } + + bucket := &attestationBucket{ + data: singleAtt.GetData(), + attestations: []ethpb.Att{singleAtt}, + } + + s.processAttestationBucket(context.Background(), bucket) + }) + }) +} + +func TestBucketAttestationsByData(t *testing.T) { + t.Run("EmptyInput", func(t *testing.T) { + hook := logTest.NewGlobal() + buckets := bucketAttestationsByData(nil) + require.Equal(t, 0, len(buckets)) + require.Equal(t, 0, len(hook.Entries)) + + buckets = bucketAttestationsByData([]ethpb.Att{}) + require.Equal(t, 0, len(buckets)) + require.Equal(t, 0, len(hook.Entries)) + }) + + t.Run("SingleAttestation", func(t *testing.T) { + hook := logTest.NewGlobal() + att := util.NewAttestation() + att.Data.Slot = 1 + att.Data.CommitteeIndex = 0 + + buckets := bucketAttestationsByData([]ethpb.Att{att}) + + require.Equal(t, 1, len(buckets)) + var bucket *attestationBucket + for _, b := range buckets { + bucket = b + break + } + require.NotNil(t, bucket) + require.Equal(t, 1, len(bucket.attestations)) + require.Equal(t, att, bucket.attestations[0]) + require.Equal(t, att.GetData(), bucket.data) + require.Equal(t, 0, len(hook.Entries)) + }) + + t.Run("MultipleAttestationsSameData", func(t *testing.T) { + hook := logTest.NewGlobal() + + att1 := util.NewAttestation() + att1.Data.Slot = 1 + att1.Data.CommitteeIndex = 0 + + att2 := util.NewAttestation() + att2.Data = att1.Data // Same data + att2.Signature = make([]byte, 96) // Different signature + + buckets := bucketAttestationsByData([]ethpb.Att{att1, att2}) + + require.Equal(t, 1, len(buckets), "Should have one bucket for same data") + var bucket *attestationBucket + for _, b := range buckets { + bucket = b + break + } + require.NotNil(t, bucket) + require.Equal(t, 2, len(bucket.attestations), "Should have both attestations in one bucket") + require.Equal(t, att1.GetData(), bucket.data) + require.Equal(t, 0, len(hook.Entries)) + }) + + t.Run("MultipleAttestationsDifferentData", func(t *testing.T) { + hook := logTest.NewGlobal() + + att1 := util.NewAttestation() + att1.Data.Slot = 1 + att1.Data.CommitteeIndex = 0 + + att2 := util.NewAttestation() + att2.Data.Slot = 2 // Different slot + att2.Data.CommitteeIndex = 1 + + buckets := bucketAttestationsByData([]ethpb.Att{att1, att2}) + + require.Equal(t, 2, len(buckets), "Should have two buckets for different data") + bucketCount := 0 + for _, bucket := range buckets { + require.Equal(t, 1, len(bucket.attestations), "Each bucket should have one attestation") + bucketCount++ + } + require.Equal(t, 2, bucketCount, "Should have exactly two buckets") + require.Equal(t, 0, len(hook.Entries)) + }) + + t.Run("MixedAttestationTypes", func(t *testing.T) { + hook := logTest.NewGlobal() + + // Create Phase0 attestation + phase0Att := util.NewAttestation() + phase0Att.Data.Slot = 1 + phase0Att.Data.CommitteeIndex = 0 + + electraAtt := ðpb.SingleAttestation{ + CommitteeId: 0, + AttesterIndex: 1, + Data: phase0Att.Data, // Same data + Signature: make([]byte, 96), + } + + buckets := bucketAttestationsByData([]ethpb.Att{phase0Att, electraAtt}) + + require.Equal(t, 1, len(buckets), "Should have one bucket for same data") + var bucket *attestationBucket + for _, b := range buckets { + bucket = b + break + } + require.NotNil(t, bucket) + require.Equal(t, 2, len(bucket.attestations), "Should have both attestations in one bucket") + require.Equal(t, phase0Att.GetData(), bucket.data) + require.Equal(t, 0, len(hook.Entries)) + }) +} + +func TestBatchVerifyAttestationSignatures(t *testing.T) { + t.Run("EmptyInput", func(t *testing.T) { + s := &Service{} + + beaconState, err := util.NewBeaconState() + require.NoError(t, err) + + result := s.batchVerifyAttestationSignatures(context.Background(), []ethpb.Att{}, beaconState) + + // Empty input should return empty output + require.Equal(t, 0, len(result)) + }) + + t.Run("BatchVerificationWithState", func(t *testing.T) { + hook := logTest.NewGlobal() + validators := make([]*ethpb.Validator, 64) + for i := range validators { + validators[i] = ðpb.Validator{ + ExitEpoch: 1000000, + EffectiveBalance: 32000000000, + } + } + + beaconState, err := util.NewBeaconState() + require.NoError(t, err) + require.NoError(t, beaconState.SetSlot(1)) + require.NoError(t, beaconState.SetValidators(validators)) + + s := &Service{} + + att := util.NewAttestation() + att.Data.Slot = 1 + attestations := []ethpb.Att{att} + + result := s.batchVerifyAttestationSignatures(context.Background(), attestations, beaconState) + require.NotNil(t, result) + + if len(result) == 0 && len(hook.Entries) > 0 { + _ = false // Check if fallback message is logged + for _, entry := range hook.Entries { + if entry.Message == "batch verification failed, using individual checks" { + _ = true // Found the fallback message + break + } + } + // It's OK if fallback message is logged - this means the function is working correctly + } + }) + + t.Run("BatchVerificationFailureFallbackToIndividual", func(t *testing.T) { + hook := logTest.NewGlobal() + beaconState, err := util.NewBeaconState() + require.NoError(t, err) + require.NoError(t, beaconState.SetSlot(1)) + + chainService := &mockChain.ChainService{ + State: beaconState, + ValidAttestation: false, // This will cause verification to fail + } + + s := &Service{ + cfg: &config{ + chain: chainService, + }, + } + + att := util.NewAttestation() + att.Data.Slot = 1 + attestations := []ethpb.Att{att} + + result := s.batchVerifyAttestationSignatures(context.Background(), attestations, beaconState) + + require.Equal(t, 0, len(result)) + + require.NotEqual(t, 0, len(hook.Entries), "Should have log entries") + found := false + for _, entry := range hook.Entries { + if entry.Message == "batch verification failed, using individual checks" { + found = true + break + } + } + require.Equal(t, true, found, "Should log fallback message") + }) +} diff --git a/changelog/ttsao_optimize-attestation-batching.md b/changelog/ttsao_optimize-attestation-batching.md new file mode 100644 index 0000000000..a03c9039c0 --- /dev/null +++ b/changelog/ttsao_optimize-attestation-batching.md @@ -0,0 +1,3 @@ +### Changed + +- Optimize pending attestation processing by adding batching