mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 13:28:01 -05:00
This PR introduces several simplifications to block processing. It calls to notify the engine in the background when forkchoice needs to be updated. It no longer updates the caches and process epoch transition before computing payload attributes, since this is no longer needed after Fulu. It removes a complicated second call to FCU with the same head after processing the last slot of the epoch. Some checks for reviewers: - the single caller of sendFCU held a lock to forkchoice. Since the call now is in the background this helper can aquire the lock. - All paths to handleEpochBoundary are now **NOT** locked. This allows the lock to get the target root to be taken locally in place. - The checkpoint cache is completely useless and thus the target root call could be removed. But removing the proposer ID cache is more complicated and out of scope for this PR. - lateBlockTasks has pre and post-fulu cased, we could remove pre-fulu checks and defer to the update function if deemed cleaner. - Conversely, postBlockProcess does not have this casing and thus pre-Fulu blocks on gossip may fail to get proposed correctly because of the lack of the proposer being correctly computed.
449 lines
17 KiB
Go
449 lines
17 KiB
Go
package blockchain
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"slices"
|
|
"time"
|
|
|
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed"
|
|
statefeed "github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed/state"
|
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
|
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/transition"
|
|
doublylinkedtree "github.com/OffchainLabs/prysm/v7/beacon-chain/forkchoice/doubly-linked-tree"
|
|
forkchoicetypes "github.com/OffchainLabs/prysm/v7/beacon-chain/forkchoice/types"
|
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
|
|
field_params "github.com/OffchainLabs/prysm/v7/config/fieldparams"
|
|
"github.com/OffchainLabs/prysm/v7/config/params"
|
|
consensus_blocks "github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
|
|
"github.com/OffchainLabs/prysm/v7/consensus-types/interfaces"
|
|
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
|
|
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
|
|
mathutil "github.com/OffchainLabs/prysm/v7/math"
|
|
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
|
|
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
|
"github.com/OffchainLabs/prysm/v7/time/slots"
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
// ErrInvalidCheckpointArgs may be returned when the finalized checkpoint has an epoch greater than the justified checkpoint epoch.
|
|
// If you are seeing this error, make sure you haven't mixed up the order of the arguments in the method you are calling.
|
|
var ErrInvalidCheckpointArgs = errors.New("finalized checkpoint cannot be greater than justified checkpoint")
|
|
|
|
// CurrentSlot returns the current slot based on time.
|
|
func (s *Service) CurrentSlot() primitives.Slot {
|
|
return slots.CurrentSlot(s.genesisTime)
|
|
}
|
|
|
|
// getFCUArgs returns the arguments to call forkchoice update
|
|
func (s *Service) getFCUArgs(cfg *postBlockProcessConfig, fcuArgs *fcuConfig) error {
|
|
if err := s.getFCUArgsEarlyBlock(cfg, fcuArgs); err != nil {
|
|
return err
|
|
}
|
|
fcuArgs.attributes = s.getPayloadAttribute(cfg.ctx, fcuArgs.headState, fcuArgs.proposingSlot, cfg.headRoot[:])
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) getFCUArgsEarlyBlock(cfg *postBlockProcessConfig, fcuArgs *fcuConfig) error {
|
|
if cfg.roblock.Root() == cfg.headRoot {
|
|
fcuArgs.headState = cfg.postState
|
|
fcuArgs.headBlock = cfg.roblock
|
|
fcuArgs.headRoot = cfg.headRoot
|
|
fcuArgs.proposingSlot = s.CurrentSlot() + 1
|
|
return nil
|
|
}
|
|
return s.fcuArgsNonCanonicalBlock(cfg, fcuArgs)
|
|
}
|
|
|
|
// logNonCanonicalBlockReceived prints a message informing that the received
|
|
// block is not the head of the chain. It requires the caller holds a lock on
|
|
// Forkchoice.
|
|
func (s *Service) logNonCanonicalBlockReceived(blockRoot [32]byte, headRoot [32]byte) {
|
|
receivedWeight, err := s.cfg.ForkChoiceStore.Weight(blockRoot)
|
|
if err != nil {
|
|
log.WithField("root", fmt.Sprintf("%#x", blockRoot)).Warn("Could not determine node weight")
|
|
}
|
|
headWeight, err := s.cfg.ForkChoiceStore.Weight(headRoot)
|
|
if err != nil {
|
|
log.WithField("root", fmt.Sprintf("%#x", headRoot)).Warn("Could not determine node weight")
|
|
}
|
|
log.WithFields(logrus.Fields{
|
|
"receivedRoot": fmt.Sprintf("%#x", blockRoot),
|
|
"receivedWeight": receivedWeight,
|
|
"headRoot": fmt.Sprintf("%#x", headRoot),
|
|
"headWeight": headWeight,
|
|
}).Debug("Head block is not the received block")
|
|
}
|
|
|
|
// fcuArgsNonCanonicalBlock returns the arguments to the FCU call when the
|
|
// incoming block is non-canonical, that is, based on the head root.
|
|
func (s *Service) fcuArgsNonCanonicalBlock(cfg *postBlockProcessConfig, fcuArgs *fcuConfig) error {
|
|
headState, headBlock, err := s.getStateAndBlock(cfg.ctx, cfg.headRoot)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
fcuArgs.headState = headState
|
|
fcuArgs.headBlock = headBlock
|
|
fcuArgs.headRoot = cfg.headRoot
|
|
fcuArgs.proposingSlot = s.CurrentSlot() + 1
|
|
return nil
|
|
}
|
|
|
|
// sendStateFeedOnBlock sends an event that a new block has been synced
|
|
func (s *Service) sendStateFeedOnBlock(cfg *postBlockProcessConfig) {
|
|
optimistic, err := s.cfg.ForkChoiceStore.IsOptimistic(cfg.roblock.Root())
|
|
if err != nil {
|
|
log.WithError(err).Debug("Could not check if block is optimistic")
|
|
optimistic = true
|
|
}
|
|
currEpoch := slots.ToEpoch(s.CurrentSlot())
|
|
currDependenRoot, err := s.cfg.ForkChoiceStore.DependentRoot(currEpoch)
|
|
if err != nil {
|
|
log.WithError(err).Debug("Could not get dependent root")
|
|
}
|
|
prevDependentRoot := [32]byte{}
|
|
if currEpoch > 0 {
|
|
prevDependentRoot, err = s.cfg.ForkChoiceStore.DependentRoot(currEpoch - 1)
|
|
if err != nil {
|
|
log.WithError(err).Debug("Could not get previous dependent root")
|
|
}
|
|
}
|
|
// Send notification of the processed block to the state feed.
|
|
s.cfg.StateNotifier.StateFeed().Send(&feed.Event{
|
|
Type: statefeed.BlockProcessed,
|
|
Data: &statefeed.BlockProcessedData{
|
|
Slot: cfg.roblock.Block().Slot(),
|
|
BlockRoot: cfg.roblock.Root(),
|
|
SignedBlock: cfg.roblock,
|
|
CurrDependentRoot: currDependenRoot,
|
|
PrevDependentRoot: prevDependentRoot,
|
|
Verified: true,
|
|
Optimistic: optimistic,
|
|
},
|
|
})
|
|
}
|
|
|
|
// processLightClientUpdates saves the light client data in lcStore, when feature flag is enabled.
|
|
func (s *Service) processLightClientUpdates(cfg *postBlockProcessConfig) {
|
|
attestedRoot := cfg.roblock.Block().ParentRoot()
|
|
attestedBlock, err := s.getBlock(cfg.ctx, attestedRoot)
|
|
if err != nil {
|
|
log.WithError(err).Error("processLightClientUpdates: Could not get attested block")
|
|
return
|
|
}
|
|
if attestedBlock == nil || attestedBlock.IsNil() {
|
|
log.Error("processLightClientUpdates: Could not get attested block")
|
|
return
|
|
}
|
|
attestedState, err := s.cfg.StateGen.StateByRoot(cfg.ctx, attestedRoot)
|
|
if err != nil {
|
|
log.WithError(err).Error("processLightClientUpdates: Could not get attested state")
|
|
return
|
|
}
|
|
if attestedState == nil || attestedState.IsNil() {
|
|
log.Error("processLightClientUpdates: Could not get attested state")
|
|
return
|
|
}
|
|
|
|
finalizedRoot := attestedState.FinalizedCheckpoint().Root
|
|
finalizedBlock, err := s.getBlock(cfg.ctx, [32]byte(finalizedRoot))
|
|
if err != nil {
|
|
if errors.Is(err, errBlockNotFoundInCacheOrDB) {
|
|
log.Debugf("Skipping saving light client update because finalized block is nil for root %#x", finalizedRoot)
|
|
return
|
|
}
|
|
log.WithError(err).Error("processLightClientUpdates: Could not get finalized block")
|
|
return
|
|
}
|
|
|
|
err = s.lcStore.SaveLCData(cfg.ctx, cfg.postState, cfg.roblock, attestedState, attestedBlock, finalizedBlock, s.headRoot())
|
|
if err != nil {
|
|
log.WithError(err).Error("processLightClientUpdates: Could not save light client data")
|
|
}
|
|
log.Debug("Processed light client updates")
|
|
}
|
|
|
|
// updateCachesPostBlockProcessing updates the next slot cache and handles the epoch
|
|
// boundary in order to compute the right proposer indices after processing
|
|
// state transition. The caller of this function must not hold a lock in forkchoice store.
|
|
func (s *Service) updateCachesPostBlockProcessing(cfg *postBlockProcessConfig) {
|
|
slot := cfg.postState.Slot()
|
|
root := cfg.roblock.Root()
|
|
if err := transition.UpdateNextSlotCache(cfg.ctx, root[:], cfg.postState); err != nil {
|
|
log.WithError(err).Error("Could not update next slot state cache")
|
|
return
|
|
}
|
|
if !slots.IsEpochEnd(slot) {
|
|
return
|
|
}
|
|
if err := s.handleEpochBoundary(cfg.ctx, slot, cfg.postState, root[:]); err != nil {
|
|
log.WithError(err).Error("Could not handle epoch boundary")
|
|
}
|
|
}
|
|
|
|
// reportProcessingTime reports the metric of how long it took to process the
|
|
// current block
|
|
func reportProcessingTime(startTime time.Time) {
|
|
onBlockProcessingTime.Observe(float64(time.Since(startTime).Milliseconds()))
|
|
}
|
|
|
|
// getBlockPreState returns the pre state of an incoming block. It uses the parent root of the block
|
|
// to retrieve the state in DB. It verifies the pre state's validity and the incoming block
|
|
// is in the correct time window.
|
|
func (s *Service) getBlockPreState(ctx context.Context, b interfaces.ReadOnlyBeaconBlock) (state.BeaconState, error) {
|
|
ctx, span := trace.StartSpan(ctx, "blockChain.getBlockPreState")
|
|
defer span.End()
|
|
|
|
// Verify incoming block has a valid pre state.
|
|
if err := s.verifyBlkPreState(ctx, b.ParentRoot()); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
preState, err := s.cfg.StateGen.StateByRoot(ctx, b.ParentRoot())
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "could not get pre state for slot %d", b.Slot())
|
|
}
|
|
if preState == nil || preState.IsNil() {
|
|
return nil, errors.Wrapf(err, "nil pre state for slot %d", b.Slot())
|
|
}
|
|
|
|
// Verify block slot time is not from the future.
|
|
if err := slots.VerifyTime(s.genesisTime, b.Slot(), params.BeaconConfig().MaximumGossipClockDisparityDuration()); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Verify block is later than the finalized epoch slot.
|
|
if err := s.verifyBlkFinalizedSlot(b); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return preState, nil
|
|
}
|
|
|
|
// verifyBlkPreState validates input block has a valid pre-state.
|
|
func (s *Service) verifyBlkPreState(ctx context.Context, parentRoot [field_params.RootLength]byte) error {
|
|
ctx, span := trace.StartSpan(ctx, "blockChain.verifyBlkPreState")
|
|
defer span.End()
|
|
|
|
// Loosen the check to HasBlock because state summary gets saved in batches
|
|
// during initial syncing. There's no risk given a state summary object is just a
|
|
// subset of the block object.
|
|
if !s.cfg.BeaconDB.HasStateSummary(ctx, parentRoot) && !s.cfg.BeaconDB.HasBlock(ctx, parentRoot) {
|
|
return errors.New("could not reconstruct parent state")
|
|
}
|
|
|
|
has, err := s.cfg.StateGen.HasState(ctx, parentRoot)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !has {
|
|
if err := s.cfg.BeaconDB.SaveBlocks(ctx, s.getInitSyncBlocks()); err != nil {
|
|
return errors.Wrap(err, "could not save initial sync blocks")
|
|
}
|
|
s.clearInitSyncBlocks()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// verifyBlkFinalizedSlot validates input block is not less than or equal
|
|
// to current finalized slot.
|
|
func (s *Service) verifyBlkFinalizedSlot(b interfaces.ReadOnlyBeaconBlock) error {
|
|
finalized := s.cfg.ForkChoiceStore.FinalizedCheckpoint()
|
|
finalizedSlot, err := slots.EpochStart(finalized.Epoch)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if finalizedSlot >= b.Slot() {
|
|
err = fmt.Errorf("block is equal or earlier than finalized block, slot %d < slot %d", b.Slot(), finalizedSlot)
|
|
return invalidBlock{error: err}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// updateFinalized saves the init sync blocks, finalized checkpoint, migrates
|
|
// to cold old states and saves the last validated checkpoint to DB. It returns
|
|
// early if the new checkpoint is older than the one on db.
|
|
func (s *Service) updateFinalized(ctx context.Context, cp *ethpb.Checkpoint) error {
|
|
ctx, span := trace.StartSpan(ctx, "blockChain.updateFinalized")
|
|
defer span.End()
|
|
|
|
// return early if new checkpoint is not newer than the one in DB
|
|
currentFinalized, err := s.cfg.BeaconDB.FinalizedCheckpoint(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if cp.Epoch <= currentFinalized.Epoch {
|
|
return nil
|
|
}
|
|
|
|
// Blocks need to be saved so that we can retrieve finalized block from
|
|
// DB when migrating states.
|
|
if err := s.cfg.BeaconDB.SaveBlocks(ctx, s.getInitSyncBlocks()); err != nil {
|
|
return err
|
|
}
|
|
s.clearInitSyncBlocks()
|
|
|
|
if err := s.cfg.BeaconDB.SaveFinalizedCheckpoint(ctx, cp); err != nil {
|
|
return err
|
|
}
|
|
|
|
fRoot := bytesutil.ToBytes32(cp.Root)
|
|
optimistic, err := s.cfg.ForkChoiceStore.IsOptimistic(fRoot)
|
|
if err != nil && !errors.Is(err, doublylinkedtree.ErrNilNode) {
|
|
return err
|
|
}
|
|
if !optimistic {
|
|
err = s.cfg.BeaconDB.SaveLastValidatedCheckpoint(ctx, cp)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
go func() {
|
|
// We do not pass in the parent context from the method as this method call
|
|
// is meant to be asynchronous and run in the background rather than being
|
|
// tied to the execution of a block.
|
|
if err := s.cfg.StateGen.MigrateToCold(s.ctx, fRoot); err != nil {
|
|
log.WithError(err).Error("Could not migrate to cold")
|
|
}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
// This retrieves an ancestor root using DB. The look up is recursively looking up DB. Slower than `ancestorByForkChoiceStore`.
|
|
func (s *Service) ancestorByDB(ctx context.Context, r [32]byte, slot primitives.Slot) (root [32]byte, err error) {
|
|
ctx, span := trace.StartSpan(ctx, "blockChain.ancestorByDB")
|
|
defer span.End()
|
|
|
|
root = [32]byte{}
|
|
// Stop recursive ancestry lookup if context is cancelled.
|
|
if ctx.Err() != nil {
|
|
err = ctx.Err()
|
|
return
|
|
}
|
|
|
|
signed, err := s.getBlock(ctx, r)
|
|
if err != nil {
|
|
return root, err
|
|
}
|
|
b := signed.Block()
|
|
if b.Slot() == slot || b.Slot() < slot {
|
|
return r, nil
|
|
}
|
|
|
|
return s.ancestorByDB(ctx, b.ParentRoot(), slot)
|
|
}
|
|
|
|
// This retrieves missing blocks from DB (ie. the blocks that couldn't be received over sync) and inserts them to fork choice store.
|
|
// This is useful for block tree visualizer and additional vote accounting.
|
|
func (s *Service) fillInForkChoiceMissingBlocks(ctx context.Context, signed interfaces.ReadOnlySignedBeaconBlock,
|
|
fCheckpoint, jCheckpoint *ethpb.Checkpoint) error {
|
|
if fCheckpoint.Epoch > jCheckpoint.Epoch {
|
|
return ErrInvalidCheckpointArgs
|
|
}
|
|
pendingNodes := make([]*forkchoicetypes.BlockAndCheckpoints, 0)
|
|
|
|
// Fork choice only matters from last finalized slot.
|
|
finalized := s.cfg.ForkChoiceStore.FinalizedCheckpoint()
|
|
fSlot, err := slots.EpochStart(finalized.Epoch)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
root := signed.Block().ParentRoot()
|
|
// As long as parent node is not in fork choice store, and parent node is in DB.
|
|
for !s.cfg.ForkChoiceStore.HasNode(root) && s.cfg.BeaconDB.HasBlock(ctx, root) {
|
|
b, err := s.getBlock(ctx, root)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if b.Block().Slot() <= fSlot {
|
|
break
|
|
}
|
|
roblock, err := consensus_blocks.NewROBlockWithRoot(b, root)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
root = b.Block().ParentRoot()
|
|
args := &forkchoicetypes.BlockAndCheckpoints{Block: roblock,
|
|
JustifiedCheckpoint: jCheckpoint,
|
|
FinalizedCheckpoint: fCheckpoint}
|
|
pendingNodes = append(pendingNodes, args)
|
|
}
|
|
if len(pendingNodes) == 0 {
|
|
return nil
|
|
}
|
|
if root != s.ensureRootNotZeros(finalized.Root) && !s.cfg.ForkChoiceStore.HasNode(root) {
|
|
return ErrNotDescendantOfFinalized
|
|
}
|
|
slices.Reverse(pendingNodes)
|
|
return s.cfg.ForkChoiceStore.InsertChain(ctx, pendingNodes)
|
|
}
|
|
|
|
// inserts finalized deposits into our finalized deposit trie, needs to be
|
|
// called in the background
|
|
// Post-Electra: prunes all proofs and pending deposits in the cache
|
|
func (s *Service) insertFinalizedDepositsAndPrune(ctx context.Context, fRoot [32]byte) {
|
|
ctx, span := trace.StartSpan(ctx, "blockChain.insertFinalizedDeposits")
|
|
defer span.End()
|
|
startTime := time.Now()
|
|
|
|
// Update deposit cache.
|
|
finalizedState, err := s.cfg.StateGen.StateByRoot(ctx, fRoot)
|
|
if err != nil {
|
|
log.WithError(err).Error("Could not fetch finalized state")
|
|
return
|
|
}
|
|
|
|
// Check if we should prune all pending deposits.
|
|
// In post-Electra(after the legacy deposit mechanism is deprecated),
|
|
// we can prune all pending deposits in the deposit cache.
|
|
// See: https://eips.ethereum.org/EIPS/eip-6110#eth1data-poll-deprecation
|
|
if helpers.DepositRequestsStarted(finalizedState) {
|
|
s.pruneAllPendingDepositsAndProofs(ctx)
|
|
return
|
|
}
|
|
|
|
// We update the cache up to the last deposit index in the finalized block's state.
|
|
// We can be confident that these deposits will be included in some block
|
|
// because the Eth1 follow distance makes such long-range reorgs extremely unlikely.
|
|
eth1DepositIndex, err := mathutil.Int(finalizedState.Eth1DepositIndex())
|
|
if err != nil {
|
|
log.WithError(err).Error("Could not cast eth1 deposit index")
|
|
return
|
|
}
|
|
// The deposit index in the state is always the index of the next deposit
|
|
// to be included(rather than the last one to be processed). This was most likely
|
|
// done as the state cannot represent signed integers.
|
|
finalizedEth1DepIdx := eth1DepositIndex - 1
|
|
if err = s.cfg.DepositCache.InsertFinalizedDeposits(ctx, int64(finalizedEth1DepIdx), common.Hash(finalizedState.Eth1Data().BlockHash),
|
|
0 /* Setting a zero value as we have no access to block height */); err != nil {
|
|
log.WithError(err).Error("Could not insert finalized deposits")
|
|
return
|
|
}
|
|
// Deposit proofs are only used during state transition and can be safely removed to save space.
|
|
if err = s.cfg.DepositCache.PruneProofs(ctx, int64(finalizedEth1DepIdx)); err != nil {
|
|
log.WithError(err).Error("Could not prune deposit proofs")
|
|
}
|
|
// Prune deposits which have already been finalized, the below method prunes all pending deposits (non-inclusive) up
|
|
// to the provided eth1 deposit index.
|
|
s.cfg.DepositCache.PrunePendingDeposits(ctx, int64(eth1DepositIndex)) // lint:ignore uintcast -- Deposit index should not exceed int64 in your lifetime.
|
|
|
|
log.WithField("duration", time.Since(startTime).String()).Debugf("Finalized deposit insertion completed at index %d", finalizedEth1DepIdx)
|
|
}
|
|
|
|
// pruneAllPendingDepositsAndProofs prunes all proofs and pending deposits in the cache.
|
|
func (s *Service) pruneAllPendingDepositsAndProofs(ctx context.Context) {
|
|
s.cfg.DepositCache.PruneAllPendingDeposits(ctx)
|
|
s.cfg.DepositCache.PruneAllProofs(ctx)
|
|
}
|
|
|
|
// This ensures that the input root defaults to using genesis root instead of zero hashes. This is needed for handling
|
|
// fork choice justification routine.
|
|
func (s *Service) ensureRootNotZeros(root [32]byte) [32]byte {
|
|
if root == params.BeaconConfig().ZeroHash {
|
|
return s.originBlockRoot
|
|
}
|
|
return root
|
|
}
|