Files
prysm/beacon-chain/blockchain/receive_attestation.go
Potuz d4ea8fafd6 Call FCU in the background (#16149)
This PR introduces several simplifications to block processing.

It calls to notify the engine in the background when forkchoice needs to
be updated.

It no longer updates the caches and process epoch transition before
computing payload attributes, since this is no longer needed after Fulu.

It removes a complicated second call to FCU with the same head after
processing the last slot of the epoch.

Some checks for reviewers:

- the single caller of sendFCU held a lock to forkchoice. Since the call
now is in the background this helper can aquire the lock.
- All paths to handleEpochBoundary are now **NOT** locked. This allows
the lock to get the target root to be taken locally in place.
- The checkpoint cache is completely useless and thus the target root
call could be removed. But removing the proposer ID cache is more
complicated and out of scope for this PR.
- lateBlockTasks has pre and post-fulu cased, we could remove pre-fulu
checks and defer to the update function if deemed cleaner.
- Conversely, postBlockProcess does not have this casing and thus
pre-Fulu blocks on gossip may fail to get proposed correctly because of
the lack of the proposer being correctly computed.
2025-12-30 21:01:34 +00:00

246 lines
9.2 KiB
Go

package blockchain
import (
"bytes"
"context"
"fmt"
"time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
"github.com/OffchainLabs/prysm/v7/config/features"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/runtime/version"
"github.com/OffchainLabs/prysm/v7/time/slots"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// reorgLateBlockCountAttestations is the time until the end of the slot in which we count
// attestations to see if we will reorg the incoming block
const reorgLateBlockCountAttestations = 2 * time.Second
// AttestationStateFetcher allows for retrieving a beacon state corresponding to the block
// root of an attestation's target checkpoint.
type AttestationStateFetcher interface {
AttestationTargetState(ctx context.Context, target *ethpb.Checkpoint) (state.ReadOnlyBeaconState, error)
}
// AttestationReceiver interface defines the methods of chain service receive and processing new attestations.
type AttestationReceiver interface {
AttestationStateFetcher
VerifyLmdFfgConsistency(ctx context.Context, att ethpb.Att) error
InForkchoice([32]byte) bool
}
// AttestationTargetState returns the pre state of attestation.
func (s *Service) AttestationTargetState(ctx context.Context, target *ethpb.Checkpoint) (state.ReadOnlyBeaconState, error) {
ss, err := slots.EpochStart(target.Epoch)
if err != nil {
return nil, err
}
if err := slots.ValidateClock(ss, s.genesisTime); err != nil {
return nil, err
}
// We acquire the lock here instead than on gettAttPreState because that function gets called from UpdateHead that holds a write lock
s.cfg.ForkChoiceStore.RLock()
defer s.cfg.ForkChoiceStore.RUnlock()
return s.getAttPreState(ctx, target)
}
// VerifyLmdFfgConsistency verifies that attestation's LMD and FFG votes are consistency to each other.
func (s *Service) VerifyLmdFfgConsistency(ctx context.Context, a ethpb.Att) error {
r, err := s.TargetRootForEpoch([32]byte(a.GetData().BeaconBlockRoot), a.GetData().Target.Epoch)
if err != nil {
return err
}
if !bytes.Equal(a.GetData().Target.Root, r[:]) {
return fmt.Errorf("FFG and LMD votes are not consistent, block root: %#x, target root: %#x, canonical target root: %#x", a.GetData().BeaconBlockRoot, a.GetData().Target.Root, r)
}
return nil
}
// This routine processes fork choice attestations from the pool to account for validator votes and fork choice.
func (s *Service) spawnProcessAttestationsRoutine() {
go func() {
_, err := s.clockWaiter.WaitForClock(s.ctx)
if err != nil {
log.WithError(err).Error("Failed to receive genesis data")
return
}
if s.genesisTime.IsZero() {
log.Warn("ProcessAttestations routine waiting for genesis time")
for s.genesisTime.IsZero() {
if err := s.ctx.Err(); err != nil {
log.WithError(err).Error("Giving up waiting for genesis time")
return
}
time.Sleep(1 * time.Second)
}
log.Warn("Genesis time received, now available to process attestations")
}
// Wait for node to be synced before running the routine.
if err := s.waitForSync(); err != nil {
log.WithError(err).Error("Could not wait to sync")
return
}
reorgInterval := time.Second*time.Duration(params.BeaconConfig().SecondsPerSlot) - reorgLateBlockCountAttestations
ticker := slots.NewSlotTickerWithIntervals(s.genesisTime, []time.Duration{0, reorgInterval})
for {
select {
case <-s.ctx.Done():
return
case slotInterval := <-ticker.C():
if slotInterval.Interval > 0 {
if s.validating() {
s.UpdateHead(s.ctx, slotInterval.Slot+1)
}
} else {
s.cfg.ForkChoiceStore.Lock()
if err := s.cfg.ForkChoiceStore.NewSlot(s.ctx, slotInterval.Slot); err != nil {
log.WithError(err).Error("Could not process new slot")
}
s.cfg.ForkChoiceStore.Unlock()
s.UpdateHead(s.ctx, slotInterval.Slot)
}
}
}
}()
}
// UpdateHead updates the canonical head of the chain based on information from fork-choice attestations and votes.
// The caller of this function MUST hold a lock in forkchoice
func (s *Service) UpdateHead(ctx context.Context, proposingSlot primitives.Slot) {
ctx, span := trace.StartSpan(ctx, "beacon-chain.blockchain.UpdateHead")
defer span.End()
start := time.Now()
s.cfg.ForkChoiceStore.Lock()
defer s.cfg.ForkChoiceStore.Unlock()
// This function is only called at 10 seconds or 0 seconds into the slot
disparity := params.BeaconConfig().MaximumGossipClockDisparityDuration()
disparity += reorgLateBlockCountAttestations
s.processAttestations(ctx, disparity)
processAttsElapsedTime.Observe(float64(time.Since(start).Milliseconds()))
start = time.Now()
// return early if we haven't changed head
newHeadRoot, err := s.cfg.ForkChoiceStore.Head(ctx)
if err != nil {
log.WithError(err).Error("Could not compute head from new attestations")
return
}
if !s.isNewHead(newHeadRoot) {
return
}
log.WithField("newHeadRoot", fmt.Sprintf("%#x", newHeadRoot)).Debug("Head changed due to attestations")
headState, headBlock, err := s.getStateAndBlock(ctx, newHeadRoot)
if err != nil {
log.WithError(err).Error("Could not get head block")
return
}
newAttHeadElapsedTime.Observe(float64(time.Since(start).Milliseconds()))
fcuArgs := &fcuConfig{
headState: headState,
headRoot: newHeadRoot,
headBlock: headBlock,
proposingSlot: proposingSlot,
}
if s.inRegularSync() {
fcuArgs.attributes = s.getPayloadAttribute(ctx, headState, proposingSlot, newHeadRoot[:])
if fcuArgs.attributes != nil && s.shouldOverrideFCU(newHeadRoot, proposingSlot) {
return
}
go s.forkchoiceUpdateWithExecution(s.ctx, fcuArgs)
}
if err := s.saveHead(s.ctx, fcuArgs.headRoot, fcuArgs.headBlock, fcuArgs.headState); err != nil {
log.WithError(err).Error("Could not save head")
}
s.pruneAttsFromPool(s.ctx, fcuArgs.headState, fcuArgs.headBlock)
}
// This processes fork choice attestations from the pool to account for validator votes and fork choice.
func (s *Service) processAttestations(ctx context.Context, disparity time.Duration) {
var atts []ethpb.Att
if features.Get().EnableExperimentalAttestationPool {
atts = s.cfg.AttestationCache.ForkchoiceAttestations()
} else {
atts = s.cfg.AttPool.ForkchoiceAttestations()
}
for _, a := range atts {
// Based on the spec, don't process the attestation until the subsequent slot.
// This delays consideration in the fork choice until their slot is in the past.
// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/fork-choice.md#validate_on_attestation
nextSlot := a.GetData().Slot + 1
if err := slots.VerifyTime(s.genesisTime, nextSlot, disparity); err != nil {
continue
}
hasState := s.cfg.BeaconDB.HasStateSummary(ctx, bytesutil.ToBytes32(a.GetData().BeaconBlockRoot))
hasBlock := s.hasBlock(ctx, bytesutil.ToBytes32(a.GetData().BeaconBlockRoot))
if !(hasState && hasBlock) {
continue
}
if features.Get().EnableExperimentalAttestationPool {
if err := s.cfg.AttestationCache.DeleteForkchoiceAttestation(a); err != nil {
log.WithError(err).Error("Could not delete fork choice attestation in pool")
}
} else if err := s.cfg.AttPool.DeleteForkchoiceAttestation(a); err != nil {
log.WithError(err).Error("Could not delete fork choice attestation in pool")
}
if !helpers.VerifyCheckpointEpoch(a.GetData().Target, s.genesisTime) {
continue
}
if err := s.receiveAttestationNoPubsub(ctx, a, disparity); err != nil {
var fields logrus.Fields
if a.Version() >= version.Electra {
fields = logrus.Fields{
"slot": a.GetData().Slot,
"committeeCount": a.CommitteeBitsVal().Count(),
"committeeIndices": a.CommitteeBitsVal().BitIndices(),
"beaconBlockRoot": fmt.Sprintf("%#x", bytesutil.Trunc(a.GetData().BeaconBlockRoot)),
"targetRoot": fmt.Sprintf("%#x", bytesutil.Trunc(a.GetData().Target.Root)),
"aggregatedCount": a.GetAggregationBits().Count(),
}
} else {
fields = logrus.Fields{
"slot": a.GetData().Slot,
"committeeIndex": a.GetData().CommitteeIndex,
"beaconBlockRoot": fmt.Sprintf("%#x", bytesutil.Trunc(a.GetData().BeaconBlockRoot)),
"targetRoot": fmt.Sprintf("%#x", bytesutil.Trunc(a.GetData().Target.Root)),
"aggregatedCount": a.GetAggregationBits().Count(),
}
}
log.WithFields(fields).WithError(err).Warn("Could not process attestation for fork choice")
}
}
}
// receiveAttestationNoPubsub is a function that defines the operations that are performed on
// attestation that is received from regular sync. The operations consist of:
// 1. Validate attestation, update validator's latest vote
// 2. Apply fork choice to the processed attestation
// 3. Save latest head info
func (s *Service) receiveAttestationNoPubsub(ctx context.Context, att ethpb.Att, disparity time.Duration) error {
ctx, span := trace.StartSpan(ctx, "beacon-chain.blockchain.receiveAttestationNoPubsub")
defer span.End()
if err := s.OnAttestation(ctx, att, disparity); err != nil {
return errors.Wrap(err, "could not process attestation")
}
return nil
}