mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 13:28:01 -05:00
When computing payload attributes post-Fulu, we do not need to process slots, nor copy the state if we need to find out if the node is proposing in the next slot. This prevents an immediate epoch processing after block 31 is processed unless we are actually proposing.
463 lines
18 KiB
Go
463 lines
18 KiB
Go
package blockchain
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"github.com/OffchainLabs/prysm/v7/async/event"
|
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/cache"
|
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/blocks"
|
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed"
|
|
statefeed "github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed/state"
|
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
|
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/time"
|
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/transition"
|
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/execution"
|
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
|
|
"github.com/OffchainLabs/prysm/v7/config/features"
|
|
"github.com/OffchainLabs/prysm/v7/config/params"
|
|
blocktypes "github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
|
|
"github.com/OffchainLabs/prysm/v7/consensus-types/interfaces"
|
|
payloadattribute "github.com/OffchainLabs/prysm/v7/consensus-types/payload-attribute"
|
|
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
|
|
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
|
|
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
|
|
enginev1 "github.com/OffchainLabs/prysm/v7/proto/engine/v1"
|
|
"github.com/OffchainLabs/prysm/v7/runtime/version"
|
|
"github.com/OffchainLabs/prysm/v7/time/slots"
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
var defaultLatestValidHash = bytesutil.PadTo([]byte{0xff}, 32)
|
|
|
|
// notifyForkchoiceUpdate signals execution engine the fork choice updates. Execution engine should:
|
|
// 1. Re-organizes the execution payload chain and corresponding state to make head_block_hash the head.
|
|
// 2. Applies finality to the execution state: it irreversibly persists the chain of all execution payloads and corresponding state, up to and including finalized_block_hash.
|
|
func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *fcuConfig) (*enginev1.PayloadIDBytes, error) {
|
|
ctx, span := trace.StartSpan(ctx, "blockChain.notifyForkchoiceUpdate")
|
|
defer span.End()
|
|
|
|
if arg.headBlock == nil || arg.headBlock.IsNil() {
|
|
log.Error("Head block is nil")
|
|
return nil, nil
|
|
}
|
|
headBlk := arg.headBlock.Block()
|
|
if headBlk == nil || headBlk.IsNil() || headBlk.Body().IsNil() {
|
|
log.Error("Head block is nil")
|
|
return nil, nil
|
|
}
|
|
// Must not call fork choice updated until the transition conditions are met on the Pow network.
|
|
isExecutionBlk, err := blocks.IsExecutionBlock(headBlk.Body())
|
|
if err != nil {
|
|
log.WithError(err).Error("Could not determine if head block is execution block")
|
|
return nil, nil
|
|
}
|
|
if !isExecutionBlk {
|
|
return nil, nil
|
|
}
|
|
headPayload, err := headBlk.Body().Execution()
|
|
if err != nil {
|
|
log.WithError(err).Error("Could not get execution payload for head block")
|
|
return nil, nil
|
|
}
|
|
finalizedHash := s.cfg.ForkChoiceStore.FinalizedPayloadBlockHash()
|
|
justifiedHash := s.cfg.ForkChoiceStore.UnrealizedJustifiedPayloadBlockHash()
|
|
fcs := &enginev1.ForkchoiceState{
|
|
HeadBlockHash: headPayload.BlockHash(),
|
|
SafeBlockHash: justifiedHash[:],
|
|
FinalizedBlockHash: finalizedHash[:],
|
|
}
|
|
if len(fcs.HeadBlockHash) != 32 || [32]byte(fcs.HeadBlockHash) == [32]byte{} {
|
|
// check if we are sending FCU at genesis
|
|
hash, err := s.hashForGenesisBlock(ctx, arg.headRoot)
|
|
if errors.Is(err, errNotGenesisRoot) {
|
|
log.Error("Sending nil head block hash to execution engine")
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "could not get head block hash")
|
|
}
|
|
fcs.HeadBlockHash = hash
|
|
}
|
|
if arg.attributes == nil {
|
|
arg.attributes = payloadattribute.EmptyWithVersion(headBlk.Version())
|
|
}
|
|
payloadID, lastValidHash, err := s.cfg.ExecutionEngineCaller.ForkchoiceUpdated(ctx, fcs, arg.attributes)
|
|
if err != nil {
|
|
switch {
|
|
case errors.Is(err, execution.ErrAcceptedSyncingPayloadStatus):
|
|
forkchoiceUpdatedOptimisticNodeCount.Inc()
|
|
log.WithFields(logrus.Fields{
|
|
"headSlot": headBlk.Slot(),
|
|
"headPayloadBlockHash": fmt.Sprintf("%#x", bytesutil.Trunc(headPayload.BlockHash())),
|
|
"finalizedPayloadBlockHash": fmt.Sprintf("%#x", bytesutil.Trunc(finalizedHash[:])),
|
|
}).Info("Called fork choice updated with optimistic block")
|
|
return payloadID, nil
|
|
case errors.Is(err, execution.ErrInvalidPayloadStatus):
|
|
forkchoiceUpdatedInvalidNodeCount.Inc()
|
|
headRoot := arg.headRoot
|
|
if len(lastValidHash) == 0 {
|
|
lastValidHash = defaultLatestValidHash
|
|
}
|
|
invalidRoots, err := s.cfg.ForkChoiceStore.SetOptimisticToInvalid(ctx, headRoot, headBlk.ParentRoot(), bytesutil.ToBytes32(lastValidHash))
|
|
if err != nil {
|
|
log.WithError(err).Error("Could not set head root to invalid")
|
|
return nil, nil
|
|
}
|
|
if err := s.removeInvalidBlockAndState(ctx, invalidRoots); err != nil {
|
|
log.WithError(err).Error("Could not remove invalid block and state")
|
|
return nil, nil
|
|
}
|
|
|
|
r, err := s.cfg.ForkChoiceStore.Head(ctx)
|
|
if err != nil {
|
|
log.WithFields(logrus.Fields{
|
|
"slot": headBlk.Slot(),
|
|
"blockRoot": fmt.Sprintf("%#x", bytesutil.Trunc(headRoot[:])),
|
|
"invalidChildrenCount": len(invalidRoots),
|
|
}).Warn("Pruned invalid blocks, could not update head root")
|
|
return nil, invalidBlock{error: ErrInvalidPayload, root: arg.headRoot, invalidAncestorRoots: invalidRoots}
|
|
}
|
|
b, err := s.getBlock(ctx, r)
|
|
if err != nil {
|
|
log.WithError(err).Error("Could not get head block")
|
|
return nil, nil
|
|
}
|
|
st, err := s.cfg.StateGen.StateByRoot(ctx, r)
|
|
if err != nil {
|
|
log.WithError(err).Error("Could not get head state")
|
|
return nil, nil
|
|
}
|
|
pid, err := s.notifyForkchoiceUpdate(ctx, &fcuConfig{
|
|
headState: st,
|
|
headRoot: r,
|
|
headBlock: b,
|
|
attributes: arg.attributes,
|
|
})
|
|
if err != nil {
|
|
return nil, err // Returning err because it's recursive here.
|
|
}
|
|
|
|
if err := s.saveHead(ctx, r, b, st); err != nil {
|
|
log.WithError(err).Error("Could not save head after pruning invalid blocks")
|
|
}
|
|
|
|
log.WithFields(logrus.Fields{
|
|
"slot": headBlk.Slot(),
|
|
"blockRoot": fmt.Sprintf("%#x", bytesutil.Trunc(headRoot[:])),
|
|
"invalidChildrenCount": len(invalidRoots),
|
|
"newHeadRoot": fmt.Sprintf("%#x", bytesutil.Trunc(r[:])),
|
|
}).Warn("Pruned invalid blocks")
|
|
return pid, invalidBlock{error: ErrInvalidPayload, root: arg.headRoot, invalidAncestorRoots: invalidRoots}
|
|
default:
|
|
log.WithError(err).Error(ErrUndefinedExecutionEngineError)
|
|
return nil, nil
|
|
}
|
|
}
|
|
forkchoiceUpdatedValidNodeCount.Inc()
|
|
if err := s.cfg.ForkChoiceStore.SetOptimisticToValid(ctx, arg.headRoot); err != nil {
|
|
log.WithError(err).Error("Could not set head root to valid")
|
|
return nil, nil
|
|
}
|
|
// If the forkchoice update call has an attribute, update the payload ID cache.
|
|
hasAttr := arg.attributes != nil && !arg.attributes.IsEmpty()
|
|
nextSlot := s.CurrentSlot() + 1
|
|
if hasAttr && payloadID != nil {
|
|
var pId [8]byte
|
|
copy(pId[:], payloadID[:])
|
|
log.WithFields(logrus.Fields{
|
|
"blockRoot": fmt.Sprintf("%#x", bytesutil.Trunc(arg.headRoot[:])),
|
|
"headSlot": headBlk.Slot(),
|
|
"nextSlot": nextSlot,
|
|
"payloadID": fmt.Sprintf("%#x", bytesutil.Trunc(payloadID[:])),
|
|
}).Info("Forkchoice updated with payload attributes for proposal")
|
|
s.cfg.PayloadIDCache.Set(nextSlot, arg.headRoot, pId)
|
|
go s.firePayloadAttributesEvent(s.cfg.StateNotifier.StateFeed(), arg.headBlock, arg.headRoot, nextSlot)
|
|
} else if hasAttr && payloadID == nil && !features.Get().PrepareAllPayloads {
|
|
log.WithFields(logrus.Fields{
|
|
"blockHash": fmt.Sprintf("%#x", headPayload.BlockHash()),
|
|
"slot": headBlk.Slot(),
|
|
"nextSlot": nextSlot,
|
|
}).Error("Received nil payload ID on VALID engine response")
|
|
}
|
|
return payloadID, nil
|
|
}
|
|
|
|
func (s *Service) firePayloadAttributesEvent(f event.SubscriberSender, block interfaces.ReadOnlySignedBeaconBlock, root [32]byte, nextSlot primitives.Slot) {
|
|
// If we're syncing a block in the past and init-sync is still running, we shouldn't fire this event.
|
|
if !s.cfg.SyncChecker.Synced() {
|
|
return
|
|
}
|
|
// the fcu args have differing amounts of completeness based on the code path,
|
|
// and there is work we only want to do if a client is actually listening to the events beacon api endpoint.
|
|
// temporary solution: just fire a blank event and fill in the details in the api handler.
|
|
f.Send(&feed.Event{
|
|
Type: statefeed.PayloadAttributes,
|
|
Data: payloadattribute.EventData{HeadBlock: block, HeadRoot: root, ProposalSlot: nextSlot},
|
|
})
|
|
}
|
|
|
|
// getPayloadHash returns the payload hash given the block root.
|
|
// if the block is before bellatrix fork epoch, it returns the zero hash.
|
|
func (s *Service) getPayloadHash(ctx context.Context, root []byte) ([32]byte, error) {
|
|
blk, err := s.getBlock(ctx, s.ensureRootNotZeros(bytesutil.ToBytes32(root)))
|
|
if err != nil {
|
|
return [32]byte{}, err
|
|
}
|
|
if blocks.IsPreBellatrixVersion(blk.Block().Version()) {
|
|
return params.BeaconConfig().ZeroHash, nil
|
|
}
|
|
payload, err := blk.Block().Body().Execution()
|
|
if err != nil {
|
|
return [32]byte{}, errors.Wrap(err, "could not get execution payload")
|
|
}
|
|
return bytesutil.ToBytes32(payload.BlockHash()), nil
|
|
}
|
|
|
|
// notifyNewPayload signals execution engine on a new payload.
|
|
// It returns true if the EL has returned VALID for the block
|
|
// stVersion should represent the version of the pre-state; header should also be from the pre-state.
|
|
func (s *Service) notifyNewPayload(ctx context.Context, stVersion int, header interfaces.ExecutionData, blk blocktypes.ROBlock) (bool, error) {
|
|
ctx, span := trace.StartSpan(ctx, "blockChain.notifyNewPayload")
|
|
defer span.End()
|
|
|
|
// Execution payload is only supported in Bellatrix and beyond. Pre
|
|
// merge blocks are never optimistic
|
|
if stVersion < version.Bellatrix {
|
|
return true, nil
|
|
}
|
|
body := blk.Block().Body()
|
|
enabled, err := blocks.IsExecutionEnabledUsingHeader(header, body)
|
|
if err != nil {
|
|
return false, errors.Wrap(invalidBlock{error: err}, "could not determine if execution is enabled")
|
|
}
|
|
if !enabled {
|
|
return true, nil
|
|
}
|
|
payload, err := body.Execution()
|
|
if err != nil {
|
|
return false, errors.Wrap(invalidBlock{error: err}, "could not get execution payload")
|
|
}
|
|
|
|
var lastValidHash []byte
|
|
var parentRoot *common.Hash
|
|
var versionedHashes []common.Hash
|
|
var requests *enginev1.ExecutionRequests
|
|
if blk.Version() >= version.Deneb {
|
|
versionedHashes, err = kzgCommitmentsToVersionedHashes(blk.Block().Body())
|
|
if err != nil {
|
|
return false, errors.Wrap(err, "could not get versioned hashes to feed the engine")
|
|
}
|
|
prh := common.Hash(blk.Block().ParentRoot())
|
|
parentRoot = &prh
|
|
}
|
|
if blk.Version() >= version.Electra {
|
|
requests, err = blk.Block().Body().ExecutionRequests()
|
|
if err != nil {
|
|
return false, errors.Wrap(err, "could not get execution requests")
|
|
}
|
|
if requests == nil {
|
|
return false, errors.New("nil execution requests")
|
|
}
|
|
}
|
|
|
|
lastValidHash, err = s.cfg.ExecutionEngineCaller.NewPayload(ctx, payload, versionedHashes, parentRoot, requests)
|
|
if err == nil {
|
|
newPayloadValidNodeCount.Inc()
|
|
return true, nil
|
|
}
|
|
logFields := logrus.Fields{
|
|
"slot": blk.Block().Slot(),
|
|
"parentRoot": fmt.Sprintf("%#x", parentRoot),
|
|
"root": fmt.Sprintf("%#x", blk.Root()),
|
|
"payloadBlockHash": fmt.Sprintf("%#x", bytesutil.Trunc(payload.BlockHash())),
|
|
}
|
|
if errors.Is(err, execution.ErrAcceptedSyncingPayloadStatus) {
|
|
newPayloadOptimisticNodeCount.Inc()
|
|
log.WithFields(logFields).Info("Called new payload with optimistic block")
|
|
return false, nil
|
|
}
|
|
if errors.Is(err, execution.ErrInvalidPayloadStatus) {
|
|
log.WithFields(logFields).WithError(err).Error("Invalid payload status")
|
|
return false, invalidBlock{
|
|
error: ErrInvalidPayload,
|
|
lastValidHash: bytesutil.ToBytes32(lastValidHash),
|
|
}
|
|
}
|
|
log.WithFields(logFields).WithError(err).Error("Unexpected execution engine error")
|
|
return false, errors.WithMessage(ErrUndefinedExecutionEngineError, err.Error())
|
|
}
|
|
|
|
// reportInvalidBlock deals with the event that an invalid block was detected by the execution layer
|
|
func (s *Service) pruneInvalidBlock(ctx context.Context, root, parentRoot, lvh [32]byte) error {
|
|
newPayloadInvalidNodeCount.Inc()
|
|
invalidRoots, err := s.cfg.ForkChoiceStore.SetOptimisticToInvalid(ctx, root, parentRoot, lvh)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := s.removeInvalidBlockAndState(ctx, invalidRoots); err != nil {
|
|
return err
|
|
}
|
|
log.WithFields(logrus.Fields{
|
|
"blockRoot": fmt.Sprintf("%#x", root),
|
|
"invalidChildrenCount": len(invalidRoots),
|
|
}).Warn("Pruned invalid blocks")
|
|
return invalidBlock{
|
|
invalidAncestorRoots: invalidRoots,
|
|
error: ErrInvalidPayload,
|
|
lastValidHash: lvh,
|
|
}
|
|
}
|
|
|
|
// getPayloadAttributes returns the payload attributes for the given state and slot.
|
|
// The attribute is required to initiate a payload build process in the context of an `engine_forkchoiceUpdated` call.
|
|
func (s *Service) getPayloadAttribute(ctx context.Context, st state.BeaconState, slot primitives.Slot, headRoot []byte) payloadattribute.Attributer {
|
|
emptyAttri := payloadattribute.EmptyWithVersion(st.Version())
|
|
|
|
// If it is an epoch boundary then process slots to get the right
|
|
// shuffling before checking if the proposer is tracked. Otherwise
|
|
// perform this check before. This is cheap as the NSC has already been updated.
|
|
var val cache.TrackedValidator
|
|
var ok bool
|
|
e := slots.ToEpoch(slot)
|
|
stateEpoch := slots.ToEpoch(st.Slot())
|
|
fuluAndNextEpoch := st.Version() >= version.Fulu && e == stateEpoch+1
|
|
if e == stateEpoch || fuluAndNextEpoch {
|
|
val, ok = s.trackedProposer(st, slot)
|
|
if !ok {
|
|
return emptyAttri
|
|
}
|
|
}
|
|
if slot > st.Slot() {
|
|
// At this point either we know we are proposing on a future slot or we need to still compute the
|
|
// right proposer index pre-Fulu, either way we need to copy the state to process it.
|
|
st = st.Copy()
|
|
var err error
|
|
st, err = transition.ProcessSlotsUsingNextSlotCache(ctx, st, headRoot, slot)
|
|
if err != nil {
|
|
log.WithError(err).Error("Could not process slots to get payload attribute")
|
|
return emptyAttri
|
|
}
|
|
}
|
|
if e > stateEpoch && !fuluAndNextEpoch {
|
|
emptyAttri := payloadattribute.EmptyWithVersion(st.Version())
|
|
val, ok = s.trackedProposer(st, slot)
|
|
if !ok {
|
|
return emptyAttri
|
|
}
|
|
}
|
|
// Get previous randao.
|
|
prevRando, err := helpers.RandaoMix(st, time.CurrentEpoch(st))
|
|
if err != nil {
|
|
log.WithError(err).Error("Could not get randao mix to get payload attribute")
|
|
return emptyAttri
|
|
}
|
|
|
|
// Get timestamp.
|
|
t, err := slots.StartTime(s.genesisTime, slot)
|
|
if err != nil {
|
|
log.WithError(err).Error("Could not get timestamp to get payload attribute")
|
|
return emptyAttri
|
|
}
|
|
|
|
v := st.Version()
|
|
|
|
if v >= version.Deneb {
|
|
withdrawals, _, err := st.ExpectedWithdrawals()
|
|
if err != nil {
|
|
log.WithError(err).Error("Could not get expected withdrawals to get payload attribute")
|
|
return emptyAttri
|
|
}
|
|
|
|
attr, err := payloadattribute.New(&enginev1.PayloadAttributesV3{
|
|
Timestamp: uint64(t.Unix()),
|
|
PrevRandao: prevRando,
|
|
SuggestedFeeRecipient: val.FeeRecipient[:],
|
|
Withdrawals: withdrawals,
|
|
ParentBeaconBlockRoot: headRoot,
|
|
})
|
|
if err != nil {
|
|
log.WithError(err).Error("Could not get payload attribute")
|
|
return emptyAttri
|
|
}
|
|
|
|
return attr
|
|
}
|
|
|
|
if v >= version.Capella {
|
|
withdrawals, _, err := st.ExpectedWithdrawals()
|
|
if err != nil {
|
|
log.WithError(err).Error("Could not get expected withdrawals to get payload attribute")
|
|
return emptyAttri
|
|
}
|
|
|
|
attr, err := payloadattribute.New(&enginev1.PayloadAttributesV2{
|
|
Timestamp: uint64(t.Unix()),
|
|
PrevRandao: prevRando,
|
|
SuggestedFeeRecipient: val.FeeRecipient[:],
|
|
Withdrawals: withdrawals,
|
|
})
|
|
if err != nil {
|
|
log.WithError(err).Error("Could not get payload attribute")
|
|
return emptyAttri
|
|
}
|
|
|
|
return attr
|
|
}
|
|
|
|
if v >= version.Bellatrix {
|
|
attr, err := payloadattribute.New(&enginev1.PayloadAttributes{
|
|
Timestamp: uint64(t.Unix()),
|
|
PrevRandao: prevRando,
|
|
SuggestedFeeRecipient: val.FeeRecipient[:],
|
|
})
|
|
if err != nil {
|
|
log.WithError(err).Error("Could not get payload attribute")
|
|
return emptyAttri
|
|
}
|
|
|
|
return attr
|
|
}
|
|
|
|
log.WithField("version", version.String(st.Version())).Error("Could not get payload attribute due to unknown state version")
|
|
return emptyAttri
|
|
}
|
|
|
|
// removeInvalidBlockAndState removes the invalid block, blob and its corresponding state from the cache and DB.
|
|
func (s *Service) removeInvalidBlockAndState(ctx context.Context, blkRoots [][32]byte) error {
|
|
for _, root := range blkRoots {
|
|
if err := s.cfg.StateGen.DeleteStateFromCaches(ctx, root); err != nil {
|
|
return err
|
|
}
|
|
// Delete block also deletes the state as well.
|
|
if err := s.cfg.BeaconDB.DeleteBlock(ctx, root); err != nil {
|
|
// TODO(10487): If a caller requests to delete a root that's justified and finalized. We should gracefully shutdown.
|
|
// This is an irreparable condition, it would me a justified or finalized block has become invalid.
|
|
return err
|
|
}
|
|
if err := s.blobStorage.Remove(root); err != nil {
|
|
// Blobs may not exist for some blocks, leading to deletion failures. Log such errors at debug level.
|
|
log.WithError(err).Debug("Could not remove blob from blob storage")
|
|
}
|
|
if err := s.dataColumnStorage.Remove(root); err != nil {
|
|
log.WithError(err).Errorf("Could not remove data columns from data column storage for root %#x", root)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func kzgCommitmentsToVersionedHashes(body interfaces.ReadOnlyBeaconBlockBody) ([]common.Hash, error) {
|
|
commitments, err := body.BlobKzgCommitments()
|
|
if err != nil {
|
|
return nil, errors.Wrap(invalidBlock{error: err}, "could not get blob kzg commitments")
|
|
}
|
|
|
|
versionedHashes := make([]common.Hash, len(commitments))
|
|
for i, commitment := range commitments {
|
|
versionedHashes[i] = primitives.ConvertKzgCommitmentToVersionedHash(commitment)
|
|
}
|
|
return versionedHashes, nil
|
|
}
|