mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 21:38:05 -05:00
Efficient Historical State Representation (#9155)
* added migration in to new schema * gazel fix * goimports fix * construct state with validator entries from a seperate bucket * save state with validator entry indirection * fixed save and retreieve issues * fixed more test cases related to DeepEqual * added save benchmark * gazel fix * organize benchmarks * add cache and improve state construction performance * gazel fix * check type * remove cache checking from Has * fix decoding when cache is off * fix slice bugs * add migration testcases * linter fix * fix few review comments * fix review feedback * gazel fix * satisfy deepsource * added the feature flag * lint fix * fix usage of featureconfig Inti in testcases * call resetConfig method * add feature flag in migration testcases * fix formatting * change bucket name from blockRootValidatorKeysIndexBucket to blockRootValidatorHashesBucket * remove from cache when state deleted * fixed few more comments * added to devModeFlags * added cache delete under the feature flag * fix lint * change cache sizes and improve documentation * fiexed few more review coments * not using hash anymore and using a new SaveStates function * satisfu deepsource * run gazel * fix feature flag related stuff * fixing merge conflict fix * few goodies * improve UX and dont swallow error * merge fix * import format fix * fix migrationion when flag not given issue Co-authored-by: Radosław Kapka <rkapka@wp.pl> Co-authored-by: Nishant Das <nishdas93@gmail.com>
This commit is contained in:
@@ -16,6 +16,7 @@ go_library(
|
||||
"migration.go",
|
||||
"migration_archived_index.go",
|
||||
"migration_block_slot_index.go",
|
||||
"migration_state_validators.go",
|
||||
"operations.go",
|
||||
"powchain.go",
|
||||
"schema.go",
|
||||
@@ -45,8 +46,11 @@ go_library(
|
||||
"//proto/prysm/v2/block:go_default_library",
|
||||
"//proto/prysm/v2/state:go_default_library",
|
||||
"//shared/bytesutil:go_default_library",
|
||||
"//shared/featureconfig:go_default_library",
|
||||
"//shared/fileutil:go_default_library",
|
||||
"//shared/hashutil:go_default_library",
|
||||
"//shared/params:go_default_library",
|
||||
"//shared/progressutil:go_default_library",
|
||||
"//shared/sliceutil:go_default_library",
|
||||
"//shared/traceutil:go_default_library",
|
||||
"@com_github_dgraph_io_ristretto//:go_default_library",
|
||||
@@ -55,6 +59,7 @@ go_library(
|
||||
"@com_github_golang_snappy//:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_prometheus_client_golang//prometheus:go_default_library",
|
||||
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
|
||||
"@com_github_prysmaticlabs_eth2_types//:go_default_library",
|
||||
"@com_github_prysmaticlabs_prombbolt//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
@@ -79,6 +84,7 @@ go_test(
|
||||
"kv_test.go",
|
||||
"migration_archived_index_test.go",
|
||||
"migration_block_slot_index_test.go",
|
||||
"migration_state_validators_test.go",
|
||||
"operations_test.go",
|
||||
"powchain_test.go",
|
||||
"slashings_test.go",
|
||||
@@ -92,6 +98,7 @@ go_test(
|
||||
"//beacon-chain/db/filters:go_default_library",
|
||||
"//beacon-chain/db/iface:go_default_library",
|
||||
"//beacon-chain/state:go_default_library",
|
||||
"//beacon-chain/state/v1:go_default_library",
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
"//proto/prysm/v1alpha1/wrapper:go_default_library",
|
||||
"//proto/prysm/v2:go_default_library",
|
||||
@@ -99,13 +106,14 @@ go_test(
|
||||
"//proto/prysm/v2/state:go_default_library",
|
||||
"//proto/testing:go_default_library",
|
||||
"//shared/bytesutil:go_default_library",
|
||||
"//shared/featureconfig:go_default_library",
|
||||
"//shared/params:go_default_library",
|
||||
"//shared/testutil:go_default_library",
|
||||
"//shared/testutil/assert:go_default_library",
|
||||
"//shared/testutil/require:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//common:go_default_library",
|
||||
"@com_github_golang_snappy//:go_default_library",
|
||||
"@com_github_prysmaticlabs_eth2_types//:go_default_library",
|
||||
"@in_gopkg_d4l3k_messagediff_v1//:go_default_library",
|
||||
"@io_bazel_rules_go//go/tools/bazel:go_default_library",
|
||||
"@io_etcd_go_bbolt//:go_default_library",
|
||||
"@org_golang_google_protobuf//proto:go_default_library",
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"github.com/dgraph-io/ristretto"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
prombolt "github.com/prysmaticlabs/prombbolt"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/db/iface"
|
||||
"github.com/prysmaticlabs/prysm/shared/fileutil"
|
||||
@@ -21,12 +22,31 @@ import (
|
||||
var _ iface.Database = (*Store)(nil)
|
||||
|
||||
const (
|
||||
// NumOfValidatorEntries is the size of the validator cache entries.
|
||||
// we expect to hold a max of 200K validators, so setting it to 2 million (10x the capacity).
|
||||
NumOfValidatorEntries = 1 << 21
|
||||
// ValidatorEntryMaxCost is set to ~64Mb to allow 200K validators entries to be cached.
|
||||
ValidatorEntryMaxCost = 1 << 26
|
||||
// BeaconNodeDbDirName is the name of the directory containing the beacon node database.
|
||||
BeaconNodeDbDirName = "beaconchaindata"
|
||||
// DatabaseFileName is the name of the beacon node database.
|
||||
DatabaseFileName = "beaconchain.db"
|
||||
|
||||
boltAllocSize = 8 * 1024 * 1024
|
||||
// The size of hash length in bytes
|
||||
hashLength = 32
|
||||
)
|
||||
|
||||
var (
|
||||
// Metrics for the validator cache.
|
||||
validatorEntryCacheHit = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "validator_entry_cache_hit",
|
||||
Help: "The total number of cache hits on the validator entry cache.",
|
||||
})
|
||||
validatorEntryCacheMiss = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "validator_entry_cache_miss",
|
||||
Help: "The total number of cache misses on the validator entry cache.",
|
||||
})
|
||||
)
|
||||
|
||||
// BlockCacheSize specifies 1000 slots worth of blocks cached, which
|
||||
@@ -52,11 +72,12 @@ type Config struct {
|
||||
// Store defines an implementation of the Prysm Database interface
|
||||
// using BoltDB as the underlying persistent kv-store for Ethereum Beacon Nodes.
|
||||
type Store struct {
|
||||
db *bolt.DB
|
||||
databasePath string
|
||||
blockCache *ristretto.Cache
|
||||
stateSummaryCache *stateSummaryCache
|
||||
ctx context.Context
|
||||
db *bolt.DB
|
||||
databasePath string
|
||||
blockCache *ristretto.Cache
|
||||
validatorEntryCache *ristretto.Cache
|
||||
stateSummaryCache *stateSummaryCache
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
// KVStoreDatafilePath is the canonical construction of a full
|
||||
@@ -104,12 +125,22 @@ func NewKVStore(ctx context.Context, dirPath string, config *Config) (*Store, er
|
||||
return nil, err
|
||||
}
|
||||
|
||||
validatorCache, err := ristretto.NewCache(&ristretto.Config{
|
||||
NumCounters: NumOfValidatorEntries, // number of entries in cache (2 Million).
|
||||
MaxCost: ValidatorEntryMaxCost, // maximum size of the cache (64Mb)
|
||||
BufferItems: 64, // number of keys per Get buffer.
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
kv := &Store{
|
||||
db: boltDB,
|
||||
databasePath: dirPath,
|
||||
blockCache: blockCache,
|
||||
stateSummaryCache: newStateSummaryCache(),
|
||||
ctx: ctx,
|
||||
db: boltDB,
|
||||
databasePath: dirPath,
|
||||
blockCache: blockCache,
|
||||
validatorEntryCache: validatorCache,
|
||||
stateSummaryCache: newStateSummaryCache(),
|
||||
ctx: ctx,
|
||||
}
|
||||
|
||||
if err := kv.db.Update(func(tx *bolt.Tx) error {
|
||||
@@ -125,6 +156,7 @@ func NewKVStore(ctx context.Context, dirPath string, config *Config) (*Store, er
|
||||
checkpointBucket,
|
||||
powchainBucket,
|
||||
stateSummaryBucket,
|
||||
stateValidatorsBucket,
|
||||
// Indices buckets.
|
||||
attestationHeadBlockRootBucket,
|
||||
attestationSourceRootIndicesBucket,
|
||||
@@ -135,6 +167,7 @@ func NewKVStore(ctx context.Context, dirPath string, config *Config) (*Store, er
|
||||
stateSlotIndicesBucket,
|
||||
blockParentRootIndicesBucket,
|
||||
finalizedBlockRootsIndexBucket,
|
||||
blockRootValidatorHashesBucket,
|
||||
// State management service bucket.
|
||||
newStateServiceCompatibleBucket,
|
||||
// Migrations
|
||||
|
||||
@@ -13,6 +13,7 @@ type migration func(*bolt.Tx) error
|
||||
var migrations = []migration{
|
||||
migrateArchivedIndex,
|
||||
migrateBlockSlotIndex,
|
||||
migrateStateValidators,
|
||||
}
|
||||
|
||||
// RunMigrations defined in the migrations array.
|
||||
|
||||
127
beacon-chain/db/kv/migration_state_validators.go
Normal file
127
beacon-chain/db/kv/migration_state_validators.go
Normal file
@@ -0,0 +1,127 @@
|
||||
package kv
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
|
||||
"github.com/golang/snappy"
|
||||
v1alpha1 "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
|
||||
statepb "github.com/prysmaticlabs/prysm/proto/prysm/v2/state"
|
||||
"github.com/prysmaticlabs/prysm/shared/featureconfig"
|
||||
"github.com/prysmaticlabs/prysm/shared/hashutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/progressutil"
|
||||
bolt "go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
var migrationStateValidatorsKey = []byte("migration_state_validator")
|
||||
|
||||
func migrateStateValidators(tx *bolt.Tx) error {
|
||||
mb := tx.Bucket(migrationsBucket)
|
||||
// feature flag is not enabled
|
||||
// - migration is complete, don't migrate the DB but warn that this will work as if the flag is enabled.
|
||||
// - migration is not complete, don't migrate the DB.
|
||||
if !featureconfig.Get().EnableHistoricalSpaceRepresentation {
|
||||
b := mb.Get(migrationStateValidatorsKey)
|
||||
if bytes.Equal(b, migrationCompleted) {
|
||||
log.Warning("migration of historical states already completed. The node will work as if --enable-historical-state-representation=true.")
|
||||
return nil // Migration already completed.
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// if the migration flag is enabled (checked in the above condition)
|
||||
// and if migration is complete, don't migrate again.
|
||||
if b := mb.Get(migrationStateValidatorsKey); bytes.Equal(b, migrationCompleted) {
|
||||
return nil
|
||||
}
|
||||
stateBkt := tx.Bucket(stateBucket)
|
||||
if stateBkt == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// get the count of keys in the state bucket for passing it to the progress indicator.
|
||||
count, err := stateCount(stateBkt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// get the source and destination buckets.
|
||||
log.Infof("Performing a one-time migration to a more efficient database schema for %s. It will take few minutes", stateBucket)
|
||||
bar := progressutil.InitializeProgressBar(count, "Migrating state validators to new schema.")
|
||||
|
||||
valBkt := tx.Bucket(stateValidatorsBucket)
|
||||
if valBkt == nil {
|
||||
return nil
|
||||
}
|
||||
indexBkt := tx.Bucket(blockRootValidatorHashesBucket)
|
||||
if indexBkt == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// for each of the state in the stateBucket, do the migration.
|
||||
ctx := context.Background()
|
||||
c := stateBkt.Cursor()
|
||||
for k, v := c.First(); k != nil; k, v = c.Next() {
|
||||
state := &statepb.BeaconState{}
|
||||
if decodeErr := decode(ctx, v, state); decodeErr != nil {
|
||||
return decodeErr
|
||||
}
|
||||
|
||||
// move all the validators in this state registry out to a new bucket.
|
||||
var validatorKeys []byte
|
||||
for _, val := range state.Validators {
|
||||
valBytes, encodeErr := encode(ctx, val)
|
||||
if encodeErr != nil {
|
||||
return encodeErr
|
||||
}
|
||||
|
||||
// create the unique hash for that validator entry.
|
||||
hash := hashutil.Hash(valBytes)
|
||||
|
||||
// add the validator in the stateValidatorsBucket, if it is not present.
|
||||
if valEntry := valBkt.Get(hash[:]); valEntry == nil {
|
||||
if putErr := valBkt.Put(hash[:], valBytes); putErr != nil {
|
||||
return putErr
|
||||
}
|
||||
}
|
||||
|
||||
// note down the pointer of the stateValidatorsBucket.
|
||||
validatorKeys = append(validatorKeys, hash[:]...)
|
||||
}
|
||||
|
||||
// add the validator entry keys for a given block root.
|
||||
compValidatorKeys := snappy.Encode(nil, validatorKeys)
|
||||
idxErr := indexBkt.Put(k, compValidatorKeys)
|
||||
if idxErr != nil {
|
||||
return idxErr
|
||||
}
|
||||
|
||||
// zero the validator entries in BeaconState object .
|
||||
state.Validators = make([]*v1alpha1.Validator, 0)
|
||||
stateBytes, encodeErr := encode(ctx, state)
|
||||
if encodeErr != nil {
|
||||
return encodeErr
|
||||
}
|
||||
if stateErr := stateBkt.Put(k, stateBytes); stateErr != nil {
|
||||
return stateErr
|
||||
}
|
||||
if barErr := bar.Add(1); barErr != nil {
|
||||
return barErr
|
||||
}
|
||||
}
|
||||
|
||||
// Mark migration complete.
|
||||
return mb.Put(migrationStateValidatorsKey, migrationCompleted)
|
||||
}
|
||||
|
||||
func stateCount(stateBucket *bolt.Bucket) (int, error) {
|
||||
count := 0
|
||||
if err := stateBucket.ForEach(func(pubKey, v []byte) error {
|
||||
count++
|
||||
return nil
|
||||
}); err != nil {
|
||||
return 0, nil
|
||||
}
|
||||
return count, nil
|
||||
}
|
||||
230
beacon-chain/db/kv/migration_state_validators_test.go
Normal file
230
beacon-chain/db/kv/migration_state_validators_test.go
Normal file
@@ -0,0 +1,230 @@
|
||||
package kv
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/golang/snappy"
|
||||
v1 "github.com/prysmaticlabs/prysm/beacon-chain/state/v1"
|
||||
v1alpha1 "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/shared/featureconfig"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil/require"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
func Test_migrateStateValidators(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
setup func(t *testing.T, dbStore *Store, state *v1.BeaconState, vals []*v1alpha1.Validator)
|
||||
eval func(t *testing.T, dbStore *Store, state *v1.BeaconState, vals []*v1alpha1.Validator)
|
||||
}{
|
||||
{
|
||||
name: "only runs once",
|
||||
setup: func(t *testing.T, dbStore *Store, state *v1.BeaconState, vals []*v1alpha1.Validator) {
|
||||
// create some new buckets that should be present for this migration
|
||||
err := dbStore.db.Update(func(tx *bbolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists(stateValidatorsBucket)
|
||||
assert.NoError(t, err)
|
||||
_, err = tx.CreateBucketIfNotExists(blockRootValidatorHashesBucket)
|
||||
assert.NoError(t, err)
|
||||
return nil
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
// save the state
|
||||
blockRoot := [32]byte{'A'}
|
||||
require.NoError(t, dbStore.SaveState(context.Background(), state, blockRoot))
|
||||
|
||||
// set the migration as over
|
||||
err = dbStore.db.Update(func(tx *bbolt.Tx) error {
|
||||
return tx.Bucket(migrationsBucket).Put(migrationStateValidatorsKey, migrationCompleted)
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
},
|
||||
eval: func(t *testing.T, dbStore *Store, state *v1.BeaconState, vals []*v1alpha1.Validator) {
|
||||
// check if the migration is completed, per migration table.
|
||||
err := dbStore.db.View(func(tx *bbolt.Tx) error {
|
||||
migrationCompleteOrNot := tx.Bucket(migrationsBucket).Get(migrationStateValidatorsKey)
|
||||
assert.DeepEqual(t, migrationCompleted, migrationCompleteOrNot, "migration is not complete")
|
||||
return nil
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "once migrated, always enable flag",
|
||||
setup: func(t *testing.T, dbStore *Store, state *v1.BeaconState, vals []*v1alpha1.Validator) {
|
||||
// create some new buckets that should be present for this migration
|
||||
err := dbStore.db.Update(func(tx *bbolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists(stateValidatorsBucket)
|
||||
assert.NoError(t, err)
|
||||
_, err = tx.CreateBucketIfNotExists(blockRootValidatorHashesBucket)
|
||||
assert.NoError(t, err)
|
||||
return nil
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
// set the migration as over
|
||||
err = dbStore.db.Update(func(tx *bbolt.Tx) error {
|
||||
return tx.Bucket(migrationsBucket).Put(migrationStateValidatorsKey, migrationCompleted)
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
},
|
||||
eval: func(t *testing.T, dbStore *Store, state *v1.BeaconState, vals []*v1alpha1.Validator) {
|
||||
// disable the flag and see if the code mandates that flag.
|
||||
resetCfg := featureconfig.InitWithReset(&featureconfig.Flags{
|
||||
EnableHistoricalSpaceRepresentation: false,
|
||||
})
|
||||
defer resetCfg()
|
||||
|
||||
// check if the migration is completed, per migration table.
|
||||
err := dbStore.db.View(func(tx *bbolt.Tx) error {
|
||||
migrationCompleteOrNot := tx.Bucket(migrationsBucket).Get(migrationStateValidatorsKey)
|
||||
assert.DeepEqual(t, migrationCompleted, migrationCompleteOrNot, "migration is not complete")
|
||||
return nil
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
// create a new state and save it
|
||||
blockRoot := [32]byte{'B'}
|
||||
st, err := testutil.NewBeaconState()
|
||||
newValidators := validators(10)
|
||||
assert.NoError(t, err)
|
||||
assert.NoError(t, st.SetSlot(101))
|
||||
assert.NoError(t, st.SetValidators(newValidators))
|
||||
assert.NoError(t, dbStore.SaveState(context.Background(), st, blockRoot))
|
||||
assert.NoError(t, err)
|
||||
|
||||
// now check if this newly saved state followed the migrated code path
|
||||
// by checking if the new validators are saved in the validator bucket.
|
||||
var individualHashes [][]byte
|
||||
for _, val := range newValidators {
|
||||
hash, hashErr := val.HashTreeRoot()
|
||||
assert.NoError(t, hashErr)
|
||||
individualHashes = append(individualHashes, hash[:])
|
||||
}
|
||||
pbState, err := v1.ProtobufBeaconState(st.InnerStateUnsafe())
|
||||
assert.NoError(t, err)
|
||||
validatorsFoundCount := 0
|
||||
for _, val := range pbState.Validators {
|
||||
hash, hashErr := val.HashTreeRoot()
|
||||
assert.NoError(t, hashErr)
|
||||
found := false
|
||||
for _, h := range individualHashes {
|
||||
if bytes.Equal(hash[:], h) {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
require.Equal(t, true, found)
|
||||
validatorsFoundCount++
|
||||
}
|
||||
require.Equal(t, len(vals), validatorsFoundCount)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "migrates validators and adds them to new buckets",
|
||||
setup: func(t *testing.T, dbStore *Store, state *v1.BeaconState, vals []*v1alpha1.Validator) {
|
||||
// create some new buckets that should be present for this migration
|
||||
err := dbStore.db.Update(func(tx *bbolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists(stateValidatorsBucket)
|
||||
assert.NoError(t, err)
|
||||
_, err = tx.CreateBucketIfNotExists(blockRootValidatorHashesBucket)
|
||||
assert.NoError(t, err)
|
||||
return nil
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
// add a state with the given validators
|
||||
blockRoot := [32]byte{'A'}
|
||||
st, err := testutil.NewBeaconState()
|
||||
assert.NoError(t, err)
|
||||
assert.NoError(t, st.SetSlot(100))
|
||||
assert.NoError(t, st.SetValidators(vals))
|
||||
assert.NoError(t, dbStore.SaveState(context.Background(), st, blockRoot))
|
||||
assert.NoError(t, err)
|
||||
},
|
||||
eval: func(t *testing.T, dbStore *Store, state *v1.BeaconState, vals []*v1alpha1.Validator) {
|
||||
// check whether the new buckets are present
|
||||
err := dbStore.db.View(func(tx *bbolt.Tx) error {
|
||||
valBkt := tx.Bucket(stateValidatorsBucket)
|
||||
assert.NotNil(t, valBkt)
|
||||
idxBkt := tx.Bucket(blockRootValidatorHashesBucket)
|
||||
assert.NotNil(t, idxBkt)
|
||||
return nil
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
// check if the migration worked
|
||||
blockRoot := [32]byte{'A'}
|
||||
rcvdState, err := dbStore.State(context.Background(), blockRoot)
|
||||
assert.NoError(t, err)
|
||||
require.DeepSSZEqual(t, rcvdState.InnerStateUnsafe(), state.InnerStateUnsafe(), "saved state with validators and retrieved state are not matching")
|
||||
|
||||
// find hashes of the validators that are set as part of the state
|
||||
var hashes []byte
|
||||
var individualHashes [][]byte
|
||||
for _, val := range vals {
|
||||
hash, hashErr := val.HashTreeRoot()
|
||||
assert.NoError(t, hashErr)
|
||||
hashes = append(hashes, hash[:]...)
|
||||
individualHashes = append(individualHashes, hash[:])
|
||||
}
|
||||
|
||||
// check if all the validators that were in the state, are stored properly in the validator bucket
|
||||
pbState, err := v1.ProtobufBeaconState(rcvdState.InnerStateUnsafe())
|
||||
assert.NoError(t, err)
|
||||
validatorsFoundCount := 0
|
||||
for _, val := range pbState.Validators {
|
||||
hash, hashErr := val.HashTreeRoot()
|
||||
assert.NoError(t, hashErr)
|
||||
found := false
|
||||
for _, h := range individualHashes {
|
||||
if bytes.Equal(hash[:], h) {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
require.Equal(t, true, found)
|
||||
validatorsFoundCount++
|
||||
}
|
||||
require.Equal(t, len(vals), validatorsFoundCount)
|
||||
|
||||
// check if the state validator indexes are stored properly
|
||||
err = dbStore.db.View(func(tx *bbolt.Tx) error {
|
||||
rcvdValhashBytes := tx.Bucket(blockRootValidatorHashesBucket).Get(blockRoot[:])
|
||||
rcvdValHashes, sErr := snappy.Decode(nil, rcvdValhashBytes)
|
||||
assert.NoError(t, sErr)
|
||||
require.DeepEqual(t, hashes, rcvdValHashes)
|
||||
return nil
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
dbStore := setupDB(t)
|
||||
|
||||
// enable historical state representation flag to test this
|
||||
resetCfg := featureconfig.InitWithReset(&featureconfig.Flags{
|
||||
EnableHistoricalSpaceRepresentation: true,
|
||||
})
|
||||
defer resetCfg()
|
||||
|
||||
// add a state with the given validators
|
||||
vals := validators(10)
|
||||
blockRoot := [32]byte{'A'}
|
||||
st, err := testutil.NewBeaconState()
|
||||
assert.NoError(t, err)
|
||||
assert.NoError(t, st.SetSlot(100))
|
||||
assert.NoError(t, st.SetValidators(vals))
|
||||
assert.NoError(t, dbStore.SaveState(context.Background(), st, blockRoot))
|
||||
assert.NoError(t, err)
|
||||
|
||||
tt.setup(t, dbStore, st, vals)
|
||||
tt.eval(t, dbStore, st, vals)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -17,6 +17,7 @@ var (
|
||||
chainMetadataBucket = []byte("chain-metadata")
|
||||
checkpointBucket = []byte("check-point")
|
||||
powchainBucket = []byte("powchain")
|
||||
stateValidatorsBucket = []byte("state-validators")
|
||||
|
||||
// Deprecated: This bucket was migrated in PR 6461. Do not use, except for migrations.
|
||||
slotsHasObjectBucket = []byte("slots-has-objects")
|
||||
@@ -33,6 +34,7 @@ var (
|
||||
attestationTargetRootIndicesBucket = []byte("attestation-target-root-indices")
|
||||
attestationTargetEpochIndicesBucket = []byte("attestation-target-epoch-indices")
|
||||
finalizedBlockRootsIndexBucket = []byte("finalized-block-roots-index")
|
||||
blockRootValidatorHashesBucket = []byte("block-root-validator-hashes")
|
||||
|
||||
// Specific item keys.
|
||||
headBlockRootKey = []byte("head-root")
|
||||
|
||||
@@ -4,16 +4,18 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
|
||||
"github.com/golang/snappy"
|
||||
"github.com/pkg/errors"
|
||||
types "github.com/prysmaticlabs/eth2-types"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/state"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/state/genesis"
|
||||
v1 "github.com/prysmaticlabs/prysm/beacon-chain/state/v1"
|
||||
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
|
||||
v1alpha "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper"
|
||||
statepb "github.com/prysmaticlabs/prysm/proto/prysm/v2/state"
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/featureconfig"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
"github.com/prysmaticlabs/prysm/shared/traceutil"
|
||||
bolt "go.etcd.io/bbolt"
|
||||
@@ -35,7 +37,13 @@ func (s *Store) State(ctx context.Context, blockRoot [32]byte) (state.BeaconStat
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
st, err = createState(ctx, enc)
|
||||
// get the validator entries of the state
|
||||
valEntries, valErr := s.validatorEntries(ctx, blockRoot)
|
||||
if valErr != nil {
|
||||
return nil, valErr
|
||||
}
|
||||
|
||||
st, err = s.createState(ctx, enc, valEntries)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -58,7 +66,7 @@ func (s *Store) GenesisState(ctx context.Context) (state.BeaconState, error) {
|
||||
}
|
||||
|
||||
var st *statepb.BeaconState
|
||||
err = s.db.View(func(tx *bolt.Tx) error {
|
||||
if err = s.db.View(func(tx *bolt.Tx) error {
|
||||
// Retrieve genesis block's signing root from blocks bucket,
|
||||
// to look up what the genesis state is.
|
||||
bucket := tx.Bucket(blocksBucket)
|
||||
@@ -70,11 +78,16 @@ func (s *Store) GenesisState(ctx context.Context) (state.BeaconState, error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
var err error
|
||||
st, err = createState(ctx, enc)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
// get the validator entries of the genesis state
|
||||
valEntries, valErr := s.validatorEntries(ctx, bytesutil.ToBytes32(genesisBlockRoot))
|
||||
if valErr != nil {
|
||||
return valErr
|
||||
}
|
||||
|
||||
var crtErr error
|
||||
st, crtErr = s.createState(ctx, enc, valEntries)
|
||||
return crtErr
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if st == nil {
|
||||
@@ -87,7 +100,13 @@ func (s *Store) GenesisState(ctx context.Context) (state.BeaconState, error) {
|
||||
func (s *Store) SaveState(ctx context.Context, st state.ReadOnlyBeaconState, blockRoot [32]byte) error {
|
||||
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveState")
|
||||
defer span.End()
|
||||
|
||||
ok, err := s.isStateValidatorMigrationOver()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if ok {
|
||||
return s.SaveStatesEfficient(ctx, []state.ReadOnlyBeaconState{st}, [][32]byte{blockRoot})
|
||||
}
|
||||
return s.SaveStates(ctx, []state.ReadOnlyBeaconState{st}, [][32]byte{blockRoot})
|
||||
}
|
||||
|
||||
@@ -125,6 +144,105 @@ func (s *Store) SaveStates(ctx context.Context, states []state.ReadOnlyBeaconSta
|
||||
})
|
||||
}
|
||||
|
||||
// SaveStatesEfficient stores multiple states to the db (new schema) using the provided corresponding roots.
|
||||
func (s *Store) SaveStatesEfficient(ctx context.Context, states []state.ReadOnlyBeaconState, blockRoots [][32]byte) error {
|
||||
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveStatesEfficient")
|
||||
defer span.End()
|
||||
if states == nil {
|
||||
return errors.New("nil state")
|
||||
}
|
||||
multipleEncs := make([][]byte, len(states))
|
||||
validatorsEntries := make(map[string]*v1alpha.Validator) // It's a map to make sure that you store only new validator entries.
|
||||
validatorKeys := make([][]byte, len(states)) // For every state, this stores a compressed list of validator keys.
|
||||
realValidators := make([][]*v1alpha.Validator, len(states)) // It's temporary structure to restore state in memory after persisting it.
|
||||
for i, st := range states {
|
||||
pbState, err := v1.ProtobufBeaconState(st.InnerStateUnsafe())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// store the real validators to restore the in memory state later.
|
||||
realValidators[i] = pbState.Validators
|
||||
|
||||
// yank out the validators and store them in separate table to save space.
|
||||
var hashes []byte
|
||||
for _, val := range pbState.Validators {
|
||||
// create the unique hash for that validator entry.
|
||||
//hash := hashutil.Hash(valBytes)
|
||||
hash, hashErr := val.HashTreeRoot()
|
||||
if hashErr != nil {
|
||||
return hashErr
|
||||
}
|
||||
hashes = append(hashes, hash[:]...)
|
||||
|
||||
// note down the hash and the encoded validator entry
|
||||
hashStr := string(hash[:])
|
||||
validatorsEntries[hashStr] = val
|
||||
}
|
||||
|
||||
// zero out the validators List from the state bucket so that it is not stored as part of it.
|
||||
pbState.Validators = make([]*v1alpha.Validator, 0)
|
||||
|
||||
validatorKeys[i] = snappy.Encode(nil, hashes)
|
||||
multipleEncs[i], err = encode(ctx, pbState)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.db.Update(func(tx *bolt.Tx) error {
|
||||
bucket := tx.Bucket(stateBucket)
|
||||
valIdxBkt := tx.Bucket(blockRootValidatorHashesBucket)
|
||||
for i, rt := range blockRoots {
|
||||
indicesByBucket := createStateIndicesFromStateSlot(ctx, states[i].Slot())
|
||||
if err := updateValueForIndices(ctx, indicesByBucket, rt[:], tx); err != nil {
|
||||
return errors.Wrap(err, "could not update DB indices")
|
||||
}
|
||||
if err := bucket.Put(rt[:], multipleEncs[i]); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := valIdxBkt.Put(rt[:], validatorKeys[i]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// store the validator entries separately to save space.
|
||||
valBkt := tx.Bucket(stateValidatorsBucket)
|
||||
for hashStr, validatorEntry := range validatorsEntries {
|
||||
key := []byte(hashStr)
|
||||
// if the entry is not in the cache and not in the DB,
|
||||
// then insert it in the DB and add to the cache.
|
||||
if _, ok := s.validatorEntryCache.Get(key); !ok {
|
||||
validatorEntryCacheMiss.Inc()
|
||||
if valEntry := valBkt.Get(key); valEntry == nil {
|
||||
valBytes, encodeErr := encode(ctx, validatorEntry)
|
||||
if encodeErr != nil {
|
||||
return encodeErr
|
||||
}
|
||||
if putErr := valBkt.Put(key, valBytes); putErr != nil {
|
||||
return putErr
|
||||
}
|
||||
s.validatorEntryCache.Set(key, validatorEntry, int64(len(valBytes)))
|
||||
}
|
||||
} else {
|
||||
validatorEntryCacheHit.Inc()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// restore the state in memory with the original validators
|
||||
for i, vals := range realValidators {
|
||||
pbState, err := v1.ProtobufBeaconState(states[i].InnerStateUnsafe())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pbState.Validators = vals
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// HasState checks if a state by root exists in the db.
|
||||
func (s *Store) HasState(ctx context.Context, blockRoot [32]byte) bool {
|
||||
ctx, span := trace.StartSpan(ctx, "BeaconDB.HasState")
|
||||
@@ -155,9 +273,9 @@ func (s *Store) DeleteState(ctx context.Context, blockRoot [32]byte) error {
|
||||
|
||||
bkt = tx.Bucket(checkpointBucket)
|
||||
enc := bkt.Get(finalizedCheckpointKey)
|
||||
checkpoint := ðpb.Checkpoint{}
|
||||
checkpoint := &v1alpha.Checkpoint{}
|
||||
if enc == nil {
|
||||
checkpoint = ðpb.Checkpoint{Root: genesisBlockRoot}
|
||||
checkpoint = &v1alpha.Checkpoint{Root: genesisBlockRoot}
|
||||
} else if err := decode(ctx, enc, checkpoint); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -170,7 +288,7 @@ func (s *Store) DeleteState(ctx context.Context, blockRoot [32]byte) error {
|
||||
return errors.New("cannot delete genesis, finalized, or head state")
|
||||
}
|
||||
|
||||
slot, err := slotByBlockRoot(ctx, tx, blockRoot[:])
|
||||
slot, err := s.slotByBlockRoot(ctx, tx, blockRoot[:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -179,6 +297,36 @@ func (s *Store) DeleteState(ctx context.Context, blockRoot [32]byte) error {
|
||||
return errors.Wrap(err, "could not delete root for DB indices")
|
||||
}
|
||||
|
||||
ok, err := s.isStateValidatorMigrationOver()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if ok {
|
||||
// remove the validator entry keys for the corresponding state.
|
||||
idxBkt := tx.Bucket(blockRootValidatorHashesBucket)
|
||||
compressedValidatorHashes := idxBkt.Get(blockRoot[:])
|
||||
err = idxBkt.Delete(blockRoot[:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// remove the respective validator entries from the cache.
|
||||
if len(compressedValidatorHashes) == 0 {
|
||||
return errors.Errorf("invalid compressed validator keys length")
|
||||
}
|
||||
validatorHashes, sErr := snappy.Decode(nil, compressedValidatorHashes)
|
||||
if sErr != nil {
|
||||
return errors.Wrap(sErr, "failed to uncompress validator keys")
|
||||
}
|
||||
if len(validatorHashes)%hashLength != 0 {
|
||||
return errors.Errorf("invalid validator keys length: %d", len(validatorHashes))
|
||||
}
|
||||
for i := 0; i < len(validatorHashes); i += hashLength {
|
||||
key := validatorHashes[i : i+hashLength]
|
||||
s.validatorEntryCache.Del(key)
|
||||
}
|
||||
}
|
||||
|
||||
return bkt.Delete(blockRoot[:])
|
||||
})
|
||||
}
|
||||
@@ -197,16 +345,90 @@ func (s *Store) DeleteStates(ctx context.Context, blockRoots [][32]byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// creates state from marshaled proto state bytes.
|
||||
func createState(ctx context.Context, enc []byte) (*statepb.BeaconState, error) {
|
||||
// creates state from marshaled proto state bytes. Also add the validator entries retrieved
|
||||
// from the validator bucket and complete the state construction.
|
||||
func (s *Store) createState(ctx context.Context, enc []byte, validatorEntries []*v1alpha.Validator) (*statepb.BeaconState, error) {
|
||||
protoState := &statepb.BeaconState{}
|
||||
if err := decode(ctx, enc, protoState); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to unmarshal encoding")
|
||||
}
|
||||
ok, err := s.isStateValidatorMigrationOver()
|
||||
if err != nil {
|
||||
return protoState, err
|
||||
}
|
||||
if ok {
|
||||
protoState.Validators = validatorEntries
|
||||
}
|
||||
return protoState, nil
|
||||
}
|
||||
|
||||
// HasState checks if a state by root exists in the db.
|
||||
// Retrieve the validator entries for a given block root. These entries are stored in a
|
||||
// separate bucket to reduce state size.
|
||||
func (s *Store) validatorEntries(ctx context.Context, blockRoot [32]byte) ([]*v1alpha.Validator, error) {
|
||||
ok, err := s.isStateValidatorMigrationOver()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !ok {
|
||||
return make([]*v1alpha.Validator, 0), nil
|
||||
}
|
||||
ctx, span := trace.StartSpan(ctx, "BeaconDB.validatorEntries")
|
||||
defer span.End()
|
||||
var validatorEntries []*v1alpha.Validator
|
||||
err = s.db.View(func(tx *bolt.Tx) error {
|
||||
// get the validator keys from the index bucket
|
||||
idxBkt := tx.Bucket(blockRootValidatorHashesBucket)
|
||||
valKey := idxBkt.Get(blockRoot[:])
|
||||
if len(valKey) == 0 {
|
||||
return errors.Errorf("invalid compressed validator keys length")
|
||||
}
|
||||
|
||||
// decompress the keys and check if they are of proper length.
|
||||
validatorKeys, sErr := snappy.Decode(nil, valKey)
|
||||
if sErr != nil {
|
||||
return errors.Wrap(sErr, "failed to uncompress validator keys")
|
||||
}
|
||||
if len(validatorKeys)%hashLength != 0 {
|
||||
return errors.Errorf("invalid validator keys length: %d", len(validatorKeys))
|
||||
}
|
||||
|
||||
// get the corresponding validator entries from the validator bucket.
|
||||
valBkt := tx.Bucket(stateValidatorsBucket)
|
||||
for i := 0; i < len(validatorKeys); i += hashLength {
|
||||
key := validatorKeys[i : i+hashLength]
|
||||
// get the entry bytes from the cache or from the DB.
|
||||
v, ok := s.validatorEntryCache.Get(key)
|
||||
if ok {
|
||||
valEntry, vType := v.(*v1alpha.Validator)
|
||||
if vType {
|
||||
s.validatorEntryCache.Set(key, valEntry, int64(valEntry.SizeSSZ()))
|
||||
validatorEntries = append(validatorEntries, valEntry)
|
||||
validatorEntryCacheHit.Inc()
|
||||
} else {
|
||||
// this should never happen, but anyway it's good to bail out if one happens.
|
||||
return errors.New("validator cache does not have proper object type")
|
||||
}
|
||||
} else {
|
||||
// not in cache, so get it from the DB, decode it and add to the entry list.
|
||||
valEntryBytes := valBkt.Get(key)
|
||||
if len(valEntryBytes) == 0 {
|
||||
return errors.New("could not find validator entry")
|
||||
}
|
||||
encValEntry := &v1alpha.Validator{}
|
||||
decodeErr := decode(ctx, valEntryBytes, encValEntry)
|
||||
if decodeErr != nil {
|
||||
return errors.Wrap(decodeErr, "failed to decode validator entry keys")
|
||||
}
|
||||
validatorEntries = append(validatorEntries, encValEntry)
|
||||
validatorEntryCacheMiss.Inc()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return validatorEntries, err
|
||||
}
|
||||
|
||||
/// retrieves and assembles the state information from multiple buckets.
|
||||
func (s *Store) stateBytes(ctx context.Context, blockRoot [32]byte) ([]byte, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "BeaconDB.stateBytes")
|
||||
defer span.End()
|
||||
@@ -229,7 +451,7 @@ func (s *Store) stateBytes(ctx context.Context, blockRoot [32]byte) ([]byte, err
|
||||
}
|
||||
|
||||
// slotByBlockRoot retrieves the corresponding slot of the input block root.
|
||||
func slotByBlockRoot(ctx context.Context, tx *bolt.Tx, blockRoot []byte) (types.Slot, error) {
|
||||
func (s *Store) slotByBlockRoot(ctx context.Context, tx *bolt.Tx, blockRoot []byte) (types.Slot, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "BeaconDB.slotByBlockRoot")
|
||||
defer span.End()
|
||||
|
||||
@@ -248,7 +470,8 @@ func slotByBlockRoot(ctx context.Context, tx *bolt.Tx, blockRoot []byte) (types.
|
||||
if enc == nil {
|
||||
return 0, errors.New("state enc can't be nil")
|
||||
}
|
||||
s, err := createState(ctx, enc)
|
||||
// no need to construct the validator entries as it is not used here.
|
||||
s, err := s.createState(ctx, enc, nil)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@@ -257,7 +480,7 @@ func slotByBlockRoot(ctx context.Context, tx *bolt.Tx, blockRoot []byte) (types.
|
||||
}
|
||||
return s.Slot, nil
|
||||
}
|
||||
b := ðpb.SignedBeaconBlock{}
|
||||
b := &v1alpha.SignedBeaconBlock{}
|
||||
err := decode(ctx, enc, b)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@@ -401,3 +624,24 @@ func (s *Store) CleanUpDirtyStates(ctx context.Context, slotsPerArchivedPoint ty
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Store) isStateValidatorMigrationOver() (bool, error) {
|
||||
// if flag is enabled, then always follow the new code path.
|
||||
if featureconfig.Get().EnableHistoricalSpaceRepresentation {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// if the flag is not enabled, but the migration is over, then
|
||||
// follow the new code path as if the flag is enabled.
|
||||
returnFlag := false
|
||||
if err := s.db.View(func(tx *bolt.Tx) error {
|
||||
mb := tx.Bucket(migrationsBucket)
|
||||
b := mb.Get(migrationStateValidatorsKey)
|
||||
returnFlag = bytes.Equal(b, migrationCompleted)
|
||||
return nil
|
||||
}); err != nil {
|
||||
return returnFlag, err
|
||||
}
|
||||
return returnFlag, nil
|
||||
|
||||
}
|
||||
|
||||
@@ -2,8 +2,10 @@ package kv
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"encoding/binary"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
types "github.com/prysmaticlabs/eth2-types"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/state"
|
||||
@@ -11,11 +13,12 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper"
|
||||
"github.com/prysmaticlabs/prysm/proto/prysm/v2/block"
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/featureconfig"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil/require"
|
||||
"gopkg.in/d4l3k/messagediff.v1"
|
||||
bolt "go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
func TestState_CanSaveRetrieve(t *testing.T) {
|
||||
@@ -35,16 +38,202 @@ func TestState_CanSaveRetrieve(t *testing.T) {
|
||||
savedS, err := db.State(context.Background(), r)
|
||||
require.NoError(t, err)
|
||||
|
||||
if !reflect.DeepEqual(st.InnerStateUnsafe(), savedS.InnerStateUnsafe()) {
|
||||
diff, _ := messagediff.PrettyDiff(st.InnerStateUnsafe(), savedS.InnerStateUnsafe())
|
||||
t.Errorf("Did not retrieve saved state: %v", diff)
|
||||
}
|
||||
require.DeepSSZEqual(t, st.InnerStateUnsafe(), savedS.InnerStateUnsafe(), "saved state and retrieved state are not matching")
|
||||
|
||||
savedS, err = db.State(context.Background(), [32]byte{'B'})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, state.ReadOnlyBeaconState(nil), savedS, "Unsaved state should've been nil")
|
||||
}
|
||||
|
||||
func TestState_CanSaveRetrieveValidatorEntries(t *testing.T) {
|
||||
db := setupDB(t)
|
||||
|
||||
// enable historical state representation flag to test this
|
||||
resetCfg := featureconfig.InitWithReset(&featureconfig.Flags{
|
||||
EnableHistoricalSpaceRepresentation: true,
|
||||
})
|
||||
defer resetCfg()
|
||||
|
||||
r := [32]byte{'A'}
|
||||
|
||||
require.Equal(t, false, db.HasState(context.Background(), r))
|
||||
|
||||
stateValidators := validators(10)
|
||||
st, err := testutil.NewBeaconState()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, st.SetSlot(100))
|
||||
require.NoError(t, st.SetValidators(stateValidators))
|
||||
|
||||
ctx := context.Background()
|
||||
require.NoError(t, db.SaveState(ctx, st, r))
|
||||
assert.Equal(t, true, db.HasState(context.Background(), r))
|
||||
|
||||
savedS, err := db.State(context.Background(), r)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.DeepSSZEqual(t, st.InnerStateUnsafe(), savedS.InnerStateUnsafe(), "saved state with validators and retrieved state are not matching")
|
||||
|
||||
// check if the index of the second state is still present.
|
||||
err = db.db.Update(func(tx *bolt.Tx) error {
|
||||
idxBkt := tx.Bucket(blockRootValidatorHashesBucket)
|
||||
data := idxBkt.Get(r[:])
|
||||
require.NotEqual(t, 0, len(data))
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// check if all the validator entries are still intact in the validator entry bucket.
|
||||
err = db.db.Update(func(tx *bolt.Tx) error {
|
||||
valBkt := tx.Bucket(stateValidatorsBucket)
|
||||
// if any of the original validator entry is not present, then fail the test.
|
||||
for _, val := range stateValidators {
|
||||
hash, hashErr := val.HashTreeRoot()
|
||||
assert.NoError(t, hashErr)
|
||||
data := valBkt.Get(hash[:])
|
||||
require.NotNil(t, data)
|
||||
require.NotEqual(t, 0, len(data))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
}
|
||||
|
||||
func TestState_CanSaveRetrieveValidatorEntriesWithoutCache(t *testing.T) {
|
||||
db := setupDB(t)
|
||||
|
||||
// enable historical state representation flag to test this
|
||||
resetCfg := featureconfig.InitWithReset(&featureconfig.Flags{
|
||||
EnableHistoricalSpaceRepresentation: true,
|
||||
})
|
||||
defer resetCfg()
|
||||
|
||||
r := [32]byte{'A'}
|
||||
|
||||
require.Equal(t, false, db.HasState(context.Background(), r))
|
||||
|
||||
stateValidators := validators(10)
|
||||
st, err := testutil.NewBeaconState()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, st.SetSlot(100))
|
||||
require.NoError(t, st.SetValidators(stateValidators))
|
||||
|
||||
ctx := context.Background()
|
||||
require.NoError(t, db.SaveState(ctx, st, r))
|
||||
assert.Equal(t, true, db.HasState(context.Background(), r))
|
||||
db.validatorEntryCache.Clear()
|
||||
|
||||
savedS, err := db.State(context.Background(), r)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.DeepSSZEqual(t, st.InnerStateUnsafe(), savedS.InnerStateUnsafe(), "saved state with validators and retrieved state are not matching")
|
||||
|
||||
// check if the index of the second state is still present.
|
||||
err = db.db.Update(func(tx *bolt.Tx) error {
|
||||
idxBkt := tx.Bucket(blockRootValidatorHashesBucket)
|
||||
data := idxBkt.Get(r[:])
|
||||
require.NotEqual(t, 0, len(data))
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// check if all the validator entries are still intact in the validator entry bucket.
|
||||
err = db.db.Update(func(tx *bolt.Tx) error {
|
||||
valBkt := tx.Bucket(stateValidatorsBucket)
|
||||
// if any of the original validator entry is not present, then fail the test.
|
||||
for _, val := range stateValidators {
|
||||
hash, hashErr := val.HashTreeRoot()
|
||||
assert.NoError(t, hashErr)
|
||||
data := valBkt.Get(hash[:])
|
||||
require.NotNil(t, data)
|
||||
require.NotEqual(t, 0, len(data))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
}
|
||||
|
||||
func TestState_DeleteState(t *testing.T) {
|
||||
db := setupDB(t)
|
||||
|
||||
// enable historical state representation flag to test this
|
||||
resetCfg := featureconfig.InitWithReset(&featureconfig.Flags{
|
||||
EnableHistoricalSpaceRepresentation: true,
|
||||
})
|
||||
defer resetCfg()
|
||||
|
||||
r1 := [32]byte{'A'}
|
||||
r2 := [32]byte{'B'}
|
||||
|
||||
require.Equal(t, false, db.HasState(context.Background(), r1))
|
||||
require.Equal(t, false, db.HasState(context.Background(), r2))
|
||||
|
||||
// create two states with the same set of validators.
|
||||
stateValidators := validators(10)
|
||||
st1, err := testutil.NewBeaconState()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, st1.SetSlot(100))
|
||||
require.NoError(t, st1.SetValidators(stateValidators))
|
||||
|
||||
st2, err := testutil.NewBeaconState()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, st2.SetSlot(101))
|
||||
require.NoError(t, st2.SetValidators(stateValidators))
|
||||
|
||||
// save both the states.
|
||||
ctx := context.Background()
|
||||
require.NoError(t, db.SaveState(ctx, st1, r1))
|
||||
require.NoError(t, db.SaveState(ctx, st2, r2))
|
||||
|
||||
// delete the first state.
|
||||
var deleteBlockRoots [][32]byte
|
||||
deleteBlockRoots = append(deleteBlockRoots, r1)
|
||||
require.NoError(t, db.DeleteStates(ctx, deleteBlockRoots))
|
||||
|
||||
// check if the validator entries of this state is removed from cache.
|
||||
for _, val := range stateValidators {
|
||||
hash, hashErr := val.HashTreeRoot()
|
||||
assert.NoError(t, hashErr)
|
||||
v, found := db.validatorEntryCache.Get(hash[:])
|
||||
require.Equal(t, false, found)
|
||||
require.Equal(t, nil, v)
|
||||
}
|
||||
|
||||
// check if the index of the first state is deleted.
|
||||
err = db.db.Update(func(tx *bolt.Tx) error {
|
||||
idxBkt := tx.Bucket(blockRootValidatorHashesBucket)
|
||||
data := idxBkt.Get(r1[:])
|
||||
require.Equal(t, 0, len(data))
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// check if the index of the second state is still present.
|
||||
err = db.db.Update(func(tx *bolt.Tx) error {
|
||||
idxBkt := tx.Bucket(blockRootValidatorHashesBucket)
|
||||
data := idxBkt.Get(r2[:])
|
||||
require.NotEqual(t, 0, len(data))
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// check if all the validator entries are still intact in the validator entry bucket.
|
||||
err = db.db.Update(func(tx *bolt.Tx) error {
|
||||
valBkt := tx.Bucket(stateValidatorsBucket)
|
||||
// if any of the original validator entry is not present, then fail the test.
|
||||
for _, val := range stateValidators {
|
||||
hash, hashErr := val.HashTreeRoot()
|
||||
assert.NoError(t, hashErr)
|
||||
data := valBkt.Get(hash[:])
|
||||
require.NotNil(t, data)
|
||||
require.NotEqual(t, 0, len(data))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestGenesisState_CanSaveRetrieve(t *testing.T) {
|
||||
db := setupDB(t)
|
||||
|
||||
@@ -342,3 +531,109 @@ func TestStore_CleanUpDirtyStates_DontDeleteNonFinalized(t *testing.T) {
|
||||
require.Equal(t, true, db.HasState(context.Background(), rt))
|
||||
}
|
||||
}
|
||||
|
||||
func validators(limit int) []*ethpb.Validator {
|
||||
var vals []*ethpb.Validator
|
||||
for i := 0; i < limit; i++ {
|
||||
pubKey := make([]byte, params.BeaconConfig().BLSPubkeyLength)
|
||||
binary.LittleEndian.PutUint64(pubKey, rand.Uint64())
|
||||
val := ðpb.Validator{
|
||||
PublicKey: pubKey,
|
||||
WithdrawalCredentials: bytesutil.ToBytes(rand.Uint64(), 32),
|
||||
EffectiveBalance: uint64(rand.Uint64()),
|
||||
Slashed: i%2 != 0,
|
||||
ActivationEligibilityEpoch: types.Epoch(rand.Uint64()),
|
||||
ActivationEpoch: types.Epoch(rand.Uint64()),
|
||||
ExitEpoch: types.Epoch(rand.Uint64()),
|
||||
WithdrawableEpoch: types.Epoch(rand.Uint64()),
|
||||
}
|
||||
vals = append(vals, val)
|
||||
}
|
||||
return vals
|
||||
}
|
||||
|
||||
func checkStateSaveTime(b *testing.B, saveCount int) {
|
||||
b.StopTimer()
|
||||
|
||||
db := setupDB(b)
|
||||
initialSetOfValidators := validators(100000)
|
||||
|
||||
// construct some states and save to randomize benchmark.
|
||||
for i := 0; i < saveCount; i++ {
|
||||
key := make([]byte, 32)
|
||||
_, err := rand.Read(key)
|
||||
require.NoError(b, err)
|
||||
st, err := testutil.NewBeaconState()
|
||||
require.NoError(b, err)
|
||||
|
||||
// Add some more new validator to the base validator.
|
||||
validatosToAddInTest := validators(10000)
|
||||
allValidators := append(initialSetOfValidators, validatosToAddInTest...)
|
||||
|
||||
// shuffle validators.
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
rand.Shuffle(len(allValidators), func(i, j int) { allValidators[i], allValidators[j] = allValidators[j], allValidators[i] })
|
||||
|
||||
require.NoError(b, st.SetValidators(allValidators))
|
||||
require.NoError(b, db.SaveState(context.Background(), st, bytesutil.ToBytes32(key)))
|
||||
}
|
||||
|
||||
// create a state to save in benchmark
|
||||
r := [32]byte{'A'}
|
||||
st, err := testutil.NewBeaconState()
|
||||
require.NoError(b, err)
|
||||
require.NoError(b, st.SetValidators(initialSetOfValidators))
|
||||
|
||||
b.ReportAllocs()
|
||||
b.StartTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
require.NoError(b, db.SaveState(context.Background(), st, r))
|
||||
}
|
||||
}
|
||||
|
||||
func checkStateReadTime(b *testing.B, saveCount int) {
|
||||
b.StopTimer()
|
||||
|
||||
db := setupDB(b)
|
||||
initialSetOfValidators := validators(100000)
|
||||
|
||||
// Save a state to read in benchmark
|
||||
r := [32]byte{'A'}
|
||||
st, err := testutil.NewBeaconState()
|
||||
require.NoError(b, err)
|
||||
require.NoError(b, st.SetValidators(initialSetOfValidators))
|
||||
require.NoError(b, db.SaveState(context.Background(), st, r))
|
||||
|
||||
// construct some states and save to randomize benchmark.
|
||||
for i := 0; i < saveCount; i++ {
|
||||
key := make([]byte, 32)
|
||||
_, err := rand.Read(key)
|
||||
require.NoError(b, err)
|
||||
st, err = testutil.NewBeaconState()
|
||||
require.NoError(b, err)
|
||||
|
||||
// Add some more new validator to the base validator.
|
||||
validatosToAddInTest := validators(10000)
|
||||
allValidators := append(initialSetOfValidators, validatosToAddInTest...)
|
||||
|
||||
// shuffle validators.
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
rand.Shuffle(len(allValidators), func(i, j int) { allValidators[i], allValidators[j] = allValidators[j], allValidators[i] })
|
||||
|
||||
require.NoError(b, st.SetValidators(allValidators))
|
||||
require.NoError(b, db.SaveState(context.Background(), st, bytesutil.ToBytes32(key)))
|
||||
}
|
||||
|
||||
b.ReportAllocs()
|
||||
b.StartTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := db.State(context.Background(), r)
|
||||
require.NoError(b, err)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkState_CheckStateSaveTime_1(b *testing.B) { checkStateSaveTime(b, 1) }
|
||||
func BenchmarkState_CheckStateSaveTime_10(b *testing.B) { checkStateSaveTime(b, 10) }
|
||||
|
||||
func BenchmarkState_CheckStateReadTime_1(b *testing.B) { checkStateReadTime(b, 1) }
|
||||
func BenchmarkState_CheckStateReadTime_10(b *testing.B) { checkStateReadTime(b, 10) }
|
||||
|
||||
@@ -567,7 +567,7 @@ func TestServer_ListValidators_OnlyActiveValidators(t *testing.T) {
|
||||
Active: true,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.DeepEqual(t, activeValidators, received.ValidatorList)
|
||||
assert.DeepSSZEqual(t, activeValidators, received.ValidatorList)
|
||||
}
|
||||
|
||||
func TestServer_ListValidators_InactiveInTheMiddle(t *testing.T) {
|
||||
@@ -1064,7 +1064,7 @@ func TestServer_ListValidators_FromOldEpoch(t *testing.T) {
|
||||
}
|
||||
res, err = bs.ListValidators(context.Background(), req)
|
||||
require.NoError(t, err)
|
||||
assert.DeepEqual(t, want, res.ValidatorList, "Incorrect number of validators")
|
||||
assert.DeepSSZEqual(t, want, res.ValidatorList, "Incorrect number of validators")
|
||||
}
|
||||
|
||||
func TestServer_ListValidators_ProcessHeadStateSlots(t *testing.T) {
|
||||
|
||||
@@ -40,17 +40,18 @@ type Flags struct {
|
||||
PyrmontTestnet bool // PyrmontTestnet defines the flag through which we can enable the node to run on the Pyrmont testnet.
|
||||
|
||||
// Feature related flags.
|
||||
WriteSSZStateTransitions bool // WriteSSZStateTransitions to tmp directory.
|
||||
SkipBLSVerify bool // Skips BLS verification across the runtime.
|
||||
SlasherProtection bool // SlasherProtection protects validator fron sending over a slashable offense over the network using external slasher.
|
||||
EnablePeerScorer bool // EnablePeerScorer enables experimental peer scoring in p2p.
|
||||
EnableLargerGossipHistory bool // EnableLargerGossipHistory increases the gossip history we store in our caches.
|
||||
WriteWalletPasswordOnWebOnboarding bool // WriteWalletPasswordOnWebOnboarding writes the password to disk after Prysm web signup.
|
||||
DisableAttestingHistoryDBCache bool // DisableAttestingHistoryDBCache for the validator client increases disk reads/writes.
|
||||
UpdateHeadTimely bool // UpdateHeadTimely updates head right after state transition.
|
||||
ProposerAttsSelectionUsingMaxCover bool // ProposerAttsSelectionUsingMaxCover enables max-cover algorithm when selecting attestations for proposing.
|
||||
EnableOptimizedBalanceUpdate bool // EnableOptimizedBalanceUpdate uses an updated method of performing balance updates.
|
||||
EnableDoppelGanger bool // EnableDoppelGanger enables doppelganger protection on startup for the validator.
|
||||
WriteSSZStateTransitions bool // WriteSSZStateTransitions to tmp directory.
|
||||
SkipBLSVerify bool // Skips BLS verification across the runtime.
|
||||
SlasherProtection bool // SlasherProtection protects validator fron sending over a slashable offense over the network using external slasher.
|
||||
EnablePeerScorer bool // EnablePeerScorer enables experimental peer scoring in p2p.
|
||||
EnableLargerGossipHistory bool // EnableLargerGossipHistory increases the gossip history we store in our caches.
|
||||
WriteWalletPasswordOnWebOnboarding bool // WriteWalletPasswordOnWebOnboarding writes the password to disk after Prysm web signup.
|
||||
DisableAttestingHistoryDBCache bool // DisableAttestingHistoryDBCache for the validator client increases disk reads/writes.
|
||||
UpdateHeadTimely bool // UpdateHeadTimely updates head right after state transition.
|
||||
ProposerAttsSelectionUsingMaxCover bool // ProposerAttsSelectionUsingMaxCover enables max-cover algorithm when selecting attestations for proposing.
|
||||
EnableOptimizedBalanceUpdate bool // EnableOptimizedBalanceUpdate uses an updated method of performing balance updates.
|
||||
EnableDoppelGanger bool // EnableDoppelGanger enables doppelganger protection on startup for the validator.
|
||||
EnableHistoricalSpaceRepresentation bool // EnableHistoricalSpaceRepresentation enables the saving of registry validators in separate buckets to save space
|
||||
// Logging related toggles.
|
||||
DisableGRPCConnectionLogs bool // Disables logging when a new grpc client has connected.
|
||||
|
||||
@@ -199,6 +200,10 @@ func ConfigureBeaconChain(ctx *cli.Context) {
|
||||
logDisabled(disableOptimizedBalanceUpdate)
|
||||
cfg.EnableOptimizedBalanceUpdate = false
|
||||
}
|
||||
if ctx.Bool(enableHistoricalSpaceRepresentation.Name) {
|
||||
log.WithField(enableHistoricalSpaceRepresentation.Name, enableHistoricalSpaceRepresentation.Usage).Warn(enabledFeatureFlag)
|
||||
cfg.EnableHistoricalSpaceRepresentation = true
|
||||
}
|
||||
Init(cfg)
|
||||
}
|
||||
|
||||
|
||||
@@ -128,6 +128,12 @@ var (
|
||||
"a foolproof method to find duplicate instances in the network. Your validator will still be" +
|
||||
" vulnerable if it is being run in unsafe configurations.",
|
||||
}
|
||||
enableHistoricalSpaceRepresentation = &cli.BoolFlag{
|
||||
Name: "enable-historical-state-representation",
|
||||
Usage: "Enables the beacon chain to save historical states in a space efficient manner." +
|
||||
" (Warning): Once enabled, this feature migrates your database in to a new schema and " +
|
||||
"there is no going back. At worst, your entire database might get corrupted.",
|
||||
}
|
||||
)
|
||||
|
||||
// devModeFlags holds list of flags that are set when development mode is on.
|
||||
@@ -135,6 +141,7 @@ var devModeFlags = []cli.Flag{
|
||||
enableLargerGossipHistory,
|
||||
enableNextSlotStateCache,
|
||||
forceOptMaxCoverAggregationStategy,
|
||||
enableHistoricalSpaceRepresentation,
|
||||
}
|
||||
|
||||
// ValidatorFlags contains a list of all the feature flags that apply to the validator client.
|
||||
@@ -187,6 +194,7 @@ var BeaconChainFlags = append(deprecatedFlags, []cli.Flag{
|
||||
disableUpdateHeadTimely,
|
||||
disableProposerAttsSelectionUsingMaxCover,
|
||||
disableOptimizedBalanceUpdate,
|
||||
enableHistoricalSpaceRepresentation,
|
||||
}...)
|
||||
|
||||
// E2EBeaconChainFlags contains a list of the beacon chain feature flags to be tested in E2E.
|
||||
|
||||
Reference in New Issue
Block a user