Compare commits

...

24 Commits

Author SHA1 Message Date
Kasey Kirkham
cfbc5e93e9 sg->sm 2022-09-06 10:21:11 -05:00
kasey
e8143682a8 Merge branch 'develop' into rm-archive-pt 2022-08-30 08:04:06 -05:00
Kasey Kirkham
8153338dbe radek feedback 2022-08-30 08:03:02 -05:00
Kasey Kirkham
a7e64d699a new type to count finalized validators for p2p 2022-08-29 14:37:33 -05:00
Kasey Kirkham
4e7720ef8b handle pre-genesis (starting from deposits) 2022-08-26 13:09:31 -05:00
Kasey Kirkham
186dd753d9 linting 2022-08-26 12:09:26 -05:00
Kasey Kirkham
980188f260 rm dead code (for deep source) 2022-08-26 12:01:32 -05:00
kasey
aa7d571d40 Merge branch 'develop' into rm-archive-pt 2022-08-26 11:42:54 -05:00
Kasey Kirkham
dfb9a1e575 lint+fmt and move error 2022-08-26 09:31:20 -05:00
Kasey Kirkham
dbd44dd42a relocate error to common location 2022-08-26 09:05:47 -05:00
Kasey Kirkham
841be3369e rewrite of state cleanup w/o slot index 2022-08-25 22:21:19 -05:00
Kasey Kirkham
80bd557afb Revert "rm CleanUpDirtyStates"
This reverts commit d2c950e15c.
2022-08-25 22:21:19 -05:00
kasey
d2b329f33e Merge branch 'develop' into rm-archive-pt 2022-08-25 14:55:06 -05:00
Kasey Kirkham
c2f40e2ed5 remove some more dead code 2022-08-25 14:29:22 -05:00
Kasey Kirkham
01d28016b1 state-slot-indices
- rm createStateIndicesFromStateSlot
- rm state-slot-indices bucket declarations
2022-08-25 14:24:45 -05:00
Kasey Kirkham
fecc081537 rm migrateArchivedIndex 2022-08-24 21:39:35 -05:00
Kasey Kirkham
fe98b8b8fd rm LastArchivedSlot 2022-08-24 21:33:23 -05:00
Kasey Kirkham
dd2ad28474 rm ArchivedPointRoot 2022-08-24 21:32:13 -05:00
Kasey Kirkham
4b26679224 rm HasArchivedPoint 2022-08-24 21:25:55 -05:00
Kasey Kirkham
03f10d5a89 rm HighestSlotStatesBelow 2022-08-24 21:24:37 -05:00
Kasey Kirkham
7922043cbc cleanup imports 2022-08-24 21:22:29 -05:00
Kasey Kirkham
b0d5ecec0b rm LastArchivedRoot 2022-08-24 21:21:45 -05:00
Kasey Kirkham
d2c950e15c rm CleanUpDirtyStates 2022-08-24 21:19:29 -05:00
Kasey Kirkham
ac2f238a60 p2p: use finalized checkpoint, not archive points 2022-08-24 21:14:55 -05:00
25 changed files with 308 additions and 737 deletions

View File

@@ -38,14 +38,9 @@ type ReadOnlyDatabase interface {
HasState(ctx context.Context, blockRoot [32]byte) bool
StateSummary(ctx context.Context, blockRoot [32]byte) (*ethpb.StateSummary, error)
HasStateSummary(ctx context.Context, blockRoot [32]byte) bool
HighestSlotStatesBelow(ctx context.Context, slot types.Slot) ([]state.ReadOnlyBeaconState, error)
// Checkpoint operations.
JustifiedCheckpoint(ctx context.Context) (*ethpb.Checkpoint, error)
FinalizedCheckpoint(ctx context.Context) (*ethpb.Checkpoint, error)
ArchivedPointRoot(ctx context.Context, slot types.Slot) [32]byte
HasArchivedPoint(ctx context.Context, slot types.Slot) bool
LastArchivedRoot(ctx context.Context) [32]byte
LastArchivedSlot(ctx context.Context) (types.Slot, error)
LastValidatedCheckpoint(ctx context.Context) (*ethpb.Checkpoint, error)
// Deposit contract related handlers.
DepositContractAddress(ctx context.Context) ([]byte, error)

View File

@@ -3,7 +3,6 @@ load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"archived_point.go",
"backup.go",
"blocks.go",
"checkpoint.go",
@@ -17,7 +16,6 @@ go_library(
"kv.go",
"log.go",
"migration.go",
"migration_archived_index.go",
"migration_blinded_beacon_blocks.go",
"migration_block_slot_index.go",
"migration_state_validators.go",
@@ -78,7 +76,6 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"archived_point_test.go",
"backup_test.go",
"blocks_test.go",
"checkpoint_test.go",
@@ -89,7 +86,6 @@ go_test(
"genesis_test.go",
"init_test.go",
"kv_test.go",
"migration_archived_index_test.go",
"migration_block_slot_index_test.go",
"migration_state_validators_test.go",
"state_summary_test.go",

View File

@@ -1,75 +0,0 @@
package kv
import (
"context"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
bolt "go.etcd.io/bbolt"
"go.opencensus.io/trace"
)
// LastArchivedSlot from the db.
func (s *Store) LastArchivedSlot(ctx context.Context) (types.Slot, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.LastArchivedSlot")
defer span.End()
var index types.Slot
err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(stateSlotIndicesBucket)
b, _ := bkt.Cursor().Last()
index = bytesutil.BytesToSlotBigEndian(b)
return nil
})
return index, err
}
// LastArchivedRoot from the db.
func (s *Store) LastArchivedRoot(ctx context.Context) [32]byte {
ctx, span := trace.StartSpan(ctx, "BeaconDB.LastArchivedRoot")
defer span.End()
var blockRoot []byte
if err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(stateSlotIndicesBucket)
_, blockRoot = bkt.Cursor().Last()
return nil
}); err != nil { // This view never returns an error, but we'll handle anyway for sanity.
panic(err)
}
return bytesutil.ToBytes32(blockRoot)
}
// ArchivedPointRoot returns the block root of an archived point from the DB.
// This is essential for cold state management and to restore a cold state.
func (s *Store) ArchivedPointRoot(ctx context.Context, slot types.Slot) [32]byte {
ctx, span := trace.StartSpan(ctx, "BeaconDB.ArchivedPointRoot")
defer span.End()
var blockRoot []byte
if err := s.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(stateSlotIndicesBucket)
blockRoot = bucket.Get(bytesutil.SlotToBytesBigEndian(slot))
return nil
}); err != nil { // This view never returns an error, but we'll handle anyway for sanity.
panic(err)
}
return bytesutil.ToBytes32(blockRoot)
}
// HasArchivedPoint returns true if an archived point exists in DB.
func (s *Store) HasArchivedPoint(ctx context.Context, slot types.Slot) bool {
ctx, span := trace.StartSpan(ctx, "BeaconDB.HasArchivedPoint")
defer span.End()
var exists bool
if err := s.db.View(func(tx *bolt.Tx) error {
iBucket := tx.Bucket(stateSlotIndicesBucket)
exists = iBucket.Get(bytesutil.SlotToBytesBigEndian(slot)) != nil
return nil
}); err != nil { // This view never returns an error, but we'll handle anyway for sanity.
panic(err)
}
return exists
}

View File

@@ -1,51 +0,0 @@
package kv
import (
"context"
"testing"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v3/testing/assert"
"github.com/prysmaticlabs/prysm/v3/testing/require"
"github.com/prysmaticlabs/prysm/v3/testing/util"
)
func TestArchivedPointIndexRoot_CanSaveRetrieve(t *testing.T) {
db := setupDB(t)
ctx := context.Background()
i1 := types.Slot(100)
r1 := [32]byte{'A'}
received := db.ArchivedPointRoot(ctx, i1)
require.NotEqual(t, r1, received, "Should not have been saved")
st, err := util.NewBeaconState()
require.NoError(t, err)
require.NoError(t, st.SetSlot(i1))
require.NoError(t, db.SaveState(ctx, st, r1))
received = db.ArchivedPointRoot(ctx, i1)
assert.Equal(t, r1, received, "Should have been saved")
}
func TestLastArchivedPoint_CanRetrieve(t *testing.T) {
db := setupDB(t)
ctx := context.Background()
i, err := db.LastArchivedSlot(ctx)
require.NoError(t, err)
assert.Equal(t, types.Slot(0), i, "Did not get correct index")
st, err := util.NewBeaconState()
require.NoError(t, err)
assert.NoError(t, db.SaveState(ctx, st, [32]byte{'A'}))
assert.Equal(t, [32]byte{'A'}, db.LastArchivedRoot(ctx), "Did not get wanted root")
assert.NoError(t, st.SetSlot(2))
assert.NoError(t, db.SaveState(ctx, st, [32]byte{'B'}))
assert.Equal(t, [32]byte{'B'}, db.LastArchivedRoot(ctx))
assert.NoError(t, st.SetSlot(3))
assert.NoError(t, db.SaveState(ctx, st, [32]byte{'C'}))
i, err = db.LastArchivedSlot(ctx)
require.NoError(t, err)
assert.Equal(t, types.Slot(3), i, "Did not get correct index")
}

View File

@@ -21,3 +21,5 @@ var ErrNotFoundBackfillBlockRoot = errors.Wrap(ErrNotFound, "BackfillBlockRoot")
// ErrNotFoundFeeRecipient is a not found error specifically for the fee recipient getter
var ErrNotFoundFeeRecipient = errors.Wrap(ErrNotFound, "fee recipient")
var errSavedStateMissingBlock = errors.New("could not find block corresponding to saved state")

View File

@@ -167,7 +167,6 @@ func NewKVStore(ctx context.Context, dirPath string) (*Store, error) {
attestationTargetRootIndicesBucket,
attestationTargetEpochIndicesBucket,
blockSlotIndicesBucket,
stateSlotIndicesBucket,
blockParentRootIndicesBucket,
finalizedBlockRootsIndexBucket,
blockRootValidatorHashesBucket,

View File

@@ -11,7 +11,6 @@ var migrationCompleted = []byte("done")
type migration func(context.Context, *bolt.DB) error
var migrations = []migration{
migrateArchivedIndex,
migrateBlockSlotIndex,
migrateStateValidators,
migrateBlindedBeaconBlocksEnabled,

View File

@@ -1,72 +0,0 @@
package kv
import (
"bytes"
"context"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
bolt "go.etcd.io/bbolt"
)
var migrationArchivedIndex0Key = []byte("archive_index_0")
func migrateArchivedIndex(ctx context.Context, db *bolt.DB) error {
if updateErr := db.Update(func(tx *bolt.Tx) error {
mb := tx.Bucket(migrationsBucket)
if b := mb.Get(migrationArchivedIndex0Key); bytes.Equal(b, migrationCompleted) {
return nil // Migration already completed.
}
bkt := tx.Bucket(archivedRootBucket)
if bkt == nil {
return nil
}
// Remove "last archived index" key before iterating over all keys.
if err := bkt.Delete(lastArchivedIndexKey); err != nil {
return err
}
var highest types.Slot
c := bkt.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
// Look up actual slot from block
b := tx.Bucket(blocksBucket).Get(v)
// Skip this key if there is no block for whatever reason.
if b == nil {
continue
}
blk := &ethpb.SignedBeaconBlock{}
if err := decode(context.TODO(), b, blk); err != nil {
return err
}
if err := tx.Bucket(stateSlotIndicesBucket).Put(bytesutil.SlotToBytesBigEndian(blk.Block.Slot), v); err != nil {
return err
}
if blk.Block.Slot > highest {
highest = blk.Block.Slot
}
// check if context is cancelled in between
if ctx.Err() != nil {
return ctx.Err()
}
}
// Delete deprecated buckets.
for _, bkt := range [][]byte{slotsHasObjectBucket, archivedRootBucket} {
if tx.Bucket(bkt) != nil {
if err := tx.DeleteBucket(bkt); err != nil {
return err
}
}
}
// Mark migration complete.
return mb.Put(migrationArchivedIndex0Key, migrationCompleted)
}); updateErr != nil {
log.WithError(updateErr).Errorf("could not migrate bucket: %s", archivedRootBucket)
return updateErr
}
return nil
}

View File

@@ -1,102 +0,0 @@
package kv
import (
"context"
"testing"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v3/testing/assert"
"github.com/prysmaticlabs/prysm/v3/testing/util"
"go.etcd.io/bbolt"
)
func Test_migrateArchivedIndex(t *testing.T) {
tests := []struct {
name string
setup func(t *testing.T, db *bbolt.DB)
eval func(t *testing.T, db *bbolt.DB)
}{
{
name: "only runs once",
setup: func(t *testing.T, db *bbolt.DB) {
err := db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(archivedRootBucket)
assert.NoError(t, err)
if err := tx.Bucket(archivedRootBucket).Put(bytesutil.Uint64ToBytesLittleEndian(2048), []byte("foo")); err != nil {
return err
}
return tx.Bucket(migrationsBucket).Put(migrationArchivedIndex0Key, migrationCompleted)
})
assert.NoError(t, err)
},
eval: func(t *testing.T, db *bbolt.DB) {
err := db.View(func(tx *bbolt.Tx) error {
v := tx.Bucket(archivedRootBucket).Get(bytesutil.Uint64ToBytesLittleEndian(2048))
assert.DeepEqual(t, []byte("foo"), v, "Did not receive correct data for key 2048")
return nil
})
assert.NoError(t, err)
},
},
{
name: "migrates and deletes entries",
setup: func(t *testing.T, db *bbolt.DB) {
err := db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(archivedRootBucket)
assert.NoError(t, err)
_, err = tx.CreateBucketIfNotExists(slotsHasObjectBucket)
assert.NoError(t, err)
if err := tx.Bucket(archivedRootBucket).Put(bytesutil.Uint64ToBytesLittleEndian(2048), []byte("foo")); err != nil {
return err
}
sb := util.NewBeaconBlock()
sb.Block.Slot = 2048
b, err := encode(context.Background(), sb)
if err != nil {
return err
}
return tx.Bucket(blocksBucket).Put([]byte("foo"), b)
})
assert.NoError(t, err)
},
eval: func(t *testing.T, db *bbolt.DB) {
err := db.View(func(tx *bbolt.Tx) error {
k := uint64(2048)
v := tx.Bucket(stateSlotIndicesBucket).Get(bytesutil.Uint64ToBytesBigEndian(k))
assert.DeepEqual(t, []byte("foo"), v, "Did not receive correct data for key %d", k)
return nil
})
assert.NoError(t, err)
},
},
{
name: "deletes old buckets",
setup: func(t *testing.T, db *bbolt.DB) {
err := db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(archivedRootBucket)
assert.NoError(t, err)
_, err = tx.CreateBucketIfNotExists(slotsHasObjectBucket)
assert.NoError(t, err)
return tx.Bucket(slotsHasObjectBucket).Put(savedStateSlotsKey, []byte("foo"))
})
assert.NoError(t, err)
},
eval: func(t *testing.T, db *bbolt.DB) {
err := db.View(func(tx *bbolt.Tx) error {
assert.Equal(t, (*bbolt.Bucket)(nil), tx.Bucket(slotsHasObjectBucket), "Expected %v to be deleted", savedStateSlotsKey)
assert.Equal(t, (*bbolt.Bucket)(nil), tx.Bucket(archivedRootBucket), "Expected %v to be deleted", savedStateSlotsKey)
return nil
})
assert.NoError(t, err)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
db := setupDB(t).db
tt.setup(t, db)
assert.NoError(t, migrateArchivedIndex(context.Background(), db), "migrateArchivedIndex(tx) error")
tt.eval(t, db)
})
}
}

View File

@@ -202,7 +202,7 @@ func Test_migrateStateValidators(t *testing.T) {
defer resetCfg()
tt.setup(t, dbStore, st, vals)
assert.NoError(t, migrateStateValidators(context.Background(), dbStore.db), "migrateArchivedIndex(tx) error")
assert.NoError(t, migrateStateValidators(context.Background(), dbStore.db), "migrateStateValidators(tx) error")
tt.eval(t, dbStore, st, vals)
})
}
@@ -309,7 +309,7 @@ func Test_migrateAltairStateValidators(t *testing.T) {
defer resetCfg()
tt.setup(t, dbStore, st, vals)
assert.NoError(t, migrateStateValidators(context.Background(), dbStore.db), "migrateArchivedIndex(tx) error")
assert.NoError(t, migrateStateValidators(context.Background(), dbStore.db), "migrateStateValidators(tx) error")
tt.eval(t, dbStore, st, vals)
})
}

View File

@@ -21,15 +21,9 @@ var (
feeRecipientBucket = []byte("fee-recipient")
registrationBucket = []byte("registration")
// Deprecated: This bucket was migrated in PR 6461. Do not use, except for migrations.
slotsHasObjectBucket = []byte("slots-has-objects")
// Deprecated: This bucket was migrated in PR 6461. Do not use, except for migrations.
archivedRootBucket = []byte("archived-index-root")
// Key indices buckets.
blockParentRootIndicesBucket = []byte("block-parent-root-indices")
blockSlotIndicesBucket = []byte("block-slot-indices")
stateSlotIndicesBucket = []byte("state-slot-indices")
attestationHeadBlockRootBucket = []byte("attestation-head-block-root-indices")
attestationSourceRootIndicesBucket = []byte("attestation-source-root-indices")
attestationSourceEpochIndicesBucket = []byte("attestation-source-epoch-indices")
@@ -57,11 +51,6 @@ var (
// block root tracking the progress of backfill, or pointing at genesis if backfill has not been initiated
backfillBlockRootKey = []byte("backfill-block-root")
// Deprecated: This index key was migrated in PR 6461. Do not use, except for migrations.
lastArchivedIndexKey = []byte("last-archived")
// Deprecated: This index key was migrated in PR 6461. Do not use, except for migrations.
savedStateSlotsKey = []byte("saved-state-slots")
// New state management service compatibility bucket.
newStateServiceCompatibleBucket = []byte("new-state-compatible")

View File

@@ -5,6 +5,7 @@ import (
"context"
"fmt"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/golang/snappy"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/state"
@@ -15,9 +16,9 @@ import (
v3 "github.com/prysmaticlabs/prysm/v3/beacon-chain/state/v3"
"github.com/prysmaticlabs/prysm/v3/config/features"
"github.com/prysmaticlabs/prysm/v3/config/params"
"github.com/prysmaticlabs/prysm/v3/consensus-types/blocks"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v3/encoding/ssz/detect"
"github.com/prysmaticlabs/prysm/v3/monitoring/tracing"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/time/slots"
@@ -139,10 +140,6 @@ func (s *Store) SaveStates(ctx context.Context, states []state.ReadOnlyBeaconSta
return s.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(stateBucket)
for i, rt := range blockRoots {
indicesByBucket := createStateIndicesFromStateSlot(ctx, states[i].Slot())
if err := updateValueForIndices(ctx, indicesByBucket, rt[:], tx); err != nil {
return errors.Wrap(err, "could not update DB indices")
}
if err := bucket.Put(rt[:], multipleEncs[i]); err != nil {
return err
}
@@ -209,11 +206,6 @@ func (s *Store) saveStatesEfficientInternal(ctx context.Context, tx *bolt.Tx, bl
bucket := tx.Bucket(stateBucket)
valIdxBkt := tx.Bucket(blockRootValidatorHashesBucket)
for i, rt := range blockRoots {
indicesByBucket := createStateIndicesFromStateSlot(ctx, states[i].Slot())
if err := updateValueForIndices(ctx, indicesByBucket, rt[:], tx); err != nil {
return errors.Wrap(err, "could not update DB indices")
}
// There is a gap when the states that are passed are used outside this
// thread. But while storing the state object, we should not store the
// validator entries.To bring the gap closer, we empty the validators
@@ -393,15 +385,6 @@ func (s *Store) DeleteState(ctx context.Context, blockRoot [32]byte) error {
return nil
}
slot, err := s.slotByBlockRoot(ctx, tx, blockRoot[:])
if err != nil {
return err
}
indicesByBucket := createStateIndicesFromStateSlot(ctx, slot)
if err := deleteValueForIndices(ctx, indicesByBucket, blockRoot[:], tx); err != nil {
return errors.Wrap(err, "could not delete root for DB indices")
}
ok, err := s.isStateValidatorMigrationOver()
if err != nil {
return err
@@ -635,135 +618,17 @@ func (s *Store) stateBytes(ctx context.Context, blockRoot [32]byte) ([]byte, err
return dst, err
}
// slotByBlockRoot retrieves the corresponding slot of the input block root.
func (s *Store) slotByBlockRoot(ctx context.Context, tx *bolt.Tx, blockRoot []byte) (types.Slot, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.slotByBlockRoot")
defer span.End()
bkt := tx.Bucket(stateSummaryBucket)
enc := bkt.Get(blockRoot)
if enc == nil {
// Fall back to check the block.
bkt := tx.Bucket(blocksBucket)
enc := bkt.Get(blockRoot)
if enc == nil {
// Fallback and check the state.
bkt = tx.Bucket(stateBucket)
enc = bkt.Get(blockRoot)
if enc == nil {
return 0, errors.New("state enc can't be nil")
}
// no need to construct the validator entries as it is not used here.
s, err := s.unmarshalState(ctx, enc, nil)
if err != nil {
return 0, err
}
if s == nil || s.IsNil() {
return 0, errors.New("state can't be nil")
}
return s.Slot(), nil
}
b := &ethpb.SignedBeaconBlock{}
err := decode(ctx, enc, b)
if err != nil {
return 0, err
}
wsb, err := blocks.NewSignedBeaconBlock(b)
if err != nil {
return 0, err
}
if err := blocks.BeaconBlockIsNil(wsb); err != nil {
return 0, err
}
return b.Block.Slot, nil
}
stateSummary := &ethpb.StateSummary{}
if err := decode(ctx, enc, stateSummary); err != nil {
return 0, err
}
return stateSummary.Slot, nil
}
// HighestSlotStatesBelow returns the states with the highest slot below the input slot
// from the db. Ideally there should just be one state per slot, but given validator
// can double propose, a single slot could have multiple block roots and
// results states. This returns a list of states.
func (s *Store) HighestSlotStatesBelow(ctx context.Context, slot types.Slot) ([]state.ReadOnlyBeaconState, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.HighestSlotStatesBelow")
defer span.End()
var best []byte
if err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(stateSlotIndicesBucket)
c := bkt.Cursor()
for s, root := c.First(); s != nil; s, root = c.Next() {
if ctx.Err() != nil {
return ctx.Err()
}
key := bytesutil.BytesToSlotBigEndian(s)
if root == nil {
continue
}
if key >= slot {
break
}
best = root
}
return nil
}); err != nil {
return nil, err
}
var st state.ReadOnlyBeaconState
var err error
if best != nil {
st, err = s.State(ctx, bytesutil.ToBytes32(best))
if err != nil {
return nil, err
}
}
if st == nil || st.IsNil() {
st, err = s.GenesisState(ctx)
if err != nil {
return nil, err
}
}
return []state.ReadOnlyBeaconState{st}, nil
}
// createStateIndicesFromStateSlot takes in a state slot and returns
// a map of bolt DB index buckets corresponding to each particular key for indices for
// data, such as (shard indices bucket -> shard 5).
func createStateIndicesFromStateSlot(ctx context.Context, slot types.Slot) map[string][]byte {
ctx, span := trace.StartSpan(ctx, "BeaconDB.createStateIndicesFromState")
defer span.End()
indicesByBucket := make(map[string][]byte)
// Every index has a unique bucket for fast, binary-search
// range scans for filtering across keys.
buckets := [][]byte{
stateSlotIndicesBucket,
}
indices := [][]byte{
bytesutil.SlotToBytesBigEndian(slot),
}
for i := 0; i < len(buckets); i++ {
indicesByBucket[string(buckets[i])] = indices[i]
}
return indicesByBucket
}
// CleanUpDirtyStates removes states in DB that falls to under archived point interval rules.
// Only following states would be kept:
// 1.) state_slot % archived_interval == 0. (e.g. archived_interval=2048, states with slot 2048, 4096... etc)
// 2.) archived_interval - archived_interval/3 < state_slot % archived_interval
// (e.g. archived_interval=2048, states with slots after 1365).
// This is to tolerate skip slots. Not every state lays on the boundary.
// 3.) state with current finalized root
// 4.) unfinalized States
// CleanUpDirtyStates attempts to maintain the promise to save approximately <head slot / save state interval> states.
// To do that, we save about 1 state every eg 2048 slots (default slotsPerArchivedPoint value), calling the slot
// where the save happened the "save point". Due to skipped slots, there may not be a block at a multiple of 2048,
// in which case the saved state point will be at the slot where the last block was previously included in the interval.
// We don't want to delete the most recently finalized state, which is saved to the same database,
// and in long periods of non-finality, stategen may also write a state every 128 slots to aid in recovery.
// So we preserve:
// 1. any state where the slot number is a multiple of 2048 (slot % 2048 == 0)
// 2. any state with a slot number within 682 slots (2048/3) of a such a save point,
// 3. most recently finalized state
// 4. non-finalized states used by stategen
func (s *Store) CleanUpDirtyStates(ctx context.Context, slotsPerArchivedPoint types.Slot) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB. CleanUpDirtyStates")
defer span.End()
@@ -776,24 +641,61 @@ func (s *Store) CleanUpDirtyStates(ctx context.Context, slotsPerArchivedPoint ty
if err != nil {
return err
}
deletedRoots := make([][32]byte, 0)
finalizedRoot := bytesutil.ToBytes32(f.Root)
// We usually archive a state every 2048 slots. If a slot with value % 2048 == 0 is skipped,
// we will store the last un-skipped state instead. We don't know exactly how far back that state could be
// from the skipped one, but a fudge factor of roughly 1/3 of the interval was chosen based on looking
// at chain history for guidance. 1/3 of the default interval (2048) comes out to about 682 slots (or ~21 epochs).
intervalTopThird := slotsPerArchivedPoint - slotsPerArchivedPoint/3
seen := 0
toDelete := make([][32]byte, 0)
err = s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(stateSlotIndicesBucket)
return bkt.ForEach(func(k, v []byte) error {
bkt := tx.Bucket(stateBucket)
bbkt := tx.Bucket(blocksBucket)
return bkt.ForEach(func(k, _ []byte) error {
if ctx.Err() != nil {
return ctx.Err()
}
finalizedChkpt := bytesutil.ToBytes32(f.Root) == bytesutil.ToBytes32(v)
slot := bytesutil.BytesToSlotBigEndian(k)
mod := slot % slotsPerArchivedPoint
nonFinalized := slot > finalizedSlot
// The following conditions cover 1, 2, 3 and 4 above.
if mod != 0 && mod <= slotsPerArchivedPoint-slotsPerArchivedPoint/3 && !finalizedChkpt && !nonFinalized {
deletedRoots = append(deletedRoots, bytesutil.ToBytes32(v))
seen += 1
// If we could cheaply and easily read the first 50 or so bytes of the state,
// we could pull the slot from the ssz-encoded bytes. But the state is very large (> 50MB) and
// we need to read the entire thing to snappy.Decode it, so this code is betting that it's cheaper
// to grab the corresponding block and decode that instead.
enc := bbkt.Get(k[:32])
if enc == nil {
// the database is in an unexpected state, we should error out to prevent anything destructive.
log.WithField("root", hexutil.Encode(k)).Error("Could not find block corresponding to saved state")
return errors.Wrapf(errSavedStateMissingBlock, "root=%#x", k)
}
enc, err = snappy.Decode(nil, enc)
if err != nil {
return errors.Wrapf(err, "unable to snappy.Decode block with root=%#x", k)
}
slot, err := detect.SlotFromBlock(enc)
if err != nil {
return errors.Wrapf(err, "unable to extract slot from block with root=%#x", k)
}
mod := slot % slotsPerArchivedPoint
// state is on an archive point, or within the final 1/3 of the interval (case 1 & 2)
if mod == 0 || mod > intervalTopThird {
return nil
}
// don't delete the state integrating the latest finalized block (case 3)
if bytesutil.ToBytes32(k) == finalizedRoot {
return nil
}
// don't delete states that haven't finalized yet - they may be in-use by the hot state cache (case 4)
if slot > finalizedSlot {
return nil
}
// delete everything else!
toDelete = append(toDelete, bytesutil.ToBytes32(k))
return nil
})
})
@@ -801,13 +703,13 @@ func (s *Store) CleanUpDirtyStates(ctx context.Context, slotsPerArchivedPoint ty
return err
}
// Length of to be deleted roots is 0. Nothing to do.
if len(deletedRoots) == 0 {
if len(toDelete) == 0 {
log.WithField("db_total", seen).Info("No dirty states to clean up")
return nil
}
log.WithField("count", len(deletedRoots)).Info("Cleaning up dirty states")
if err := s.DeleteStates(ctx, deletedRoots); err != nil {
log.WithField("db_total", seen).WithField("dirty", len(toDelete)).Info("Cleaning up dirty states")
if err := s.DeleteStates(ctx, toDelete); err != nil {
return err
}

View File

@@ -3,6 +3,7 @@ package kv
import (
"context"
"encoding/binary"
"fmt"
"math/rand"
"testing"
"time"
@@ -468,102 +469,13 @@ func TestStore_DeleteHeadState(t *testing.T) {
require.NoError(t, db.DeleteState(ctx, headBlockRoot)) // Ok to delete head state if it's optimistic.
}
func TestStore_SaveDeleteState_CanGetHighestBelow(t *testing.T) {
db := setupDB(t)
b := util.NewBeaconBlock()
b.Block.Slot = 1
r, err := b.Block.HashTreeRoot()
require.NoError(t, err)
wsb, err := blocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
require.NoError(t, db.SaveBlock(context.Background(), wsb))
st, err := util.NewBeaconState()
require.NoError(t, err)
require.NoError(t, st.SetSlot(1))
s0 := st.InnerStateUnsafe()
require.NoError(t, db.SaveState(context.Background(), st, r))
b.Block.Slot = 100
r1, err := b.Block.HashTreeRoot()
require.NoError(t, err)
wsb, err = blocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
require.NoError(t, db.SaveBlock(context.Background(), wsb))
st, err = util.NewBeaconState()
require.NoError(t, err)
require.NoError(t, st.SetSlot(100))
s1 := st.InnerStateUnsafe()
require.NoError(t, db.SaveState(context.Background(), st, r1))
b.Block.Slot = 1000
r2, err := b.Block.HashTreeRoot()
require.NoError(t, err)
wsb, err = blocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
require.NoError(t, db.SaveBlock(context.Background(), wsb))
st, err = util.NewBeaconState()
require.NoError(t, err)
require.NoError(t, st.SetSlot(1000))
s2 := st.InnerStateUnsafe()
require.NoError(t, db.SaveState(context.Background(), st, r2))
highest, err := db.HighestSlotStatesBelow(context.Background(), 2)
require.NoError(t, err)
assert.DeepSSZEqual(t, highest[0].InnerStateUnsafe(), s0)
highest, err = db.HighestSlotStatesBelow(context.Background(), 101)
require.NoError(t, err)
assert.DeepSSZEqual(t, highest[0].InnerStateUnsafe(), s1)
highest, err = db.HighestSlotStatesBelow(context.Background(), 1001)
require.NoError(t, err)
assert.DeepSSZEqual(t, highest[0].InnerStateUnsafe(), s2)
}
func TestStore_GenesisState_CanGetHighestBelow(t *testing.T) {
db := setupDB(t)
genesisState, err := util.NewBeaconState()
require.NoError(t, err)
genesisRoot := [32]byte{'a'}
require.NoError(t, db.SaveGenesisBlockRoot(context.Background(), genesisRoot))
require.NoError(t, db.SaveState(context.Background(), genesisState, genesisRoot))
b := util.NewBeaconBlock()
b.Block.Slot = 1
r, err := b.Block.HashTreeRoot()
require.NoError(t, err)
wsb, err := blocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
require.NoError(t, db.SaveBlock(context.Background(), wsb))
st, err := util.NewBeaconState()
require.NoError(t, err)
require.NoError(t, st.SetSlot(1))
require.NoError(t, db.SaveState(context.Background(), st, r))
highest, err := db.HighestSlotStatesBelow(context.Background(), 2)
require.NoError(t, err)
assert.DeepSSZEqual(t, highest[0].InnerStateUnsafe(), st.InnerStateUnsafe())
highest, err = db.HighestSlotStatesBelow(context.Background(), 1)
require.NoError(t, err)
assert.DeepSSZEqual(t, highest[0].InnerStateUnsafe(), genesisState.InnerStateUnsafe())
highest, err = db.HighestSlotStatesBelow(context.Background(), 0)
require.NoError(t, err)
assert.DeepSSZEqual(t, highest[0].InnerStateUnsafe(), genesisState.InnerStateUnsafe())
}
func TestStore_CleanUpDirtyStates_AboveThreshold(t *testing.T) {
db := setupDB(t)
genesisState, err := util.NewBeaconState()
require.NoError(t, err)
genesisRoot := [32]byte{'a'}
require.NoError(t, db.SaveGenesisBlockRoot(context.Background(), genesisRoot))
require.NoError(t, db.SaveState(context.Background(), genesisState, genesisRoot))
require.NoError(t, db.SaveGenesisData(context.Background(), genesisState))
bRoots := make([][32]byte, 0)
slotsPerArchivedPoint := types.Slot(128)
@@ -592,11 +504,12 @@ func TestStore_CleanUpDirtyStates_AboveThreshold(t *testing.T) {
}))
require.NoError(t, db.CleanUpDirtyStates(context.Background(), slotsPerArchivedPoint))
threshold := slotsPerArchivedPoint.SubSlot(slotsPerArchivedPoint.Div(3))
for i, root := range bRoots {
if types.Slot(i) >= slotsPerArchivedPoint.SubSlot(slotsPerArchivedPoint.Div(3)) {
if types.Slot(i) >= threshold {
require.Equal(t, true, db.HasState(context.Background(), root))
} else {
require.Equal(t, false, db.HasState(context.Background(), root))
require.Equal(t, false, db.HasState(context.Background(), root), fmt.Sprintf("slot=%d, threshold=%d", i, threshold))
}
}
}
@@ -606,9 +519,9 @@ func TestStore_CleanUpDirtyStates_Finalized(t *testing.T) {
genesisState, err := util.NewBeaconState()
require.NoError(t, err)
genesisRoot := [32]byte{'a'}
require.NoError(t, db.SaveGenesisBlockRoot(context.Background(), genesisRoot))
require.NoError(t, db.SaveState(context.Background(), genesisState, genesisRoot))
require.NoError(t, db.SaveGenesisData(context.Background(), genesisState))
genesisRoot, err := db.GenesisBlockRoot(context.Background())
require.NoError(t, err)
for i := types.Slot(1); i <= params.BeaconConfig().SlotsPerEpoch; i++ {
b := util.NewBeaconBlock()
@@ -635,9 +548,9 @@ func TestStore_CleanUpDirtyStates_DontDeleteNonFinalized(t *testing.T) {
genesisState, err := util.NewBeaconState()
require.NoError(t, err)
genesisRoot := [32]byte{'a'}
require.NoError(t, db.SaveGenesisBlockRoot(context.Background(), genesisRoot))
require.NoError(t, db.SaveState(context.Background(), genesisState, genesisRoot))
require.NoError(t, db.SaveGenesisData(context.Background(), genesisState))
genesisRoot, err := db.GenesisBlockRoot(context.Background())
require.NoError(t, err)
var unfinalizedRoots [][32]byte
for i := types.Slot(1); i <= params.BeaconConfig().SlotsPerEpoch; i++ {

View File

@@ -202,12 +202,13 @@ func New(cliCtx *cli.Context, opts ...Option) (*BeaconNode, error) {
}
log.Debugln("Starting State Gen")
if err := beacon.startStateGen(ctx, bfs); err != nil {
stg, err := beacon.startStateGen(ctx, bfs)
if err != nil {
return nil, err
}
log.Debugln("Registering P2P Service")
if err := beacon.registerP2P(cliCtx); err != nil {
if err := beacon.registerP2P(cliCtx, stg); err != nil {
return nil, err
}
@@ -481,13 +482,13 @@ func (b *BeaconNode) startSlasherDB(cliCtx *cli.Context) error {
return nil
}
func (b *BeaconNode) startStateGen(ctx context.Context, bfs *backfill.Status) error {
func (b *BeaconNode) startStateGen(ctx context.Context, bfs *backfill.Status) (stategen.StateManager, error) {
opts := []stategen.StateGenOption{stategen.WithBackfillStatus(bfs)}
sg := stategen.New(b.db, opts...)
cp, err := b.db.FinalizedCheckpoint(ctx)
if err != nil {
return err
return nil, err
}
r := bytesutil.ToBytes32(cp.Root)
@@ -495,31 +496,32 @@ func (b *BeaconNode) startStateGen(ctx context.Context, bfs *backfill.Status) er
if r == params.BeaconConfig().ZeroHash {
genesisBlock, err := b.db.GenesisBlock(ctx)
if err != nil {
return err
return nil, err
}
if genesisBlock != nil && !genesisBlock.IsNil() {
r, err = genesisBlock.Block().HashTreeRoot()
if err != nil {
return err
return nil, err
}
}
}
b.finalizedStateAtStartUp, err = sg.StateByRoot(ctx, r)
if err != nil {
return err
return nil, err
}
b.stateGen = sg
return nil
return sg, err
}
func (b *BeaconNode) registerP2P(cliCtx *cli.Context) error {
func (b *BeaconNode) registerP2P(cliCtx *cli.Context, sg stategen.StateManager) error {
bootstrapNodeAddrs, dataDir, err := registration.P2PPreregistration(cliCtx)
if err != nil {
return err
}
vc := stategen.NewLastFinalizedValidatorCounter(0, b.db, sg)
svc, err := p2p.NewService(b.ctx, &p2p.Config{
NoDiscovery: cliCtx.Bool(cmd.NoDiscovery.Name),
StaticPeers: slice.SplitCommaSeparated(cliCtx.StringSlice(cmd.StaticPeers.Name)),
@@ -538,7 +540,7 @@ func (b *BeaconNode) registerP2P(cliCtx *cli.Context) error {
DenyListCIDR: slice.SplitCommaSeparated(cliCtx.StringSlice(cmd.P2PDenyList.Name)),
EnableUPnP: cliCtx.Bool(cmd.EnableUPnPFlag.Name),
StateNotifier: b,
DB: b.db,
ValCounter: vc,
})
if err != nil {
return err

View File

@@ -46,13 +46,12 @@ go_library(
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/time:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/peers/peerdata:go_default_library",
"//beacon-chain/p2p/peers/scorers:go_default_library",
"//beacon-chain/p2p/types:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
@@ -138,7 +137,6 @@ go_test(
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/db/testing:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/peers/peerdata:go_default_library",

View File

@@ -2,7 +2,7 @@ package p2p
import (
statefeed "github.com/prysmaticlabs/prysm/v3/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/state/stategen"
)
// Config for the p2p service. These parameters are set from application level flags
@@ -26,5 +26,5 @@ type Config struct {
AllowListCIDR string
DenyListCIDR []string
StateNotifier statefeed.Notifier
DB db.ReadOnlyDatabase
ValCounter stategen.ActiveValidatorCounter
}

View File

@@ -1,7 +1,6 @@
package p2p
import (
"context"
"math"
"reflect"
"strings"
@@ -11,7 +10,6 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/helpers"
coreTime "github.com/prysmaticlabs/prysm/v3/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v3/config/params"
"github.com/sirupsen/logrus"
)
@@ -95,19 +93,19 @@ func peerScoringParams() (*pubsub.PeerScoreParams, *pubsub.PeerScoreThresholds)
}
func (s *Service) topicScoreParams(topic string) (*pubsub.TopicScoreParams, error) {
activeValidators, err := s.retrieveActiveValidators()
c, err := s.cfg.ValCounter.ActiveValidatorCount(s.ctx)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "could not compute active validator count")
}
switch {
case strings.Contains(topic, GossipBlockMessage):
return defaultBlockTopicParams(), nil
case strings.Contains(topic, GossipAggregateAndProofMessage):
return defaultAggregateTopicParams(activeValidators), nil
return defaultAggregateTopicParams(c), nil
case strings.Contains(topic, GossipAttestationMessage):
return defaultAggregateSubnetTopicParams(activeValidators), nil
return defaultAggregateSubnetTopicParams(c), nil
case strings.Contains(topic, GossipSyncCommitteeMessage):
return defaultSyncSubnetTopicParams(activeValidators), nil
return defaultSyncSubnetTopicParams(c), nil
case strings.Contains(topic, GossipContributionAndProofMessage):
return defaultSyncContributionTopicParams(), nil
case strings.Contains(topic, GossipExitMessage):
@@ -121,43 +119,6 @@ func (s *Service) topicScoreParams(topic string) (*pubsub.TopicScoreParams, erro
}
}
func (s *Service) retrieveActiveValidators() (uint64, error) {
if s.activeValidatorCount != 0 {
return s.activeValidatorCount, nil
}
rt := s.cfg.DB.LastArchivedRoot(s.ctx)
if rt == params.BeaconConfig().ZeroHash {
genState, err := s.cfg.DB.GenesisState(s.ctx)
if err != nil {
return 0, err
}
if genState == nil || genState.IsNil() {
return 0, errors.New("no genesis state exists")
}
activeVals, err := helpers.ActiveValidatorCount(context.Background(), genState, coreTime.CurrentEpoch(genState))
if err != nil {
return 0, err
}
// Cache active validator count
s.activeValidatorCount = activeVals
return activeVals, nil
}
bState, err := s.cfg.DB.State(s.ctx, rt)
if err != nil {
return 0, err
}
if bState == nil || bState.IsNil() {
return 0, errors.Errorf("no state with root %#x exists", rt)
}
activeVals, err := helpers.ActiveValidatorCount(context.Background(), bState, coreTime.CurrentEpoch(bState))
if err != nil {
return 0, err
}
// Cache active validator count
s.activeValidatorCount = activeVals
return activeVals, nil
}
// Based on the lighthouse parameters.
// https://gist.github.com/blacktemplar/5c1862cb3f0e32a1a7fb0b25e79e6e2c

View File

@@ -1,68 +1,11 @@
package p2p
import (
"context"
"testing"
pubsub "github.com/libp2p/go-libp2p-pubsub"
dbutil "github.com/prysmaticlabs/prysm/v3/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/v3/config/params"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/testing/assert"
"github.com/prysmaticlabs/prysm/v3/testing/require"
"github.com/prysmaticlabs/prysm/v3/testing/util"
)
func TestCorrect_ActiveValidatorsCount(t *testing.T) {
params.SetupTestConfigCleanup(t)
cfg := params.MainnetConfig().Copy()
cfg.ConfigName = "test"
params.OverrideBeaconConfig(cfg)
db := dbutil.SetupDB(t)
s := &Service{
ctx: context.Background(),
cfg: &Config{DB: db},
}
bState, err := util.NewBeaconState(func(state *ethpb.BeaconState) error {
validators := make([]*ethpb.Validator, params.BeaconConfig().MinGenesisActiveValidatorCount)
for i := 0; i < len(validators); i++ {
validators[i] = &ethpb.Validator{
PublicKey: make([]byte, 48),
WithdrawalCredentials: make([]byte, 32),
ExitEpoch: params.BeaconConfig().FarFutureEpoch,
Slashed: false,
}
}
state.Validators = validators
return nil
})
require.NoError(t, err)
require.NoError(t, db.SaveGenesisData(s.ctx, bState))
vals, err := s.retrieveActiveValidators()
assert.NoError(t, err, "genesis state not retrieved")
assert.Equal(t, int(params.BeaconConfig().MinGenesisActiveValidatorCount), int(vals), "mainnet genesis active count isn't accurate")
for i := 0; i < 100; i++ {
require.NoError(t, bState.AppendValidator(&ethpb.Validator{
PublicKey: make([]byte, 48),
WithdrawalCredentials: make([]byte, 32),
ExitEpoch: params.BeaconConfig().FarFutureEpoch,
Slashed: false,
}))
}
require.NoError(t, bState.SetSlot(10000))
require.NoError(t, db.SaveState(s.ctx, bState, [32]byte{'a'}))
// Reset count
s.activeValidatorCount = 0
// Retrieve last archived state.
vals, err = s.retrieveActiveValidators()
assert.NoError(t, err, "genesis state not retrieved")
assert.Equal(t, int(params.BeaconConfig().MinGenesisActiveValidatorCount)+100, int(vals), "mainnet genesis active count isn't accurate")
}
func TestLoggingParameters(_ *testing.T) {
logGossipParameters("testing", nil)
logGossipParameters("testing", &pubsub.TopicScoreParams{})

View File

@@ -84,7 +84,6 @@ type Service struct {
host host.Host
genesisTime time.Time
genesisValidatorsRoot []byte
activeValidatorCount uint64
}
// NewService initializes a new p2p service compatible with shared.Service interface. No

View File

@@ -16,12 +16,14 @@ go_library(
"replayer.go",
"service.go",
"setter.go",
"valcounter.go",
],
importpath = "github.com/prysmaticlabs/prysm/v3/beacon-chain/state/stategen",
visibility = ["//visibility:public"],
deps = [
"//beacon-chain/core/altair:go_default_library",
"//beacon-chain/core/execution:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/time:go_default_library",
"//beacon-chain/core/transition:go_default_library",
"//beacon-chain/db:go_default_library",
@@ -62,6 +64,7 @@ go_test(
"replayer_test.go",
"service_test.go",
"setter_test.go",
"valcounter_test.go",
],
embed = [":go_default_library"],
deps = [

View File

@@ -52,11 +52,6 @@ func TestMigrateToCold_HappyPath(t *testing.T) {
gotState, err := service.beaconDB.State(ctx, fRoot)
require.NoError(t, err)
assert.DeepSSZEqual(t, beaconState.InnerStateUnsafe(), gotState.InnerStateUnsafe(), "Did not save state")
gotRoot := service.beaconDB.ArchivedPointRoot(ctx, stateSlot/service.slotsPerArchivedPoint)
assert.Equal(t, fRoot, gotRoot, "Did not save archived root")
lastIndex, err := service.beaconDB.LastArchivedSlot(ctx)
require.NoError(t, err)
assert.Equal(t, types.Slot(1), lastIndex, "Did not save last archived index")
require.LogsContain(t, hook, "Saved state in DB")
}
@@ -102,11 +97,6 @@ func TestMigrateToCold_RegeneratePath(t *testing.T) {
s1, err := service.beaconDB.State(ctx, r1)
require.NoError(t, err)
assert.Equal(t, s1.Slot(), types.Slot(1), "Did not save state")
gotRoot := service.beaconDB.ArchivedPointRoot(ctx, 1/service.slotsPerArchivedPoint)
assert.Equal(t, r1, gotRoot, "Did not save archived root")
lastIndex, err := service.beaconDB.LastArchivedSlot(ctx)
require.NoError(t, err)
assert.Equal(t, types.Slot(1), lastIndex, "Did not save last archived index")
require.LogsContain(t, hook, "Saved state in DB")
}
@@ -128,9 +118,17 @@ func TestMigrateToCold_StateExistsInDB(t *testing.T) {
util.SaveBlock(t, ctx, service.beaconDB, b)
require.NoError(t, service.epochBoundaryStateCache.put(fRoot, beaconState))
require.NoError(t, service.beaconDB.SaveState(ctx, beaconState, fRoot))
savedRoot, err := beaconState.HashTreeRoot(ctx)
require.NoError(t, err)
service.saveHotStateDB.blockRootsOfSavedStates = [][32]byte{{1}, {2}, {3}, {4}, fRoot}
require.NoError(t, service.MigrateToCold(ctx, fRoot))
assert.DeepEqual(t, [][32]byte{{1}, {2}, {3}, {4}}, service.saveHotStateDB.blockRootsOfSavedStates)
assert.LogsDoNotContain(t, hook, "Saved state in DB")
st, err := service.beaconDB.State(ctx, fRoot)
require.NoError(t, err)
foundRoot, err := st.HashTreeRoot(ctx)
require.NoError(t, err)
require.Equal(t, savedRoot, foundRoot)
require.Equal(t, false, st.IsNil())
}

View File

@@ -0,0 +1,68 @@
package stategen
import (
"context"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/helpers"
coreTime "github.com/prysmaticlabs/prysm/v3/beacon-chain/core/time"
beacondb "github.com/prysmaticlabs/prysm/v3/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v3/config/params"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
)
type ActiveValidatorCounter interface {
ActiveValidatorCount(ctx context.Context) (uint64, error)
}
type LastFinalizedValidatorCounter struct {
count uint64
db beacondb.HeadAccessDatabase
sm StateManager
}
func (lf *LastFinalizedValidatorCounter) ActiveValidatorCount(ctx context.Context) (uint64, error) {
if lf.count != 0 {
return lf.count, nil
}
cp, err := lf.db.FinalizedCheckpoint(ctx)
if err != nil {
return 0, err
}
r := bytesutil.ToBytes32(cp.Root)
// Consider edge case where finalized root are zeros instead of genesis root hash.
if r == params.BeaconConfig().ZeroHash {
genesisBlock, err := lf.db.GenesisBlock(ctx)
if err != nil {
return 0, err
}
if genesisBlock != nil && !genesisBlock.IsNil() {
r, err = genesisBlock.Block().HashTreeRoot()
if err != nil {
return 0, err
}
}
}
st, err := lf.sm.StateByRoot(ctx, r)
if err != nil {
return 0, err
}
if st == nil || st.IsNil() {
return 0, errors.Wrapf(errUnknownState, "could not retrieve state with root=%#x", r)
}
vc, err := helpers.ActiveValidatorCount(context.Background(), st, coreTime.CurrentEpoch(st))
if err != nil {
return 0, err
}
lf.count = vc
return lf.count, nil
}
func NewLastFinalizedValidatorCounter(count uint64, db beacondb.HeadAccessDatabase, sg StateManager) *LastFinalizedValidatorCounter {
return &LastFinalizedValidatorCounter{
count: count,
db: db,
sm: sg,
}
}

View File

@@ -0,0 +1,88 @@
package stategen
import (
"context"
"testing"
testDB "github.com/prysmaticlabs/prysm/v3/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/v3/config/params"
"github.com/prysmaticlabs/prysm/v3/consensus-types/blocks"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/testing/require"
"github.com/prysmaticlabs/prysm/v3/testing/util"
)
func TestLastFinalizedValidatorCounter(t *testing.T) {
ctx := context.Background()
beaconDB := testDB.SetupDB(t)
sg := New(beaconDB)
lf := NewLastFinalizedValidatorCounter(0, beaconDB, sg)
c, err := lf.ActiveValidatorCount(ctx)
require.Equal(t, uint64(0), c)
require.ErrorIs(t, err, errUnknownState)
var genCount uint64 = 64
gst, err := util.NewBeaconState(valPopulator(genCount))
require.NoError(t, err)
require.NoError(t, beaconDB.SaveGenesisData(ctx, gst))
c, err = lf.ActiveValidatorCount(ctx)
require.NoError(t, err)
require.Equal(t, genCount, c)
finCount := params.BeaconConfig().MinGenesisActiveValidatorCount
var expectedSlot types.Slot = 3200
st, err := util.NewBeaconState(valPopulator(finCount))
require.NoError(t, err)
require.NoError(t, st.SetSlot(expectedSlot))
// get the genesis block so we can use a valid parent root when for the new final block
gb, err := beaconDB.GenesisBlock(ctx)
require.NoError(t, err)
gbr, err := gb.Block().HashTreeRoot()
require.NoError(t, err)
b := util.NewBeaconBlock()
b.Block.ParentRoot = gbr[:]
b.Block.Slot = expectedSlot
br, err := b.Block.HashTreeRoot()
require.NoError(t, err)
wsb, err := blocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
require.NoError(t, beaconDB.SaveBlock(context.Background(), wsb))
require.NoError(t, beaconDB.SaveState(ctx, st, br))
require.NoError(t, beaconDB.SaveFinalizedCheckpoint(context.Background(), &ethpb.Checkpoint{
Root: br[:],
Epoch: types.Epoch(expectedSlot / params.BeaconConfig().SlotsPerEpoch),
}))
c, err = lf.ActiveValidatorCount(ctx)
require.NoError(t, err)
// genCount will still be cached
require.Equal(t, genCount, c)
// reach in and reset the cache to cause it to miss and repopulate
lf.count = 0
c, err = lf.ActiveValidatorCount(ctx)
require.NoError(t, err)
// updated value should match most recently finalized
require.Equal(t, finCount, c)
}
func valPopulator(valCount uint64) func(state *ethpb.BeaconState) error {
return func(state *ethpb.BeaconState) error {
validators := make([]*ethpb.Validator, valCount)
for i := 0; i < len(validators); i++ {
validators[i] = &ethpb.Validator{
PublicKey: make([]byte, 48),
WithdrawalCredentials: make([]byte, 32),
ExitEpoch: params.BeaconConfig().FarFutureEpoch,
Slashed: false,
}
}
state.Validators = validators
return nil
}
}

View File

@@ -40,6 +40,18 @@ var beaconStateCurrentVersion = fieldSpec{
t: typeBytes4,
}
var beaconStateSlot = fieldSpec{
// 40 = 8 (genesis_time) + 32 (genesis_validators_root)
offset: 40,
t: typeUint64,
}
// SlotFromState extracts the slot of the state out of the ssz-encoded byte slice
func SlotFromState(marshaled []byte) (types.Slot, error) {
s, err := beaconStateSlot.uint64(marshaled)
return types.Slot(s), err
}
// FromState exploits the fixed-size lower-order bytes in a BeaconState as a heuristic to obtain the value of the
// state.version field without first unmarshaling the BeaconState. The Version is then internally used to lookup
// the correct ConfigVersion.
@@ -128,7 +140,7 @@ var beaconBlockSlot = fieldSpec{
t: typeUint64,
}
func slotFromBlock(marshaled []byte) (types.Slot, error) {
func SlotFromBlock(marshaled []byte) (types.Slot, error) {
slot, err := beaconBlockSlot.uint64(marshaled)
if err != nil {
return 0, err
@@ -141,7 +153,7 @@ var errBlockForkMismatch = errors.New("fork or config detected in unmarshaler is
// UnmarshalBeaconBlock uses internal knowledge in the VersionedUnmarshaler to pick the right concrete SignedBeaconBlock type,
// then Unmarshal()s the type and returns an instance of block.SignedBeaconBlock if successful.
func (cf *VersionedUnmarshaler) UnmarshalBeaconBlock(marshaled []byte) (interfaces.SignedBeaconBlock, error) {
slot, err := slotFromBlock(marshaled)
slot, err := SlotFromBlock(marshaled)
if err != nil {
return nil, err
}
@@ -172,7 +184,7 @@ func (cf *VersionedUnmarshaler) UnmarshalBeaconBlock(marshaled []byte) (interfac
// then Unmarshal()s the type and returns an instance of block.SignedBeaconBlock if successful.
// For Phase0 and Altair it works exactly line UnmarshalBeaconBlock.
func (cf *VersionedUnmarshaler) UnmarshalBlindedBeaconBlock(marshaled []byte) (interfaces.SignedBeaconBlock, error) {
slot, err := slotFromBlock(marshaled)
slot, err := SlotFromBlock(marshaled)
if err != nil {
return nil, err
}

View File

@@ -21,30 +21,31 @@ import (
)
func TestSlotFromBlock(t *testing.T) {
b := util.NewBeaconBlock()
var slot types.Slot = 3
b.Block.Slot = slot
bb, err := b.MarshalSSZ()
require.NoError(t, err)
sfb, err := slotFromBlock(bb)
require.NoError(t, err)
require.Equal(t, slot, sfb)
for _, slot := range []types.Slot{0, 1, 2, 3, 5, 7, 11, 13, 16, 32, 128, 2048, math.MaxUint64} {
b := util.NewBeaconBlock()
b.Block.Slot = slot
bb, err := b.MarshalSSZ()
require.NoError(t, err)
sfb, err := SlotFromBlock(bb)
require.NoError(t, err)
require.Equal(t, slot, sfb)
ba := util.NewBeaconBlockAltair()
ba.Block.Slot = slot
bab, err := ba.MarshalSSZ()
require.NoError(t, err)
sfba, err := slotFromBlock(bab)
require.NoError(t, err)
require.Equal(t, slot, sfba)
ba := util.NewBeaconBlockAltair()
ba.Block.Slot = slot
bab, err := ba.MarshalSSZ()
require.NoError(t, err)
sfba, err := SlotFromBlock(bab)
require.NoError(t, err)
require.Equal(t, slot, sfba)
bm := util.NewBeaconBlockBellatrix()
bm.Block.Slot = slot
bmb, err := ba.MarshalSSZ()
require.NoError(t, err)
sfbm, err := slotFromBlock(bmb)
require.NoError(t, err)
require.Equal(t, slot, sfbm)
bm := util.NewBeaconBlockBellatrix()
bm.Block.Slot = slot
bmb, err := ba.MarshalSSZ()
require.NoError(t, err)
sfbm, err := SlotFromBlock(bmb)
require.NoError(t, err)
require.Equal(t, slot, sfbm)
}
}
func TestByState(t *testing.T) {
@@ -98,6 +99,9 @@ func TestByState(t *testing.T) {
require.Equal(t, c.version, cf.Fork)
require.Equal(t, c.forkversion, cf.Version)
require.Equal(t, bc.ConfigName, cf.Config.ConfigName)
s, err := SlotFromState(m)
require.NoError(t, err)
require.Equal(t, c.slot, s)
}
}