mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-05-02 03:02:54 -04:00
<!-- Thanks for sending a PR! Before submitting: 1. If this is your first PR, check out our contribution guide here https://docs.prylabs.network/docs/contribute/contribution-guidelines You will then need to sign our Contributor License Agreement (CLA), which will show up as a comment from a bot in this pull request after you open it. We cannot review code without a signed CLA. 2. Please file an associated tracking issue if this pull request is non-trivial and requires context for our team to understand. All features and most bug fixes should have an associated issue with a design discussed and decided upon. Small bug fixes and documentation improvements don't need issues. 3. New features and bug fixes must have tests. Documentation may need to be updated. If you're unsure what to update, send the PR, and we'll discuss in review. 4. Note that PRs updating dependencies and new Go versions are not accepted. Please file an issue instead. 5. A changelog entry is required for user facing issues. --> **What type of PR is this?** Feature **What does this PR do? Why is it needed?** Prior to this PR, beacon nodes could not resume sync from an hdiff database. This PR introduces cache re-population to enable successful restarts of the beacon node when using hdiff. **Which issues(s) does this PR fix?** **Other notes for review** I am syncing from mainnet genesis with this feature. So far, it's synced nearly 6M slots and I have restarted it multiple times. I've also been doing some manual testing of state fetch via the beacon API. This testing includes random sampling of states, performing HTR, and verifying this root against the appropriate block header. **Acknowledgements** - [x] I have read [CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md). - [x] I have included a uniquely named [changelog fragment file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd). - [x] I have added a description with sufficient context for reviewers to understand this PR. - [x] I have tested that my changes work as expected and I added a testing plan to the PR description (if applicable). --------- Co-authored-by: Bastin <43618253+Inspector-Butters@users.noreply.github.com>
232 lines
6.1 KiB
Go
232 lines
6.1 KiB
Go
package kv
|
|
|
|
import (
|
|
"context"
|
|
"encoding/binary"
|
|
"errors"
|
|
"sync"
|
|
|
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
|
|
"github.com/OffchainLabs/prysm/v7/cmd/beacon-chain/flags"
|
|
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
|
|
pkgerrors "github.com/pkg/errors"
|
|
"go.etcd.io/bbolt"
|
|
)
|
|
|
|
type stateDiffCache struct {
|
|
sync.RWMutex
|
|
anchors []state.ReadOnlyBeaconState
|
|
levelsWithData []bool
|
|
offset uint64
|
|
}
|
|
|
|
func populateStateDiffCacheFromDB(s *Store, offset uint64) (*stateDiffCache, error) {
|
|
cache := &stateDiffCache{
|
|
anchors: make([]state.ReadOnlyBeaconState, len(flags.Get().StateDiffExponents)-1),
|
|
levelsWithData: make([]bool, len(flags.Get().StateDiffExponents)),
|
|
offset: offset,
|
|
}
|
|
|
|
if err := s.db.View(func(tx *bbolt.Tx) error {
|
|
bucket := tx.Bucket(stateDiffBucket)
|
|
if bucket == nil {
|
|
return bbolt.ErrBucketNotFound
|
|
}
|
|
for level := range cache.levelsWithData {
|
|
if level == 0 {
|
|
if bucket.Get(makeKeyForStateDiffTree(0, offset)) != nil {
|
|
cache.levelsWithData[level] = true
|
|
}
|
|
continue
|
|
}
|
|
cursor := bucket.Cursor()
|
|
prefix := []byte{byte(level)}
|
|
key, _ := cursor.Seek(prefix)
|
|
if key != nil && key[0] == byte(level) {
|
|
slot, ok := slotFromStateDiffKey(key)
|
|
if !ok {
|
|
return ErrStateDiffCorrupted
|
|
}
|
|
if slot < offset {
|
|
return ErrStateDiffCorrupted
|
|
}
|
|
if computeLevel(offset, primitives.Slot(slot)) != level {
|
|
return ErrStateDiffCorrupted
|
|
}
|
|
if !hasCompleteDiffAtLevelSlot(bucket, level, slot) {
|
|
return ErrStateDiffCorrupted
|
|
}
|
|
cache.levelsWithData[level] = true
|
|
}
|
|
}
|
|
return nil
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
anchor0, err := s.getFullSnapshot(offset)
|
|
if err != nil {
|
|
if errors.Is(err, errSnapshotNotFound) {
|
|
return nil, pkgerrors.Wrapf(ErrStateDiffMissingSnapshot, "offset snapshot at slot %d", offset)
|
|
}
|
|
return nil, pkgerrors.Wrapf(ErrStateDiffCorrupted, "failed to load offset snapshot at slot %d: %v", offset, err)
|
|
}
|
|
// Only cache anchor if there are higher levels that need it.
|
|
// With a single exponent, len(anchors)==0 and no caching is needed.
|
|
if len(cache.anchors) > 0 {
|
|
cache.anchors[0] = anchor0
|
|
}
|
|
cache.levelsWithData[0] = true
|
|
|
|
return cache, nil
|
|
}
|
|
|
|
func validateStateDiffCache(ctx context.Context, s *Store, cache *stateDiffCache) error {
|
|
// Copy level flags under lock, then release before validation work.
|
|
// stateByDiff may consult cache metadata and should never be called while holding cache locks.
|
|
cache.RLock()
|
|
levels := make([]bool, len(cache.levelsWithData))
|
|
copy(levels, cache.levelsWithData)
|
|
cache.RUnlock()
|
|
|
|
for level, hasData := range levels {
|
|
if !hasData || level == 0 {
|
|
continue
|
|
}
|
|
maxSlot, err := latestSlotForLevel(s, level)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if _, err := s.stateByDiff(ctx, primitives.Slot(maxSlot)); err != nil {
|
|
return pkgerrors.Wrapf(ErrStateDiffCorrupted, "state diff validation failed for level %d slot %d: %v", level, maxSlot, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func latestSlotForLevel(s *Store, level int) (uint64, error) {
|
|
var maxSlot uint64
|
|
found := false
|
|
err := s.db.View(func(tx *bbolt.Tx) error {
|
|
bucket := tx.Bucket(stateDiffBucket)
|
|
if bucket == nil {
|
|
return bbolt.ErrBucketNotFound
|
|
}
|
|
cursor := bucket.Cursor()
|
|
prefix := []byte{byte(level)}
|
|
for key, _ := cursor.Seek(prefix); key != nil && key[0] == byte(level); key, _ = cursor.Next() {
|
|
slot, ok := slotFromStateDiffKey(key)
|
|
if !ok {
|
|
return ErrStateDiffCorrupted
|
|
}
|
|
if !found || slot > maxSlot {
|
|
maxSlot = slot
|
|
found = true
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if !found {
|
|
return 0, ErrStateDiffCorrupted
|
|
}
|
|
return maxSlot, nil
|
|
}
|
|
|
|
func slotFromStateDiffKey(key []byte) (uint64, bool) {
|
|
if len(key) < 9 {
|
|
return 0, false
|
|
}
|
|
return binary.LittleEndian.Uint64(key[1:9]), true
|
|
}
|
|
|
|
func hasCompleteDiffAtLevelSlot(bucket *bbolt.Bucket, level int, slot uint64) bool {
|
|
key := makeKeyForStateDiffTree(level, slot)
|
|
stateKey := append(append([]byte{}, key...), stateSuffix...)
|
|
validatorKey := append(append([]byte{}, key...), validatorSuffix...)
|
|
balancesKey := append(append([]byte{}, key...), balancesSuffix...)
|
|
return bucket.Get(stateKey) != nil && bucket.Get(validatorKey) != nil && bucket.Get(balancesKey) != nil
|
|
}
|
|
|
|
func newStateDiffCache(s *Store) (*stateDiffCache, error) {
|
|
var offset uint64
|
|
|
|
err := s.db.View(func(tx *bbolt.Tx) error {
|
|
bucket := tx.Bucket(stateDiffBucket)
|
|
if bucket == nil {
|
|
return bbolt.ErrBucketNotFound
|
|
}
|
|
|
|
offsetBytes := bucket.Get(offsetKey)
|
|
if offsetBytes == nil {
|
|
return errors.New("state diff cache: offset not found")
|
|
}
|
|
offset = binary.LittleEndian.Uint64(offsetBytes)
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &stateDiffCache{
|
|
anchors: make([]state.ReadOnlyBeaconState, len(flags.Get().StateDiffExponents)-1), // -1 because last level doesn't need to be cached
|
|
levelsWithData: make([]bool, len(flags.Get().StateDiffExponents)),
|
|
offset: offset,
|
|
}, nil
|
|
}
|
|
|
|
func (c *stateDiffCache) getAnchor(level int) state.ReadOnlyBeaconState {
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
return c.anchors[level]
|
|
}
|
|
|
|
func (c *stateDiffCache) setAnchor(level int, anchor state.ReadOnlyBeaconState) error {
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
if level >= len(c.anchors) || level < 0 {
|
|
return errors.New("state diff cache: anchor level out of range")
|
|
}
|
|
c.anchors[level] = anchor
|
|
return nil
|
|
}
|
|
|
|
func (c *stateDiffCache) levelHasData(level int) bool {
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
if level < 0 || level >= len(c.levelsWithData) {
|
|
return false
|
|
}
|
|
return c.levelsWithData[level]
|
|
}
|
|
|
|
func (c *stateDiffCache) setLevelHasData(level int) error {
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
if level < 0 || level >= len(c.levelsWithData) {
|
|
return errors.New("state diff cache: level data index out of range")
|
|
}
|
|
c.levelsWithData[level] = true
|
|
return nil
|
|
}
|
|
|
|
func (c *stateDiffCache) getOffset() uint64 {
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
return c.offset
|
|
}
|
|
|
|
func (c *stateDiffCache) setOffset(offset uint64) {
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
c.offset = offset
|
|
}
|
|
|
|
func (c *stateDiffCache) clearAnchors() {
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
c.anchors = make([]state.ReadOnlyBeaconState, len(flags.Get().StateDiffExponents)-1) // -1 because last level doesn't need to be cached
|
|
}
|