mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 23:48:06 -05:00
Move consensus and execution validation outside of onBlock (#12589)
* Move consensus and execution validation outside of onBlock * reviews * fix unit test * revert version change * fix tests --------- Co-authored-by: terencechain <terence@prysmaticlabs.com>
This commit is contained in:
@@ -22,7 +22,6 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/crypto/bls"
|
||||
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/v4/monitoring/tracing"
|
||||
ethpbv1 "github.com/prysmaticlabs/prysm/v4/proto/eth/v1"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1/attestation"
|
||||
"github.com/prysmaticlabs/prysm/v4/runtime/version"
|
||||
@@ -40,59 +39,11 @@ const depositDeadline = 20 * time.Second
|
||||
// This defines size of the upper bound for initial sync block cache.
|
||||
var initialSyncBlockCacheSize = uint64(2 * params.BeaconConfig().SlotsPerEpoch)
|
||||
|
||||
// onBlock is called when a gossip block is received. It runs regular state transition on the block.
|
||||
// The block's signing root should be computed before calling this method to avoid redundant
|
||||
// computation in this method and methods it calls into.
|
||||
//
|
||||
// Spec pseudocode definition:
|
||||
//
|
||||
// def on_block(store: Store, signed_block: ReadOnlySignedBeaconBlock) -> None:
|
||||
// block = signed_block.message
|
||||
// # Parent block must be known
|
||||
// assert block.parent_root in store.block_states
|
||||
// # Make a copy of the state to avoid mutability issues
|
||||
// pre_state = copy(store.block_states[block.parent_root])
|
||||
// # Blocks cannot be in the future. If they are, their consideration must be delayed until the are in the past.
|
||||
// assert get_current_slot(store) >= block.slot
|
||||
//
|
||||
// # Check that block is later than the finalized epoch slot (optimization to reduce calls to get_ancestor)
|
||||
// finalized_slot = compute_start_slot_at_epoch(store.finalized_checkpoint.epoch)
|
||||
// assert block.slot > finalized_slot
|
||||
// # Check block is a descendant of the finalized block at the checkpoint finalized slot
|
||||
// assert get_ancestor(store, block.parent_root, finalized_slot) == store.finalized_checkpoint.root
|
||||
//
|
||||
// # Check the block is valid and compute the post-state
|
||||
// state = pre_state.copy()
|
||||
// state_transition(state, signed_block, True)
|
||||
// # Add new block to the store
|
||||
// store.blocks[hash_tree_root(block)] = block
|
||||
// # Add new state for this block to the store
|
||||
// store.block_states[hash_tree_root(block)] = state
|
||||
//
|
||||
// # Update justified checkpoint
|
||||
// if state.current_justified_checkpoint.epoch > store.justified_checkpoint.epoch:
|
||||
// if state.current_justified_checkpoint.epoch > store.best_justified_checkpoint.epoch:
|
||||
// store.best_justified_checkpoint = state.current_justified_checkpoint
|
||||
// if should_update_justified_checkpoint(store, state.current_justified_checkpoint):
|
||||
// store.justified_checkpoint = state.current_justified_checkpoint
|
||||
//
|
||||
// # Update finalized checkpoint
|
||||
// if state.finalized_checkpoint.epoch > store.finalized_checkpoint.epoch:
|
||||
// store.finalized_checkpoint = state.finalized_checkpoint
|
||||
//
|
||||
// # Potentially update justified if different from store
|
||||
// if store.justified_checkpoint != state.current_justified_checkpoint:
|
||||
// # Update justified if new justified is later than store justified
|
||||
// if state.current_justified_checkpoint.epoch > store.justified_checkpoint.epoch:
|
||||
// store.justified_checkpoint = state.current_justified_checkpoint
|
||||
// return
|
||||
//
|
||||
// # Update justified if store justified is not in chain with finalized checkpoint
|
||||
// finalized_slot = compute_start_slot_at_epoch(store.finalized_checkpoint.epoch)
|
||||
// ancestor_at_finalized_slot = get_ancestor(store, store.justified_checkpoint.root, finalized_slot)
|
||||
// if ancestor_at_finalized_slot != store.finalized_checkpoint.root:
|
||||
// store.justified_checkpoint = state.current_justified_checkpoint
|
||||
func (s *Service) onBlock(ctx context.Context, signed interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) error {
|
||||
// postBlockProcess is called when a gossip block is received. This function performs
|
||||
// several duties most importantly informing the engine if head was updated,
|
||||
// saving the new head information to the blockchain package and database and
|
||||
// handling attestations, slashings and similar included in the block.
|
||||
func (s *Service) postBlockProcess(ctx context.Context, signed interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, postState state.BeaconState, isValidPayload bool) error {
|
||||
ctx, span := trace.StartSpan(ctx, "blockChain.onBlock")
|
||||
defer span.End()
|
||||
if err := consensusblocks.BeaconBlockIsNil(signed); err != nil {
|
||||
@@ -101,54 +52,6 @@ func (s *Service) onBlock(ctx context.Context, signed interfaces.ReadOnlySignedB
|
||||
startTime := time.Now()
|
||||
b := signed.Block()
|
||||
|
||||
preState, err := s.getBlockPreState(ctx, b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Verify that the parent block is in forkchoice
|
||||
parentRoot := b.ParentRoot()
|
||||
if !s.cfg.ForkChoiceStore.HasNode(parentRoot) {
|
||||
return ErrNotDescendantOfFinalized
|
||||
}
|
||||
|
||||
// Save current justified and finalized epochs for future use.
|
||||
currStoreJustifiedEpoch := s.cfg.ForkChoiceStore.JustifiedCheckpoint().Epoch
|
||||
currStoreFinalizedEpoch := s.cfg.ForkChoiceStore.FinalizedCheckpoint().Epoch
|
||||
preStateFinalizedEpoch := preState.FinalizedCheckpoint().Epoch
|
||||
preStateJustifiedEpoch := preState.CurrentJustifiedCheckpoint().Epoch
|
||||
|
||||
preStateVersion, preStateHeader, err := getStateVersionAndPayload(preState)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
stateTransitionStartTime := time.Now()
|
||||
postState, err := transition.ExecuteStateTransition(ctx, preState, signed)
|
||||
if err != nil {
|
||||
return invalidBlock{error: err}
|
||||
}
|
||||
stateTransitionProcessingTime.Observe(float64(time.Since(stateTransitionStartTime).Milliseconds()))
|
||||
|
||||
postStateVersion, postStateHeader, err := getStateVersionAndPayload(postState)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
isValidPayload, err := s.notifyNewPayload(ctx, postStateVersion, postStateHeader, signed)
|
||||
if err != nil {
|
||||
if IsInvalidBlock(err) && InvalidBlockLVH(err) != [32]byte{} {
|
||||
return s.reportInvalidBlock(ctx, blockRoot, parentRoot, InvalidBlockLVH(err))
|
||||
}
|
||||
return errors.Wrap(err, "could not validate new payload")
|
||||
}
|
||||
if signed.Version() < version.Capella && isValidPayload {
|
||||
if err := s.validateMergeTransitionBlock(ctx, preStateVersion, preStateHeader, signed); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.savePostStateInfo(ctx, blockRoot, signed, postState); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.cfg.ForkChoiceStore.InsertNode(ctx, postState, blockRoot); err != nil {
|
||||
return errors.Wrapf(err, "could not insert block %d to fork choice store", signed.Block().Slot())
|
||||
}
|
||||
@@ -163,33 +66,6 @@ func (s *Service) onBlock(ctx context.Context, signed interfaces.ReadOnlySignedB
|
||||
}
|
||||
}
|
||||
|
||||
// If slasher is configured, forward the attestations in the block via
|
||||
// an event feed for processing.
|
||||
if features.Get().EnableSlasher {
|
||||
// Feed the indexed attestation to slasher if enabled. This action
|
||||
// is done in the background to avoid adding more load to this critical code path.
|
||||
go func() {
|
||||
// Using a different context to prevent timeouts as this operation can be expensive
|
||||
// and we want to avoid affecting the critical code path.
|
||||
ctx := context.TODO()
|
||||
for _, att := range signed.Block().Body().Attestations() {
|
||||
committee, err := helpers.BeaconCommitteeFromState(ctx, preState, att.Data.Slot, att.Data.CommitteeIndex)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not get attestation committee")
|
||||
tracing.AnnotateError(span, err)
|
||||
return
|
||||
}
|
||||
indexedAtt, err := attestation.ConvertToIndexed(ctx, att, committee)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not convert to indexed attestation")
|
||||
tracing.AnnotateError(span, err)
|
||||
return
|
||||
}
|
||||
s.cfg.SlasherAttestationsFeed.Send(indexedAtt)
|
||||
}
|
||||
}()
|
||||
}
|
||||
justified := s.cfg.ForkChoiceStore.JustifiedCheckpoint()
|
||||
start := time.Now()
|
||||
headRoot, err := s.cfg.ForkChoiceStore.Head(ctx)
|
||||
if err != nil {
|
||||
@@ -242,46 +118,6 @@ func (s *Service) onBlock(ctx context.Context, signed interfaces.ReadOnlySignedB
|
||||
},
|
||||
})
|
||||
|
||||
// Save justified check point to db.
|
||||
postStateJustifiedEpoch := postState.CurrentJustifiedCheckpoint().Epoch
|
||||
if justified.Epoch > currStoreJustifiedEpoch || (justified.Epoch == postStateJustifiedEpoch && justified.Epoch > preStateJustifiedEpoch) {
|
||||
if err := s.cfg.BeaconDB.SaveJustifiedCheckpoint(ctx, ðpb.Checkpoint{
|
||||
Epoch: justified.Epoch, Root: justified.Root[:],
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Save finalized check point to db and more.
|
||||
postStateFinalizedEpoch := postState.FinalizedCheckpoint().Epoch
|
||||
finalized := s.cfg.ForkChoiceStore.FinalizedCheckpoint()
|
||||
if finalized.Epoch > currStoreFinalizedEpoch || (finalized.Epoch == postStateFinalizedEpoch && finalized.Epoch > preStateFinalizedEpoch) {
|
||||
if err := s.updateFinalized(ctx, ðpb.Checkpoint{Epoch: finalized.Epoch, Root: finalized.Root[:]}); err != nil {
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
// Send an event regarding the new finalized checkpoint over a common event feed.
|
||||
stateRoot := signed.Block().StateRoot()
|
||||
s.cfg.StateNotifier.StateFeed().Send(&feed.Event{
|
||||
Type: statefeed.FinalizedCheckpoint,
|
||||
Data: ðpbv1.EventFinalizedCheckpoint{
|
||||
Epoch: postState.FinalizedCheckpoint().Epoch,
|
||||
Block: postState.FinalizedCheckpoint().Root,
|
||||
State: stateRoot[:],
|
||||
ExecutionOptimistic: isValidPayload,
|
||||
},
|
||||
})
|
||||
|
||||
// Use a custom deadline here, since this method runs asynchronously.
|
||||
// We ignore the parent method's context and instead create a new one
|
||||
// with a custom deadline, therefore using the background context instead.
|
||||
depCtx, cancel := context.WithTimeout(context.Background(), depositDeadline)
|
||||
defer cancel()
|
||||
if err := s.insertFinalizedDeposits(depCtx, finalized.Root); err != nil {
|
||||
log.WithError(err).Error("Could not insert finalized deposits.")
|
||||
}
|
||||
}()
|
||||
}
|
||||
defer reportAttestationInclusion(b)
|
||||
if err := s.handleEpochBoundary(ctx, postState, blockRoot[:]); err != nil {
|
||||
return err
|
||||
|
||||
Reference in New Issue
Block a user