mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 23:48:06 -05:00
Optimize pending attestation processing by adding batching (#15801)
* Optimize pending attestation processing by adding batching * Update beacon-chain/sync/pending_attestations_queue.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * Update beacon-chain/sync/pending_attestations_queue.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * Add root for debug * Change it to map * Dont need receiver * Use two slices --------- Co-authored-by: Radosław Kapka <rkapka@wp.pl>
This commit is contained in:
@@ -174,6 +174,7 @@ go_test(
|
|||||||
"fork_watcher_test.go",
|
"fork_watcher_test.go",
|
||||||
"kzg_batch_verifier_test.go",
|
"kzg_batch_verifier_test.go",
|
||||||
"once_test.go",
|
"once_test.go",
|
||||||
|
"pending_attestations_queue_bucket_test.go",
|
||||||
"pending_attestations_queue_test.go",
|
"pending_attestations_queue_test.go",
|
||||||
"pending_blocks_queue_test.go",
|
"pending_blocks_queue_test.go",
|
||||||
"rate_limiter_test.go",
|
"rate_limiter_test.go",
|
||||||
|
|||||||
@@ -7,9 +7,11 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain"
|
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain"
|
||||||
|
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/blocks"
|
||||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed"
|
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed"
|
||||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/operation"
|
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/operation"
|
||||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers"
|
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers"
|
||||||
|
"github.com/OffchainLabs/prysm/v6/beacon-chain/state"
|
||||||
"github.com/OffchainLabs/prysm/v6/config/features"
|
"github.com/OffchainLabs/prysm/v6/config/features"
|
||||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||||
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||||
@@ -18,6 +20,7 @@ import (
|
|||||||
"github.com/OffchainLabs/prysm/v6/monitoring/tracing/trace"
|
"github.com/OffchainLabs/prysm/v6/monitoring/tracing/trace"
|
||||||
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
|
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
|
||||||
"github.com/OffchainLabs/prysm/v6/runtime/version"
|
"github.com/OffchainLabs/prysm/v6/runtime/version"
|
||||||
|
"github.com/OffchainLabs/prysm/v6/time"
|
||||||
"github.com/OffchainLabs/prysm/v6/time/slots"
|
"github.com/OffchainLabs/prysm/v6/time/slots"
|
||||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
@@ -47,12 +50,16 @@ func (s *Service) processPendingAttsForBlock(ctx context.Context, bRoot [32]byte
|
|||||||
s.pendingAttsLock.RUnlock()
|
s.pendingAttsLock.RUnlock()
|
||||||
|
|
||||||
if len(attestations) > 0 {
|
if len(attestations) > 0 {
|
||||||
|
start := time.Now()
|
||||||
s.processAttestations(ctx, attestations)
|
s.processAttestations(ctx, attestations)
|
||||||
|
duration := time.Since(start)
|
||||||
log.WithFields(logrus.Fields{
|
log.WithFields(logrus.Fields{
|
||||||
"blockRoot": hex.EncodeToString(bytesutil.Trunc(bRoot[:])),
|
"blockRoot": hex.EncodeToString(bytesutil.Trunc(bRoot[:])),
|
||||||
"pendingAttsCount": len(attestations),
|
"pendingAttsCount": len(attestations),
|
||||||
|
"duration": duration,
|
||||||
}).Debug("Verified and saved pending attestations to pool")
|
}).Debug("Verified and saved pending attestations to pool")
|
||||||
}
|
}
|
||||||
|
|
||||||
randGen := rand.NewGenerator()
|
randGen := rand.NewGenerator()
|
||||||
// Delete the missing block root key from pending attestation queue so a node will not request for the block again.
|
// Delete the missing block root key from pending attestation queue so a node will not request for the block again.
|
||||||
s.pendingAttsLock.Lock()
|
s.pendingAttsLock.Lock()
|
||||||
@@ -72,18 +79,224 @@ func (s *Service) processPendingAttsForBlock(ctx context.Context, bRoot [32]byte
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) processAttestations(ctx context.Context, attestations []any) {
|
func (s *Service) processAttestations(ctx context.Context, attestations []any) {
|
||||||
for _, signedAtt := range attestations {
|
if len(attestations) == 0 {
|
||||||
// The pending attestations can arrive as both aggregates and attestations,
|
return
|
||||||
// and each form has to be processed differently.
|
}
|
||||||
switch t := signedAtt.(type) {
|
|
||||||
|
atts := make([]ethpb.Att, 0, len(attestations))
|
||||||
|
for _, att := range attestations {
|
||||||
|
switch v := att.(type) {
|
||||||
case ethpb.Att:
|
case ethpb.Att:
|
||||||
s.processAtt(ctx, t)
|
atts = append(atts, v)
|
||||||
case ethpb.SignedAggregateAttAndProof:
|
case ethpb.SignedAggregateAttAndProof:
|
||||||
s.processAggregate(ctx, t)
|
s.processAggregate(ctx, v)
|
||||||
default:
|
default:
|
||||||
log.Warnf("Unexpected item of type %T in pending attestation queue. Item will not be processed", t)
|
log.Warnf("Unexpected attestation type %T, skipping", v)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, bucket := range bucketAttestationsByData(atts) {
|
||||||
|
s.processAttestationBucket(ctx, bucket)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// attestationBucket groups attestations with the same AttestationData for batch processing.
|
||||||
|
type attestationBucket struct {
|
||||||
|
dataHash [32]byte
|
||||||
|
data *ethpb.AttestationData
|
||||||
|
attestations []ethpb.Att
|
||||||
|
}
|
||||||
|
|
||||||
|
// processAttestationBucket processes a bucket of attestations with shared AttestationData.
|
||||||
|
func (s *Service) processAttestationBucket(ctx context.Context, bucket *attestationBucket) {
|
||||||
|
if bucket == nil || len(bucket.attestations) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
data := bucket.data
|
||||||
|
|
||||||
|
// Shared validations for the entire bucket.
|
||||||
|
if !s.cfg.chain.InForkchoice(bytesutil.ToBytes32(data.BeaconBlockRoot)) {
|
||||||
|
log.WithError(blockchain.ErrNotDescendantOfFinalized).WithField("root", fmt.Sprintf("%#x", data.BeaconBlockRoot)).Debug("Failed forkchoice check for bucket")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
preState, err := s.cfg.chain.AttestationTargetState(ctx, data.Target)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Debug("Failed to get attestation prestate for bucket")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.cfg.chain.VerifyLmdFfgConsistency(ctx, bucket.attestations[0]); err != nil {
|
||||||
|
log.WithError(err).Debug("Failed FFG consistency check for bucket")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Collect valid attestations for both single and electra formats.
|
||||||
|
// Broadcast takes single format but attestation pool and batch signature verification take electra format.
|
||||||
|
forBroadcast := make([]ethpb.Att, 0, len(bucket.attestations))
|
||||||
|
forPool := make([]ethpb.Att, 0, len(bucket.attestations))
|
||||||
|
|
||||||
|
for _, att := range bucket.attestations {
|
||||||
|
committee, err := helpers.BeaconCommitteeFromState(ctx, preState, data.Slot, att.GetCommitteeIndex())
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Debug("Failed to get committee from state")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
valid, err := validateAttesterData(ctx, att, committee)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Debug("Failed attester data validation")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if valid != pubsub.ValidationAccept {
|
||||||
|
log.Debug("Pending attestation rejected due to invalid data")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var conv ethpb.Att
|
||||||
|
if att.Version() >= version.Electra {
|
||||||
|
single, ok := att.(*ethpb.SingleAttestation)
|
||||||
|
if !ok {
|
||||||
|
log.Debugf("Wrong type: expected %T, got %T", ðpb.SingleAttestation{}, att)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
conv = single.ToAttestationElectra(committee)
|
||||||
|
} else {
|
||||||
|
conv = att
|
||||||
|
}
|
||||||
|
|
||||||
|
forBroadcast = append(forBroadcast, att)
|
||||||
|
forPool = append(forPool, conv)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(forPool) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
verified := s.batchVerifyAttestationSignatures(ctx, forPool, preState)
|
||||||
|
verifiedSet := make(map[ethpb.Att]struct{}, len(verified))
|
||||||
|
for _, att := range verified {
|
||||||
|
verifiedSet[att] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, poolAtt := range forPool {
|
||||||
|
if _, ok := verifiedSet[poolAtt]; ok {
|
||||||
|
s.processVerifiedAttestation(ctx, forBroadcast[i], poolAtt, preState)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// batchVerifyAttestationSignatures attempts batch verification, with individual fallback on failure.
|
||||||
|
func (s *Service) batchVerifyAttestationSignatures(
|
||||||
|
ctx context.Context,
|
||||||
|
attestations []ethpb.Att,
|
||||||
|
preState state.ReadOnlyBeaconState,
|
||||||
|
) []ethpb.Att {
|
||||||
|
const fallbackMsg = "batch verification failed, using individual checks"
|
||||||
|
|
||||||
|
set, err := blocks.AttestationSignatureBatch(ctx, preState, attestations)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Debug(fallbackMsg)
|
||||||
|
return s.fallbackToIndividualVerification(ctx, attestations, preState)
|
||||||
|
}
|
||||||
|
|
||||||
|
ok, err := set.Verify()
|
||||||
|
if err != nil || !ok {
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Debug(fallbackMsg)
|
||||||
|
} else {
|
||||||
|
log.Debug(fallbackMsg)
|
||||||
|
}
|
||||||
|
return s.fallbackToIndividualVerification(ctx, attestations, preState)
|
||||||
|
}
|
||||||
|
|
||||||
|
return attestations
|
||||||
|
}
|
||||||
|
|
||||||
|
// fallbackToIndividualVerification verifies each attestation individually if batch verification fails.
|
||||||
|
func (s *Service) fallbackToIndividualVerification(
|
||||||
|
ctx context.Context,
|
||||||
|
attestations []ethpb.Att,
|
||||||
|
preState state.ReadOnlyBeaconState,
|
||||||
|
) []ethpb.Att {
|
||||||
|
verified := make([]ethpb.Att, 0, len(attestations))
|
||||||
|
|
||||||
|
for _, att := range attestations {
|
||||||
|
res, err := s.validateUnaggregatedAttWithState(ctx, att, preState)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Debug("Individual signature verification error")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if res == pubsub.ValidationAccept {
|
||||||
|
verified = append(verified, att)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return verified
|
||||||
|
}
|
||||||
|
|
||||||
|
// saveAttestation saves an attestation to the appropriate pool.
|
||||||
|
func (s *Service) saveAttestation(att ethpb.Att) error {
|
||||||
|
if features.Get().EnableExperimentalAttestationPool {
|
||||||
|
return s.cfg.attestationCache.Add(att)
|
||||||
|
}
|
||||||
|
return s.cfg.attPool.SaveUnaggregatedAttestation(att)
|
||||||
|
}
|
||||||
|
|
||||||
|
// processVerifiedAttestation handles a signature-verified attestation.
|
||||||
|
func (s *Service) processVerifiedAttestation(
|
||||||
|
ctx context.Context,
|
||||||
|
broadcastAtt ethpb.Att,
|
||||||
|
poolAtt ethpb.Att,
|
||||||
|
preState state.ReadOnlyBeaconState,
|
||||||
|
) {
|
||||||
|
data := broadcastAtt.GetData()
|
||||||
|
|
||||||
|
if err := s.saveAttestation(poolAtt); err != nil {
|
||||||
|
log.WithError(err).Debug("Failed to save unaggregated attestation")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if key, err := generateUnaggregatedAttCacheKey(broadcastAtt); err != nil {
|
||||||
|
log.WithError(err).Error("Failed to generate cache key for attestation tracking")
|
||||||
|
} else {
|
||||||
|
s.setSeenUnaggregatedAtt(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
valCount, err := helpers.ActiveValidatorCount(ctx, preState, slots.ToEpoch(data.Slot))
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Debug("Failed to retrieve active validator count")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.cfg.p2p.BroadcastAttestation(ctx, helpers.ComputeSubnetForAttestation(valCount, broadcastAtt), broadcastAtt); err != nil {
|
||||||
|
log.WithError(err).Debug("Failed to broadcast attestation")
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
eventType feed.EventType
|
||||||
|
eventData any
|
||||||
|
)
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case broadcastAtt.Version() >= version.Electra:
|
||||||
|
if sa, ok := broadcastAtt.(*ethpb.SingleAttestation); ok {
|
||||||
|
eventType = operation.SingleAttReceived
|
||||||
|
eventData = &operation.SingleAttReceivedData{Attestation: sa}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
fallthrough
|
||||||
|
default:
|
||||||
|
eventType = operation.UnaggregatedAttReceived
|
||||||
|
eventData = &operation.UnAggregatedAttReceivedData{Attestation: broadcastAtt}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send event notification
|
||||||
|
s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{
|
||||||
|
Type: eventType,
|
||||||
|
Data: eventData,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) processAggregate(ctx context.Context, aggregate ethpb.SignedAggregateAttAndProof) {
|
func (s *Service) processAggregate(ctx context.Context, aggregate ethpb.SignedAggregateAttAndProof) {
|
||||||
@@ -94,9 +307,10 @@ func (s *Service) processAggregate(ctx context.Context, aggregate ethpb.SignedAg
|
|||||||
valRes, err := s.validateAggregatedAtt(ctx, aggregate)
|
valRes, err := s.validateAggregatedAtt(ctx, aggregate)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Debug("Pending aggregated attestation failed validation")
|
log.WithError(err).Debug("Pending aggregated attestation failed validation")
|
||||||
|
return
|
||||||
}
|
}
|
||||||
aggValid := pubsub.ValidationAccept == valRes
|
aggValid := pubsub.ValidationAccept == valRes
|
||||||
if s.validateBlockInAttestation(ctx, aggregate) && aggValid {
|
if aggValid && s.validateBlockInAttestation(ctx, aggregate) {
|
||||||
if features.Get().EnableExperimentalAttestationPool {
|
if features.Get().EnableExperimentalAttestationPool {
|
||||||
if err = s.cfg.attestationCache.Add(att); err != nil {
|
if err = s.cfg.attestationCache.Add(att); err != nil {
|
||||||
log.WithError(err).Debug("Could not save aggregated attestation")
|
log.WithError(err).Debug("Could not save aggregated attestation")
|
||||||
@@ -123,114 +337,6 @@ func (s *Service) processAggregate(ctx context.Context, aggregate ethpb.SignedAg
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) processAtt(ctx context.Context, att ethpb.Att) {
|
|
||||||
data := att.GetData()
|
|
||||||
|
|
||||||
// This is an important validation before retrieving attestation pre state to defend against
|
|
||||||
// attestation's target intentionally referencing a checkpoint that's long ago.
|
|
||||||
if !s.cfg.chain.InForkchoice(bytesutil.ToBytes32(data.BeaconBlockRoot)) {
|
|
||||||
log.WithError(blockchain.ErrNotDescendantOfFinalized).Debug("Could not verify finalized consistency")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := s.cfg.chain.VerifyLmdFfgConsistency(ctx, att); err != nil {
|
|
||||||
log.WithError(err).Debug("Could not verify FFG consistency")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
preState, err := s.cfg.chain.AttestationTargetState(ctx, data.Target)
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Debug("Could not retrieve attestation prestate")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
committee, err := helpers.BeaconCommitteeFromState(ctx, preState, data.Slot, att.GetCommitteeIndex())
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Debug("Could not retrieve committee from state")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
valid, err := validateAttesterData(ctx, att, committee)
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Debug("Could not validate attester data")
|
|
||||||
return
|
|
||||||
} else if valid != pubsub.ValidationAccept {
|
|
||||||
log.Debug("Attestation failed attester data validation")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Decide if the attestation is an Electra SingleAttestation or a Phase0 unaggregated attestation
|
|
||||||
var (
|
|
||||||
attForValidation ethpb.Att
|
|
||||||
broadcastAtt ethpb.Att
|
|
||||||
eventType feed.EventType
|
|
||||||
eventData interface{}
|
|
||||||
)
|
|
||||||
|
|
||||||
if att.Version() >= version.Electra {
|
|
||||||
singleAtt, ok := att.(*ethpb.SingleAttestation)
|
|
||||||
if !ok {
|
|
||||||
log.Debugf("Attestation has wrong type (expected %T, got %T)", ðpb.SingleAttestation{}, att)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// Convert Electra SingleAttestation to unaggregated ElectraAttestation. This is needed because many parts of the codebase assume that attestations have a certain structure and SingleAttestation validates these assumptions.
|
|
||||||
attForValidation = singleAtt.ToAttestationElectra(committee)
|
|
||||||
broadcastAtt = singleAtt
|
|
||||||
eventType = operation.SingleAttReceived
|
|
||||||
eventData = &operation.SingleAttReceivedData{
|
|
||||||
Attestation: singleAtt,
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Phase0 attestation
|
|
||||||
attForValidation = att
|
|
||||||
broadcastAtt = att
|
|
||||||
eventType = operation.UnaggregatedAttReceived
|
|
||||||
eventData = &operation.UnAggregatedAttReceivedData{
|
|
||||||
Attestation: att,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
valid, err = s.validateUnaggregatedAttWithState(ctx, attForValidation, preState)
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Debug("Pending unaggregated attestation failed validation")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if valid == pubsub.ValidationAccept {
|
|
||||||
if features.Get().EnableExperimentalAttestationPool {
|
|
||||||
if err = s.cfg.attestationCache.Add(attForValidation); err != nil {
|
|
||||||
log.WithError(err).Debug("Could not save unaggregated attestation")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if err := s.cfg.attPool.SaveUnaggregatedAttestation(attForValidation); err != nil {
|
|
||||||
log.WithError(err).Debug("Could not save unaggregated attestation")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
attKey, err := generateUnaggregatedAttCacheKey(att)
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Error("Could not generate cache key for attestation tracking")
|
|
||||||
} else {
|
|
||||||
s.setSeenUnaggregatedAtt(attKey)
|
|
||||||
}
|
|
||||||
|
|
||||||
valCount, err := helpers.ActiveValidatorCount(ctx, preState, slots.ToEpoch(data.Slot))
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Debug("Could not retrieve active validator count")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Broadcast the final 'broadcastAtt' object
|
|
||||||
if err := s.cfg.p2p.BroadcastAttestation(ctx, helpers.ComputeSubnetForAttestation(valCount, broadcastAtt), broadcastAtt); err != nil {
|
|
||||||
log.WithError(err).Debug("Could not broadcast")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Feed event notification for other services
|
|
||||||
s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{
|
|
||||||
Type: eventType,
|
|
||||||
Data: eventData,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// This defines how pending aggregates are saved in the map. The key is the
|
// This defines how pending aggregates are saved in the map. The key is the
|
||||||
// root of the missing block. The value is the list of pending attestations/aggregates
|
// root of the missing block. The value is the list of pending attestations/aggregates
|
||||||
// that voted for that block root. The caller of this function is responsible
|
// that voted for that block root. The caller of this function is responsible
|
||||||
@@ -372,3 +478,29 @@ func (s *Service) validatePendingAtts(ctx context.Context, slot primitives.Slot)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// bucketAttestationsByData groups attestations by their AttestationData hash.
|
||||||
|
func bucketAttestationsByData(attestations []ethpb.Att) map[[32]byte]*attestationBucket {
|
||||||
|
bucketMap := make(map[[32]byte]*attestationBucket)
|
||||||
|
|
||||||
|
for _, att := range attestations {
|
||||||
|
data := att.GetData()
|
||||||
|
dataHash, err := data.HashTreeRoot()
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Debug("Failed to hash attestation data, skipping attestation")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if bucket, ok := bucketMap[dataHash]; ok {
|
||||||
|
bucket.attestations = append(bucket.attestations, att)
|
||||||
|
} else {
|
||||||
|
bucketMap[dataHash] = &attestationBucket{
|
||||||
|
dataHash: dataHash,
|
||||||
|
data: data,
|
||||||
|
attestations: []ethpb.Att{att},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return bucketMap
|
||||||
|
}
|
||||||
|
|||||||
417
beacon-chain/sync/pending_attestations_queue_bucket_test.go
Normal file
417
beacon-chain/sync/pending_attestations_queue_bucket_test.go
Normal file
@@ -0,0 +1,417 @@
|
|||||||
|
package sync
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
mockChain "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
|
||||||
|
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
|
||||||
|
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
|
||||||
|
"github.com/OffchainLabs/prysm/v6/testing/assert"
|
||||||
|
"github.com/OffchainLabs/prysm/v6/testing/require"
|
||||||
|
"github.com/OffchainLabs/prysm/v6/testing/util"
|
||||||
|
logTest "github.com/sirupsen/logrus/hooks/test"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestProcessAttestationBucket(t *testing.T) {
|
||||||
|
t.Run("EmptyBucket", func(t *testing.T) {
|
||||||
|
hook := logTest.NewGlobal()
|
||||||
|
s := &Service{}
|
||||||
|
|
||||||
|
s.processAttestationBucket(context.Background(), nil)
|
||||||
|
|
||||||
|
emptyBucket := &attestationBucket{
|
||||||
|
attestations: []ethpb.Att{},
|
||||||
|
}
|
||||||
|
s.processAttestationBucket(context.Background(), emptyBucket)
|
||||||
|
|
||||||
|
require.Equal(t, 0, len(hook.Entries), "Should not log any messages for empty buckets")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("ForkchoiceFailure", func(t *testing.T) {
|
||||||
|
hook := logTest.NewGlobal()
|
||||||
|
chainService := &mockChain.ChainService{
|
||||||
|
NotFinalized: true, // This makes InForkchoice return false
|
||||||
|
}
|
||||||
|
|
||||||
|
s := &Service{
|
||||||
|
cfg: &config{
|
||||||
|
chain: chainService,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
attData := ðpb.AttestationData{
|
||||||
|
BeaconBlockRoot: bytesutil.PadTo([]byte("blockroot"), 32),
|
||||||
|
}
|
||||||
|
|
||||||
|
bucket := &attestationBucket{
|
||||||
|
data: attData,
|
||||||
|
attestations: []ethpb.Att{util.NewAttestation()},
|
||||||
|
}
|
||||||
|
|
||||||
|
s.processAttestationBucket(context.Background(), bucket)
|
||||||
|
|
||||||
|
require.Equal(t, 1, len(hook.Entries))
|
||||||
|
assert.StringContains(t, "Failed forkchoice check for bucket", hook.LastEntry().Message)
|
||||||
|
require.NotNil(t, hook.LastEntry().Data["error"])
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("CommitteeFailure", func(t *testing.T) {
|
||||||
|
hook := logTest.NewGlobal()
|
||||||
|
beaconState, err := util.NewBeaconState()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, beaconState.SetSlot(1))
|
||||||
|
|
||||||
|
chainService := &mockChain.ChainService{
|
||||||
|
State: beaconState,
|
||||||
|
ValidAttestation: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
s := &Service{
|
||||||
|
cfg: &config{
|
||||||
|
chain: chainService,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
attData := ðpb.AttestationData{
|
||||||
|
BeaconBlockRoot: bytesutil.PadTo([]byte("blockroot"), 32),
|
||||||
|
Target: ðpb.Checkpoint{
|
||||||
|
Epoch: 1,
|
||||||
|
Root: bytesutil.PadTo([]byte("blockroot"), 32),
|
||||||
|
},
|
||||||
|
CommitteeIndex: 999999,
|
||||||
|
}
|
||||||
|
|
||||||
|
att := util.NewAttestation()
|
||||||
|
att.Data = attData
|
||||||
|
|
||||||
|
bucket := &attestationBucket{
|
||||||
|
data: attData,
|
||||||
|
attestations: []ethpb.Att{att},
|
||||||
|
}
|
||||||
|
|
||||||
|
s.processAttestationBucket(context.Background(), bucket)
|
||||||
|
|
||||||
|
require.Equal(t, 1, len(hook.Entries))
|
||||||
|
assert.StringContains(t, "Failed to get committee from state", hook.LastEntry().Message)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("FFGConsistencyFailure", func(t *testing.T) {
|
||||||
|
hook := logTest.NewGlobal()
|
||||||
|
|
||||||
|
validators := make([]*ethpb.Validator, 64)
|
||||||
|
for i := range validators {
|
||||||
|
validators[i] = ðpb.Validator{
|
||||||
|
ExitEpoch: 1000000,
|
||||||
|
EffectiveBalance: 32000000000,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
beaconState, err := util.NewBeaconState()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, beaconState.SetSlot(1))
|
||||||
|
require.NoError(t, beaconState.SetValidators(validators))
|
||||||
|
|
||||||
|
chainService := &mockChain.ChainService{
|
||||||
|
State: beaconState,
|
||||||
|
}
|
||||||
|
|
||||||
|
s := &Service{
|
||||||
|
cfg: &config{
|
||||||
|
chain: chainService,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
attData := ðpb.AttestationData{
|
||||||
|
BeaconBlockRoot: bytesutil.PadTo([]byte("blockroot"), 32),
|
||||||
|
Target: ðpb.Checkpoint{
|
||||||
|
Epoch: 1,
|
||||||
|
Root: bytesutil.PadTo([]byte("different_target"), 32), // Different from BeaconBlockRoot to trigger FFG failure
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
att := util.NewAttestation()
|
||||||
|
att.Data = attData
|
||||||
|
|
||||||
|
bucket := &attestationBucket{
|
||||||
|
data: attData,
|
||||||
|
attestations: []ethpb.Att{att},
|
||||||
|
}
|
||||||
|
|
||||||
|
s.processAttestationBucket(context.Background(), bucket)
|
||||||
|
|
||||||
|
require.Equal(t, 1, len(hook.Entries))
|
||||||
|
assert.StringContains(t, "Failed FFG consistency check for bucket", hook.LastEntry().Message)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("ProcessingSuccess", func(t *testing.T) {
|
||||||
|
hook := logTest.NewGlobal()
|
||||||
|
validators := make([]*ethpb.Validator, 64)
|
||||||
|
for i := range validators {
|
||||||
|
validators[i] = ðpb.Validator{
|
||||||
|
ExitEpoch: 1000000,
|
||||||
|
EffectiveBalance: 32000000000,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
beaconState, err := util.NewBeaconState()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, beaconState.SetSlot(1))
|
||||||
|
require.NoError(t, beaconState.SetValidators(validators))
|
||||||
|
|
||||||
|
chainService := &mockChain.ChainService{
|
||||||
|
State: beaconState,
|
||||||
|
ValidAttestation: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
s := &Service{
|
||||||
|
cfg: &config{
|
||||||
|
chain: chainService,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test with Phase0 attestation
|
||||||
|
t.Run("Phase0_NoError", func(t *testing.T) {
|
||||||
|
hook.Reset() // Reset logs before test
|
||||||
|
phase0Att := util.NewAttestation()
|
||||||
|
phase0Att.Data.Slot = 1
|
||||||
|
phase0Att.Data.CommitteeIndex = 0
|
||||||
|
|
||||||
|
bucket := &attestationBucket{
|
||||||
|
data: phase0Att.GetData(),
|
||||||
|
attestations: []ethpb.Att{phase0Att},
|
||||||
|
}
|
||||||
|
|
||||||
|
s.processAttestationBucket(context.Background(), bucket)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Test with SingleAttestation
|
||||||
|
t.Run("Electra_NoError", func(t *testing.T) {
|
||||||
|
hook.Reset() // Reset logs before test
|
||||||
|
attData := ðpb.AttestationData{
|
||||||
|
Slot: 1,
|
||||||
|
CommitteeIndex: 0,
|
||||||
|
BeaconBlockRoot: bytesutil.PadTo([]byte("blockroot"), 32),
|
||||||
|
Source: ðpb.Checkpoint{
|
||||||
|
Epoch: 0,
|
||||||
|
Root: bytesutil.PadTo([]byte("source"), 32),
|
||||||
|
},
|
||||||
|
Target: ðpb.Checkpoint{
|
||||||
|
Epoch: 1,
|
||||||
|
Root: bytesutil.PadTo([]byte("blockroot"), 32), // Same as BeaconBlockRoot for LMD/FFG consistency
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
singleAtt := ðpb.SingleAttestation{
|
||||||
|
CommitteeId: 0,
|
||||||
|
AttesterIndex: 0,
|
||||||
|
Data: attData,
|
||||||
|
Signature: make([]byte, 96),
|
||||||
|
}
|
||||||
|
|
||||||
|
bucket := &attestationBucket{
|
||||||
|
data: singleAtt.GetData(),
|
||||||
|
attestations: []ethpb.Att{singleAtt},
|
||||||
|
}
|
||||||
|
|
||||||
|
s.processAttestationBucket(context.Background(), bucket)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBucketAttestationsByData(t *testing.T) {
|
||||||
|
t.Run("EmptyInput", func(t *testing.T) {
|
||||||
|
hook := logTest.NewGlobal()
|
||||||
|
buckets := bucketAttestationsByData(nil)
|
||||||
|
require.Equal(t, 0, len(buckets))
|
||||||
|
require.Equal(t, 0, len(hook.Entries))
|
||||||
|
|
||||||
|
buckets = bucketAttestationsByData([]ethpb.Att{})
|
||||||
|
require.Equal(t, 0, len(buckets))
|
||||||
|
require.Equal(t, 0, len(hook.Entries))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("SingleAttestation", func(t *testing.T) {
|
||||||
|
hook := logTest.NewGlobal()
|
||||||
|
att := util.NewAttestation()
|
||||||
|
att.Data.Slot = 1
|
||||||
|
att.Data.CommitteeIndex = 0
|
||||||
|
|
||||||
|
buckets := bucketAttestationsByData([]ethpb.Att{att})
|
||||||
|
|
||||||
|
require.Equal(t, 1, len(buckets))
|
||||||
|
var bucket *attestationBucket
|
||||||
|
for _, b := range buckets {
|
||||||
|
bucket = b
|
||||||
|
break
|
||||||
|
}
|
||||||
|
require.NotNil(t, bucket)
|
||||||
|
require.Equal(t, 1, len(bucket.attestations))
|
||||||
|
require.Equal(t, att, bucket.attestations[0])
|
||||||
|
require.Equal(t, att.GetData(), bucket.data)
|
||||||
|
require.Equal(t, 0, len(hook.Entries))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("MultipleAttestationsSameData", func(t *testing.T) {
|
||||||
|
hook := logTest.NewGlobal()
|
||||||
|
|
||||||
|
att1 := util.NewAttestation()
|
||||||
|
att1.Data.Slot = 1
|
||||||
|
att1.Data.CommitteeIndex = 0
|
||||||
|
|
||||||
|
att2 := util.NewAttestation()
|
||||||
|
att2.Data = att1.Data // Same data
|
||||||
|
att2.Signature = make([]byte, 96) // Different signature
|
||||||
|
|
||||||
|
buckets := bucketAttestationsByData([]ethpb.Att{att1, att2})
|
||||||
|
|
||||||
|
require.Equal(t, 1, len(buckets), "Should have one bucket for same data")
|
||||||
|
var bucket *attestationBucket
|
||||||
|
for _, b := range buckets {
|
||||||
|
bucket = b
|
||||||
|
break
|
||||||
|
}
|
||||||
|
require.NotNil(t, bucket)
|
||||||
|
require.Equal(t, 2, len(bucket.attestations), "Should have both attestations in one bucket")
|
||||||
|
require.Equal(t, att1.GetData(), bucket.data)
|
||||||
|
require.Equal(t, 0, len(hook.Entries))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("MultipleAttestationsDifferentData", func(t *testing.T) {
|
||||||
|
hook := logTest.NewGlobal()
|
||||||
|
|
||||||
|
att1 := util.NewAttestation()
|
||||||
|
att1.Data.Slot = 1
|
||||||
|
att1.Data.CommitteeIndex = 0
|
||||||
|
|
||||||
|
att2 := util.NewAttestation()
|
||||||
|
att2.Data.Slot = 2 // Different slot
|
||||||
|
att2.Data.CommitteeIndex = 1
|
||||||
|
|
||||||
|
buckets := bucketAttestationsByData([]ethpb.Att{att1, att2})
|
||||||
|
|
||||||
|
require.Equal(t, 2, len(buckets), "Should have two buckets for different data")
|
||||||
|
bucketCount := 0
|
||||||
|
for _, bucket := range buckets {
|
||||||
|
require.Equal(t, 1, len(bucket.attestations), "Each bucket should have one attestation")
|
||||||
|
bucketCount++
|
||||||
|
}
|
||||||
|
require.Equal(t, 2, bucketCount, "Should have exactly two buckets")
|
||||||
|
require.Equal(t, 0, len(hook.Entries))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("MixedAttestationTypes", func(t *testing.T) {
|
||||||
|
hook := logTest.NewGlobal()
|
||||||
|
|
||||||
|
// Create Phase0 attestation
|
||||||
|
phase0Att := util.NewAttestation()
|
||||||
|
phase0Att.Data.Slot = 1
|
||||||
|
phase0Att.Data.CommitteeIndex = 0
|
||||||
|
|
||||||
|
electraAtt := ðpb.SingleAttestation{
|
||||||
|
CommitteeId: 0,
|
||||||
|
AttesterIndex: 1,
|
||||||
|
Data: phase0Att.Data, // Same data
|
||||||
|
Signature: make([]byte, 96),
|
||||||
|
}
|
||||||
|
|
||||||
|
buckets := bucketAttestationsByData([]ethpb.Att{phase0Att, electraAtt})
|
||||||
|
|
||||||
|
require.Equal(t, 1, len(buckets), "Should have one bucket for same data")
|
||||||
|
var bucket *attestationBucket
|
||||||
|
for _, b := range buckets {
|
||||||
|
bucket = b
|
||||||
|
break
|
||||||
|
}
|
||||||
|
require.NotNil(t, bucket)
|
||||||
|
require.Equal(t, 2, len(bucket.attestations), "Should have both attestations in one bucket")
|
||||||
|
require.Equal(t, phase0Att.GetData(), bucket.data)
|
||||||
|
require.Equal(t, 0, len(hook.Entries))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBatchVerifyAttestationSignatures(t *testing.T) {
|
||||||
|
t.Run("EmptyInput", func(t *testing.T) {
|
||||||
|
s := &Service{}
|
||||||
|
|
||||||
|
beaconState, err := util.NewBeaconState()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
result := s.batchVerifyAttestationSignatures(context.Background(), []ethpb.Att{}, beaconState)
|
||||||
|
|
||||||
|
// Empty input should return empty output
|
||||||
|
require.Equal(t, 0, len(result))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("BatchVerificationWithState", func(t *testing.T) {
|
||||||
|
hook := logTest.NewGlobal()
|
||||||
|
validators := make([]*ethpb.Validator, 64)
|
||||||
|
for i := range validators {
|
||||||
|
validators[i] = ðpb.Validator{
|
||||||
|
ExitEpoch: 1000000,
|
||||||
|
EffectiveBalance: 32000000000,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
beaconState, err := util.NewBeaconState()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, beaconState.SetSlot(1))
|
||||||
|
require.NoError(t, beaconState.SetValidators(validators))
|
||||||
|
|
||||||
|
s := &Service{}
|
||||||
|
|
||||||
|
att := util.NewAttestation()
|
||||||
|
att.Data.Slot = 1
|
||||||
|
attestations := []ethpb.Att{att}
|
||||||
|
|
||||||
|
result := s.batchVerifyAttestationSignatures(context.Background(), attestations, beaconState)
|
||||||
|
require.NotNil(t, result)
|
||||||
|
|
||||||
|
if len(result) == 0 && len(hook.Entries) > 0 {
|
||||||
|
_ = false // Check if fallback message is logged
|
||||||
|
for _, entry := range hook.Entries {
|
||||||
|
if entry.Message == "batch verification failed, using individual checks" {
|
||||||
|
_ = true // Found the fallback message
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// It's OK if fallback message is logged - this means the function is working correctly
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("BatchVerificationFailureFallbackToIndividual", func(t *testing.T) {
|
||||||
|
hook := logTest.NewGlobal()
|
||||||
|
beaconState, err := util.NewBeaconState()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, beaconState.SetSlot(1))
|
||||||
|
|
||||||
|
chainService := &mockChain.ChainService{
|
||||||
|
State: beaconState,
|
||||||
|
ValidAttestation: false, // This will cause verification to fail
|
||||||
|
}
|
||||||
|
|
||||||
|
s := &Service{
|
||||||
|
cfg: &config{
|
||||||
|
chain: chainService,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
att := util.NewAttestation()
|
||||||
|
att.Data.Slot = 1
|
||||||
|
attestations := []ethpb.Att{att}
|
||||||
|
|
||||||
|
result := s.batchVerifyAttestationSignatures(context.Background(), attestations, beaconState)
|
||||||
|
|
||||||
|
require.Equal(t, 0, len(result))
|
||||||
|
|
||||||
|
require.NotEqual(t, 0, len(hook.Entries), "Should have log entries")
|
||||||
|
found := false
|
||||||
|
for _, entry := range hook.Entries {
|
||||||
|
if entry.Message == "batch verification failed, using individual checks" {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
require.Equal(t, true, found, "Should log fallback message")
|
||||||
|
})
|
||||||
|
}
|
||||||
3
changelog/ttsao_optimize-attestation-batching.md
Normal file
3
changelog/ttsao_optimize-attestation-batching.md
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
### Changed
|
||||||
|
|
||||||
|
- Optimize pending attestation processing by adding batching
|
||||||
Reference in New Issue
Block a user