Files
prysm/beacon-chain/rpc/lookup/stater.go
james-prysm 4374e709cb fixing state replay caused by REST api duties attester and sync committee endpoints (#16136)
<!-- Thanks for sending a PR! Before submitting:

1. If this is your first PR, check out our contribution guide here
https://docs.prylabs.network/docs/contribute/contribution-guidelines
You will then need to sign our Contributor License Agreement (CLA),
which will show up as a comment from a bot in this pull request after
you open it. We cannot review code without a signed CLA.
2. Please file an associated tracking issue if this pull request is
non-trivial and requires context for our team to understand. All
features and most bug fixes should have
an associated issue with a design discussed and decided upon. Small bug
   fixes and documentation improvements don't need issues.
3. New features and bug fixes must have tests. Documentation may need to
be updated. If you're unsure what to update, send the PR, and we'll
discuss
   in review.
4. Note that PRs updating dependencies and new Go versions are not
accepted.
   Please file an issue instead.
5. A changelog entry is required for user facing issues.
-->

**What type of PR is this?**

 Bug fix


**What does this PR do? Why is it needed?**

s.Stater.StateBySlot may replay if it's the current epoch as it's for
values in the db, if we are in the current we should try to get head
slot and use the cache, proposer duties was doing this already but the
other 2 duties endpoints was not. this pr aligns all 3 and introduces a
new `statebyepoch` that just wraps the approach.

I tested by running this kurtosis config with and without the fix to see
that the replays stop, the blockchain progresses, and the upgraded to
fulu is not printed multiple times

```
participants:
 # Super-nodes
 - el_type: nethermind
   cl_type: prysm
   cl_image: gcr.io/offchainlabs/prysm/beacon-chain:latest
   count: 2
   supernode: true
   cl_extra_params:
     - --subscribe-all-subnets
     - --verbosity=debug
   vc_extra_params:
     - --enable-beacon-rest-api
     - --verbosity=debug

 # Full-nodes
 - el_type: nethermind
   cl_type: prysm
   cl_image: gcr.io/offchainlabs/prysm/beacon-chain:latest
   validator_count: 63
   cl_extra_params:
     - --verbosity=debug
   vc_extra_params:
     - --enable-beacon-rest-api
     - --verbosity=debug

 - el_type: nethermind
   cl_type: prysm
   cl_image: gcr.io/offchainlabs/prysm/beacon-chain:latest
   cl_extra_params:
     - --verbosity=debug
   vc_extra_params:
     - --enable-beacon-rest-api
     - --verbosity=debug
   validator_count: 13

additional_services:
 - dora
 - spamoor

spamoor_params:
 image: ethpandaops/spamoor:master
 max_mem: 4000
 spammers:
   - scenario: eoatx
     config:
       throughput: 200
   - scenario: blobs
     config:
       throughput: 20

network_params:
  fulu_fork_epoch: 2
  bpo_1_epoch: 8
  bpo_1_max_blobs: 21
  withdrawal_type: "0x02"
  preset: mainnet
  seconds_per_slot: 6

global_log_level: debug
```

**Which issues(s) does this PR fix?**

Fixes # https://github.com/OffchainLabs/prysm/issues/16135

**Other notes for review**

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
2025-12-12 23:18:22 +00:00

406 lines
13 KiB
Go

package lookup
import (
"bytes"
"context"
"fmt"
"strconv"
"strings"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/transition"
"github.com/OffchainLabs/prysm/v7/beacon-chain/db"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/stategen"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
"github.com/OffchainLabs/prysm/v7/time/slots"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors"
)
type FetchStateError struct {
message string
cause error
}
func NewFetchStateError(cause error) *FetchStateError {
return &FetchStateError{
message: "could not fetch state",
cause: cause,
}
}
func (e *FetchStateError) Error() string {
if e.cause != nil {
return e.message + ": " + e.cause.Error()
}
return e.message
}
func (e *FetchStateError) Unwrap() error { return e.cause }
// StateIdParseError represents an error scenario where a state ID could not be parsed.
type StateIdParseError struct {
message string
}
// NewStateIdParseError creates a new error instance.
func NewStateIdParseError(reason error) StateIdParseError {
return StateIdParseError{
message: errors.Wrapf(reason, "could not parse state ID").Error(),
}
}
// Error returns the underlying error message.
func (e *StateIdParseError) Error() string {
return e.message
}
// StateNotFoundError represents an error scenario where a state could not be found.
type StateNotFoundError struct {
message string
}
// NewStateNotFoundError creates a new error instance.
func NewStateNotFoundError(stateRootsSize int, stateRoot []byte) StateNotFoundError {
return StateNotFoundError{
message: fmt.Sprintf("state not found in the last %d state roots, looking for state root: %#x", stateRootsSize, stateRoot),
}
}
// Error returns the underlying error message.
func (e *StateNotFoundError) Error() string {
return e.message
}
// StateRootNotFoundError represents an error scenario where a state root could not be found.
type StateRootNotFoundError struct {
message string
}
// NewStateRootNotFoundError creates a new error instance.
func NewStateRootNotFoundError(stateRootsSize int) StateRootNotFoundError {
return StateRootNotFoundError{
message: fmt.Sprintf("state root not found in the last %d state roots", stateRootsSize),
}
}
// Error returns the underlying error message.
func (e *StateRootNotFoundError) Error() string {
return e.message
}
// Stater is responsible for retrieving states.
type Stater interface {
State(ctx context.Context, id []byte) (state.BeaconState, error)
StateRoot(ctx context.Context, id []byte) ([]byte, error)
StateBySlot(ctx context.Context, slot primitives.Slot) (state.BeaconState, error)
StateByEpoch(ctx context.Context, epoch primitives.Epoch) (state.BeaconState, error)
}
// BeaconDbStater is an implementation of Stater. It retrieves states from the beacon chain database.
type BeaconDbStater struct {
BeaconDB db.ReadOnlyDatabase
ChainInfoFetcher blockchain.ChainInfoFetcher
GenesisTimeFetcher blockchain.TimeFetcher
StateGenService stategen.StateManager
ReplayerBuilder stategen.ReplayerBuilder
}
// State returns the BeaconState for a given identifier. The identifier can be one of:
// - "head" (canonical head in node's view)
// - "genesis"
// - "finalized"
// - "justified"
// - <slot>
// - <hex encoded state root with '0x' prefix>
// - <state root>
func (p *BeaconDbStater) State(ctx context.Context, stateId []byte) (state.BeaconState, error) {
var (
s state.BeaconState
err error
)
stateIdString := strings.ToLower(string(stateId))
switch stateIdString {
case "head":
s, err = p.ChainInfoFetcher.HeadState(ctx)
if err != nil {
return nil, errors.Wrap(err, "could not get head state")
}
case "genesis":
s, err = p.StateBySlot(ctx, params.BeaconConfig().GenesisSlot)
if err != nil {
return nil, errors.Wrap(err, "could not get genesis state")
}
case "finalized":
checkpoint := p.ChainInfoFetcher.FinalizedCheckpt()
targetSlot, err := slots.EpochStart(checkpoint.Epoch)
if err != nil {
return nil, errors.Wrap(err, "could not get start slot")
}
// We use the stategen replayer to fetch the finalized state and then
// replay it to the start slot of our checkpoint's epoch. The replayer
// only ever accesses our canonical history, so the state retrieved will
// always be the finalized state at that epoch.
s, err = p.ReplayerBuilder.ReplayerForSlot(targetSlot).ReplayToSlot(ctx, targetSlot)
if err != nil {
return nil, errors.Wrap(err, "could not get finalized state")
}
case "justified":
checkpoint := p.ChainInfoFetcher.CurrentJustifiedCheckpt()
targetSlot, err := slots.EpochStart(checkpoint.Epoch)
if err != nil {
return nil, errors.Wrap(err, "could not get start slot")
}
// We use the stategen replayer to fetch the justified state and then
// replay it to the start slot of our checkpoint's epoch. The replayer
// only ever accesses our canonical history, so the state retrieved will
// always be the justified state at that epoch.
s, err = p.ReplayerBuilder.ReplayerForSlot(targetSlot).ReplayToSlot(ctx, targetSlot)
if err != nil {
return nil, errors.Wrap(err, "could not get justified state")
}
default:
if bytesutil.IsHex(stateId) {
decoded, parseErr := hexutil.Decode(string(stateId))
if parseErr != nil {
e := NewStateIdParseError(parseErr)
return nil, &e
}
s, err = p.stateByRoot(ctx, decoded)
} else if len(stateId) == 32 {
s, err = p.stateByRoot(ctx, stateId)
} else {
slotNumber, parseErr := strconv.ParseUint(stateIdString, 10, 64)
if parseErr != nil {
// ID format does not match any valid options.
e := NewStateIdParseError(parseErr)
return nil, &e
}
s, err = p.StateBySlot(ctx, primitives.Slot(slotNumber))
}
}
return s, err
}
// StateRoot returns a beacon state root for a given identifier. The identifier can be one of:
// - "head" (canonical head in node's view)
// - "genesis"
// - "finalized"
// - "justified"
// - <slot>
// - <hex encoded state root with '0x' prefix>
func (p *BeaconDbStater) StateRoot(ctx context.Context, stateId []byte) (root []byte, err error) {
stateIdString := strings.ToLower(string(stateId))
switch stateIdString {
case "head":
root, err = p.headStateRoot(ctx)
case "genesis":
root, err = p.genesisStateRoot(ctx)
case "finalized":
root, err = p.finalizedStateRoot(ctx)
case "justified":
root, err = p.justifiedStateRoot(ctx)
default:
if bytesutil.IsHex(stateId) {
var decoded []byte
decoded, err = hexutil.Decode(string(stateId))
if err != nil {
e := NewStateIdParseError(err)
return nil, &e
}
root, err = p.stateRootByRoot(ctx, decoded)
} else if len(stateId) == 32 {
root, err = p.stateRootByRoot(ctx, stateId)
} else {
slotNumber, parseErr := strconv.ParseUint(stateIdString, 10, 64)
if parseErr != nil {
e := NewStateIdParseError(parseErr)
// ID format does not match any valid options.
return nil, &e
}
root, err = p.stateRootBySlot(ctx, primitives.Slot(slotNumber))
}
}
return root, err
}
func (p *BeaconDbStater) stateByRoot(ctx context.Context, stateRoot []byte) (state.BeaconState, error) {
headState, err := p.ChainInfoFetcher.HeadStateReadOnly(ctx)
if err != nil {
return nil, errors.Wrap(err, "could not get head state")
}
for i, root := range headState.StateRoots() {
if bytes.Equal(root, stateRoot) {
blockRoot := headState.BlockRoots()[i]
return p.StateGenService.StateByRoot(ctx, bytesutil.ToBytes32(blockRoot))
}
}
stateNotFoundErr := NewStateNotFoundError(len(headState.StateRoots()), stateRoot)
return nil, &stateNotFoundErr
}
// StateBySlot returns the post-state for the requested slot. To generate the state, it uses the
// most recent canonical state prior to the target slot, and all canonical blocks
// between the found state's slot and the target slot.
// process_blocks is applied for all canonical blocks, and process_slots is called for any skipped
// slots, or slots following the most recent canonical block up to and including the target slot.
func (p *BeaconDbStater) StateBySlot(ctx context.Context, target primitives.Slot) (state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "statefetcher.StateBySlot")
defer span.End()
if target > p.GenesisTimeFetcher.CurrentSlot() {
return nil, errors.New("requested slot is in the future")
}
st, err := p.ReplayerBuilder.ReplayerForSlot(target).ReplayBlocks(ctx)
if err != nil {
msg := fmt.Sprintf("error while replaying history to slot=%d", target)
return nil, errors.Wrap(err, msg)
}
return st, nil
}
// StateByEpoch returns the state for the start of the requested epoch.
// For current or next epoch, it uses the head state and next slot cache for efficiency.
// For past epochs, it replays blocks from the most recent canonical state.
func (p *BeaconDbStater) StateByEpoch(ctx context.Context, epoch primitives.Epoch) (state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "statefetcher.StateByEpoch")
defer span.End()
targetSlot, err := slots.EpochStart(epoch)
if err != nil {
return nil, errors.Wrap(err, "could not get epoch start slot")
}
currentSlot := p.GenesisTimeFetcher.CurrentSlot()
currentEpoch := slots.ToEpoch(currentSlot)
// For past epochs, use the replay mechanism
if epoch < currentEpoch {
return p.StateBySlot(ctx, targetSlot)
}
// For current or next epoch, use head state + next slot cache (much faster)
headState, err := p.ChainInfoFetcher.HeadState(ctx)
if err != nil {
return nil, errors.Wrap(err, "could not get head state")
}
// If head state is already at or past the target slot, return it
if headState.Slot() >= targetSlot {
return headState, nil
}
// Process slots using the next slot cache
headRoot := p.ChainInfoFetcher.CachedHeadRoot()
st, err := transition.ProcessSlotsUsingNextSlotCache(ctx, headState, headRoot[:], targetSlot)
if err != nil {
return nil, errors.Wrapf(err, "could not process slots up to %d", targetSlot)
}
return st, nil
}
func (p *BeaconDbStater) headStateRoot(ctx context.Context) ([]byte, error) {
b, err := p.ChainInfoFetcher.HeadBlock(ctx)
if err != nil {
return nil, errors.Wrap(err, "could not get head block")
}
if err = blocks.BeaconBlockIsNil(b); err != nil {
return nil, err
}
stateRoot := b.Block().StateRoot()
return stateRoot[:], nil
}
func (p *BeaconDbStater) genesisStateRoot(ctx context.Context) ([]byte, error) {
b, err := p.BeaconDB.GenesisBlock(ctx)
if err != nil {
return nil, errors.Wrap(err, "could not get genesis block")
}
if err := blocks.BeaconBlockIsNil(b); err != nil {
return nil, err
}
stateRoot := b.Block().StateRoot()
return stateRoot[:], nil
}
func (p *BeaconDbStater) finalizedStateRoot(ctx context.Context) ([]byte, error) {
cp, err := p.BeaconDB.FinalizedCheckpoint(ctx)
if err != nil {
return nil, errors.Wrap(err, "could not get finalized checkpoint")
}
b, err := p.BeaconDB.Block(ctx, bytesutil.ToBytes32(cp.Root))
if err != nil {
return nil, errors.Wrap(err, "could not get finalized block")
}
if err := blocks.BeaconBlockIsNil(b); err != nil {
return nil, err
}
stateRoot := b.Block().StateRoot()
return stateRoot[:], nil
}
func (p *BeaconDbStater) justifiedStateRoot(ctx context.Context) ([]byte, error) {
cp, err := p.BeaconDB.JustifiedCheckpoint(ctx)
if err != nil {
return nil, errors.Wrap(err, "could not get justified checkpoint")
}
b, err := p.BeaconDB.Block(ctx, bytesutil.ToBytes32(cp.Root))
if err != nil {
return nil, errors.Wrap(err, "could not get justified block")
}
if err := blocks.BeaconBlockIsNil(b); err != nil {
return nil, err
}
stateRoot := b.Block().StateRoot()
return stateRoot[:], nil
}
func (p *BeaconDbStater) stateRootByRoot(ctx context.Context, stateRoot []byte) ([]byte, error) {
var r [32]byte
copy(r[:], stateRoot)
headState, err := p.ChainInfoFetcher.HeadStateReadOnly(ctx)
if err != nil {
return nil, errors.Wrap(err, "could not get head state")
}
for _, root := range headState.StateRoots() {
if bytes.Equal(root, r[:]) {
return r[:], nil
}
}
rootNotFoundErr := NewStateRootNotFoundError(len(headState.StateRoots()))
return nil, &rootNotFoundErr
}
func (p *BeaconDbStater) stateRootBySlot(ctx context.Context, slot primitives.Slot) ([]byte, error) {
currentSlot := p.GenesisTimeFetcher.CurrentSlot()
if slot > currentSlot {
return nil, errors.New("slot cannot be in the future")
}
blks, err := p.BeaconDB.BlocksBySlot(ctx, slot)
if err != nil {
return nil, errors.Wrap(err, "could not get blocks")
}
if len(blks) == 0 {
return nil, errors.New("no block exists")
}
if len(blks) != 1 {
return nil, errors.New("multiple blocks exist in same slot")
}
if blks[0] == nil || blks[0].Block() == nil {
return nil, errors.New("nil block")
}
stateRoot := blks[0].Block().StateRoot()
return stateRoot[:], nil
}