Files
prysm/beacon-chain/db/kv/state.go
Bastin 26100e074d Refactor slot by blockroot (#16040)
**Review after #16033 is merged**

**What type of PR is this?**
Other

**What does this PR do? Why is it needed?**
This PR refactors the code to find the corresponding slot of a given
block root using state summary or the block itself, into its own
function `SlotByBlockRoot(ctx, blockroot)`.

Note that there exists a function `slotByBlockRoot(ctx context.Context,
tx *bolt.Tx, blockRoot []byte)` immediately below the new function. Also
note that this function has two drawbacks, which led to creation of the
new function:
- the old function requires a boltdb tx, which is not necessarily
available to the caller.
- the old function does NOT make use of the state summary cache. 

edit: 
- the old function also uses the state bucket to retrieve the state and
it's slot. this is not something we want in the state diff feature,
since there is no state bucket.
2025-11-24 19:59:13 +00:00

1056 lines
32 KiB
Go

package kv
import (
"bytes"
"context"
"fmt"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
statenative "github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native"
"github.com/OffchainLabs/prysm/v7/config/features"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
"github.com/OffchainLabs/prysm/v7/genesis"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/runtime/version"
"github.com/OffchainLabs/prysm/v7/time"
"github.com/OffchainLabs/prysm/v7/time/slots"
"github.com/golang/snappy"
"github.com/pkg/errors"
bolt "go.etcd.io/bbolt"
)
// State returns the saved state using block's signing root,
// this particular block was used to generate the state.
func (s *Store) State(ctx context.Context, blockRoot [32]byte) (state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.State")
defer span.End()
startTime := time.Now()
// If state diff is enabled, we get the state from the state-diff db.
if features.Get().EnableStateDiff {
st, err := s.getStateUsingStateDiff(ctx, blockRoot)
if err != nil {
return nil, err
}
stateReadingTime.Observe(float64(time.Since(startTime).Milliseconds()))
return st, nil
}
enc, err := s.stateBytes(ctx, blockRoot)
if err != nil {
return nil, err
}
if len(enc) == 0 {
return nil, nil
}
// get the validator entries of the state
valEntries, valErr := s.validatorEntries(ctx, blockRoot)
if valErr != nil {
return nil, valErr
}
st, err := s.unmarshalState(ctx, enc, valEntries)
if err != nil {
return nil, err
}
stateReadingTime.Observe(float64(time.Since(startTime).Milliseconds()))
return st, err
}
// StateOrError is just like State(), except it only returns a non-error response
// if the requested state is found in the database.
func (s *Store) StateOrError(ctx context.Context, blockRoot [32]byte) (state.BeaconState, error) {
st, err := s.State(ctx, blockRoot)
if err != nil {
return nil, err
}
if st == nil || st.IsNil() {
return nil, errors.Wrap(ErrNotFoundState, fmt.Sprintf("no state with blockroot=%#x", blockRoot))
}
return st, nil
}
func (s *Store) GenesisState(ctx context.Context) (state.BeaconState, error) {
st, err := genesis.State()
if errors.Is(err, genesis.ErrGenesisStateNotInitialized) {
log.WithError(err).Error("genesis state not initialized, returning nil state. this should only happen in tests")
return nil, nil
}
return st, err
}
// GenesisState returns the genesis state in beacon chain.
func (s *Store) LegacyGenesisState(ctx context.Context) (state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.LegacyGenesisState")
defer span.End()
var err error
var st state.BeaconState
err = s.db.View(func(tx *bolt.Tx) error {
// Retrieve genesis block's signing root from blocks bucket,
// to look up what the genesis state is.
bucket := tx.Bucket(blocksBucket)
genesisBlockRoot := bucket.Get(genesisBlockRootKey)
bucket = tx.Bucket(stateBucket)
enc := bucket.Get(genesisBlockRoot)
if enc == nil {
return nil
}
// get the validator entries of the genesis state
valEntries, valErr := s.validatorEntries(ctx, bytesutil.ToBytes32(genesisBlockRoot))
if valErr != nil {
return valErr
}
st, err = s.unmarshalState(ctx, enc, valEntries)
return err
})
if err != nil {
return nil, err
}
if st == nil || st.IsNil() {
return nil, nil
}
return st, nil
}
// SaveState stores a state to the db using block's signing root which was used to generate the state.
func (s *Store) SaveState(ctx context.Context, st state.ReadOnlyBeaconState, blockRoot [32]byte) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveState")
defer span.End()
ok, err := s.isStateValidatorMigrationOver()
if err != nil {
return err
}
if ok {
return s.SaveStatesEfficient(ctx, []state.ReadOnlyBeaconState{st}, [][32]byte{blockRoot})
}
return s.SaveStates(ctx, []state.ReadOnlyBeaconState{st}, [][32]byte{blockRoot})
}
// SaveStates stores multiple states to the db using the provided corresponding roots.
func (s *Store) SaveStates(ctx context.Context, states []state.ReadOnlyBeaconState, blockRoots [][32]byte) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveStates")
defer span.End()
if states == nil {
return errors.New("nil state")
}
startTime := time.Now()
multipleEncs := make([][]byte, len(states))
for i, st := range states {
stateBytes, err := marshalState(ctx, st)
if err != nil {
return err
}
multipleEncs[i] = stateBytes
}
if err := s.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(stateBucket)
for i, rt := range blockRoots {
indicesByBucket := createStateIndicesFromStateSlot(ctx, states[i].Slot())
if err := updateValueForIndices(ctx, indicesByBucket, rt[:], tx); err != nil {
return errors.Wrap(err, "could not update DB indices")
}
if err := bucket.Put(rt[:], multipleEncs[i]); err != nil {
return err
}
}
return nil
}); err != nil {
return err
}
stateSavingTime.Observe(float64(time.Since(startTime).Milliseconds()))
return nil
}
type withValidators interface {
GetValidators() []*ethpb.Validator
}
// SaveStatesEfficient stores multiple states to the db (new schema) using the provided corresponding roots.
func (s *Store) SaveStatesEfficient(ctx context.Context, states []state.ReadOnlyBeaconState, blockRoots [][32]byte) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveStatesEfficient")
defer span.End()
if states == nil {
return errors.New("nil state")
}
validatorKeys, validatorsEntries, err := getValidators(states)
if err != nil {
return err
}
if err := s.db.Update(func(tx *bolt.Tx) error {
return s.saveStatesEfficientInternal(ctx, tx, blockRoots, states, validatorKeys, validatorsEntries)
}); err != nil {
return err
}
return nil
}
func getValidators(states []state.ReadOnlyBeaconState) ([][]byte, map[string]*ethpb.Validator, error) {
validatorsEntries := make(map[string]*ethpb.Validator) // It's a map to make sure that you store only new validator entries.
validatorKeys := make([][]byte, len(states)) // For every state, this stores a compressed list of validator keys.
for i, st := range states {
pb, ok := st.ToProtoUnsafe().(withValidators)
if !ok {
return nil, nil, errors.New("could not cast state to interface with GetValidators()")
}
validators := pb.GetValidators()
// yank out the validators and store them in separate table to save space.
var hashes []byte
for _, val := range validators {
// create the unique hash for that validator entry.
hash, hashErr := val.HashTreeRoot()
if hashErr != nil {
return nil, nil, hashErr
}
hashes = append(hashes, hash[:]...)
// note down the hash and the encoded validator entry
hashStr := string(hash[:])
validatorsEntries[hashStr] = val
}
validatorKeys[i] = snappy.Encode(nil, hashes)
}
return validatorKeys, validatorsEntries, nil
}
func (s *Store) saveStatesEfficientInternal(ctx context.Context, tx *bolt.Tx, blockRoots [][32]byte, states []state.ReadOnlyBeaconState, validatorKeys [][]byte, validatorsEntries map[string]*ethpb.Validator) error {
bucket := tx.Bucket(stateBucket)
valIdxBkt := tx.Bucket(blockRootValidatorHashesBucket)
for i, rt := range blockRoots {
indicesByBucket := createStateIndicesFromStateSlot(ctx, states[i].Slot())
if err := updateValueForIndices(ctx, indicesByBucket, rt[:], tx); err != nil {
return errors.Wrap(err, "could not update DB indices")
}
// There is a gap when the states that are passed are used outside this
// thread. But while storing the state object, we should not store the
// validator entries.To bring the gap closer, we empty the validators
// just before Put() and repopulate that state with original validators.
// look at issue https://github.com/prysmaticlabs/prysm/issues/9262.
switch rawType := states[i].ToProtoUnsafe().(type) {
case *ethpb.BeaconState:
if err := s.processPhase0(ctx, rawType, rt[:], bucket, valIdxBkt, validatorKeys[i]); err != nil {
return err
}
case *ethpb.BeaconStateAltair:
if err := s.processAltair(ctx, rawType, rt[:], bucket, valIdxBkt, validatorKeys[i]); err != nil {
return err
}
case *ethpb.BeaconStateBellatrix:
if err := s.processBellatrix(ctx, rawType, rt[:], bucket, valIdxBkt, validatorKeys[i]); err != nil {
return err
}
case *ethpb.BeaconStateCapella:
if err := s.processCapella(ctx, rawType, rt[:], bucket, valIdxBkt, validatorKeys[i]); err != nil {
return err
}
case *ethpb.BeaconStateDeneb:
if err := s.processDeneb(ctx, rawType, rt[:], bucket, valIdxBkt, validatorKeys[i]); err != nil {
return err
}
case *ethpb.BeaconStateElectra:
if err := s.processElectra(ctx, rawType, rt[:], bucket, valIdxBkt, validatorKeys[i]); err != nil {
return err
}
case *ethpb.BeaconStateFulu:
if err := s.processFulu(ctx, rawType, rt[:], bucket, valIdxBkt, validatorKeys[i]); err != nil {
return err
}
default:
return errors.New("invalid state type")
}
}
return s.storeValidatorEntriesSeparately(ctx, tx, validatorsEntries)
}
func (s *Store) processPhase0(ctx context.Context, pbState *ethpb.BeaconState, rootHash []byte, bucket, valIdxBkt *bolt.Bucket, validatorKey []byte) error {
valEntries := pbState.Validators
pbState.Validators = make([]*ethpb.Validator, 0)
encodedState, err := encode(ctx, pbState)
if err != nil {
return err
}
pbState.Validators = valEntries
if err := bucket.Put(rootHash, encodedState); err != nil {
return err
}
if err := valIdxBkt.Put(rootHash, validatorKey); err != nil {
return err
}
return nil
}
func (s *Store) processAltair(ctx context.Context, pbState *ethpb.BeaconStateAltair, rootHash []byte, bucket, valIdxBkt *bolt.Bucket, validatorKey []byte) error {
valEntries := pbState.Validators
pbState.Validators = make([]*ethpb.Validator, 0)
rawObj, err := pbState.MarshalSSZ()
if err != nil {
return err
}
encodedState := snappy.Encode(nil, append(altairKey, rawObj...))
if err := bucket.Put(rootHash, encodedState); err != nil {
return err
}
pbState.Validators = valEntries
if err := valIdxBkt.Put(rootHash, validatorKey); err != nil {
return err
}
return nil
}
func (s *Store) processBellatrix(ctx context.Context, pbState *ethpb.BeaconStateBellatrix, rootHash []byte, bucket, valIdxBkt *bolt.Bucket, validatorKey []byte) error {
valEntries := pbState.Validators
pbState.Validators = make([]*ethpb.Validator, 0)
rawObj, err := pbState.MarshalSSZ()
if err != nil {
return err
}
encodedState := snappy.Encode(nil, append(bellatrixKey, rawObj...))
if err := bucket.Put(rootHash, encodedState); err != nil {
return err
}
pbState.Validators = valEntries
if err := valIdxBkt.Put(rootHash, validatorKey); err != nil {
return err
}
return nil
}
func (s *Store) processCapella(ctx context.Context, pbState *ethpb.BeaconStateCapella, rootHash []byte, bucket, valIdxBkt *bolt.Bucket, validatorKey []byte) error {
valEntries := pbState.Validators
pbState.Validators = make([]*ethpb.Validator, 0)
rawObj, err := pbState.MarshalSSZ()
if err != nil {
return err
}
encodedState := snappy.Encode(nil, append(capellaKey, rawObj...))
if err := bucket.Put(rootHash, encodedState); err != nil {
return err
}
pbState.Validators = valEntries
if err := valIdxBkt.Put(rootHash, validatorKey); err != nil {
return err
}
return nil
}
func (s *Store) processDeneb(ctx context.Context, pbState *ethpb.BeaconStateDeneb, rootHash []byte, bucket, valIdxBkt *bolt.Bucket, validatorKey []byte) error {
valEntries := pbState.Validators
pbState.Validators = make([]*ethpb.Validator, 0)
rawObj, err := pbState.MarshalSSZ()
if err != nil {
return err
}
encodedState := snappy.Encode(nil, append(denebKey, rawObj...))
if err := bucket.Put(rootHash, encodedState); err != nil {
return err
}
pbState.Validators = valEntries
if err := valIdxBkt.Put(rootHash, validatorKey); err != nil {
return err
}
return nil
}
func (s *Store) processElectra(ctx context.Context, pbState *ethpb.BeaconStateElectra, rootHash []byte, bucket, valIdxBkt *bolt.Bucket, validatorKey []byte) error {
valEntries := pbState.Validators
pbState.Validators = make([]*ethpb.Validator, 0)
rawObj, err := pbState.MarshalSSZ()
if err != nil {
return err
}
encodedState := snappy.Encode(nil, append(ElectraKey, rawObj...))
if err := bucket.Put(rootHash, encodedState); err != nil {
return err
}
pbState.Validators = valEntries
if err := valIdxBkt.Put(rootHash, validatorKey); err != nil {
return err
}
return nil
}
func (s *Store) processFulu(ctx context.Context, pbState *ethpb.BeaconStateFulu, rootHash []byte, bucket, valIdxBkt *bolt.Bucket, validatorKey []byte) error {
valEntries := pbState.Validators
pbState.Validators = make([]*ethpb.Validator, 0)
rawObj, err := pbState.MarshalSSZ()
if err != nil {
return err
}
encodedState := snappy.Encode(nil, append(fuluKey, rawObj...))
if err := bucket.Put(rootHash, encodedState); err != nil {
return err
}
pbState.Validators = valEntries
if err := valIdxBkt.Put(rootHash, validatorKey); err != nil {
return err
}
return nil
}
func (s *Store) storeValidatorEntriesSeparately(ctx context.Context, tx *bolt.Tx, validatorsEntries map[string]*ethpb.Validator) error {
valBkt := tx.Bucket(stateValidatorsBucket)
for hashStr, validatorEntry := range validatorsEntries {
key := []byte(hashStr)
// if the entry is not in the cache and not in the DB,
// then insert it in the DB and add to the cache.
if _, ok := s.validatorEntryCache.Get(key); !ok {
validatorEntryCacheMiss.Inc()
if valEntry := valBkt.Get(key); valEntry == nil {
valBytes, encodeErr := encode(ctx, validatorEntry)
if encodeErr != nil {
return encodeErr
}
if putErr := valBkt.Put(key, valBytes); putErr != nil {
return putErr
}
s.validatorEntryCache.Set(key, validatorEntry, int64(len(valBytes)))
}
} else {
validatorEntryCacheHit.Inc()
}
}
return nil
}
// HasState checks if a state by root exists in the db.
func (s *Store) HasState(ctx context.Context, blockRoot [32]byte) bool {
_, span := trace.StartSpan(ctx, "BeaconDB.HasState")
defer span.End()
hasState := false
err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(stateBucket)
stBytes := bkt.Get(blockRoot[:])
if len(stBytes) > 0 {
hasState = true
}
return nil
})
if err != nil {
panic(err) // lint:nopanic -- View never returns an error.
}
return hasState
}
// DeleteState by block root.
func (s *Store) DeleteState(ctx context.Context, blockRoot [32]byte) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.DeleteState")
defer span.End()
return s.db.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(blocksBucket)
genesisBlockRoot := bkt.Get(genesisBlockRootKey)
bkt = tx.Bucket(checkpointBucket)
enc := bkt.Get(finalizedCheckpointKey)
finalized := &ethpb.Checkpoint{}
if enc == nil {
finalized = &ethpb.Checkpoint{Root: genesisBlockRoot}
} else if err := decode(ctx, enc, finalized); err != nil {
return err
}
enc = bkt.Get(justifiedCheckpointKey)
justified := &ethpb.Checkpoint{}
if enc == nil {
justified = &ethpb.Checkpoint{Root: genesisBlockRoot}
} else if err := decode(ctx, enc, justified); err != nil {
return err
}
bkt = tx.Bucket(stateBucket)
// Safeguard against deleting genesis, finalized, head state.
if bytes.Equal(blockRoot[:], finalized.Root) || bytes.Equal(blockRoot[:], genesisBlockRoot) || bytes.Equal(blockRoot[:], justified.Root) {
return ErrDeleteJustifiedAndFinalized
}
// Nothing to delete if state doesn't exist.
enc = bkt.Get(blockRoot[:])
if enc == nil {
return nil
}
slot, err := s.SlotByBlockRoot(ctx, blockRoot)
if err != nil {
return err
}
indicesByBucket := createStateIndicesFromStateSlot(ctx, slot)
if err := deleteValueForIndices(ctx, indicesByBucket, blockRoot[:], tx); err != nil {
return errors.Wrap(err, "could not delete root for DB indices")
}
ok, err := s.isStateValidatorMigrationOver()
if err != nil {
return err
}
if ok {
// remove the validator entry keys for the corresponding state.
idxBkt := tx.Bucket(blockRootValidatorHashesBucket)
compressedValidatorHashes := idxBkt.Get(blockRoot[:])
err = idxBkt.Delete(blockRoot[:])
if err != nil {
return err
}
// remove the respective validator entries from the cache.
if len(compressedValidatorHashes) == 0 {
return errors.Errorf("invalid compressed validator keys length")
}
validatorHashes, sErr := snappy.Decode(nil, compressedValidatorHashes)
if sErr != nil {
return errors.Wrap(sErr, "failed to uncompress validator keys")
}
if len(validatorHashes)%hashLength != 0 {
return errors.Errorf("invalid validator keys length: %d", len(validatorHashes))
}
for i := 0; i < len(validatorHashes); i += hashLength {
key := validatorHashes[i : i+hashLength]
s.validatorEntryCache.Del(key)
validatorEntryCacheDelete.Inc()
}
}
return bkt.Delete(blockRoot[:])
})
}
// DeleteStates by block roots.
func (s *Store) DeleteStates(ctx context.Context, blockRoots [][32]byte) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.DeleteStates")
defer span.End()
for _, r := range blockRoots {
if err := s.DeleteState(ctx, r); err != nil {
return err
}
}
return nil
}
// unmarshal state from marshaled proto state bytes to versioned state struct type.
func (s *Store) unmarshalState(_ context.Context, enc []byte, validatorEntries []*ethpb.Validator) (state.BeaconState, error) {
var err error
enc, err = snappy.Decode(nil, enc)
if err != nil {
return nil, err
}
switch {
case hasFuluKey(enc):
protoState := &ethpb.BeaconStateFulu{}
if err := protoState.UnmarshalSSZ(enc[len(fuluKey):]); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal encoding for Fulu")
}
ok, err := s.isStateValidatorMigrationOver()
if err != nil {
return nil, err
}
if ok {
protoState.Validators = validatorEntries
}
return statenative.InitializeFromProtoUnsafeFulu(protoState)
case HasElectraKey(enc):
protoState := &ethpb.BeaconStateElectra{}
if err := protoState.UnmarshalSSZ(enc[len(ElectraKey):]); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal encoding for Electra")
}
ok, err := s.isStateValidatorMigrationOver()
if err != nil {
return nil, err
}
if ok {
protoState.Validators = validatorEntries
}
return statenative.InitializeFromProtoUnsafeElectra(protoState)
case hasDenebKey(enc):
protoState := &ethpb.BeaconStateDeneb{}
if err := protoState.UnmarshalSSZ(enc[len(denebKey):]); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal encoding for Deneb")
}
ok, err := s.isStateValidatorMigrationOver()
if err != nil {
return nil, err
}
if ok {
protoState.Validators = validatorEntries
}
return statenative.InitializeFromProtoUnsafeDeneb(protoState)
case hasCapellaKey(enc):
// Marshal state bytes to capella beacon state.
protoState := &ethpb.BeaconStateCapella{}
if err := protoState.UnmarshalSSZ(enc[len(capellaKey):]); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal encoding for capella")
}
ok, err := s.isStateValidatorMigrationOver()
if err != nil {
return nil, err
}
if ok {
protoState.Validators = validatorEntries
}
return statenative.InitializeFromProtoUnsafeCapella(protoState)
case hasBellatrixKey(enc):
// Marshal state bytes to bellatrix beacon state.
protoState := &ethpb.BeaconStateBellatrix{}
if err := protoState.UnmarshalSSZ(enc[len(bellatrixKey):]); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal encoding for bellatrix")
}
ok, err := s.isStateValidatorMigrationOver()
if err != nil {
return nil, err
}
if ok {
protoState.Validators = validatorEntries
}
return statenative.InitializeFromProtoUnsafeBellatrix(protoState)
case hasAltairKey(enc):
// Marshal state bytes to altair beacon state.
protoState := &ethpb.BeaconStateAltair{}
if err := protoState.UnmarshalSSZ(enc[len(altairKey):]); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal encoding for altair")
}
ok, err := s.isStateValidatorMigrationOver()
if err != nil {
return nil, err
}
if ok {
protoState.Validators = validatorEntries
}
return statenative.InitializeFromProtoUnsafeAltair(protoState)
default:
// Marshal state bytes to phase 0 beacon state.
protoState := &ethpb.BeaconState{}
if err := protoState.UnmarshalSSZ(enc); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal encoding")
}
ok, err := s.isStateValidatorMigrationOver()
if err != nil {
return nil, err
}
if ok {
protoState.Validators = validatorEntries
}
return statenative.InitializeFromProtoUnsafePhase0(protoState)
}
}
// marshal versioned state from struct type down to bytes.
func marshalState(ctx context.Context, st state.ReadOnlyBeaconState) ([]byte, error) {
switch st.Version() {
case version.Phase0:
rState, ok := st.ToProtoUnsafe().(*ethpb.BeaconState)
if !ok {
return nil, errors.New("non valid inner state")
}
return encode(ctx, rState)
case version.Altair:
rState, ok := st.ToProtoUnsafe().(*ethpb.BeaconStateAltair)
if !ok {
return nil, errors.New("non valid inner state")
}
if rState == nil {
return nil, errors.New("nil state")
}
rawObj, err := rState.MarshalSSZ()
if err != nil {
return nil, err
}
return snappy.Encode(nil, append(altairKey, rawObj...)), nil
case version.Bellatrix:
rState, ok := st.ToProtoUnsafe().(*ethpb.BeaconStateBellatrix)
if !ok {
return nil, errors.New("non valid inner state")
}
if rState == nil {
return nil, errors.New("nil state")
}
rawObj, err := rState.MarshalSSZ()
if err != nil {
return nil, err
}
return snappy.Encode(nil, append(bellatrixKey, rawObj...)), nil
case version.Capella:
rState, ok := st.ToProtoUnsafe().(*ethpb.BeaconStateCapella)
if !ok {
return nil, errors.New("non valid inner state")
}
if rState == nil {
return nil, errors.New("nil state")
}
rawObj, err := rState.MarshalSSZ()
if err != nil {
return nil, err
}
return snappy.Encode(nil, append(capellaKey, rawObj...)), nil
case version.Deneb:
rState, ok := st.ToProtoUnsafe().(*ethpb.BeaconStateDeneb)
if !ok {
return nil, errors.New("non valid inner state")
}
if rState == nil {
return nil, errors.New("nil state")
}
rawObj, err := rState.MarshalSSZ()
if err != nil {
return nil, err
}
return snappy.Encode(nil, append(denebKey, rawObj...)), nil
case version.Electra:
rState, ok := st.ToProtoUnsafe().(*ethpb.BeaconStateElectra)
if !ok {
return nil, errors.New("non valid inner state")
}
if rState == nil {
return nil, errors.New("nil state")
}
rawObj, err := rState.MarshalSSZ()
if err != nil {
return nil, err
}
return snappy.Encode(nil, append(ElectraKey, rawObj...)), nil
case version.Fulu:
rState, ok := st.ToProtoUnsafe().(*ethpb.BeaconStateFulu)
if !ok {
return nil, errors.New("non valid inner state")
}
if rState == nil {
return nil, errors.New("nil state")
}
rawObj, err := rState.MarshalSSZ()
if err != nil {
return nil, err
}
return snappy.Encode(nil, append(fuluKey, rawObj...)), nil
default:
return nil, errors.New("invalid inner state")
}
}
// Retrieve the validator entries for a given block root. These entries are stored in a
// separate bucket to reduce state size.
func (s *Store) validatorEntries(ctx context.Context, blockRoot [32]byte) ([]*ethpb.Validator, error) {
ok, err := s.isStateValidatorMigrationOver()
if err != nil {
return nil, err
}
if !ok {
return make([]*ethpb.Validator, 0), nil
}
ctx, span := trace.StartSpan(ctx, "BeaconDB.validatorEntries")
defer span.End()
var validatorEntries []*ethpb.Validator
err = s.db.View(func(tx *bolt.Tx) error {
// get the validator keys from the index bucket
idxBkt := tx.Bucket(blockRootValidatorHashesBucket)
valKey := idxBkt.Get(blockRoot[:])
if len(valKey) == 0 {
return errors.Errorf("validator keys not found for given block root: %x", blockRoot)
}
// decompress the keys and check if they are of proper length.
validatorKeys, sErr := snappy.Decode(nil, valKey)
if sErr != nil {
return errors.Wrap(sErr, "failed to uncompress validator keys")
}
if len(validatorKeys)%hashLength != 0 {
return errors.Errorf("invalid validator keys length: %d", len(validatorKeys))
}
// get the corresponding validator entries from the validator bucket.
valBkt := tx.Bucket(stateValidatorsBucket)
for i := 0; i < len(validatorKeys); i += hashLength {
key := validatorKeys[i : i+hashLength]
// get the entry bytes from the cache or from the DB.
v, ok := s.validatorEntryCache.Get(key)
if ok {
valEntry := v
validatorEntries = append(validatorEntries, valEntry)
validatorEntryCacheHit.Inc()
} else {
// not in cache, so get it from the DB, decode it and add to the entry list.
valEntryBytes := valBkt.Get(key)
if len(valEntryBytes) == 0 {
return errors.New("could not find validator entry")
}
encValEntry := &ethpb.Validator{}
decodeErr := decode(ctx, valEntryBytes, encValEntry)
if decodeErr != nil {
return errors.Wrap(decodeErr, "failed to decode validator entry keys")
}
validatorEntries = append(validatorEntries, encValEntry)
validatorEntryCacheMiss.Inc()
// should add here in cache
s.validatorEntryCache.Set(key, encValEntry, int64(encValEntry.SizeSSZ()))
}
}
return nil
})
return validatorEntries, err
}
// retrieves and assembles the state information from multiple buckets.
func (s *Store) stateBytes(ctx context.Context, blockRoot [32]byte) ([]byte, error) {
_, span := trace.StartSpan(ctx, "BeaconDB.stateBytes")
defer span.End()
var dst []byte
err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(stateBucket)
stBytes := bkt.Get(blockRoot[:])
if len(stBytes) == 0 {
return nil
}
// Due to https://github.com/boltdb/bolt/issues/204, we need to
// allocate a byte slice separately in the transaction or there
// is the possibility of a panic when accessing that particular
// area of memory.
dst = make([]byte, len(stBytes))
copy(dst, stBytes)
return nil
})
return dst, err
}
// SlotByBlockRoot returns the slot of the input block root, based on state summary, block, or state.
// Check for state is only done if state diff feature is not enabled.
func (s *Store) SlotByBlockRoot(ctx context.Context, blockRoot [32]byte) (primitives.Slot, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.SlotByBlockRoot")
defer span.End()
// check state summary first
stateSummary, err := s.StateSummary(ctx, blockRoot)
if err != nil {
return 0, err
}
if stateSummary != nil {
return stateSummary.Slot, nil
}
// fall back to block if state summary is not found
blk, err := s.Block(ctx, blockRoot)
if err != nil {
return 0, err
}
if blk != nil && !blk.IsNil() {
return blk.Block().Slot(), nil
}
// fall back to state, only if state diff feature is not enabled
if features.Get().EnableStateDiff {
return 0, errors.New("neither state summary nor block found")
}
st, err := s.State(ctx, blockRoot)
if err != nil {
return 0, err
}
if st != nil && !st.IsNil() {
return st.Slot(), nil
}
// neither state summary, block nor state found
return 0, errors.New("neither state summary, block nor state found")
}
// HighestSlotStatesBelow returns the states with the highest slot below the input slot
// from the db. Ideally there should just be one state per slot, but given validator
// can double propose, a single slot could have multiple block roots and
// results states. This returns a list of states.
func (s *Store) HighestSlotStatesBelow(ctx context.Context, slot primitives.Slot) ([]state.ReadOnlyBeaconState, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.HighestSlotStatesBelow")
defer span.End()
var best []byte
if err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(stateSlotIndicesBucket)
c := bkt.Cursor()
for s, root := c.First(); s != nil; s, root = c.Next() {
if ctx.Err() != nil {
return ctx.Err()
}
key := bytesutil.BytesToSlotBigEndian(s)
if root == nil {
continue
}
if key >= slot {
break
}
best = root
}
return nil
}); err != nil {
return nil, err
}
var st state.ReadOnlyBeaconState
var err error
if best != nil {
st, err = s.State(ctx, bytesutil.ToBytes32(best))
if err != nil {
return nil, err
}
}
if st == nil || st.IsNil() {
st, err = s.GenesisState(ctx)
if err != nil {
return nil, err
}
}
return []state.ReadOnlyBeaconState{st}, nil
}
// createStateIndicesFromStateSlot takes in a state slot and returns
// a map of bolt DB index buckets corresponding to each particular key for indices for
// data, such as (shard indices bucket -> shard 5).
func createStateIndicesFromStateSlot(ctx context.Context, slot primitives.Slot) map[string][]byte {
_, span := trace.StartSpan(ctx, "BeaconDB.createStateIndicesFromState")
defer span.End()
indicesByBucket := make(map[string][]byte)
// Every index has a unique bucket for fast, binary-search
// range scans for filtering across keys.
buckets := [][]byte{
stateSlotIndicesBucket,
}
indices := [][]byte{
bytesutil.SlotToBytesBigEndian(slot),
}
for i := range buckets {
indicesByBucket[string(buckets[i])] = indices[i]
}
return indicesByBucket
}
// CleanUpDirtyStates removes states in DB that falls to under archived point interval rules.
// Only following states would be kept:
// 1.) state_slot % archived_interval == 0. (e.g. archived_interval=2048, states with slot 2048, 4096... etc)
// 2.) archived_interval - archived_interval/3 < state_slot % archived_interval
//
// (e.g. archived_interval=2048, states with slots after 1365).
// This is to tolerate skip slots. Not every state lays on the boundary.
//
// 3.) state with current finalized root
// 4.) unfinalized States
// 5.) not origin root
func (s *Store) CleanUpDirtyStates(ctx context.Context, slotsPerArchivedPoint primitives.Slot) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB. CleanUpDirtyStates")
defer span.End()
f, err := s.FinalizedCheckpoint(ctx)
if err != nil {
return err
}
finalizedSlot, err := slots.EpochStart(f.Epoch)
if err != nil {
return err
}
deletedRoots := make([][32]byte, 0)
oRoot, err := s.OriginCheckpointBlockRoot(ctx)
if err != nil && !errors.Is(err, ErrNotFoundOriginBlockRoot) {
// If the node did not use checkpoint sync, there will be no origin block root.
// Use zero hash which will never match any actual state root
return err
}
err = s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(stateSlotIndicesBucket)
return bkt.ForEach(func(k, v []byte) error {
if ctx.Err() != nil {
return ctx.Err()
}
root := bytesutil.ToBytes32(v)
slot := bytesutil.BytesToSlotBigEndian(k)
mod := slot % slotsPerArchivedPoint
if mod == 0 {
return nil
}
if mod > slotsPerArchivedPoint-slotsPerArchivedPoint/3 {
return nil
}
if bytesutil.ToBytes32(f.Root) == root {
return nil
}
if slot > finalizedSlot {
return nil
}
if oRoot == root {
return nil
}
deletedRoots = append(deletedRoots, root)
return nil
})
})
if err != nil {
return err
}
// Length of to be deleted roots is 0. Nothing to do.
if len(deletedRoots) == 0 {
return nil
}
log.WithField("count", len(deletedRoots)).Info("Cleaning up dirty states")
if err := s.DeleteStates(ctx, deletedRoots); err != nil {
return err
}
return err
}
func (s *Store) isStateValidatorMigrationOver() (bool, error) {
// if flag is enabled, then always follow the new code path.
if features.Get().EnableHistoricalSpaceRepresentation {
return true, nil
}
// if the flag is not enabled, but the migration is over, then
// follow the new code path as if the flag is enabled.
returnFlag := false
if err := s.db.View(func(tx *bolt.Tx) error {
mb := tx.Bucket(migrationsBucket)
b := mb.Get(migrationStateValidatorsKey)
returnFlag = bytes.Equal(b, migrationCompleted)
return nil
}); err != nil {
return returnFlag, err
}
return returnFlag, nil
}
func (s *Store) getStateUsingStateDiff(ctx context.Context, blockRoot [32]byte) (state.BeaconState, error) {
slot, err := s.SlotByBlockRoot(ctx, blockRoot)
if err != nil {
return nil, err
}
st, err := s.stateByDiff(ctx, slot)
if err != nil {
return nil, err
}
if st == nil || st.IsNil() {
return nil, errors.New("state not found")
}
return st, nil
}