mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 13:28:01 -05:00
Compare commits
3 Commits
ba2333069a
...
poc-onbloc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6d9f23eefd | ||
|
|
217971a792 | ||
|
|
b8cabdaa5b |
@@ -3,6 +3,7 @@ load("@prysm//tools/go:def.bzl", "go_library", "go_test")
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"block_processor.go",
|
||||
"chain_info.go",
|
||||
"chain_info_forkchoice.go",
|
||||
"currently_syncing_block.go",
|
||||
@@ -22,6 +23,7 @@ go_library(
|
||||
"process_attestation_helpers.go",
|
||||
"process_block.go",
|
||||
"process_block_helpers.go",
|
||||
"processing_stages.go",
|
||||
"receive_attestation.go",
|
||||
"receive_blob.go",
|
||||
"receive_block.go",
|
||||
|
||||
130
beacon-chain/blockchain/block_processor.go
Normal file
130
beacon-chain/blockchain/block_processor.go
Normal file
@@ -0,0 +1,130 @@
|
||||
package blockchain
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/das"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/state"
|
||||
consensusblocks "github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v6/crypto/bls"
|
||||
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// ProcessingMode defines the mode of block processing
|
||||
type ProcessingMode int
|
||||
|
||||
const (
|
||||
// ModeSingle processes blocks individually with full validation
|
||||
ModeSingle ProcessingMode = iota
|
||||
// ModeBatch processes multiple blocks with optimizations for throughput
|
||||
ModeBatch
|
||||
)
|
||||
|
||||
// ProcessingContext contains all the context needed for block processing
|
||||
type ProcessingContext struct {
|
||||
Context context.Context
|
||||
Mode ProcessingMode
|
||||
AVS das.AvailabilityStore
|
||||
BatchSize int
|
||||
|
||||
// Single state tracking (reused in batch mode)
|
||||
CurrentState state.BeaconState
|
||||
CurrentPreState state.BeaconState
|
||||
|
||||
// Boundary states (only epoch boundaries saved)
|
||||
BoundaryStates map[[32]byte]state.BeaconState
|
||||
|
||||
// For single mode only (size 1)
|
||||
States []state.BeaconState
|
||||
PreStates []state.BeaconState
|
||||
|
||||
// Lightweight data - OK to keep all
|
||||
Checkpoints [][]*ethpb.Checkpoint
|
||||
SigSets []*bls.SignatureBatch
|
||||
IsValidPayloads []bool
|
||||
BlockRoots [][32]byte
|
||||
|
||||
// Timing info
|
||||
ReceivedTime time.Time
|
||||
DAWaitedTime time.Duration
|
||||
|
||||
// Current block processing info
|
||||
CurrentBlockIndex int
|
||||
}
|
||||
|
||||
// ProcessingStage represents a stage in the block processing pipeline
|
||||
type ProcessingStage interface {
|
||||
Name() string
|
||||
Execute(ctx *ProcessingContext, blocks []consensusblocks.ROBlock) error
|
||||
SupportsBatch() bool
|
||||
}
|
||||
|
||||
// BlockProcessor handles both single and batch block processing using a pipeline
|
||||
type BlockProcessor struct {
|
||||
service *Service
|
||||
stages []ProcessingStage
|
||||
}
|
||||
|
||||
// NewBlockProcessor creates a new unified block processor with optimal performance
|
||||
func NewBlockProcessor(service *Service) *BlockProcessor {
|
||||
stages := []ProcessingStage{
|
||||
&ValidationStage{service: service},
|
||||
&StateTransitionExecutionAndDAStage{service: service}, // Combined for single-loop batch performance
|
||||
&SignatureVerificationStage{service: service},
|
||||
&ForkchoiceStage{service: service}, // Includes post-processing for single mode
|
||||
}
|
||||
|
||||
return &BlockProcessor{
|
||||
service: service,
|
||||
stages: stages,
|
||||
}
|
||||
}
|
||||
|
||||
// Process executes the block processing pipeline
|
||||
func (bp *BlockProcessor) Process(pc *ProcessingContext, blocks []consensusblocks.ROBlock) error {
|
||||
// Set batch size for strategy
|
||||
pc.BatchSize = len(blocks)
|
||||
|
||||
// Initialize based on strategy
|
||||
strategy := pc.Strategy()
|
||||
if strategy.IsSingle() {
|
||||
// Single mode: allocate single-element arrays
|
||||
pc.States = make([]state.BeaconState, 1)
|
||||
pc.PreStates = make([]state.BeaconState, 1)
|
||||
} else {
|
||||
// Batch mode: use streaming approach with boundary states
|
||||
pc.BoundaryStates = make(map[[32]byte]state.BeaconState)
|
||||
}
|
||||
|
||||
// Lightweight arrays - always allocate full size
|
||||
pc.Checkpoints = make([][]*ethpb.Checkpoint, len(blocks))
|
||||
pc.SigSets = make([]*bls.SignatureBatch, len(blocks))
|
||||
pc.IsValidPayloads = make([]bool, len(blocks))
|
||||
pc.BlockRoots = make([][32]byte, len(blocks))
|
||||
|
||||
for i, block := range blocks {
|
||||
pc.BlockRoots[i] = block.Root()
|
||||
}
|
||||
|
||||
// Execute each stage
|
||||
for _, stage := range bp.stages {
|
||||
// For batch mode with multiple blocks, non-batch stages need individual processing
|
||||
if pc.IsBatch() && !stage.SupportsBatch() && len(blocks) > 1 {
|
||||
// Process individually for stages that don't support batch
|
||||
for i, block := range blocks {
|
||||
pc.CurrentBlockIndex = i
|
||||
if err := stage.Execute(pc, []consensusblocks.ROBlock{block}); err != nil {
|
||||
return errors.Wrapf(err, "stage %s failed for block %d", stage.Name(), i)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if err := stage.Execute(pc, blocks); err != nil {
|
||||
return errors.Wrapf(err, "stage %s failed", stage.Name())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
160
beacon-chain/blockchain/processing_context_accessors.go
Normal file
160
beacon-chain/blockchain/processing_context_accessors.go
Normal file
@@ -0,0 +1,160 @@
|
||||
package blockchain
|
||||
|
||||
import (
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/state"
|
||||
)
|
||||
|
||||
// ProcessingStrategy encapsulates the differences between single and batch processing
|
||||
type ProcessingStrategy interface {
|
||||
// Processing mode queries
|
||||
IsSingle() bool
|
||||
IsBatch() bool
|
||||
NeedsForkchoiceLock() bool
|
||||
|
||||
// State accessors
|
||||
GetPreState(ctx *ProcessingContext, blockIndex int) state.BeaconState
|
||||
GetPostState(ctx *ProcessingContext, blockIndex int) state.BeaconState
|
||||
SetPreState(ctx *ProcessingContext, blockIndex int, state state.BeaconState)
|
||||
SetPostState(ctx *ProcessingContext, blockIndex int, state state.BeaconState)
|
||||
|
||||
// Current state management (for streaming)
|
||||
GetCurrentPreState(ctx *ProcessingContext) state.BeaconState
|
||||
GetCurrentPostState(ctx *ProcessingContext) state.BeaconState
|
||||
SetCurrentPreState(ctx *ProcessingContext, state state.BeaconState)
|
||||
SetCurrentPostState(ctx *ProcessingContext, state state.BeaconState)
|
||||
|
||||
// Execution strategy
|
||||
ShouldProcessInParallel() bool
|
||||
ShouldStreamStates() bool
|
||||
}
|
||||
|
||||
// SingleBlockStrategy handles single block processing
|
||||
type SingleBlockStrategy struct{}
|
||||
|
||||
func (s *SingleBlockStrategy) IsSingle() bool { return true }
|
||||
func (s *SingleBlockStrategy) IsBatch() bool { return false }
|
||||
func (s *SingleBlockStrategy) NeedsForkchoiceLock() bool { return true }
|
||||
func (s *SingleBlockStrategy) ShouldProcessInParallel() bool { return true }
|
||||
func (s *SingleBlockStrategy) ShouldStreamStates() bool { return false }
|
||||
|
||||
func (s *SingleBlockStrategy) GetPreState(ctx *ProcessingContext, blockIndex int) state.BeaconState {
|
||||
return ctx.PreStates[0]
|
||||
}
|
||||
|
||||
func (s *SingleBlockStrategy) GetPostState(ctx *ProcessingContext, blockIndex int) state.BeaconState {
|
||||
return ctx.States[0]
|
||||
}
|
||||
|
||||
func (s *SingleBlockStrategy) SetPreState(ctx *ProcessingContext, blockIndex int, state state.BeaconState) {
|
||||
ctx.PreStates[0] = state
|
||||
}
|
||||
|
||||
func (s *SingleBlockStrategy) SetPostState(ctx *ProcessingContext, blockIndex int, state state.BeaconState) {
|
||||
ctx.States[0] = state
|
||||
}
|
||||
|
||||
func (s *SingleBlockStrategy) GetCurrentPreState(ctx *ProcessingContext) state.BeaconState {
|
||||
return ctx.PreStates[0]
|
||||
}
|
||||
|
||||
func (s *SingleBlockStrategy) GetCurrentPostState(ctx *ProcessingContext) state.BeaconState {
|
||||
return ctx.States[0]
|
||||
}
|
||||
|
||||
func (s *SingleBlockStrategy) SetCurrentPreState(ctx *ProcessingContext, state state.BeaconState) {
|
||||
ctx.PreStates[0] = state
|
||||
ctx.CurrentPreState = state
|
||||
}
|
||||
|
||||
func (s *SingleBlockStrategy) SetCurrentPostState(ctx *ProcessingContext, state state.BeaconState) {
|
||||
ctx.States[0] = state
|
||||
ctx.CurrentState = state
|
||||
}
|
||||
|
||||
// BatchBlockStrategy handles batch processing (including single block in batch mode)
|
||||
type BatchBlockStrategy struct {
|
||||
blockCount int
|
||||
}
|
||||
|
||||
func NewBatchBlockStrategy(blockCount int) *BatchBlockStrategy {
|
||||
return &BatchBlockStrategy{blockCount: blockCount}
|
||||
}
|
||||
|
||||
func (b *BatchBlockStrategy) IsSingle() bool { return false }
|
||||
func (b *BatchBlockStrategy) IsBatch() bool { return true }
|
||||
func (b *BatchBlockStrategy) NeedsForkchoiceLock() bool { return false } // Lock held by ReceiveBlockBatch
|
||||
func (b *BatchBlockStrategy) ShouldProcessInParallel() bool { return false } // Batch processing is sequential
|
||||
func (b *BatchBlockStrategy) ShouldStreamStates() bool { return true }
|
||||
|
||||
func (b *BatchBlockStrategy) GetPreState(ctx *ProcessingContext, blockIndex int) state.BeaconState {
|
||||
return ctx.CurrentPreState
|
||||
}
|
||||
|
||||
func (b *BatchBlockStrategy) GetPostState(ctx *ProcessingContext, blockIndex int) state.BeaconState {
|
||||
return ctx.CurrentState
|
||||
}
|
||||
|
||||
func (b *BatchBlockStrategy) SetPreState(ctx *ProcessingContext, blockIndex int, state state.BeaconState) {
|
||||
ctx.CurrentPreState = state
|
||||
}
|
||||
|
||||
func (b *BatchBlockStrategy) SetPostState(ctx *ProcessingContext, blockIndex int, state state.BeaconState) {
|
||||
ctx.CurrentState = state
|
||||
}
|
||||
|
||||
func (b *BatchBlockStrategy) GetCurrentPreState(ctx *ProcessingContext) state.BeaconState {
|
||||
return ctx.CurrentPreState
|
||||
}
|
||||
|
||||
func (b *BatchBlockStrategy) GetCurrentPostState(ctx *ProcessingContext) state.BeaconState {
|
||||
return ctx.CurrentState
|
||||
}
|
||||
|
||||
func (b *BatchBlockStrategy) SetCurrentPreState(ctx *ProcessingContext, state state.BeaconState) {
|
||||
ctx.CurrentPreState = state
|
||||
}
|
||||
|
||||
func (b *BatchBlockStrategy) SetCurrentPostState(ctx *ProcessingContext, state state.BeaconState) {
|
||||
ctx.CurrentState = state
|
||||
}
|
||||
|
||||
// Strategy returns the appropriate processing strategy for the context
|
||||
func (ctx *ProcessingContext) Strategy() ProcessingStrategy {
|
||||
if ctx.Mode == ModeSingle {
|
||||
return &SingleBlockStrategy{}
|
||||
}
|
||||
return NewBatchBlockStrategy(ctx.BatchSize)
|
||||
}
|
||||
|
||||
// Convenience methods on ProcessingContext that delegate to strategy
|
||||
func (ctx *ProcessingContext) IsSingle() bool {
|
||||
return ctx.Strategy().IsSingle()
|
||||
}
|
||||
|
||||
func (ctx *ProcessingContext) IsBatch() bool {
|
||||
return ctx.Strategy().IsBatch()
|
||||
}
|
||||
|
||||
func (ctx *ProcessingContext) NeedsForkchoiceLock() bool {
|
||||
return ctx.Strategy().NeedsForkchoiceLock()
|
||||
}
|
||||
|
||||
func (ctx *ProcessingContext) ShouldProcessInParallel() bool {
|
||||
return ctx.Strategy().ShouldProcessInParallel()
|
||||
}
|
||||
|
||||
func (ctx *ProcessingContext) GetPreStateForBlock(blockIndex int) state.BeaconState {
|
||||
return ctx.Strategy().GetPreState(ctx, blockIndex)
|
||||
}
|
||||
|
||||
func (ctx *ProcessingContext) GetPostStateForBlock(blockIndex int) state.BeaconState {
|
||||
return ctx.Strategy().GetPostState(ctx, blockIndex)
|
||||
}
|
||||
|
||||
func (ctx *ProcessingContext) SetPreStateForBlock(blockIndex int, state state.BeaconState) {
|
||||
ctx.Strategy().SetPreState(ctx, blockIndex, state)
|
||||
}
|
||||
|
||||
func (ctx *ProcessingContext) SetPostStateForBlock(blockIndex int, state state.BeaconState) {
|
||||
ctx.Strategy().SetPostState(ctx, blockIndex, state)
|
||||
}
|
||||
513
beacon-chain/blockchain/processing_stages.go
Normal file
513
beacon-chain/blockchain/processing_stages.go
Normal file
@@ -0,0 +1,513 @@
|
||||
package blockchain
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/transition"
|
||||
forkchoicetypes "github.com/OffchainLabs/prysm/v6/beacon-chain/forkchoice/types"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/state"
|
||||
"github.com/OffchainLabs/prysm/v6/config/features"
|
||||
consensusblocks "github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v6/crypto/bls"
|
||||
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
|
||||
"github.com/OffchainLabs/prysm/v6/time/slots"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// ValidationStage handles initial validation checks
|
||||
type ValidationStage struct {
|
||||
service *Service
|
||||
}
|
||||
|
||||
func (s *ValidationStage) Name() string { return "validation" }
|
||||
func (s *ValidationStage) SupportsBatch() bool { return true }
|
||||
|
||||
func (s *ValidationStage) Execute(ctx *ProcessingContext, blocks []consensusblocks.ROBlock) error {
|
||||
for i, block := range blocks {
|
||||
blockRoot := ctx.BlockRoots[i]
|
||||
|
||||
// Check if block is blacklisted
|
||||
if features.BlacklistedBlock(blockRoot) {
|
||||
return errBlacklistedRoot
|
||||
}
|
||||
|
||||
// For single mode, check if already synced
|
||||
if ctx.IsSingle() {
|
||||
if s.service.InForkchoice(blockRoot) {
|
||||
return fmt.Errorf("block already synced: %#x", blockRoot)
|
||||
}
|
||||
|
||||
// Set block as being synced
|
||||
err := s.service.blockBeingSynced.set(blockRoot)
|
||||
if errors.Is(err, errBlockBeingSynced) {
|
||||
return fmt.Errorf("block currently being synced: %#x", blockRoot)
|
||||
}
|
||||
defer s.service.blockBeingSynced.unset(blockRoot)
|
||||
}
|
||||
|
||||
// Validate block structure
|
||||
if err := consensusblocks.BeaconBlockIsNil(block); err != nil {
|
||||
return invalidBlock{error: err}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// StateTransitionExecutionAndDAStage combines state transition, execution validation, and DA checks
|
||||
// into a single stage for optimal batch performance while maintaining code reuse
|
||||
type StateTransitionExecutionAndDAStage struct {
|
||||
service *Service
|
||||
}
|
||||
|
||||
func (s *StateTransitionExecutionAndDAStage) Name() string { return "state_transition_execution_and_da" }
|
||||
func (s *StateTransitionExecutionAndDAStage) SupportsBatch() bool { return true }
|
||||
|
||||
func (s *StateTransitionExecutionAndDAStage) Execute(ctx *ProcessingContext, blocks []consensusblocks.ROBlock) error {
|
||||
if ctx.IsSingle() {
|
||||
return s.processSingleBlock(ctx, blocks[0])
|
||||
}
|
||||
return s.processBatchBlocks(ctx, blocks)
|
||||
}
|
||||
|
||||
// ProcessMode defines how to execute block processing
|
||||
type ProcessMode int
|
||||
|
||||
const (
|
||||
ProcessParallel ProcessMode = iota // Run state transition and execution in parallel
|
||||
ProcessSequential // Run state transition and execution sequentially
|
||||
)
|
||||
|
||||
// BlockProcessResult contains the result of processing a single block
|
||||
type BlockProcessResult struct {
|
||||
preState state.BeaconState
|
||||
postState state.BeaconState
|
||||
isValidPayload bool
|
||||
sigSet *bls.SignatureBatch
|
||||
checkpoints []*ethpb.Checkpoint
|
||||
daWaitedTime time.Duration
|
||||
}
|
||||
|
||||
// processBlock handles the core validation logic shared between single and batch modes
|
||||
func (s *StateTransitionExecutionAndDAStage) processBlock(
|
||||
ctx *ProcessingContext,
|
||||
block consensusblocks.ROBlock,
|
||||
blockIndex int,
|
||||
preState state.BeaconState,
|
||||
mode ProcessMode,
|
||||
) (*BlockProcessResult, error) {
|
||||
blockRoot := ctx.BlockRoots[blockIndex]
|
||||
result := &BlockProcessResult{
|
||||
preState: preState,
|
||||
}
|
||||
|
||||
// Save current checkpoints (reused logic)
|
||||
cp := s.service.saveCurrentCheckpoints(preState)
|
||||
result.checkpoints = []*ethpb.Checkpoint{
|
||||
{Epoch: cp.j, Root: nil},
|
||||
{Epoch: cp.f, Root: nil},
|
||||
{Epoch: cp.c, Root: nil},
|
||||
}
|
||||
|
||||
// Get state version info for execution validation (reused logic)
|
||||
preStateVersion, preStateHeader, err := getStateVersionAndPayload(preState)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if mode == ProcessParallel {
|
||||
// Single mode: Run state transition and execution validation IN PARALLEL
|
||||
eg, _ := errgroup.WithContext(ctx.Context)
|
||||
var postState state.BeaconState
|
||||
var isValidPayload bool
|
||||
|
||||
eg.Go(func() error {
|
||||
var err error
|
||||
postState, err = s.service.validateStateTransition(ctx.Context, preState, block)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to validate consensus state transition function")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
eg.Go(func() error {
|
||||
var err error
|
||||
isValidPayload, err = s.service.validateExecutionOnBlock(ctx.Context, preStateVersion, preStateHeader, block)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not notify the engine of the new payload")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err := eg.Wait(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result.postState = postState
|
||||
result.isValidPayload = isValidPayload
|
||||
} else {
|
||||
// Batch mode: Run state transition and execution validation SEQUENTIALLY
|
||||
// Execute state transition without signature verification for batch optimization
|
||||
var set *bls.SignatureBatch
|
||||
set, result.postState, err = transition.ExecuteStateTransitionNoVerifyAnySig(ctx.Context, preState, block)
|
||||
if err != nil {
|
||||
return nil, invalidBlock{error: err}
|
||||
}
|
||||
result.sigSet = set
|
||||
|
||||
// Validate execution payload
|
||||
postVersion, postHeader, err := getStateVersionAndPayload(result.postState)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result.isValidPayload, err = s.service.notifyNewPayload(ctx.Context, postVersion, postHeader, block)
|
||||
if err != nil {
|
||||
return nil, s.service.handleInvalidExecutionError(ctx.Context, err, blockRoot, block.Block().ParentRoot())
|
||||
}
|
||||
|
||||
// Validate merge transition if needed (reused logic)
|
||||
if result.isValidPayload {
|
||||
if err := s.service.validateMergeTransitionBlock(ctx.Context, preStateVersion, preStateHeader, block); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// checkDataAvailability handles DA checks for a single block (reused logic)
|
||||
func (s *StateTransitionExecutionAndDAStage) checkDataAvailability(
|
||||
ctx *ProcessingContext,
|
||||
block consensusblocks.ROBlock,
|
||||
blockIndex int,
|
||||
) (time.Duration, error) {
|
||||
blockRoot := ctx.BlockRoots[blockIndex]
|
||||
start := time.Now()
|
||||
|
||||
if ctx.AVS == nil {
|
||||
blockCopy, err := block.Copy()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return time.Since(start), s.service.isDataAvailable(ctx.Context, blockRoot, blockCopy)
|
||||
}
|
||||
|
||||
return time.Since(start), ctx.AVS.IsDataAvailable(ctx.Context, s.service.CurrentSlot(), block)
|
||||
}
|
||||
|
||||
func (s *StateTransitionExecutionAndDAStage) processSingleBlock(ctx *ProcessingContext, block consensusblocks.ROBlock) error {
|
||||
idx := ctx.CurrentBlockIndex
|
||||
|
||||
// Get pre-state
|
||||
preState, err := s.service.getBlockPreState(ctx.Context, block.Block())
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not get block's prestate")
|
||||
}
|
||||
ctx.SetPreStateForBlock(0, preState)
|
||||
|
||||
// Process block using shared logic with parallel execution
|
||||
result, err := s.processBlock(ctx, block, idx, preState, ProcessParallel)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Store results
|
||||
ctx.SetPostStateForBlock(0, result.postState)
|
||||
ctx.IsValidPayloads[idx] = result.isValidPayload
|
||||
ctx.Checkpoints[idx] = result.checkpoints
|
||||
|
||||
// Check data availability using shared logic
|
||||
daWaitedTime, err := s.checkDataAvailability(ctx, block, idx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ctx.DAWaitedTime = daWaitedTime
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *StateTransitionExecutionAndDAStage) processBatchBlocks(ctx *ProcessingContext, blocks []consensusblocks.ROBlock) error {
|
||||
// Single-loop batch processing for optimal performance
|
||||
|
||||
if len(blocks) == 0 {
|
||||
return errors.New("no blocks provided")
|
||||
}
|
||||
|
||||
// Get initial pre-state
|
||||
preState, err := s.service.cfg.StateGen.StateByRootInitialSync(ctx.Context, blocks[0].Block().ParentRoot())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if preState == nil || preState.IsNil() {
|
||||
return fmt.Errorf("nil pre state for slot %d", blocks[0].Block().Slot())
|
||||
}
|
||||
|
||||
// Fill in missing blocks for forkchoice
|
||||
if err := s.service.fillInForkChoiceMissingBlocks(ctx.Context, blocks[0], preState.CurrentJustifiedCheckpoint(), preState.FinalizedCheckpoint()); err != nil {
|
||||
return errors.Wrap(err, "could not fill in missing blocks to forkchoice")
|
||||
}
|
||||
|
||||
// SINGLE LOOP: Process each block with state transition, execution validation, and DA checks
|
||||
for i, block := range blocks {
|
||||
// Store pre-state for other stages that need it
|
||||
ctx.CurrentPreState = preState.Copy()
|
||||
|
||||
// Process block using shared logic with sequential execution
|
||||
result, err := s.processBlock(ctx, block, i, preState, ProcessSequential)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check data availability using shared logic
|
||||
_, err = s.checkDataAvailability(ctx, block, i)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Store results
|
||||
ctx.IsValidPayloads[i] = result.isValidPayload
|
||||
ctx.SigSets[i] = result.sigSet
|
||||
ctx.Checkpoints[i] = result.checkpoints
|
||||
|
||||
// Save boundary states at epoch transitions (like original)
|
||||
if slots.IsEpochStart(result.postState.Slot()) {
|
||||
ctx.BoundaryStates[block.Root()] = result.postState.Copy()
|
||||
}
|
||||
|
||||
// Update streaming state for next iteration
|
||||
ctx.CurrentState = result.postState
|
||||
preState = result.postState
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
// SignatureVerificationStage handles signature verification
|
||||
type SignatureVerificationStage struct {
|
||||
service *Service
|
||||
}
|
||||
|
||||
func (s *SignatureVerificationStage) Name() string { return "signature_verification" }
|
||||
func (s *SignatureVerificationStage) SupportsBatch() bool { return true }
|
||||
|
||||
func (s *SignatureVerificationStage) Execute(ctx *ProcessingContext, blocks []consensusblocks.ROBlock) error {
|
||||
if ctx.IsBatch() {
|
||||
return s.verifyBatchSignatures(ctx)
|
||||
}
|
||||
return s.verifySingleSignatures(ctx, blocks[0])
|
||||
}
|
||||
|
||||
func (s *SignatureVerificationStage) verifyBatchSignatures(ctx *ProcessingContext) error {
|
||||
sigSet := bls.NewSet()
|
||||
for _, set := range ctx.SigSets {
|
||||
if set != nil {
|
||||
sigSet.Join(set)
|
||||
}
|
||||
}
|
||||
|
||||
var verify bool
|
||||
var err error
|
||||
if features.Get().EnableVerboseSigVerification {
|
||||
verify, err = sigSet.VerifyVerbosely()
|
||||
} else {
|
||||
verify, err = sigSet.Verify()
|
||||
}
|
||||
if err != nil {
|
||||
return invalidBlock{error: err}
|
||||
}
|
||||
if !verify {
|
||||
return errors.New("batch block signature verification failed")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SignatureVerificationStage) verifySingleSignatures(ctx *ProcessingContext, block consensusblocks.ROBlock) error {
|
||||
// For single blocks, signature verification is done during state transition
|
||||
// This stage is mainly for batch mode
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
|
||||
// ForkchoiceStage handles forkchoice operations
|
||||
type ForkchoiceStage struct {
|
||||
service *Service
|
||||
}
|
||||
|
||||
func (s *ForkchoiceStage) Name() string { return "forkchoice" }
|
||||
func (s *ForkchoiceStage) SupportsBatch() bool { return true }
|
||||
|
||||
func (s *ForkchoiceStage) Execute(ctx *ProcessingContext, blocks []consensusblocks.ROBlock) error {
|
||||
if ctx.IsBatch() {
|
||||
return s.executeBatchForkchoice(ctx, blocks)
|
||||
}
|
||||
return s.executeSingleForkchoice(ctx, blocks[0])
|
||||
}
|
||||
|
||||
func (s *ForkchoiceStage) executeSingleForkchoice(ctx *ProcessingContext, block consensusblocks.ROBlock) error {
|
||||
idx := ctx.CurrentBlockIndex
|
||||
blockRoot := ctx.BlockRoots[idx]
|
||||
|
||||
// Save post state info
|
||||
blockCopy, err := block.Copy()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Take forkchoice lock (single mode always needs it)
|
||||
s.service.cfg.ForkChoiceStore.Lock()
|
||||
defer s.service.cfg.ForkChoiceStore.Unlock()
|
||||
|
||||
// Get the post state
|
||||
postState := ctx.GetPostStateForBlock(0)
|
||||
|
||||
if err := s.service.savePostStateInfo(ctx.Context, blockRoot, blockCopy, postState); err != nil {
|
||||
return errors.Wrap(err, "could not save post state info")
|
||||
}
|
||||
|
||||
// Execute post block processing (within the forkchoice lock)
|
||||
args := &postBlockProcessConfig{
|
||||
ctx: ctx.Context,
|
||||
roblock: block,
|
||||
postState: postState,
|
||||
isValidPayload: ctx.IsValidPayloads[idx],
|
||||
}
|
||||
|
||||
if err := s.service.postBlockProcess(args); err != nil {
|
||||
return errors.Wrap(err, "could not process block")
|
||||
}
|
||||
|
||||
// IMPORTANT: Single-mode post-processing MUST happen within forkchoice lock
|
||||
// Update checkpoints (requires forkchoice lock)
|
||||
preState := ctx.GetPreStateForBlock(0)
|
||||
cp := ffgCheckpoints{
|
||||
j: ctx.Checkpoints[0][0].Epoch,
|
||||
f: ctx.Checkpoints[0][1].Epoch,
|
||||
c: ctx.Checkpoints[0][2].Epoch,
|
||||
}
|
||||
|
||||
if err := s.service.updateCheckpoints(ctx.Context, cp, preState, postState, blockRoot); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Handle slasher if enabled
|
||||
if s.service.slasherEnabled {
|
||||
go s.service.sendBlockAttestationsToSlasher(blockCopy, preState)
|
||||
}
|
||||
|
||||
// Prune operation pools (only if block is head)
|
||||
if err := s.service.prunePostBlockOperationPools(ctx.Context, blockCopy, blockRoot); err != nil {
|
||||
log.WithError(err).Error("Could not prune canonical objects from pool")
|
||||
}
|
||||
|
||||
// Check save hot state DB (requires forkchoice lock)
|
||||
if err := s.service.checkSaveHotStateDB(ctx.Context); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Handle caches (requires forkchoice lock)
|
||||
if err := s.service.handleCaches(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Report processing metrics
|
||||
s.service.reportPostBlockProcessing(blockCopy, blockRoot, ctx.ReceivedTime, ctx.DAWaitedTime)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *ForkchoiceStage) executeBatchForkchoice(ctx *ProcessingContext, blocks []consensusblocks.ROBlock) error {
|
||||
// Save blocks and prepare forkchoice nodes
|
||||
pendingNodes := make([]*forkchoicetypes.BlockAndCheckpoints, len(blocks))
|
||||
|
||||
for i, b := range blocks {
|
||||
root := b.Root()
|
||||
|
||||
// Save block to database
|
||||
if err := s.service.saveInitSyncBlock(ctx.Context, root, b); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Save state summary
|
||||
if err := s.service.cfg.BeaconDB.SaveStateSummary(ctx.Context, ðpb.StateSummary{
|
||||
Slot: b.Block().Slot(),
|
||||
Root: root[:],
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Prepare forkchoice node
|
||||
args := &forkchoicetypes.BlockAndCheckpoints{
|
||||
Block: b,
|
||||
JustifiedCheckpoint: ctx.Checkpoints[i][0],
|
||||
FinalizedCheckpoint: ctx.Checkpoints[i][1],
|
||||
}
|
||||
pendingNodes[len(blocks)-i-1] = args
|
||||
|
||||
// Update justified/finalized checkpoints if needed
|
||||
if i > 0 && ctx.Checkpoints[i][0].Epoch > ctx.Checkpoints[i-1][0].Epoch {
|
||||
if err := s.service.cfg.BeaconDB.SaveJustifiedCheckpoint(ctx.Context, ctx.Checkpoints[i][0]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if i > 0 && ctx.Checkpoints[i][1].Epoch > ctx.Checkpoints[i-1][1].Epoch {
|
||||
if err := s.service.updateFinalized(ctx.Context, ctx.Checkpoints[i][1]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Save boundary states
|
||||
for r, st := range ctx.BoundaryStates {
|
||||
if err := s.service.cfg.StateGen.SaveState(ctx.Context, r, st); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
lastBlock := blocks[len(blocks)-1]
|
||||
lastRoot := ctx.BlockRoots[len(blocks)-1]
|
||||
|
||||
// Save the final state
|
||||
if err := s.service.cfg.StateGen.SaveState(ctx.Context, lastRoot, ctx.CurrentState); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Insert all nodes but the last one to forkchoice
|
||||
if len(pendingNodes) > 1 {
|
||||
if err := s.service.cfg.ForkChoiceStore.InsertChain(ctx.Context, pendingNodes[:len(pendingNodes)-1]); err != nil {
|
||||
return errors.Wrap(err, "could not insert batch to forkchoice")
|
||||
}
|
||||
}
|
||||
|
||||
// Insert the last block to forkchoice
|
||||
if err := s.service.cfg.ForkChoiceStore.InsertNode(ctx.Context, ctx.CurrentState, lastBlock); err != nil {
|
||||
return errors.Wrap(err, "could not insert last block in batch to forkchoice")
|
||||
}
|
||||
|
||||
// Set optimistic status
|
||||
if ctx.IsValidPayloads[len(blocks)-1] {
|
||||
if err := s.service.cfg.ForkChoiceStore.SetOptimisticToValid(ctx.Context, lastRoot); err != nil {
|
||||
return errors.Wrap(err, "could not set optimistic block to valid")
|
||||
}
|
||||
}
|
||||
|
||||
// Notify forkchoice update
|
||||
fcuArgs := &fcuConfig{
|
||||
headState: ctx.CurrentState,
|
||||
headRoot: lastRoot,
|
||||
headBlock: lastBlock,
|
||||
}
|
||||
if _, err := s.service.notifyForkchoiceUpdate(ctx.Context, fcuArgs); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.service.saveHeadNoDB(ctx.Context, lastBlock, lastRoot, ctx.CurrentState, !ctx.IsValidPayloads[len(blocks)-1])
|
||||
}
|
||||
|
||||
|
||||
@@ -3,7 +3,6 @@ package blockchain
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/electra"
|
||||
@@ -73,94 +72,25 @@ type SlashingReceiver interface {
|
||||
func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, avs das.AvailabilityStore) error {
|
||||
ctx, span := trace.StartSpan(ctx, "blockChain.ReceiveBlock")
|
||||
defer span.End()
|
||||
// Return early if the block is blacklisted
|
||||
if features.BlacklistedBlock(blockRoot) {
|
||||
return errBlacklistedRoot
|
||||
}
|
||||
// Return early if the block has been synced
|
||||
if s.InForkchoice(blockRoot) {
|
||||
log.WithField("blockRoot", fmt.Sprintf("%#x", blockRoot)).Debug("Ignoring already synced block")
|
||||
return nil
|
||||
}
|
||||
|
||||
receivedTime := time.Now()
|
||||
err := s.blockBeingSynced.set(blockRoot)
|
||||
if errors.Is(err, errBlockBeingSynced) {
|
||||
log.WithField("blockRoot", fmt.Sprintf("%#x", blockRoot)).Debug("Ignoring block currently being synced")
|
||||
return nil
|
||||
}
|
||||
defer s.blockBeingSynced.unset(blockRoot)
|
||||
|
||||
|
||||
blockCopy, err := block.Copy()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
preState, err := s.getBlockPreState(ctx, blockCopy.Block())
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not get block's prestate")
|
||||
}
|
||||
|
||||
currentCheckpoints := s.saveCurrentCheckpoints(preState)
|
||||
|
||||
roblock, err := blocks.NewROBlockWithRoot(blockCopy, blockRoot)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
postState, isValidPayload, err := s.validateExecutionAndConsensus(ctx, preState, roblock)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
procCtx := &ProcessingContext{
|
||||
Context: ctx,
|
||||
Mode: ModeSingle,
|
||||
AVS: avs,
|
||||
ReceivedTime: time.Now(),
|
||||
}
|
||||
|
||||
daWaitedTime, err := s.handleDA(ctx, blockCopy, blockRoot, avs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Defragment the state before continuing block processing.
|
||||
s.defragmentState(postState)
|
||||
|
||||
// The rest of block processing takes a lock on forkchoice.
|
||||
s.cfg.ForkChoiceStore.Lock()
|
||||
defer s.cfg.ForkChoiceStore.Unlock()
|
||||
if err := s.savePostStateInfo(ctx, blockRoot, blockCopy, postState); err != nil {
|
||||
return errors.Wrap(err, "could not save post state info")
|
||||
}
|
||||
args := &postBlockProcessConfig{
|
||||
ctx: ctx,
|
||||
roblock: roblock,
|
||||
postState: postState,
|
||||
isValidPayload: isValidPayload,
|
||||
}
|
||||
if err := s.postBlockProcess(args); err != nil {
|
||||
err := errors.Wrap(err, "could not process block")
|
||||
tracing.AnnotateError(span, err)
|
||||
return err
|
||||
}
|
||||
if err := s.updateCheckpoints(ctx, currentCheckpoints, preState, postState, blockRoot); err != nil {
|
||||
return err
|
||||
}
|
||||
// If slasher is configured, forward the attestations in the block via an event feed for processing.
|
||||
if s.slasherEnabled {
|
||||
go s.sendBlockAttestationsToSlasher(blockCopy, preState)
|
||||
}
|
||||
|
||||
// Handle post block operations such as pruning exits and bls messages if incoming block is the head
|
||||
if err := s.prunePostBlockOperationPools(ctx, blockCopy, blockRoot); err != nil {
|
||||
log.WithError(err).Error("Could not prune canonical objects from pool ")
|
||||
}
|
||||
|
||||
// Have we been finalizing? Should we start saving hot states to db?
|
||||
if err := s.checkSaveHotStateDB(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// We apply the same heuristic to some of our more important caches.
|
||||
if err := s.handleCaches(); err != nil {
|
||||
return err
|
||||
}
|
||||
s.reportPostBlockProcessing(blockCopy, blockRoot, receivedTime, daWaitedTime)
|
||||
return nil
|
||||
|
||||
return s.blockProcessor.Process(procCtx, []blocks.ROBlock{roblock})
|
||||
}
|
||||
|
||||
type ffgCheckpoints struct {
|
||||
@@ -340,8 +270,15 @@ func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock
|
||||
s.cfg.ForkChoiceStore.Lock()
|
||||
defer s.cfg.ForkChoiceStore.Unlock()
|
||||
|
||||
procCtx := &ProcessingContext{
|
||||
Context: ctx,
|
||||
Mode: ModeBatch,
|
||||
AVS: avs,
|
||||
BatchSize: len(blocks),
|
||||
}
|
||||
|
||||
// Apply state transition on the incoming newly received block batches, one by one.
|
||||
if err := s.onBlockBatch(ctx, blocks, avs); err != nil {
|
||||
if err := s.blockProcessor.Process(procCtx, blocks); err != nil {
|
||||
err := errors.Wrap(err, "could not process block in batch")
|
||||
tracing.AnnotateError(span, err)
|
||||
return err
|
||||
|
||||
@@ -69,6 +69,7 @@ type Service struct {
|
||||
lcStore *lightClient.Store
|
||||
startWaitingDataColumnSidecars chan bool // for testing purposes only
|
||||
syncCommitteeHeadState *cache.SyncCommitteeHeadStateCache
|
||||
blockProcessor *BlockProcessor
|
||||
}
|
||||
|
||||
// config options for the service.
|
||||
@@ -189,6 +190,9 @@ func NewService(ctx context.Context, opts ...Option) (*Service, error) {
|
||||
blockBeingSynced: ¤tlySyncingBlock{roots: make(map[[32]byte]struct{})},
|
||||
syncCommitteeHeadState: cache.NewSyncCommitteeHeadState(),
|
||||
}
|
||||
|
||||
// Initialize the block processor after srv is created but before options are applied
|
||||
srv.blockProcessor = NewBlockProcessor(srv)
|
||||
for _, opt := range opts {
|
||||
if err := opt(srv); err != nil {
|
||||
return nil, err
|
||||
|
||||
Reference in New Issue
Block a user