Files
prysm/beacon-chain/db/kv/state_diff_cache.go
Preston Van Loon bfbca75862 Hdiff restart support (#16389)
<!-- Thanks for sending a PR! Before submitting:

1. If this is your first PR, check out our contribution guide here
https://docs.prylabs.network/docs/contribute/contribution-guidelines
You will then need to sign our Contributor License Agreement (CLA),
which will show up as a comment from a bot in this pull request after
you open it. We cannot review code without a signed CLA.
2. Please file an associated tracking issue if this pull request is
non-trivial and requires context for our team to understand. All
features and most bug fixes should have
an associated issue with a design discussed and decided upon. Small bug
   fixes and documentation improvements don't need issues.
3. New features and bug fixes must have tests. Documentation may need to
be updated. If you're unsure what to update, send the PR, and we'll
discuss
   in review.
4. Note that PRs updating dependencies and new Go versions are not
accepted.
   Please file an issue instead.
5. A changelog entry is required for user facing issues.
-->

**What type of PR is this?**

Feature

**What does this PR do? Why is it needed?**

Prior to this PR, beacon nodes could not resume sync from an hdiff
database. This PR introduces cache re-population to enable successful
restarts of the beacon node when using hdiff.

**Which issues(s) does this PR fix?**

**Other notes for review**

I am syncing from mainnet genesis with this feature. So far, it's synced
nearly 6M slots and I have restarted it multiple times. I've also been
doing some manual testing of state fetch via the beacon API. This
testing includes random sampling of states, performing HTR, and
verifying this root against the appropriate block header.

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).

---------

Co-authored-by: Bastin <43618253+Inspector-Butters@users.noreply.github.com>
2026-03-24 16:12:58 +00:00

232 lines
6.1 KiB
Go

package kv
import (
"context"
"encoding/binary"
"errors"
"sync"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
"github.com/OffchainLabs/prysm/v7/cmd/beacon-chain/flags"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
pkgerrors "github.com/pkg/errors"
"go.etcd.io/bbolt"
)
type stateDiffCache struct {
sync.RWMutex
anchors []state.ReadOnlyBeaconState
levelsWithData []bool
offset uint64
}
func populateStateDiffCacheFromDB(s *Store, offset uint64) (*stateDiffCache, error) {
cache := &stateDiffCache{
anchors: make([]state.ReadOnlyBeaconState, len(flags.Get().StateDiffExponents)-1),
levelsWithData: make([]bool, len(flags.Get().StateDiffExponents)),
offset: offset,
}
if err := s.db.View(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(stateDiffBucket)
if bucket == nil {
return bbolt.ErrBucketNotFound
}
for level := range cache.levelsWithData {
if level == 0 {
if bucket.Get(makeKeyForStateDiffTree(0, offset)) != nil {
cache.levelsWithData[level] = true
}
continue
}
cursor := bucket.Cursor()
prefix := []byte{byte(level)}
key, _ := cursor.Seek(prefix)
if key != nil && key[0] == byte(level) {
slot, ok := slotFromStateDiffKey(key)
if !ok {
return ErrStateDiffCorrupted
}
if slot < offset {
return ErrStateDiffCorrupted
}
if computeLevel(offset, primitives.Slot(slot)) != level {
return ErrStateDiffCorrupted
}
if !hasCompleteDiffAtLevelSlot(bucket, level, slot) {
return ErrStateDiffCorrupted
}
cache.levelsWithData[level] = true
}
}
return nil
}); err != nil {
return nil, err
}
anchor0, err := s.getFullSnapshot(offset)
if err != nil {
if errors.Is(err, errSnapshotNotFound) {
return nil, pkgerrors.Wrapf(ErrStateDiffMissingSnapshot, "offset snapshot at slot %d", offset)
}
return nil, pkgerrors.Wrapf(ErrStateDiffCorrupted, "failed to load offset snapshot at slot %d: %v", offset, err)
}
// Only cache anchor if there are higher levels that need it.
// With a single exponent, len(anchors)==0 and no caching is needed.
if len(cache.anchors) > 0 {
cache.anchors[0] = anchor0
}
cache.levelsWithData[0] = true
return cache, nil
}
func validateStateDiffCache(ctx context.Context, s *Store, cache *stateDiffCache) error {
// Copy level flags under lock, then release before validation work.
// stateByDiff may consult cache metadata and should never be called while holding cache locks.
cache.RLock()
levels := make([]bool, len(cache.levelsWithData))
copy(levels, cache.levelsWithData)
cache.RUnlock()
for level, hasData := range levels {
if !hasData || level == 0 {
continue
}
maxSlot, err := latestSlotForLevel(s, level)
if err != nil {
return err
}
if _, err := s.stateByDiff(ctx, primitives.Slot(maxSlot)); err != nil {
return pkgerrors.Wrapf(ErrStateDiffCorrupted, "state diff validation failed for level %d slot %d: %v", level, maxSlot, err)
}
}
return nil
}
func latestSlotForLevel(s *Store, level int) (uint64, error) {
var maxSlot uint64
found := false
err := s.db.View(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(stateDiffBucket)
if bucket == nil {
return bbolt.ErrBucketNotFound
}
cursor := bucket.Cursor()
prefix := []byte{byte(level)}
for key, _ := cursor.Seek(prefix); key != nil && key[0] == byte(level); key, _ = cursor.Next() {
slot, ok := slotFromStateDiffKey(key)
if !ok {
return ErrStateDiffCorrupted
}
if !found || slot > maxSlot {
maxSlot = slot
found = true
}
}
return nil
})
if err != nil {
return 0, err
}
if !found {
return 0, ErrStateDiffCorrupted
}
return maxSlot, nil
}
func slotFromStateDiffKey(key []byte) (uint64, bool) {
if len(key) < 9 {
return 0, false
}
return binary.LittleEndian.Uint64(key[1:9]), true
}
func hasCompleteDiffAtLevelSlot(bucket *bbolt.Bucket, level int, slot uint64) bool {
key := makeKeyForStateDiffTree(level, slot)
stateKey := append(append([]byte{}, key...), stateSuffix...)
validatorKey := append(append([]byte{}, key...), validatorSuffix...)
balancesKey := append(append([]byte{}, key...), balancesSuffix...)
return bucket.Get(stateKey) != nil && bucket.Get(validatorKey) != nil && bucket.Get(balancesKey) != nil
}
func newStateDiffCache(s *Store) (*stateDiffCache, error) {
var offset uint64
err := s.db.View(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(stateDiffBucket)
if bucket == nil {
return bbolt.ErrBucketNotFound
}
offsetBytes := bucket.Get(offsetKey)
if offsetBytes == nil {
return errors.New("state diff cache: offset not found")
}
offset = binary.LittleEndian.Uint64(offsetBytes)
return nil
})
if err != nil {
return nil, err
}
return &stateDiffCache{
anchors: make([]state.ReadOnlyBeaconState, len(flags.Get().StateDiffExponents)-1), // -1 because last level doesn't need to be cached
levelsWithData: make([]bool, len(flags.Get().StateDiffExponents)),
offset: offset,
}, nil
}
func (c *stateDiffCache) getAnchor(level int) state.ReadOnlyBeaconState {
c.RLock()
defer c.RUnlock()
return c.anchors[level]
}
func (c *stateDiffCache) setAnchor(level int, anchor state.ReadOnlyBeaconState) error {
c.Lock()
defer c.Unlock()
if level >= len(c.anchors) || level < 0 {
return errors.New("state diff cache: anchor level out of range")
}
c.anchors[level] = anchor
return nil
}
func (c *stateDiffCache) levelHasData(level int) bool {
c.RLock()
defer c.RUnlock()
if level < 0 || level >= len(c.levelsWithData) {
return false
}
return c.levelsWithData[level]
}
func (c *stateDiffCache) setLevelHasData(level int) error {
c.Lock()
defer c.Unlock()
if level < 0 || level >= len(c.levelsWithData) {
return errors.New("state diff cache: level data index out of range")
}
c.levelsWithData[level] = true
return nil
}
func (c *stateDiffCache) getOffset() uint64 {
c.RLock()
defer c.RUnlock()
return c.offset
}
func (c *stateDiffCache) setOffset(offset uint64) {
c.Lock()
defer c.Unlock()
c.offset = offset
}
func (c *stateDiffCache) clearAnchors() {
c.Lock()
defer c.Unlock()
c.anchors = make([]state.ReadOnlyBeaconState, len(flags.Get().StateDiffExponents)-1) // -1 because last level doesn't need to be cached
}