Add att duration.

This commit is contained in:
Manu NALEPA
2025-12-16 15:52:11 +01:00
parent 04a95d75a2
commit 84830bf97a

View File

@@ -3,9 +3,9 @@ package sync
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"slices"
"time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/blocks"
@@ -21,7 +21,6 @@ import (
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/runtime/version"
"github.com/OffchainLabs/prysm/v7/time"
"github.com/OffchainLabs/prysm/v7/time/slots"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/sirupsen/logrus"
@@ -50,16 +49,7 @@ func (s *Service) processPendingAttsForBlock(ctx context.Context, bRoot [32]byte
attestations := s.blkRootToPendingAtts[bRoot]
s.pendingAttsLock.RUnlock()
if len(attestations) > 0 {
start := time.Now()
s.processAttestations(ctx, attestations)
duration := time.Since(start)
log.WithFields(logrus.Fields{
"blockRoot": hex.EncodeToString(bytesutil.Trunc(bRoot[:])),
"pendingAttsCount": len(attestations),
"duration": duration,
}).Debug("Verified and saved pending attestations to pool")
}
s.processAttestations(ctx, attestations, bRoot)
randGen := rand.NewGenerator()
// Delete the missing block root key from pending attestation queue so a node will not request for the block again.
@@ -79,26 +69,43 @@ func (s *Service) processPendingAttsForBlock(ctx context.Context, bRoot [32]byte
return s.sendBatchRootRequest(ctx, pendingRoots, randGen)
}
func (s *Service) processAttestations(ctx context.Context, attestations []any) {
func (s *Service) processAttestations(ctx context.Context, attestations []any, blockRoot [32]byte) {
if len(attestations) == 0 {
return
}
startAggregate := time.Now()
atts := make([]ethpb.Att, 0, len(attestations))
aggregateAttAndProofCount := 0
for _, att := range attestations {
switch v := att.(type) {
case ethpb.Att:
atts = append(atts, v)
case ethpb.SignedAggregateAttAndProof:
s.processAggregate(ctx, v)
aggregateAttAndProofCount++
default:
log.Warnf("Unexpected attestation type %T, skipping", v)
}
}
durationAggregateAttAndProof := time.Since(startAggregate)
startAtts := time.Now()
for _, bucket := range bucketAttestationsByData(atts) {
s.processAttestationBucket(ctx, bucket)
}
durationAtts := time.Since(startAtts)
log.WithFields(logrus.Fields{
"blockRoot": fmt.Sprintf("%#x", blockRoot),
"pendingTotalCount": len(attestations),
"pendingAggregateAttAndProofCount": aggregateAttAndProofCount,
"pendingAttCount": len(atts),
"durationTotal": durationAggregateAttAndProof + durationAtts,
"durationAggregateAttAndProof": durationAggregateAttAndProof,
"durationAtts": durationAtts,
}).Debug("Verified and saved pending attestations to pool")
}
// attestationBucket groups attestations with the same AttestationData for batch processing.