Files
prysm/beacon-chain/state/stategen/replay.go
Bastin 92bd211e4d upgrade v6 to v7 (#15989)
* upgrade v6 to v7

* changelog

* update-go-ssz
2025-11-06 16:16:23 +00:00

167 lines
5.5 KiB
Go

package stategen
import (
"context"
"fmt"
"time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/transition"
"github.com/OffchainLabs/prysm/v7/beacon-chain/db/filters"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// ReplayBlocks replays the input blocks on the input state until the target slot is reached.
func (*State) replayBlocks(
ctx context.Context,
state state.BeaconState,
signed []interfaces.ReadOnlySignedBeaconBlock,
targetSlot primitives.Slot,
) (state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "stateGen.replayBlocks")
defer span.End()
var err error
start := time.Now()
rLog := log.WithFields(logrus.Fields{
"startSlot": state.Slot(),
"endSlot": targetSlot,
"diff": targetSlot - state.Slot(),
})
rLog.Debug("Replaying state")
// The input block list is sorted in decreasing slots order.
for _, blk := range signed {
if ctx.Err() != nil {
return nil, ctx.Err()
}
if state.Slot() >= targetSlot {
break
}
// A node shouldn't process the block if the block slot is lower than the state slot.
if state.Slot() >= blk.Block().Slot() {
continue
}
state, err = executeStateTransitionStateGen(ctx, state, blk)
if err != nil {
return nil, err
}
}
// If there are skip slots at the end.
if targetSlot > state.Slot() {
state, err = ReplayProcessSlots(ctx, state, targetSlot)
if err != nil {
return nil, err
}
}
duration := time.Since(start)
rLog.WithFields(logrus.Fields{
"duration": duration,
}).Debug("Replayed state")
replayBlocksSummary.Observe(float64(duration.Milliseconds()))
return state, nil
}
// loadBlocks loads the blocks between start slot and end slot by recursively fetching from end block root.
// The Blocks are returned in slot-descending order.
func (s *State) loadBlocks(ctx context.Context, startSlot, endSlot primitives.Slot, endBlockRoot [32]byte) ([]interfaces.ReadOnlySignedBeaconBlock, error) {
// Nothing to load for invalid range.
if startSlot > endSlot {
return nil, fmt.Errorf("start slot %d > end slot %d", startSlot, endSlot)
}
query := filters.AncestryQuery{Earliest: startSlot, Descendent: filters.SlotRoot{Slot: endSlot, Root: endBlockRoot}}
filter := filters.NewFilter().SetAncestryQuery(query)
blocks, _, err := s.beaconDB.Blocks(ctx, filter)
if err != nil {
return nil, err
}
return blocks, nil
}
// executeStateTransitionStateGen applies state transition on input historical state and block for state gen usages.
// There's no signature verification involved given state gen only works with stored block and state in DB.
// If the objects are already in stored in DB, one can omit redundant signature checks and ssz hashing calculations.
//
// WARNING: This method should not be used on an unverified new block.
func executeStateTransitionStateGen(
ctx context.Context,
state state.BeaconState,
signed interfaces.ReadOnlySignedBeaconBlock,
) (state.BeaconState, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
if err := blocks.BeaconBlockIsNil(signed); err != nil {
return nil, err
}
ctx, span := trace.StartSpan(ctx, "stategen.executeStateTransitionStateGen")
defer span.End()
var err error
// Execute per slots transition.
// Given this is for state gen, a node uses the version of process slots without skip slots cache.
state, err = ReplayProcessSlots(ctx, state, signed.Block().Slot())
if err != nil {
return nil, errors.Wrap(err, "could not process slot")
}
// Execute per block transition.
// Given this is for state gen, a node only cares about the post state without proposer
// and randao signature verifications.
state, err = transition.ProcessBlockForStateRoot(ctx, state, signed)
if err != nil {
return nil, errors.Wrap(err, "could not process block")
}
return state, nil
}
// ReplayProcessSlots to process old slots for state gen usages.
// There's no skip slot cache involved given state gen only works with already stored block and state in DB.
//
// WARNING: This method should not be used for future slot.
func ReplayProcessSlots(ctx context.Context, state state.BeaconState, slot primitives.Slot) (state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "stategen.ReplayProcessSlots")
defer span.End()
if state == nil || state.IsNil() {
return nil, errUnknownState
}
if state.Slot() > slot {
err := fmt.Errorf("expected state.slot %d <= slot %d", state.Slot(), slot)
return nil, err
}
if state.Slot() == slot {
return state, nil
}
return transition.ProcessSlotsCore(ctx, span, state, slot, nil)
}
// Given the start slot and the end slot, this returns the finalized beacon blocks in between.
// Since hot states don't have finalized blocks, this should ONLY be used for replaying cold state.
func (s *State) loadFinalizedBlocks(ctx context.Context, startSlot, endSlot primitives.Slot) ([]interfaces.ReadOnlySignedBeaconBlock, error) {
f := filters.NewFilter().SetStartSlot(startSlot).SetEndSlot(endSlot)
bs, bRoots, err := s.beaconDB.Blocks(ctx, f)
if err != nil {
return nil, err
}
if len(bs) != len(bRoots) {
return nil, errors.New("length of blocks and roots don't match")
}
fbs := make([]interfaces.ReadOnlySignedBeaconBlock, 0, len(bs))
for i := len(bs) - 1; i >= 0; i-- {
if s.beaconDB.IsFinalizedBlock(ctx, bRoots[i]) {
fbs = append(fbs, bs[i])
}
}
return fbs, nil
}