Compare commits

...

3 Commits

Author SHA1 Message Date
james-prysm
6d9f23eefd optimizations 2025-08-15 16:20:03 -05:00
james-prysm
217971a792 memory optimization 2025-08-15 14:01:20 -05:00
james-prysm
b8cabdaa5b init 2025-08-15 08:46:09 -05:00
6 changed files with 827 additions and 81 deletions

View File

@@ -3,6 +3,7 @@ load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"block_processor.go",
"chain_info.go",
"chain_info_forkchoice.go",
"currently_syncing_block.go",
@@ -22,6 +23,7 @@ go_library(
"process_attestation_helpers.go",
"process_block.go",
"process_block_helpers.go",
"processing_stages.go",
"receive_attestation.go",
"receive_blob.go",
"receive_block.go",

View File

@@ -0,0 +1,130 @@
package blockchain
import (
"context"
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/das"
"github.com/OffchainLabs/prysm/v6/beacon-chain/state"
consensusblocks "github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/crypto/bls"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/pkg/errors"
)
// ProcessingMode defines the mode of block processing
type ProcessingMode int
const (
// ModeSingle processes blocks individually with full validation
ModeSingle ProcessingMode = iota
// ModeBatch processes multiple blocks with optimizations for throughput
ModeBatch
)
// ProcessingContext contains all the context needed for block processing
type ProcessingContext struct {
Context context.Context
Mode ProcessingMode
AVS das.AvailabilityStore
BatchSize int
// Single state tracking (reused in batch mode)
CurrentState state.BeaconState
CurrentPreState state.BeaconState
// Boundary states (only epoch boundaries saved)
BoundaryStates map[[32]byte]state.BeaconState
// For single mode only (size 1)
States []state.BeaconState
PreStates []state.BeaconState
// Lightweight data - OK to keep all
Checkpoints [][]*ethpb.Checkpoint
SigSets []*bls.SignatureBatch
IsValidPayloads []bool
BlockRoots [][32]byte
// Timing info
ReceivedTime time.Time
DAWaitedTime time.Duration
// Current block processing info
CurrentBlockIndex int
}
// ProcessingStage represents a stage in the block processing pipeline
type ProcessingStage interface {
Name() string
Execute(ctx *ProcessingContext, blocks []consensusblocks.ROBlock) error
SupportsBatch() bool
}
// BlockProcessor handles both single and batch block processing using a pipeline
type BlockProcessor struct {
service *Service
stages []ProcessingStage
}
// NewBlockProcessor creates a new unified block processor with optimal performance
func NewBlockProcessor(service *Service) *BlockProcessor {
stages := []ProcessingStage{
&ValidationStage{service: service},
&StateTransitionExecutionAndDAStage{service: service}, // Combined for single-loop batch performance
&SignatureVerificationStage{service: service},
&ForkchoiceStage{service: service}, // Includes post-processing for single mode
}
return &BlockProcessor{
service: service,
stages: stages,
}
}
// Process executes the block processing pipeline
func (bp *BlockProcessor) Process(pc *ProcessingContext, blocks []consensusblocks.ROBlock) error {
// Set batch size for strategy
pc.BatchSize = len(blocks)
// Initialize based on strategy
strategy := pc.Strategy()
if strategy.IsSingle() {
// Single mode: allocate single-element arrays
pc.States = make([]state.BeaconState, 1)
pc.PreStates = make([]state.BeaconState, 1)
} else {
// Batch mode: use streaming approach with boundary states
pc.BoundaryStates = make(map[[32]byte]state.BeaconState)
}
// Lightweight arrays - always allocate full size
pc.Checkpoints = make([][]*ethpb.Checkpoint, len(blocks))
pc.SigSets = make([]*bls.SignatureBatch, len(blocks))
pc.IsValidPayloads = make([]bool, len(blocks))
pc.BlockRoots = make([][32]byte, len(blocks))
for i, block := range blocks {
pc.BlockRoots[i] = block.Root()
}
// Execute each stage
for _, stage := range bp.stages {
// For batch mode with multiple blocks, non-batch stages need individual processing
if pc.IsBatch() && !stage.SupportsBatch() && len(blocks) > 1 {
// Process individually for stages that don't support batch
for i, block := range blocks {
pc.CurrentBlockIndex = i
if err := stage.Execute(pc, []consensusblocks.ROBlock{block}); err != nil {
return errors.Wrapf(err, "stage %s failed for block %d", stage.Name(), i)
}
}
} else {
if err := stage.Execute(pc, blocks); err != nil {
return errors.Wrapf(err, "stage %s failed", stage.Name())
}
}
}
return nil
}

View File

@@ -0,0 +1,160 @@
package blockchain
import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/state"
)
// ProcessingStrategy encapsulates the differences between single and batch processing
type ProcessingStrategy interface {
// Processing mode queries
IsSingle() bool
IsBatch() bool
NeedsForkchoiceLock() bool
// State accessors
GetPreState(ctx *ProcessingContext, blockIndex int) state.BeaconState
GetPostState(ctx *ProcessingContext, blockIndex int) state.BeaconState
SetPreState(ctx *ProcessingContext, blockIndex int, state state.BeaconState)
SetPostState(ctx *ProcessingContext, blockIndex int, state state.BeaconState)
// Current state management (for streaming)
GetCurrentPreState(ctx *ProcessingContext) state.BeaconState
GetCurrentPostState(ctx *ProcessingContext) state.BeaconState
SetCurrentPreState(ctx *ProcessingContext, state state.BeaconState)
SetCurrentPostState(ctx *ProcessingContext, state state.BeaconState)
// Execution strategy
ShouldProcessInParallel() bool
ShouldStreamStates() bool
}
// SingleBlockStrategy handles single block processing
type SingleBlockStrategy struct{}
func (s *SingleBlockStrategy) IsSingle() bool { return true }
func (s *SingleBlockStrategy) IsBatch() bool { return false }
func (s *SingleBlockStrategy) NeedsForkchoiceLock() bool { return true }
func (s *SingleBlockStrategy) ShouldProcessInParallel() bool { return true }
func (s *SingleBlockStrategy) ShouldStreamStates() bool { return false }
func (s *SingleBlockStrategy) GetPreState(ctx *ProcessingContext, blockIndex int) state.BeaconState {
return ctx.PreStates[0]
}
func (s *SingleBlockStrategy) GetPostState(ctx *ProcessingContext, blockIndex int) state.BeaconState {
return ctx.States[0]
}
func (s *SingleBlockStrategy) SetPreState(ctx *ProcessingContext, blockIndex int, state state.BeaconState) {
ctx.PreStates[0] = state
}
func (s *SingleBlockStrategy) SetPostState(ctx *ProcessingContext, blockIndex int, state state.BeaconState) {
ctx.States[0] = state
}
func (s *SingleBlockStrategy) GetCurrentPreState(ctx *ProcessingContext) state.BeaconState {
return ctx.PreStates[0]
}
func (s *SingleBlockStrategy) GetCurrentPostState(ctx *ProcessingContext) state.BeaconState {
return ctx.States[0]
}
func (s *SingleBlockStrategy) SetCurrentPreState(ctx *ProcessingContext, state state.BeaconState) {
ctx.PreStates[0] = state
ctx.CurrentPreState = state
}
func (s *SingleBlockStrategy) SetCurrentPostState(ctx *ProcessingContext, state state.BeaconState) {
ctx.States[0] = state
ctx.CurrentState = state
}
// BatchBlockStrategy handles batch processing (including single block in batch mode)
type BatchBlockStrategy struct {
blockCount int
}
func NewBatchBlockStrategy(blockCount int) *BatchBlockStrategy {
return &BatchBlockStrategy{blockCount: blockCount}
}
func (b *BatchBlockStrategy) IsSingle() bool { return false }
func (b *BatchBlockStrategy) IsBatch() bool { return true }
func (b *BatchBlockStrategy) NeedsForkchoiceLock() bool { return false } // Lock held by ReceiveBlockBatch
func (b *BatchBlockStrategy) ShouldProcessInParallel() bool { return false } // Batch processing is sequential
func (b *BatchBlockStrategy) ShouldStreamStates() bool { return true }
func (b *BatchBlockStrategy) GetPreState(ctx *ProcessingContext, blockIndex int) state.BeaconState {
return ctx.CurrentPreState
}
func (b *BatchBlockStrategy) GetPostState(ctx *ProcessingContext, blockIndex int) state.BeaconState {
return ctx.CurrentState
}
func (b *BatchBlockStrategy) SetPreState(ctx *ProcessingContext, blockIndex int, state state.BeaconState) {
ctx.CurrentPreState = state
}
func (b *BatchBlockStrategy) SetPostState(ctx *ProcessingContext, blockIndex int, state state.BeaconState) {
ctx.CurrentState = state
}
func (b *BatchBlockStrategy) GetCurrentPreState(ctx *ProcessingContext) state.BeaconState {
return ctx.CurrentPreState
}
func (b *BatchBlockStrategy) GetCurrentPostState(ctx *ProcessingContext) state.BeaconState {
return ctx.CurrentState
}
func (b *BatchBlockStrategy) SetCurrentPreState(ctx *ProcessingContext, state state.BeaconState) {
ctx.CurrentPreState = state
}
func (b *BatchBlockStrategy) SetCurrentPostState(ctx *ProcessingContext, state state.BeaconState) {
ctx.CurrentState = state
}
// Strategy returns the appropriate processing strategy for the context
func (ctx *ProcessingContext) Strategy() ProcessingStrategy {
if ctx.Mode == ModeSingle {
return &SingleBlockStrategy{}
}
return NewBatchBlockStrategy(ctx.BatchSize)
}
// Convenience methods on ProcessingContext that delegate to strategy
func (ctx *ProcessingContext) IsSingle() bool {
return ctx.Strategy().IsSingle()
}
func (ctx *ProcessingContext) IsBatch() bool {
return ctx.Strategy().IsBatch()
}
func (ctx *ProcessingContext) NeedsForkchoiceLock() bool {
return ctx.Strategy().NeedsForkchoiceLock()
}
func (ctx *ProcessingContext) ShouldProcessInParallel() bool {
return ctx.Strategy().ShouldProcessInParallel()
}
func (ctx *ProcessingContext) GetPreStateForBlock(blockIndex int) state.BeaconState {
return ctx.Strategy().GetPreState(ctx, blockIndex)
}
func (ctx *ProcessingContext) GetPostStateForBlock(blockIndex int) state.BeaconState {
return ctx.Strategy().GetPostState(ctx, blockIndex)
}
func (ctx *ProcessingContext) SetPreStateForBlock(blockIndex int, state state.BeaconState) {
ctx.Strategy().SetPreState(ctx, blockIndex, state)
}
func (ctx *ProcessingContext) SetPostStateForBlock(blockIndex int, state state.BeaconState) {
ctx.Strategy().SetPostState(ctx, blockIndex, state)
}

View File

@@ -0,0 +1,513 @@
package blockchain
import (
"fmt"
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/transition"
forkchoicetypes "github.com/OffchainLabs/prysm/v6/beacon-chain/forkchoice/types"
"github.com/OffchainLabs/prysm/v6/beacon-chain/state"
"github.com/OffchainLabs/prysm/v6/config/features"
consensusblocks "github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/crypto/bls"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
// ValidationStage handles initial validation checks
type ValidationStage struct {
service *Service
}
func (s *ValidationStage) Name() string { return "validation" }
func (s *ValidationStage) SupportsBatch() bool { return true }
func (s *ValidationStage) Execute(ctx *ProcessingContext, blocks []consensusblocks.ROBlock) error {
for i, block := range blocks {
blockRoot := ctx.BlockRoots[i]
// Check if block is blacklisted
if features.BlacklistedBlock(blockRoot) {
return errBlacklistedRoot
}
// For single mode, check if already synced
if ctx.IsSingle() {
if s.service.InForkchoice(blockRoot) {
return fmt.Errorf("block already synced: %#x", blockRoot)
}
// Set block as being synced
err := s.service.blockBeingSynced.set(blockRoot)
if errors.Is(err, errBlockBeingSynced) {
return fmt.Errorf("block currently being synced: %#x", blockRoot)
}
defer s.service.blockBeingSynced.unset(blockRoot)
}
// Validate block structure
if err := consensusblocks.BeaconBlockIsNil(block); err != nil {
return invalidBlock{error: err}
}
}
return nil
}
// StateTransitionExecutionAndDAStage combines state transition, execution validation, and DA checks
// into a single stage for optimal batch performance while maintaining code reuse
type StateTransitionExecutionAndDAStage struct {
service *Service
}
func (s *StateTransitionExecutionAndDAStage) Name() string { return "state_transition_execution_and_da" }
func (s *StateTransitionExecutionAndDAStage) SupportsBatch() bool { return true }
func (s *StateTransitionExecutionAndDAStage) Execute(ctx *ProcessingContext, blocks []consensusblocks.ROBlock) error {
if ctx.IsSingle() {
return s.processSingleBlock(ctx, blocks[0])
}
return s.processBatchBlocks(ctx, blocks)
}
// ProcessMode defines how to execute block processing
type ProcessMode int
const (
ProcessParallel ProcessMode = iota // Run state transition and execution in parallel
ProcessSequential // Run state transition and execution sequentially
)
// BlockProcessResult contains the result of processing a single block
type BlockProcessResult struct {
preState state.BeaconState
postState state.BeaconState
isValidPayload bool
sigSet *bls.SignatureBatch
checkpoints []*ethpb.Checkpoint
daWaitedTime time.Duration
}
// processBlock handles the core validation logic shared between single and batch modes
func (s *StateTransitionExecutionAndDAStage) processBlock(
ctx *ProcessingContext,
block consensusblocks.ROBlock,
blockIndex int,
preState state.BeaconState,
mode ProcessMode,
) (*BlockProcessResult, error) {
blockRoot := ctx.BlockRoots[blockIndex]
result := &BlockProcessResult{
preState: preState,
}
// Save current checkpoints (reused logic)
cp := s.service.saveCurrentCheckpoints(preState)
result.checkpoints = []*ethpb.Checkpoint{
{Epoch: cp.j, Root: nil},
{Epoch: cp.f, Root: nil},
{Epoch: cp.c, Root: nil},
}
// Get state version info for execution validation (reused logic)
preStateVersion, preStateHeader, err := getStateVersionAndPayload(preState)
if err != nil {
return nil, err
}
if mode == ProcessParallel {
// Single mode: Run state transition and execution validation IN PARALLEL
eg, _ := errgroup.WithContext(ctx.Context)
var postState state.BeaconState
var isValidPayload bool
eg.Go(func() error {
var err error
postState, err = s.service.validateStateTransition(ctx.Context, preState, block)
if err != nil {
return errors.Wrap(err, "failed to validate consensus state transition function")
}
return nil
})
eg.Go(func() error {
var err error
isValidPayload, err = s.service.validateExecutionOnBlock(ctx.Context, preStateVersion, preStateHeader, block)
if err != nil {
return errors.Wrap(err, "could not notify the engine of the new payload")
}
return nil
})
if err := eg.Wait(); err != nil {
return nil, err
}
result.postState = postState
result.isValidPayload = isValidPayload
} else {
// Batch mode: Run state transition and execution validation SEQUENTIALLY
// Execute state transition without signature verification for batch optimization
var set *bls.SignatureBatch
set, result.postState, err = transition.ExecuteStateTransitionNoVerifyAnySig(ctx.Context, preState, block)
if err != nil {
return nil, invalidBlock{error: err}
}
result.sigSet = set
// Validate execution payload
postVersion, postHeader, err := getStateVersionAndPayload(result.postState)
if err != nil {
return nil, err
}
result.isValidPayload, err = s.service.notifyNewPayload(ctx.Context, postVersion, postHeader, block)
if err != nil {
return nil, s.service.handleInvalidExecutionError(ctx.Context, err, blockRoot, block.Block().ParentRoot())
}
// Validate merge transition if needed (reused logic)
if result.isValidPayload {
if err := s.service.validateMergeTransitionBlock(ctx.Context, preStateVersion, preStateHeader, block); err != nil {
return nil, err
}
}
}
return result, nil
}
// checkDataAvailability handles DA checks for a single block (reused logic)
func (s *StateTransitionExecutionAndDAStage) checkDataAvailability(
ctx *ProcessingContext,
block consensusblocks.ROBlock,
blockIndex int,
) (time.Duration, error) {
blockRoot := ctx.BlockRoots[blockIndex]
start := time.Now()
if ctx.AVS == nil {
blockCopy, err := block.Copy()
if err != nil {
return 0, err
}
return time.Since(start), s.service.isDataAvailable(ctx.Context, blockRoot, blockCopy)
}
return time.Since(start), ctx.AVS.IsDataAvailable(ctx.Context, s.service.CurrentSlot(), block)
}
func (s *StateTransitionExecutionAndDAStage) processSingleBlock(ctx *ProcessingContext, block consensusblocks.ROBlock) error {
idx := ctx.CurrentBlockIndex
// Get pre-state
preState, err := s.service.getBlockPreState(ctx.Context, block.Block())
if err != nil {
return errors.Wrap(err, "could not get block's prestate")
}
ctx.SetPreStateForBlock(0, preState)
// Process block using shared logic with parallel execution
result, err := s.processBlock(ctx, block, idx, preState, ProcessParallel)
if err != nil {
return err
}
// Store results
ctx.SetPostStateForBlock(0, result.postState)
ctx.IsValidPayloads[idx] = result.isValidPayload
ctx.Checkpoints[idx] = result.checkpoints
// Check data availability using shared logic
daWaitedTime, err := s.checkDataAvailability(ctx, block, idx)
if err != nil {
return err
}
ctx.DAWaitedTime = daWaitedTime
return nil
}
func (s *StateTransitionExecutionAndDAStage) processBatchBlocks(ctx *ProcessingContext, blocks []consensusblocks.ROBlock) error {
// Single-loop batch processing for optimal performance
if len(blocks) == 0 {
return errors.New("no blocks provided")
}
// Get initial pre-state
preState, err := s.service.cfg.StateGen.StateByRootInitialSync(ctx.Context, blocks[0].Block().ParentRoot())
if err != nil {
return err
}
if preState == nil || preState.IsNil() {
return fmt.Errorf("nil pre state for slot %d", blocks[0].Block().Slot())
}
// Fill in missing blocks for forkchoice
if err := s.service.fillInForkChoiceMissingBlocks(ctx.Context, blocks[0], preState.CurrentJustifiedCheckpoint(), preState.FinalizedCheckpoint()); err != nil {
return errors.Wrap(err, "could not fill in missing blocks to forkchoice")
}
// SINGLE LOOP: Process each block with state transition, execution validation, and DA checks
for i, block := range blocks {
// Store pre-state for other stages that need it
ctx.CurrentPreState = preState.Copy()
// Process block using shared logic with sequential execution
result, err := s.processBlock(ctx, block, i, preState, ProcessSequential)
if err != nil {
return err
}
// Check data availability using shared logic
_, err = s.checkDataAvailability(ctx, block, i)
if err != nil {
return err
}
// Store results
ctx.IsValidPayloads[i] = result.isValidPayload
ctx.SigSets[i] = result.sigSet
ctx.Checkpoints[i] = result.checkpoints
// Save boundary states at epoch transitions (like original)
if slots.IsEpochStart(result.postState.Slot()) {
ctx.BoundaryStates[block.Root()] = result.postState.Copy()
}
// Update streaming state for next iteration
ctx.CurrentState = result.postState
preState = result.postState
}
return nil
}
// SignatureVerificationStage handles signature verification
type SignatureVerificationStage struct {
service *Service
}
func (s *SignatureVerificationStage) Name() string { return "signature_verification" }
func (s *SignatureVerificationStage) SupportsBatch() bool { return true }
func (s *SignatureVerificationStage) Execute(ctx *ProcessingContext, blocks []consensusblocks.ROBlock) error {
if ctx.IsBatch() {
return s.verifyBatchSignatures(ctx)
}
return s.verifySingleSignatures(ctx, blocks[0])
}
func (s *SignatureVerificationStage) verifyBatchSignatures(ctx *ProcessingContext) error {
sigSet := bls.NewSet()
for _, set := range ctx.SigSets {
if set != nil {
sigSet.Join(set)
}
}
var verify bool
var err error
if features.Get().EnableVerboseSigVerification {
verify, err = sigSet.VerifyVerbosely()
} else {
verify, err = sigSet.Verify()
}
if err != nil {
return invalidBlock{error: err}
}
if !verify {
return errors.New("batch block signature verification failed")
}
return nil
}
func (s *SignatureVerificationStage) verifySingleSignatures(ctx *ProcessingContext, block consensusblocks.ROBlock) error {
// For single blocks, signature verification is done during state transition
// This stage is mainly for batch mode
return nil
}
// ForkchoiceStage handles forkchoice operations
type ForkchoiceStage struct {
service *Service
}
func (s *ForkchoiceStage) Name() string { return "forkchoice" }
func (s *ForkchoiceStage) SupportsBatch() bool { return true }
func (s *ForkchoiceStage) Execute(ctx *ProcessingContext, blocks []consensusblocks.ROBlock) error {
if ctx.IsBatch() {
return s.executeBatchForkchoice(ctx, blocks)
}
return s.executeSingleForkchoice(ctx, blocks[0])
}
func (s *ForkchoiceStage) executeSingleForkchoice(ctx *ProcessingContext, block consensusblocks.ROBlock) error {
idx := ctx.CurrentBlockIndex
blockRoot := ctx.BlockRoots[idx]
// Save post state info
blockCopy, err := block.Copy()
if err != nil {
return err
}
// Take forkchoice lock (single mode always needs it)
s.service.cfg.ForkChoiceStore.Lock()
defer s.service.cfg.ForkChoiceStore.Unlock()
// Get the post state
postState := ctx.GetPostStateForBlock(0)
if err := s.service.savePostStateInfo(ctx.Context, blockRoot, blockCopy, postState); err != nil {
return errors.Wrap(err, "could not save post state info")
}
// Execute post block processing (within the forkchoice lock)
args := &postBlockProcessConfig{
ctx: ctx.Context,
roblock: block,
postState: postState,
isValidPayload: ctx.IsValidPayloads[idx],
}
if err := s.service.postBlockProcess(args); err != nil {
return errors.Wrap(err, "could not process block")
}
// IMPORTANT: Single-mode post-processing MUST happen within forkchoice lock
// Update checkpoints (requires forkchoice lock)
preState := ctx.GetPreStateForBlock(0)
cp := ffgCheckpoints{
j: ctx.Checkpoints[0][0].Epoch,
f: ctx.Checkpoints[0][1].Epoch,
c: ctx.Checkpoints[0][2].Epoch,
}
if err := s.service.updateCheckpoints(ctx.Context, cp, preState, postState, blockRoot); err != nil {
return err
}
// Handle slasher if enabled
if s.service.slasherEnabled {
go s.service.sendBlockAttestationsToSlasher(blockCopy, preState)
}
// Prune operation pools (only if block is head)
if err := s.service.prunePostBlockOperationPools(ctx.Context, blockCopy, blockRoot); err != nil {
log.WithError(err).Error("Could not prune canonical objects from pool")
}
// Check save hot state DB (requires forkchoice lock)
if err := s.service.checkSaveHotStateDB(ctx.Context); err != nil {
return err
}
// Handle caches (requires forkchoice lock)
if err := s.service.handleCaches(); err != nil {
return err
}
// Report processing metrics
s.service.reportPostBlockProcessing(blockCopy, blockRoot, ctx.ReceivedTime, ctx.DAWaitedTime)
return nil
}
func (s *ForkchoiceStage) executeBatchForkchoice(ctx *ProcessingContext, blocks []consensusblocks.ROBlock) error {
// Save blocks and prepare forkchoice nodes
pendingNodes := make([]*forkchoicetypes.BlockAndCheckpoints, len(blocks))
for i, b := range blocks {
root := b.Root()
// Save block to database
if err := s.service.saveInitSyncBlock(ctx.Context, root, b); err != nil {
return err
}
// Save state summary
if err := s.service.cfg.BeaconDB.SaveStateSummary(ctx.Context, &ethpb.StateSummary{
Slot: b.Block().Slot(),
Root: root[:],
}); err != nil {
return err
}
// Prepare forkchoice node
args := &forkchoicetypes.BlockAndCheckpoints{
Block: b,
JustifiedCheckpoint: ctx.Checkpoints[i][0],
FinalizedCheckpoint: ctx.Checkpoints[i][1],
}
pendingNodes[len(blocks)-i-1] = args
// Update justified/finalized checkpoints if needed
if i > 0 && ctx.Checkpoints[i][0].Epoch > ctx.Checkpoints[i-1][0].Epoch {
if err := s.service.cfg.BeaconDB.SaveJustifiedCheckpoint(ctx.Context, ctx.Checkpoints[i][0]); err != nil {
return err
}
}
if i > 0 && ctx.Checkpoints[i][1].Epoch > ctx.Checkpoints[i-1][1].Epoch {
if err := s.service.updateFinalized(ctx.Context, ctx.Checkpoints[i][1]); err != nil {
return err
}
}
}
// Save boundary states
for r, st := range ctx.BoundaryStates {
if err := s.service.cfg.StateGen.SaveState(ctx.Context, r, st); err != nil {
return err
}
}
lastBlock := blocks[len(blocks)-1]
lastRoot := ctx.BlockRoots[len(blocks)-1]
// Save the final state
if err := s.service.cfg.StateGen.SaveState(ctx.Context, lastRoot, ctx.CurrentState); err != nil {
return err
}
// Insert all nodes but the last one to forkchoice
if len(pendingNodes) > 1 {
if err := s.service.cfg.ForkChoiceStore.InsertChain(ctx.Context, pendingNodes[:len(pendingNodes)-1]); err != nil {
return errors.Wrap(err, "could not insert batch to forkchoice")
}
}
// Insert the last block to forkchoice
if err := s.service.cfg.ForkChoiceStore.InsertNode(ctx.Context, ctx.CurrentState, lastBlock); err != nil {
return errors.Wrap(err, "could not insert last block in batch to forkchoice")
}
// Set optimistic status
if ctx.IsValidPayloads[len(blocks)-1] {
if err := s.service.cfg.ForkChoiceStore.SetOptimisticToValid(ctx.Context, lastRoot); err != nil {
return errors.Wrap(err, "could not set optimistic block to valid")
}
}
// Notify forkchoice update
fcuArgs := &fcuConfig{
headState: ctx.CurrentState,
headRoot: lastRoot,
headBlock: lastBlock,
}
if _, err := s.service.notifyForkchoiceUpdate(ctx.Context, fcuArgs); err != nil {
return err
}
return s.service.saveHeadNoDB(ctx.Context, lastBlock, lastRoot, ctx.CurrentState, !ctx.IsValidPayloads[len(blocks)-1])
}

View File

@@ -3,7 +3,6 @@ package blockchain
import (
"bytes"
"context"
"fmt"
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/electra"
@@ -73,94 +72,25 @@ type SlashingReceiver interface {
func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, avs das.AvailabilityStore) error {
ctx, span := trace.StartSpan(ctx, "blockChain.ReceiveBlock")
defer span.End()
// Return early if the block is blacklisted
if features.BlacklistedBlock(blockRoot) {
return errBlacklistedRoot
}
// Return early if the block has been synced
if s.InForkchoice(blockRoot) {
log.WithField("blockRoot", fmt.Sprintf("%#x", blockRoot)).Debug("Ignoring already synced block")
return nil
}
receivedTime := time.Now()
err := s.blockBeingSynced.set(blockRoot)
if errors.Is(err, errBlockBeingSynced) {
log.WithField("blockRoot", fmt.Sprintf("%#x", blockRoot)).Debug("Ignoring block currently being synced")
return nil
}
defer s.blockBeingSynced.unset(blockRoot)
blockCopy, err := block.Copy()
if err != nil {
return err
}
preState, err := s.getBlockPreState(ctx, blockCopy.Block())
if err != nil {
return errors.Wrap(err, "could not get block's prestate")
}
currentCheckpoints := s.saveCurrentCheckpoints(preState)
roblock, err := blocks.NewROBlockWithRoot(blockCopy, blockRoot)
if err != nil {
return err
}
postState, isValidPayload, err := s.validateExecutionAndConsensus(ctx, preState, roblock)
if err != nil {
return err
procCtx := &ProcessingContext{
Context: ctx,
Mode: ModeSingle,
AVS: avs,
ReceivedTime: time.Now(),
}
daWaitedTime, err := s.handleDA(ctx, blockCopy, blockRoot, avs)
if err != nil {
return err
}
// Defragment the state before continuing block processing.
s.defragmentState(postState)
// The rest of block processing takes a lock on forkchoice.
s.cfg.ForkChoiceStore.Lock()
defer s.cfg.ForkChoiceStore.Unlock()
if err := s.savePostStateInfo(ctx, blockRoot, blockCopy, postState); err != nil {
return errors.Wrap(err, "could not save post state info")
}
args := &postBlockProcessConfig{
ctx: ctx,
roblock: roblock,
postState: postState,
isValidPayload: isValidPayload,
}
if err := s.postBlockProcess(args); err != nil {
err := errors.Wrap(err, "could not process block")
tracing.AnnotateError(span, err)
return err
}
if err := s.updateCheckpoints(ctx, currentCheckpoints, preState, postState, blockRoot); err != nil {
return err
}
// If slasher is configured, forward the attestations in the block via an event feed for processing.
if s.slasherEnabled {
go s.sendBlockAttestationsToSlasher(blockCopy, preState)
}
// Handle post block operations such as pruning exits and bls messages if incoming block is the head
if err := s.prunePostBlockOperationPools(ctx, blockCopy, blockRoot); err != nil {
log.WithError(err).Error("Could not prune canonical objects from pool ")
}
// Have we been finalizing? Should we start saving hot states to db?
if err := s.checkSaveHotStateDB(ctx); err != nil {
return err
}
// We apply the same heuristic to some of our more important caches.
if err := s.handleCaches(); err != nil {
return err
}
s.reportPostBlockProcessing(blockCopy, blockRoot, receivedTime, daWaitedTime)
return nil
return s.blockProcessor.Process(procCtx, []blocks.ROBlock{roblock})
}
type ffgCheckpoints struct {
@@ -340,8 +270,15 @@ func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock
s.cfg.ForkChoiceStore.Lock()
defer s.cfg.ForkChoiceStore.Unlock()
procCtx := &ProcessingContext{
Context: ctx,
Mode: ModeBatch,
AVS: avs,
BatchSize: len(blocks),
}
// Apply state transition on the incoming newly received block batches, one by one.
if err := s.onBlockBatch(ctx, blocks, avs); err != nil {
if err := s.blockProcessor.Process(procCtx, blocks); err != nil {
err := errors.Wrap(err, "could not process block in batch")
tracing.AnnotateError(span, err)
return err

View File

@@ -69,6 +69,7 @@ type Service struct {
lcStore *lightClient.Store
startWaitingDataColumnSidecars chan bool // for testing purposes only
syncCommitteeHeadState *cache.SyncCommitteeHeadStateCache
blockProcessor *BlockProcessor
}
// config options for the service.
@@ -189,6 +190,9 @@ func NewService(ctx context.Context, opts ...Option) (*Service, error) {
blockBeingSynced: &currentlySyncingBlock{roots: make(map[[32]byte]struct{})},
syncCommitteeHeadState: cache.NewSyncCommitteeHeadState(),
}
// Initialize the block processor after srv is created but before options are applied
srv.blockProcessor = NewBlockProcessor(srv)
for _, opt := range opts {
if err := opt(srv); err != nil {
return nil, err