Improved cold state checkpoints: migrate database index and usage (#6461)

* Add database migrations, still need to update the API usage...
* gofmt goimports
* progress
* Merge branch 'master' of github.com:prysmaticlabs/prysm into index-migration
* use slot instead of index
* rename LastArchivedIndex to LastArchivedSlot
* rename LastArchivedIndexRoot to LastArchivedRoot
* remove unused HighestSlotStates method
* deprecate old key, include in migration
* deprecate old key, include in migration
* remove blocks index in migration
* rename bucket variable
* fix code to pass tests
* Merge branch 'master' of github.com:prysmaticlabs/prysm into index-migration
* gofmt, goimports
* fix
* Add state slot index
* progress
* lint
* fix build
* Merge branch 'master' of github.com:prysmaticlabs/prysm into index-migration
* kafka
* Merge refs/heads/master into index-migration
* Merge refs/heads/master into index-migration
* Merge refs/heads/master into index-migration
* remove SaveArchivedPointRoot, a few other big changes
* Merge branch 'index-migration' of github.com:prysmaticlabs/prysm into index-migration
* fix tests and lint
* lint again
* Merge refs/heads/master into index-migration
* Merge refs/heads/master into index-migration
* Merge refs/heads/master into index-migration
* block migration, some renaming
* gaz, gofmt
* add tests
* change index to uint bytes
* Merge branch 'index-migration' of github.com:prysmaticlabs/prysm into index-migration
* rm method notes
* stop if the bucket doesn't exist
* Merge refs/heads/master into index-migration
* Merge refs/heads/master into index-migration
* Merge refs/heads/master into index-migration
* Merge refs/heads/master into index-migration
* Merge refs/heads/master into index-migration
* Merge refs/heads/master into index-migration
* Merge refs/heads/master into index-migration
* Merge refs/heads/master into index-migration
* @rauljordan pr feedback
* Simplify
* Merge refs/heads/master into index-migration
* Remove unused method, add roundtrip test
* gofmt
* Merge refs/heads/master into index-migration
* Merge refs/heads/master into index-migration
* Merge refs/heads/master into index-migration
* Merge refs/heads/master into index-migration
* Merge refs/heads/master into index-migration
* Merge refs/heads/master into index-migration
* Merge refs/heads/master into index-migration
* Merge branch 'master' of github.com:prysmaticlabs/prysm into index-migration
This commit is contained in:
Preston Van Loon
2020-07-18 11:05:04 -07:00
committed by GitHub
parent d53539499c
commit c419e4ed8f
36 changed files with 601 additions and 792 deletions

View File

@@ -424,7 +424,7 @@ func (s *Service) initializeChainInfo(ctx context.Context) error {
if err != nil {
return errors.Wrap(err, "could not get finalized state from db")
}
finalizedRoot = s.beaconDB.LastArchivedIndexRoot(ctx)
finalizedRoot = s.beaconDB.LastArchivedRoot(ctx)
if finalizedRoot == params.BeaconConfig().ZeroHash {
finalizedRoot = bytesutil.ToBytes32(finalized.Root)
}

View File

@@ -29,7 +29,6 @@ type ReadOnlyDatabase interface {
HasBlock(ctx context.Context, blockRoot [32]byte) bool
GenesisBlock(ctx context.Context) (*ethpb.SignedBeaconBlock, error)
IsFinalizedBlock(ctx context.Context, blockRoot [32]byte) bool
HighestSlotBlocks(ctx context.Context) ([]*ethpb.SignedBeaconBlock, error)
HighestSlotBlocksBelow(ctx context.Context, slot uint64) ([]*ethpb.SignedBeaconBlock, error)
// State related methods.
State(ctx context.Context, blockRoot [32]byte) (*state.BeaconState, error)
@@ -37,7 +36,6 @@ type ReadOnlyDatabase interface {
HasState(ctx context.Context, blockRoot [32]byte) bool
StateSummary(ctx context.Context, blockRoot [32]byte) (*ethereum_beacon_p2p_v1.StateSummary, error)
HasStateSummary(ctx context.Context, blockRoot [32]byte) bool
HighestSlotStates(ctx context.Context) ([]*state.BeaconState, error)
HighestSlotStatesBelow(ctx context.Context, slot uint64) ([]*state.BeaconState, error)
// Slashing operations.
ProposerSlashing(ctx context.Context, slashingRoot [32]byte) (*eth.ProposerSlashing, error)
@@ -55,10 +53,10 @@ type ReadOnlyDatabase interface {
ArchivedCommitteeInfo(ctx context.Context, epoch uint64) (*ethereum_beacon_p2p_v1.ArchivedCommitteeInfo, error)
ArchivedBalances(ctx context.Context, epoch uint64) ([]uint64, error)
ArchivedValidatorParticipation(ctx context.Context, epoch uint64) (*eth.ValidatorParticipation, error)
ArchivedPointRoot(ctx context.Context, index uint64) [32]byte
HasArchivedPoint(ctx context.Context, index uint64) bool
LastArchivedIndexRoot(ctx context.Context) [32]byte
LastArchivedIndex(ctx context.Context) (uint64, error)
ArchivedPointRoot(ctx context.Context, slot uint64) [32]byte
HasArchivedPoint(ctx context.Context, slot uint64) bool
LastArchivedRoot(ctx context.Context) [32]byte
LastArchivedSlot(ctx context.Context) (uint64, error)
// Deposit contract related handlers.
DepositContractAddress(ctx context.Context) ([]byte, error)
// Powchain operations.
@@ -98,12 +96,13 @@ type NoHeadAccessDatabase interface {
SaveArchivedCommitteeInfo(ctx context.Context, epoch uint64, info *ethereum_beacon_p2p_v1.ArchivedCommitteeInfo) error
SaveArchivedBalances(ctx context.Context, epoch uint64, balances []uint64) error
SaveArchivedValidatorParticipation(ctx context.Context, epoch uint64, part *eth.ValidatorParticipation) error
SaveArchivedPointRoot(ctx context.Context, blockRoot [32]byte, index uint64) error
SaveLastArchivedIndex(ctx context.Context, index uint64) error
// Deposit contract related handlers.
SaveDepositContractAddress(ctx context.Context, addr common.Address) error
// Powchain operations.
SavePowchainData(ctx context.Context, data *db.ETH1ChainData) error
// Run any required database migrations.
RunMigrations(ctx context.Context) error
}
// HeadAccessDatabase defines a struct with access to reading chain head data.

View File

@@ -283,11 +283,6 @@ func (e Exporter) SavePowchainData(ctx context.Context, data *db.ETH1ChainData)
return e.db.SavePowchainData(ctx, data)
}
// SaveArchivedPointRoot -- passthrough
func (e Exporter) SaveArchivedPointRoot(ctx context.Context, blockRoot [32]byte, index uint64) error {
return e.db.SaveArchivedPointRoot(ctx, blockRoot, index)
}
// ArchivedPointRoot -- passthrough
func (e Exporter) ArchivedPointRoot(ctx context.Context, index uint64) [32]byte {
return e.db.ArchivedPointRoot(ctx, index)
@@ -298,14 +293,9 @@ func (e Exporter) HasArchivedPoint(ctx context.Context, index uint64) bool {
return e.db.HasArchivedPoint(ctx, index)
}
// LastArchivedIndexRoot -- passthrough
func (e Exporter) LastArchivedIndexRoot(ctx context.Context) [32]byte {
return e.db.LastArchivedIndexRoot(ctx)
}
// HighestSlotBlocks -- passthrough
func (e Exporter) HighestSlotBlocks(ctx context.Context) ([]*ethpb.SignedBeaconBlock, error) {
return e.db.HighestSlotBlocks(ctx)
// LastArchivedRoot -- passthrough
func (e Exporter) LastArchivedRoot(ctx context.Context) [32]byte {
return e.db.LastArchivedRoot(ctx)
}
// HighestSlotBlocksBelow -- passthrough
@@ -313,27 +303,22 @@ func (e Exporter) HighestSlotBlocksBelow(ctx context.Context, slot uint64) ([]*e
return e.db.HighestSlotBlocksBelow(ctx, slot)
}
// HighestSlotStates -- passthrough
func (e Exporter) HighestSlotStates(ctx context.Context) ([]*state.BeaconState, error) {
return e.db.HighestSlotStates(ctx)
}
// HighestSlotStatesBelow -- passthrough
func (e Exporter) HighestSlotStatesBelow(ctx context.Context, slot uint64) ([]*state.BeaconState, error) {
return e.db.HighestSlotStatesBelow(ctx, slot)
}
// SaveLastArchivedIndex -- passthrough
func (e Exporter) SaveLastArchivedIndex(ctx context.Context, index uint64) error {
return e.db.SaveLastArchivedIndex(ctx, index)
}
// LastArchivedIndex -- passthrough
func (e Exporter) LastArchivedIndex(ctx context.Context) (uint64, error) {
return e.db.LastArchivedIndex(ctx)
// LastArchivedSlot -- passthrough
func (e Exporter) LastArchivedSlot(ctx context.Context) (uint64, error) {
return e.db.LastArchivedSlot(ctx)
}
// HistoricalStatesDeleted -- passthrough
func (e Exporter) HistoricalStatesDeleted(ctx context.Context) error {
return e.db.HistoricalStatesDeleted(ctx)
}
// RunMigrations -- passthrough
func (e Exporter) RunMigrations(ctx context.Context) error {
return e.db.RunMigrations(ctx)
}

View File

@@ -15,6 +15,9 @@ go_library(
"encoding.go",
"finalized_block_roots.go",
"kv.go",
"migration.go",
"migration_archived_index.go",
"migration_block_slot_index.go",
"operations.go",
"powchain.go",
"regen_historical_states.go",
@@ -71,6 +74,8 @@ go_test(
"encoding_test.go",
"finalized_block_roots_test.go",
"kv_test.go",
"migration_archived_index_test.go",
"migration_block_slot_index_test.go",
"operations_test.go",
"slashings_test.go",
"state_summary_test.go",

View File

@@ -16,7 +16,7 @@ func (kv *Store) ArchivedActiveValidatorChanges(ctx context.Context, epoch uint6
ctx, span := trace.StartSpan(ctx, "BeaconDB.ArchivedActiveValidatorChanges")
defer span.End()
buf := bytesutil.Uint64ToBytes(epoch)
buf := bytesutil.Uint64ToBytesLittleEndian(epoch)
var target *pb.ArchivedActiveSetChanges
err := kv.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(archivedValidatorSetChangesBucket)
@@ -34,7 +34,7 @@ func (kv *Store) ArchivedActiveValidatorChanges(ctx context.Context, epoch uint6
func (kv *Store) SaveArchivedActiveValidatorChanges(ctx context.Context, epoch uint64, changes *pb.ArchivedActiveSetChanges) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveArchivedActiveValidatorChanges")
defer span.End()
buf := bytesutil.Uint64ToBytes(epoch)
buf := bytesutil.Uint64ToBytesLittleEndian(epoch)
enc, err := encode(ctx, changes)
if err != nil {
return err
@@ -50,7 +50,7 @@ func (kv *Store) ArchivedCommitteeInfo(ctx context.Context, epoch uint64) (*pb.A
ctx, span := trace.StartSpan(ctx, "BeaconDB.ArchivedCommitteeInfo")
defer span.End()
buf := bytesutil.Uint64ToBytes(epoch)
buf := bytesutil.Uint64ToBytesLittleEndian(epoch)
var target *pb.ArchivedCommitteeInfo
err := kv.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(archivedCommitteeInfoBucket)
@@ -68,7 +68,7 @@ func (kv *Store) ArchivedCommitteeInfo(ctx context.Context, epoch uint64) (*pb.A
func (kv *Store) SaveArchivedCommitteeInfo(ctx context.Context, epoch uint64, info *pb.ArchivedCommitteeInfo) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveArchivedCommitteeInfo")
defer span.End()
buf := bytesutil.Uint64ToBytes(epoch)
buf := bytesutil.Uint64ToBytesLittleEndian(epoch)
enc, err := encode(ctx, info)
if err != nil {
return err
@@ -84,7 +84,7 @@ func (kv *Store) ArchivedBalances(ctx context.Context, epoch uint64) ([]uint64,
ctx, span := trace.StartSpan(ctx, "BeaconDB.ArchivedBalances")
defer span.End()
buf := bytesutil.Uint64ToBytes(epoch)
buf := bytesutil.Uint64ToBytesLittleEndian(epoch)
var target []uint64
err := kv.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(archivedBalancesBucket)
@@ -102,7 +102,7 @@ func (kv *Store) ArchivedBalances(ctx context.Context, epoch uint64) ([]uint64,
func (kv *Store) SaveArchivedBalances(ctx context.Context, epoch uint64, balances []uint64) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveArchivedBalances")
defer span.End()
buf := bytesutil.Uint64ToBytes(epoch)
buf := bytesutil.Uint64ToBytesLittleEndian(epoch)
enc := marshalBalances(ctx, balances)
return kv.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(archivedBalancesBucket)
@@ -115,7 +115,7 @@ func (kv *Store) ArchivedValidatorParticipation(ctx context.Context, epoch uint6
ctx, span := trace.StartSpan(ctx, "BeaconDB.ArchivedValidatorParticipation")
defer span.End()
buf := bytesutil.Uint64ToBytes(epoch)
buf := bytesutil.Uint64ToBytesLittleEndian(epoch)
var target *ethpb.ValidatorParticipation
err := kv.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(archivedValidatorParticipationBucket)
@@ -133,7 +133,7 @@ func (kv *Store) ArchivedValidatorParticipation(ctx context.Context, epoch uint6
func (kv *Store) SaveArchivedValidatorParticipation(ctx context.Context, epoch uint64, part *ethpb.ValidatorParticipation) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveArchivedValidatorParticipation")
defer span.End()
buf := bytesutil.Uint64ToBytes(epoch)
buf := bytesutil.Uint64ToBytesLittleEndian(epoch)
enc, err := encode(ctx, part)
if err != nil {
return err

View File

@@ -2,65 +2,36 @@ package kv
import (
"context"
"encoding/binary"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
bolt "go.etcd.io/bbolt"
"go.opencensus.io/trace"
)
// SaveArchivedPointRoot saves an archived point root to the DB. This is used for cold state management.
func (kv *Store) SaveArchivedPointRoot(ctx context.Context, blockRoot [32]byte, index uint64) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveArchivedPointRoot")
defer span.End()
return kv.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(archivedIndexRootBucket)
return bucket.Put(bytesutil.Uint64ToBytes(index), blockRoot[:])
})
}
// SaveLastArchivedIndex to the db.
func (kv *Store) SaveLastArchivedIndex(ctx context.Context, index uint64) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveLastArchivedIndex")
defer span.End()
return kv.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(archivedIndexRootBucket)
return bucket.Put(lastArchivedIndexKey, bytesutil.Uint64ToBytes(index))
})
}
// LastArchivedIndex from the db.
func (kv *Store) LastArchivedIndex(ctx context.Context) (uint64, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.LastArchivedIndex")
// LastArchivedSlot from the db.
func (kv *Store) LastArchivedSlot(ctx context.Context) (uint64, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.LastArchivedSlot")
defer span.End()
var index uint64
err := kv.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(archivedIndexRootBucket)
b := bucket.Get(lastArchivedIndexKey)
if b == nil {
return nil
}
index = binary.LittleEndian.Uint64(b)
bkt := tx.Bucket(stateSlotIndicesBucket)
b, _ := bkt.Cursor().Last()
index = bytesutil.BytesToUint64BigEndian(b)
return nil
})
return index, err
}
// LastArchivedIndexRoot from the db.
func (kv *Store) LastArchivedIndexRoot(ctx context.Context) [32]byte {
ctx, span := trace.StartSpan(ctx, "BeaconDB.LastArchivedIndexRoot")
// LastArchivedRoot from the db.
func (kv *Store) LastArchivedRoot(ctx context.Context) [32]byte {
ctx, span := trace.StartSpan(ctx, "BeaconDB.LastArchivedRoot")
defer span.End()
var blockRoot []byte
if err := kv.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(archivedIndexRootBucket)
lastArchivedIndex := bucket.Get(lastArchivedIndexKey)
if lastArchivedIndex == nil {
return nil
}
blockRoot = bucket.Get(lastArchivedIndex)
bkt := tx.Bucket(stateSlotIndicesBucket)
_, blockRoot = bkt.Cursor().Last()
return nil
}); err != nil { // This view never returns an error, but we'll handle anyway for sanity.
panic(err)
@@ -71,14 +42,14 @@ func (kv *Store) LastArchivedIndexRoot(ctx context.Context) [32]byte {
// ArchivedPointRoot returns the block root of an archived point from the DB.
// This is essential for cold state management and to restore a cold state.
func (kv *Store) ArchivedPointRoot(ctx context.Context, index uint64) [32]byte {
func (kv *Store) ArchivedPointRoot(ctx context.Context, slot uint64) [32]byte {
ctx, span := trace.StartSpan(ctx, "BeaconDB.ArchivedPointRoot")
defer span.End()
var blockRoot []byte
if err := kv.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(archivedIndexRootBucket)
blockRoot = bucket.Get(bytesutil.Uint64ToBytes(index))
bucket := tx.Bucket(stateSlotIndicesBucket)
blockRoot = bucket.Get(bytesutil.Uint64ToBytesBigEndian(slot))
return nil
}); err != nil { // This view never returns an error, but we'll handle anyway for sanity.
panic(err)
@@ -88,13 +59,13 @@ func (kv *Store) ArchivedPointRoot(ctx context.Context, index uint64) [32]byte {
}
// HasArchivedPoint returns true if an archived point exists in DB.
func (kv *Store) HasArchivedPoint(ctx context.Context, index uint64) bool {
func (kv *Store) HasArchivedPoint(ctx context.Context, slot uint64) bool {
ctx, span := trace.StartSpan(ctx, "BeaconDB.HasArchivedPoint")
defer span.End()
var exists bool
if err := kv.db.View(func(tx *bolt.Tx) error {
iBucket := tx.Bucket(archivedIndexRootBucket)
exists = iBucket.Get(bytesutil.Uint64ToBytes(index)) != nil
iBucket := tx.Bucket(stateSlotIndicesBucket)
exists = iBucket.Get(bytesutil.Uint64ToBytesBigEndian(slot)) != nil
return nil
}); err != nil { // This view never returns an error, but we'll handle anyway for sanity.
panic(err)

View File

@@ -3,6 +3,8 @@ package kv
import (
"context"
"testing"
"github.com/prysmaticlabs/prysm/shared/testutil"
)
func TestArchivedPointIndexRoot_CanSaveRetrieve(t *testing.T) {
@@ -15,8 +17,11 @@ func TestArchivedPointIndexRoot_CanSaveRetrieve(t *testing.T) {
if r1 == received {
t.Fatal("Should not have been saved")
}
if err := db.SaveArchivedPointRoot(ctx, r1, i1); err != nil {
st := testutil.NewBeaconState()
if err := st.SetSlot(i1); err != nil {
t.Fatal(err)
}
if err := db.SaveState(ctx, st, r1); err != nil {
t.Fatal(err)
}
received = db.ArchivedPointRoot(ctx, i1)
@@ -28,7 +33,7 @@ func TestArchivedPointIndexRoot_CanSaveRetrieve(t *testing.T) {
func TestLastArchivedPoint_CanRetrieve(t *testing.T) {
db := setupDB(t)
ctx := context.Background()
i, err := db.LastArchivedIndex(ctx)
i, err := db.LastArchivedSlot(ctx)
if err != nil {
t.Fatal(err)
}
@@ -36,29 +41,36 @@ func TestLastArchivedPoint_CanRetrieve(t *testing.T) {
t.Error("Did not get correct index")
}
if err := db.SaveArchivedPointRoot(ctx, [32]byte{'A'}, 1); err != nil {
t.Fatal(err)
st := testutil.NewBeaconState()
if err := db.SaveState(ctx, st, [32]byte{'A'}); err != nil {
t.Error(err)
}
if err := db.SaveArchivedPointRoot(ctx, [32]byte{'B'}, 3); err != nil {
t.Fatal(err)
}
if err := db.SaveLastArchivedIndex(ctx, 1); err != nil {
t.Fatal(err)
}
if db.LastArchivedIndexRoot(ctx) != [32]byte{'A'} {
if db.LastArchivedRoot(ctx) != [32]byte{'A'} {
t.Error("Did not get wanted root")
}
if err := db.SaveLastArchivedIndex(ctx, 3); err != nil {
t.Fatal(err)
if err := st.SetSlot(2); err != nil {
t.Error(err)
}
if db.LastArchivedIndexRoot(ctx) != [32]byte{'B'} {
if err := db.SaveState(ctx, st, [32]byte{'B'}); err != nil {
t.Error(err)
}
if db.LastArchivedRoot(ctx) != [32]byte{'B'} {
t.Error("Did not get wanted root")
}
i, err = db.LastArchivedIndex(ctx)
if err := st.SetSlot(3); err != nil {
t.Error(err)
}
if err := db.SaveState(ctx, st, [32]byte{'C'}); err != nil {
t.Fatal(err)
}
i, err = db.LastArchivedSlot(ctx)
if err != nil {
t.Fatal(err)
}

View File

@@ -203,7 +203,7 @@ func createAttestationIndicesFromData(ctx context.Context, attData *ethpb.Attest
indices := make([][]byte, 0)
if attData.Source != nil {
buckets = append(buckets, attestationSourceEpochIndicesBucket)
indices = append(indices, bytesutil.Uint64ToBytes(attData.Source.Epoch))
indices = append(indices, bytesutil.Uint64ToBytesLittleEndian(attData.Source.Epoch))
if attData.Source.Root != nil && len(attData.Source.Root) > 0 {
buckets = append(buckets, attestationSourceRootIndicesBucket)
indices = append(indices, attData.Source.Root)
@@ -211,7 +211,7 @@ func createAttestationIndicesFromData(ctx context.Context, attData *ethpb.Attest
}
if attData.Target != nil {
buckets = append(buckets, attestationTargetEpochIndicesBucket)
indices = append(indices, bytesutil.Uint64ToBytes(attData.Target.Epoch))
indices = append(indices, bytesutil.Uint64ToBytesLittleEndian(attData.Target.Epoch))
if attData.Target.Root != nil && len(attData.Target.Root) > 0 {
buckets = append(buckets, attestationTargetRootIndicesBucket)
indices = append(indices, attData.Target.Root)
@@ -258,13 +258,13 @@ func createAttestationIndicesFromFilters(ctx context.Context, f *filters.QueryFi
if !ok {
return nil, errors.New("sourceEpoch is not type uint64")
}
indicesByBucket[string(attestationSourceEpochIndicesBucket)] = bytesutil.Uint64ToBytes(sourceEpoch)
indicesByBucket[string(attestationSourceEpochIndicesBucket)] = bytesutil.Uint64ToBytesLittleEndian(sourceEpoch)
case filters.TargetEpoch:
targetEpoch, ok := v.(uint64)
if !ok {
return nil, errors.New("targetEpoch is not type uint64")
}
indicesByBucket[string(attestationTargetEpochIndicesBucket)] = bytesutil.Uint64ToBytes(targetEpoch)
indicesByBucket[string(attestationTargetEpochIndicesBucket)] = bytesutil.Uint64ToBytesLittleEndian(targetEpoch)
case filters.TargetRoot:
targetRoot, ok := v.([]byte)
if !ok {

View File

@@ -4,18 +4,15 @@ import (
"bytes"
"context"
"fmt"
"strconv"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/sliceutil"
log "github.com/sirupsen/logrus"
bolt "go.etcd.io/bbolt"
"go.opencensus.io/trace"
)
@@ -147,9 +144,6 @@ func (kv *Store) deleteBlock(ctx context.Context, blockRoot [32]byte) error {
return errors.Wrap(err, "could not delete root for DB indices")
}
kv.blockCache.Del(string(blockRoot[:]))
if err := kv.clearBlockSlotBitField(ctx, tx, block.Block.Slot); err != nil {
return err
}
return bkt.Delete(blockRoot[:])
})
}
@@ -175,9 +169,6 @@ func (kv *Store) deleteBlocks(ctx context.Context, blockRoots [][32]byte) error
return errors.Wrap(err, "could not delete root for DB indices")
}
kv.blockCache.Del(string(blockRoot[:]))
if err := kv.clearBlockSlotBitField(ctx, tx, block.Block.Slot); err != nil {
return err
}
if err := bkt.Delete(blockRoot[:]); err != nil {
return err
}
@@ -209,9 +200,6 @@ func (kv *Store) SaveBlocks(ctx context.Context, blocks []*ethpb.SignedBeaconBlo
return kv.db.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(blocksBucket)
for _, block := range blocks {
if err := kv.setBlockSlotBitField(ctx, tx, block.Block.Slot); err != nil {
return err
}
blockRoot, err := stateutil.BlockRoot(block.Block)
if err != nil {
return err
@@ -283,136 +271,47 @@ func (kv *Store) SaveGenesisBlockRoot(ctx context.Context, blockRoot [32]byte) e
})
}
// HighestSlotBlocks returns the blocks with the highest slot from the db.
func (kv *Store) HighestSlotBlocks(ctx context.Context) ([]*ethpb.SignedBeaconBlock, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.HighestSlotBlocks")
defer span.End()
blocks := make([]*ethpb.SignedBeaconBlock, 0)
err := kv.db.View(func(tx *bolt.Tx) error {
sBkt := tx.Bucket(slotsHasObjectBucket)
savedSlots := sBkt.Get(savedBlockSlotsKey)
highestIndex, err := bytesutil.HighestBitIndex(savedSlots)
if err != nil {
return err
}
blocks, err = kv.blocksAtSlotBitfieldIndex(ctx, tx, highestIndex)
if err != nil {
return err
}
return nil
})
return blocks, err
}
// HighestSlotBlocksBelow returns the block with the highest slot below the input slot from the db.
func (kv *Store) HighestSlotBlocksBelow(ctx context.Context, slot uint64) ([]*ethpb.SignedBeaconBlock, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.HighestSlotBlocksBelow")
defer span.End()
blocks := make([]*ethpb.SignedBeaconBlock, 0)
err := kv.db.View(func(tx *bolt.Tx) error {
sBkt := tx.Bucket(slotsHasObjectBucket)
savedSlots := sBkt.Get(savedBlockSlotsKey)
if len(savedSlots) == 0 {
savedSlots = bytesutil.MakeEmptyBitlists(int(slot))
}
highestIndex, err := bytesutil.HighestBitIndexAt(savedSlots, int(slot))
if err != nil {
return err
}
blocks, err = kv.blocksAtSlotBitfieldIndex(ctx, tx, highestIndex)
if err != nil {
return err
var best []byte
if err := kv.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(blockSlotIndicesBucket)
// Iterate through the index, which is in byte sorted order.
c := bkt.Cursor()
for s, root := c.First(); s != nil; s, root = c.Next() {
key := bytesutil.BytesToUint64BigEndian(s)
if root == nil {
continue
}
if key >= slot {
break
}
best = root
}
return nil
})
return blocks, err
}
// blocksAtSlotBitfieldIndex retrieves the blocks in DB given the input index. The index represents
// the position of the slot bitfield the saved block maps to.
func (kv *Store) blocksAtSlotBitfieldIndex(ctx context.Context, tx *bolt.Tx, index int) ([]*ethpb.SignedBeaconBlock, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.blocksAtSlotBitfieldIndex")
defer span.End()
highestSlot := uint64(0)
if uint64(index) > highestSlot+1 {
highestSlot = uint64(index - 1)
}
if highestSlot == 0 {
gBlock, err := kv.GenesisBlock(ctx)
if err != nil {
return nil, err
}
return []*ethpb.SignedBeaconBlock{gBlock}, nil
}
f := filters.NewFilter().SetStartSlot(highestSlot).SetEndSlot(highestSlot)
keys, err := getBlockRootsByFilter(ctx, tx, f)
if err != nil {
}); err != nil {
return nil, err
}
blocks := make([]*ethpb.SignedBeaconBlock, 0, len(keys))
bBkt := tx.Bucket(blocksBucket)
for i := 0; i < len(keys); i++ {
encoded := bBkt.Get(keys[i])
block := &ethpb.SignedBeaconBlock{}
if err := decode(ctx, encoded, block); err != nil {
var blk *ethpb.SignedBeaconBlock
var err error
if best != nil {
blk, err = kv.Block(ctx, bytesutil.ToBytes32(best))
if err != nil {
return nil, err
}
}
if blk == nil {
blk, err = kv.GenesisBlock(ctx)
if err != nil {
return nil, err
}
blocks = append(blocks, block)
}
return blocks, err
}
// setBlockSlotBitField sets the block slot bit in DB.
// This helps to track which slot has a saved block in db.
func (kv *Store) setBlockSlotBitField(ctx context.Context, tx *bolt.Tx, slot uint64) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.setBlockSlotBitField")
defer span.End()
kv.blockSlotBitLock.Lock()
defer kv.blockSlotBitLock.Unlock()
bucket := tx.Bucket(slotsHasObjectBucket)
slotBitfields := bucket.Get(savedBlockSlotsKey)
// Copy is needed to avoid unsafe pointer conversions.
// See: https://github.com/etcd-io/bbolt/pull/201
tmp := make([]byte, len(slotBitfields))
copy(tmp, slotBitfields)
slotBitfields = bytesutil.SetBit(tmp, int(slot))
return bucket.Put(savedBlockSlotsKey, slotBitfields)
}
// clearBlockSlotBitField clears the block slot bit in DB.
// This helps to track which slot has a saved block in db.
func (kv *Store) clearBlockSlotBitField(ctx context.Context, tx *bolt.Tx, slot uint64) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.clearBlockSlotBitField")
defer span.End()
kv.blockSlotBitLock.Lock()
defer kv.blockSlotBitLock.Unlock()
bucket := tx.Bucket(slotsHasObjectBucket)
slotBitfields := bucket.Get(savedBlockSlotsKey)
// Copy is needed to avoid unsafe pointer conversions.
// See: https://github.com/etcd-io/bbolt/pull/201
tmp := make([]byte, len(slotBitfields))
copy(tmp, slotBitfields)
slotBitfields = bytesutil.ClearBit(tmp, int(slot))
return bucket.Put(savedBlockSlotsKey, slotBitfields)
return []*ethpb.SignedBeaconBlock{blk}, nil
}
// getBlockRootsByFilter retrieves the block roots given the filter criteria.
@@ -501,8 +400,8 @@ func fetchBlockRootsBySlotRange(
startSlot = helpers.StartSlot(startEpoch)
endSlot = helpers.StartSlot(endEpoch) + params.BeaconConfig().SlotsPerEpoch - 1
}
min := []byte(fmt.Sprintf("%07d", startSlot))
max := []byte(fmt.Sprintf("%07d", endSlot))
min := bytesutil.Uint64ToBytesBigEndian(startSlot)
max := bytesutil.Uint64ToBytesBigEndian(endSlot)
var conditional func(key, max []byte) bool
if endSlot == 0 {
conditional = func(key, max []byte) bool {
@@ -521,11 +420,7 @@ func fetchBlockRootsBySlotRange(
c := bkt.Cursor()
for k, v := c.Seek(min); conditional(k, max); k, v = c.Next() {
if step > 1 {
slot, err := strconv.ParseUint(string(k), 10, 64)
if err != nil {
log.WithError(err).Error("Cannot parse key to uint")
continue
}
slot := bytesutil.BytesToUint64BigEndian(k)
if (slot-startSlot)%step != 0 {
continue
}
@@ -553,7 +448,7 @@ func createBlockIndicesFromBlock(ctx context.Context, block *ethpb.BeaconBlock)
blockSlotIndicesBucket,
}
indices := [][]byte{
[]byte(fmt.Sprintf("%07d", block.Slot)),
bytesutil.Uint64ToBytesBigEndian(block.Slot),
}
if block.ParentRoot != nil && len(block.ParentRoot) > 0 {
buckets = append(buckets, blockParentRootIndicesBucket)

View File

@@ -382,48 +382,6 @@ func TestStore_Blocks_Retrieve_SlotRangeWithStep(t *testing.T) {
}
}
func TestStore_SaveBlock_CanGetHighest(t *testing.T) {
db := setupDB(t)
ctx := context.Background()
block := testutil.NewBeaconBlock()
block.Block.Slot = 1
if err := db.SaveBlock(ctx, block); err != nil {
t.Fatal(err)
}
highestSavedBlock, err := db.HighestSlotBlocks(ctx)
if err != nil {
t.Fatal(err)
}
if !proto.Equal(block, highestSavedBlock[0]) {
t.Errorf("Wanted %v, received %v", block, highestSavedBlock)
}
block.Block.Slot = 999
if err := db.SaveBlock(ctx, block); err != nil {
t.Fatal(err)
}
highestSavedBlock, err = db.HighestSlotBlocks(ctx)
if err != nil {
t.Fatal(err)
}
if !proto.Equal(block, highestSavedBlock[0]) {
t.Errorf("Wanted %v, received %v", block, highestSavedBlock)
}
block.Block.Slot = 300000000
if err := db.SaveBlock(ctx, block); err != nil {
t.Fatal(err)
}
highestSavedBlock, err = db.HighestSlotBlocks(ctx)
if err != nil {
t.Fatal(err)
}
if !proto.Equal(block, highestSavedBlock[0]) {
t.Errorf("Wanted %v, received %v", block, highestSavedBlock)
}
}
func TestStore_SaveBlock_CanGetHighestAt(t *testing.T) {
db := setupDB(t)
ctx := context.Background()
@@ -536,30 +494,6 @@ func TestStore_GenesisBlock_CanGetHighestAt(t *testing.T) {
}
}
func TestStore_SaveBlocks_CanGetHighest(t *testing.T) {
db := setupDB(t)
ctx := context.Background()
totalBlocks := make([]*ethpb.SignedBeaconBlock, 500)
for i := 0; i < 500; i++ {
b := testutil.NewBeaconBlock()
b.Block.Slot = uint64(i)
b.Block.ParentRoot = bytesutil.PadTo([]byte("parent"), 32)
totalBlocks[i] = b
}
if err := db.SaveBlocks(ctx, totalBlocks); err != nil {
t.Fatal(err)
}
highestSavedBlock, err := db.HighestSlotBlocks(ctx)
if err != nil {
t.Fatal(err)
}
if !proto.Equal(totalBlocks[len(totalBlocks)-1], highestSavedBlock[0]) {
t.Errorf("Wanted %v, received %v", totalBlocks[len(totalBlocks)-1], highestSavedBlock)
}
}
func TestStore_SaveBlocks_HasCachedBlocks(t *testing.T) {
db := setupDB(t)
ctx := context.Background()
@@ -591,83 +525,3 @@ func TestStore_SaveBlocks_HasCachedBlocks(t *testing.T) {
t.Error("Did not get wanted blocks")
}
}
func TestStore_DeleteBlock_CanGetHighest(t *testing.T) {
db := setupDB(t)
ctx := context.Background()
b50 := testutil.NewBeaconBlock()
b50.Block.Slot = 50
if err := db.SaveBlock(ctx, b50); err != nil {
t.Fatal(err)
}
highestSavedBlock, err := db.HighestSlotBlocks(ctx)
if err != nil {
t.Fatal(err)
}
if !proto.Equal(b50, highestSavedBlock[0]) {
t.Errorf("Wanted %v, received %v", b50, highestSavedBlock)
}
b51 := testutil.NewBeaconBlock()
b51.Block.Slot = 51
r51, err := stateutil.BlockRoot(b51.Block)
if err != nil {
t.Fatal(err)
}
if err := db.SaveBlock(ctx, b51); err != nil {
t.Fatal(err)
}
highestSavedBlock, err = db.HighestSlotBlocks(ctx)
if err != nil {
t.Fatal(err)
}
if !proto.Equal(b51, highestSavedBlock[0]) {
t.Errorf("Wanted %v, received %v", b51, highestSavedBlock)
}
if err := db.deleteBlock(ctx, r51); err != nil {
t.Fatal(err)
}
highestSavedBlock, err = db.HighestSlotBlocks(ctx)
if err != nil {
t.Fatal(err)
}
if !proto.Equal(b50, highestSavedBlock[0]) {
t.Errorf("Wanted %v, received %v", b50, highestSavedBlock)
}
}
func TestStore_DeleteBlocks_CanGetHighest(t *testing.T) {
db := setupDB(t)
ctx := context.Background()
var err error
totalBlocks := make([]*ethpb.SignedBeaconBlock, 100)
r := make([][32]byte, 100)
for i := 0; i < 100; i++ {
b := testutil.NewBeaconBlock()
b.Block.Slot = uint64(i)
b.Block.ParentRoot = bytesutil.PadTo([]byte("parent"), 32)
totalBlocks[i] = b
r[i], err = stateutil.BlockRoot(totalBlocks[i].Block)
if err != nil {
t.Error(err)
}
}
if err := db.SaveBlocks(ctx, totalBlocks); err != nil {
t.Fatal(err)
}
if err := db.deleteBlocks(ctx, [][32]byte{r[99], r[98], r[97]}); err != nil {
t.Fatal(err)
}
highestSavedBlock, err := db.HighestSlotBlocks(ctx)
if err != nil {
t.Fatal(err)
}
if !proto.Equal(totalBlocks[96], highestSavedBlock[0]) {
t.Errorf("Wanted %v, received %v", totalBlocks[len(totalBlocks)-1], highestSavedBlock)
}
}

View File

@@ -104,8 +104,6 @@ func NewKVStore(dirPath string, stateSummaryCache *cache.StateSummaryCache) (*St
archivedValidatorParticipationBucket,
powchainBucket,
stateSummaryBucket,
archivedIndexRootBucket,
slotsHasObjectBucket,
// Indices buckets.
attestationHeadBlockRootBucket,
attestationSourceRootIndicesBucket,
@@ -113,10 +111,13 @@ func NewKVStore(dirPath string, stateSummaryCache *cache.StateSummaryCache) (*St
attestationTargetRootIndicesBucket,
attestationTargetEpochIndicesBucket,
blockSlotIndicesBucket,
stateSlotIndicesBucket,
blockParentRootIndicesBucket,
finalizedBlockRootsIndexBucket,
// New State Management service bucket.
newStateServiceCompatibleBucket,
// Migrations
migrationsBucket,
)
}); err != nil {
return nil, err

View File

@@ -0,0 +1,30 @@
package kv
import (
"context"
bolt "go.etcd.io/bbolt"
)
var migrationCompleted = []byte("done")
type migration func(*bolt.Tx) error
var migrations = []migration{
migrateArchivedIndex,
migrateBlockSlotIndex,
}
// RunMigrations defined in the migrations array.
func (s *Store) RunMigrations(ctx context.Context) error {
for _, m := range migrations {
if ctx.Err() != nil {
return ctx.Err()
}
if err := s.db.Update(m); err != nil {
return err
}
}
return nil
}

View File

@@ -0,0 +1,61 @@
package kv
import (
"bytes"
"context"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
bolt "go.etcd.io/bbolt"
)
var migrationArchivedIndex0Key = []byte("archive_index_0")
func migrateArchivedIndex(tx *bolt.Tx) error {
mb := tx.Bucket(migrationsBucket)
if b := mb.Get(migrationArchivedIndex0Key); bytes.Equal(b, migrationCompleted) {
return nil // Migration already completed.
}
bkt := tx.Bucket(archivedRootBucket)
if bkt == nil {
return nil
}
// Remove "last archived index" key before iterating over all keys.
if err := bkt.Delete(lastArchivedIndexKey); err != nil {
return err
}
var highest uint64
c := bkt.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
// Look up actual slot from block
b := tx.Bucket(blocksBucket).Get(v)
// Skip this key if there is no block for whatever reason.
if b == nil {
continue
}
blk := &ethpb.SignedBeaconBlock{}
if err := decode(context.Background(), b, blk); err != nil {
return err
}
if err := tx.Bucket(stateSlotIndicesBucket).Put(bytesutil.Uint64ToBytesBigEndian(blk.Block.Slot), v); err != nil {
return err
}
if blk.Block.Slot > highest {
highest = blk.Block.Slot
}
}
// Delete deprecated buckets.
for _, bkt := range [][]byte{slotsHasObjectBucket, archivedRootBucket} {
if tx.Bucket(bkt) != nil {
if err := tx.DeleteBucket(bkt); err != nil {
return err
}
}
}
// Mark migration complete.
return mb.Put(migrationArchivedIndex0Key, migrationCompleted)
}

View File

@@ -0,0 +1,122 @@
package kv
import (
"bytes"
"context"
"fmt"
"testing"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"go.etcd.io/bbolt"
)
func Test_migrateArchivedIndex(t *testing.T) {
tests := []struct {
name string
setup func(t *testing.T, db *bbolt.DB)
eval func(t *testing.T, db *bbolt.DB)
}{
{
name: "only runs once",
setup: func(t *testing.T, db *bbolt.DB) {
if err := db.Update(func(tx *bbolt.Tx) error {
if _, err := tx.CreateBucketIfNotExists(archivedRootBucket); err != nil {
t.Error(err)
}
if err := tx.Bucket(archivedRootBucket).Put(bytesutil.Uint64ToBytesLittleEndian(2048), []byte("foo")); err != nil {
return err
}
return tx.Bucket(migrationsBucket).Put(migrationArchivedIndex0Key, migrationCompleted)
}); err != nil {
t.Error(err)
}
},
eval: func(t *testing.T, db *bbolt.DB) {
if err := db.View(func(tx *bbolt.Tx) error {
v := tx.Bucket(archivedRootBucket).Get(bytesutil.Uint64ToBytesLittleEndian(2048))
if !bytes.Equal(v, []byte("foo")) {
return fmt.Errorf("did not receive correct data for key 2048, wanted 'foo' got %s", v)
}
return nil
}); err != nil {
t.Error(err)
}
},
},
{
name: "migrates and deletes entries",
setup: func(t *testing.T, db *bbolt.DB) {
if err := db.Update(func(tx *bbolt.Tx) error {
if _, err := tx.CreateBucketIfNotExists(archivedRootBucket); err != nil {
t.Error(err)
}
if _, err := tx.CreateBucketIfNotExists(slotsHasObjectBucket); err != nil {
t.Error(err)
}
if err := tx.Bucket(archivedRootBucket).Put(bytesutil.Uint64ToBytesLittleEndian(2048), []byte("foo")); err != nil {
return err
}
b, err := encode(context.Background(), &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{Slot: 2048}})
if err != nil {
return err
}
return tx.Bucket(blocksBucket).Put([]byte("foo"), b)
}); err != nil {
t.Error(err)
}
},
eval: func(t *testing.T, db *bbolt.DB) {
if err := db.View(func(tx *bbolt.Tx) error {
k := uint64(2048)
if v := tx.Bucket(stateSlotIndicesBucket).Get(bytesutil.Uint64ToBytesBigEndian(k)); !bytes.Equal(v, []byte("foo")) {
return fmt.Errorf("did not receive correct data for key %d, wanted 'foo' got %v", k, v)
}
return nil
}); err != nil {
t.Error(err)
}
},
},
{
name: "deletes old buckets",
setup: func(t *testing.T, db *bbolt.DB) {
if err := db.Update(func(tx *bbolt.Tx) error {
if _, err := tx.CreateBucketIfNotExists(archivedRootBucket); err != nil {
t.Error(err)
}
if _, err := tx.CreateBucketIfNotExists(slotsHasObjectBucket); err != nil {
t.Error(err)
}
return tx.Bucket(slotsHasObjectBucket).Put(savedStateSlotsKey, []byte("foo"))
}); err != nil {
t.Error(err)
}
},
eval: func(t *testing.T, db *bbolt.DB) {
if err := db.View(func(tx *bbolt.Tx) error {
if tx.Bucket(slotsHasObjectBucket) != nil {
t.Errorf("Expected %v to be deleted", savedStateSlotsKey)
}
if tx.Bucket(archivedRootBucket) != nil {
t.Errorf("Expected %v to be deleted", savedStateSlotsKey)
}
return nil
}); err != nil {
t.Error(err)
}
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
db := setupDB(t).db
tt.setup(t, db)
if err := db.Update(migrateArchivedIndex); err != nil {
t.Errorf("migrateArchivedIndex(tx) error = %v", err)
}
tt.eval(t, db)
})
}
}

View File

@@ -0,0 +1,36 @@
package kv
import (
"bytes"
"strconv"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
bolt "go.etcd.io/bbolt"
)
var migrationBlockSlotIndex0Key = []byte("block_slot_index_0")
func migrateBlockSlotIndex(tx *bolt.Tx) error {
mb := tx.Bucket(migrationsBucket)
if b := mb.Get(migrationBlockSlotIndex0Key); bytes.Equal(b, migrationCompleted) {
return nil // Migration already completed.
}
bkt := tx.Bucket(blockSlotIndicesBucket)
// Convert indices from strings to big endian integers.
if err := bkt.ForEach(func(k, v []byte) error {
key, err := strconv.ParseUint(string(k), 10, 64)
if err != nil {
return err
}
if err := bkt.Delete(k); err != nil {
return err
}
return bkt.Put(bytesutil.Uint64ToBytesBigEndian(key), v)
}); err != nil {
return err
}
return mb.Put(migrationBlockSlotIndex0Key, migrationCompleted)
}

View File

@@ -0,0 +1,74 @@
package kv
import (
"bytes"
"fmt"
"testing"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"go.etcd.io/bbolt"
)
func Test_migrateBlockSlotIndex(t *testing.T) {
tests := []struct {
name string
setup func(t *testing.T, db *bbolt.DB)
eval func(t *testing.T, db *bbolt.DB)
}{
{
name: "only runs once",
setup: func(t *testing.T, db *bbolt.DB) {
if err := db.Update(func(tx *bbolt.Tx) error {
if err := tx.Bucket(blockSlotIndicesBucket).Put([]byte("2048"), []byte("foo")); err != nil {
return err
}
return tx.Bucket(migrationsBucket).Put(migrationBlockSlotIndex0Key, migrationCompleted)
}); err != nil {
t.Error(err)
}
},
eval: func(t *testing.T, db *bbolt.DB) {
if err := db.View(func(tx *bbolt.Tx) error {
v := tx.Bucket(blockSlotIndicesBucket).Get([]byte("2048"))
if !bytes.Equal(v, []byte("foo")) {
return fmt.Errorf("did not receive correct data for key 2048, wanted 'foo' got %s", v)
}
return nil
}); err != nil {
t.Error(err)
}
},
},
{
name: "migrates and deletes entries",
setup: func(t *testing.T, db *bbolt.DB) {
if err := db.Update(func(tx *bbolt.Tx) error {
return tx.Bucket(blockSlotIndicesBucket).Put([]byte("2048"), []byte("foo"))
}); err != nil {
t.Error(err)
}
},
eval: func(t *testing.T, db *bbolt.DB) {
if err := db.View(func(tx *bbolt.Tx) error {
k := uint64(2048)
if v := tx.Bucket(blockSlotIndicesBucket).Get(bytesutil.Uint64ToBytesBigEndian(k)); !bytes.Equal(v, []byte("foo")) {
return fmt.Errorf("did not receive correct data for key %d, wanted 'foo' got %v", k, v)
}
return nil
}); err != nil {
t.Error(err)
}
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
db := setupDB(t).db
tt.setup(t, db)
if err := db.Update(migrateBlockSlotIndex); err != nil {
t.Errorf("migrateBlockSlotIndex(tx) error = %v", err)
}
tt.eval(t, db)
})
}
}

View File

@@ -8,7 +8,6 @@ import (
lru "github.com/hashicorp/golang-lru"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
transition "github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
@@ -16,6 +15,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
log "github.com/sirupsen/logrus"
bolt "go.etcd.io/bbolt"
"go.opencensus.io/trace"
)
@@ -37,12 +37,12 @@ func (kv *Store) regenHistoricalStates(ctx context.Context) error {
// Restore from last archived point if this process was previously interrupted.
slotsPerArchivedPoint := params.BeaconConfig().SlotsPerArchivedPoint
lastArchivedIndex, err := kv.LastArchivedIndex(ctx)
lastArchivedSlot, err := kv.LastArchivedSlot(ctx)
if err != nil {
return err
}
if lastArchivedIndex > 0 {
archivedIndexStart := lastArchivedIndex - 1
if lastArchivedSlot > 0 {
archivedIndexStart := lastArchivedSlot - 1
archivedRoot := kv.ArchivedPointRoot(ctx, archivedIndexStart)
currentState, err := kv.State(ctx, archivedRoot)
if err != nil {
@@ -51,7 +51,7 @@ func (kv *Store) regenHistoricalStates(ctx context.Context) error {
startSlot = currentState.Slot()
}
lastSavedBlockArchivedIndex, err := kv.lastSavedBlockArchivedIndex(ctx)
lastSavedBlockArchivedSlot, err := kv.lastSavedBlockArchivedSlot(ctx)
if err != nil {
return err
}
@@ -60,7 +60,7 @@ func (kv *Store) regenHistoricalStates(ctx context.Context) error {
if err != nil {
return err
}
for i := lastArchivedIndex; i <= lastSavedBlockArchivedIndex; i++ {
for slot := lastArchivedSlot; slot <= lastSavedBlockArchivedSlot; slot++ {
// This is an expensive operation, so we check if the context was canceled
// at any point in the iteration.
if err := ctx.Err(); err != nil {
@@ -115,12 +115,12 @@ func (kv *Store) regenHistoricalStates(ctx context.Context) error {
if len(blocks) > 0 {
// Save the historical root, state and highest index to the DB.
if helpers.IsEpochStart(currentState.Slot()) && currentState.Slot()%slotsPerArchivedPoint == 0 {
if err := kv.saveArchivedInfo(ctx, currentState.Copy(), blocks, i); err != nil {
if currentState.Slot()%slotsPerArchivedPoint == 0 {
if err := kv.saveArchivedInfo(ctx, currentState, blocks); err != nil {
return err
}
log.WithFields(log.Fields{
"currentArchivedIndex/totalArchivedIndices": fmt.Sprintf("%d/%d", i, lastSavedBlockArchivedIndex),
"currentArchivedIndex/totalArchivedIndices": fmt.Sprintf("%d/%d", slot, lastSavedBlockArchivedSlot),
"archivedStateSlot": currentState.Slot()}).Info("Saved historical state")
}
}
@@ -194,35 +194,39 @@ func regenHistoricalStateProcessSlots(ctx context.Context, state *stateTrie.Beac
return state, nil
}
// This retrieves the last saved block's archived index.
func (kv *Store) lastSavedBlockArchivedIndex(ctx context.Context) (uint64, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.lastSavedBlockArchivedIndex")
// This retrieves the last saved block's archived slot.
func (kv *Store) lastSavedBlockArchivedSlot(ctx context.Context) (uint64, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.lastSavedBlockArchivedSlot")
defer span.End()
b, err := kv.HighestSlotBlocks(ctx)
if err != nil {
var slot uint64
if err := kv.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(blockSlotIndicesBucket)
// This index is sorted in byte order so accessing the last value would represent the
// highest slot stored in this index bucket.
s, _ := bkt.Cursor().Last()
slot = bytesutil.BytesToUint64BigEndian(s)
return nil
}); err != nil {
return 0, err
}
if len(b) == 0 {
return 0, errors.New("blocks can't be empty")
}
if b[0] == nil {
return 0, errors.New("nil last block")
}
lastSavedBlockSlot := b[0].Block.Slot
slotsPerArchivedPoint := params.BeaconConfig().SlotsPerArchivedPoint
lastSavedBlockArchivedIndex := lastSavedBlockSlot/slotsPerArchivedPoint - 1
return lastSavedBlockArchivedIndex, nil
return slot, nil
}
// This saved archived info (state, root, index) into the db.
// This saved archived info (state, root) into the db.
func (kv *Store) saveArchivedInfo(ctx context.Context,
currentState *stateTrie.BeaconState,
blocks []*ethpb.SignedBeaconBlock,
archivedIndex uint64,
) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.saveArchivedInfo")
defer span.End()
span.AddAttributes(trace.Int64Attribute("slot", int64(currentState.Slot())))
if len(blocks) == 0 {
return errors.New("no blocks provided")
}
lastBlocksRoot, err := stateutil.BlockRoot(blocks[len(blocks)-1].Block)
if err != nil {
return nil
@@ -230,11 +234,5 @@ func (kv *Store) saveArchivedInfo(ctx context.Context,
if err := kv.SaveState(ctx, currentState, lastBlocksRoot); err != nil {
return err
}
if err := kv.SaveArchivedPointRoot(ctx, lastBlocksRoot, archivedIndex); err != nil {
return err
}
if err := kv.SaveLastArchivedIndex(ctx, archivedIndex); err != nil {
return err
}
return nil
}

View File

@@ -21,12 +21,16 @@ var (
archivedBalancesBucket = []byte("archived-balances")
archivedValidatorParticipationBucket = []byte("archived-validator-participation")
powchainBucket = []byte("powchain")
archivedIndexRootBucket = []byte("archived-index-root")
slotsHasObjectBucket = []byte("slots-has-objects")
// Deprecated: This bucket was migrated in PR 6461. Do not use, except for migrations.
slotsHasObjectBucket = []byte("slots-has-objects")
// Deprecated: This bucket was migrated in PR 6461. Do not use, except for migrations.
archivedRootBucket = []byte("archived-index-root")
// Key indices buckets.
blockParentRootIndicesBucket = []byte("block-parent-root-indices")
blockSlotIndicesBucket = []byte("block-slot-indices")
stateSlotIndicesBucket = []byte("state-slot-indices")
attestationHeadBlockRootBucket = []byte("attestation-head-block-root-indices")
attestationSourceRootIndicesBucket = []byte("attestation-source-root-indices")
attestationSourceEpochIndicesBucket = []byte("attestation-source-epoch-indices")
@@ -41,10 +45,17 @@ var (
justifiedCheckpointKey = []byte("justified-checkpoint")
finalizedCheckpointKey = []byte("finalized-checkpoint")
powchainDataKey = []byte("powchain-data")
lastArchivedIndexKey = []byte("last-archived")
savedBlockSlotsKey = []byte("saved-block-slots")
savedStateSlotsKey = []byte("saved-state-slots")
// Deprecated: This index key was migrated in PR 6461. Do not use, except for migrations.
lastArchivedIndexKey = []byte("last-archived")
// Deprecated: This index key was migrated in PR 6461. Do not use, except for migrations.
savedStateSlotsKey = []byte("saved-state-slots")
// Deprecated: This index key was migrated in PR 6461. Do not use, except for migrations.
savedBlockSlotsKey = []byte("saved-block-slots")
// New state management service compatibility bucket.
newStateServiceCompatibleBucket = []byte("new-state-compatible")
// Migrations
migrationsBucket = []byte("migrations")
)

View File

@@ -6,7 +6,6 @@ import (
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
"github.com/prysmaticlabs/prysm/beacon-chain/state"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
@@ -127,11 +126,11 @@ func (kv *Store) SaveStates(ctx context.Context, states []*state.BeaconState, bl
return kv.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(stateBucket)
for i, rt := range blockRoots {
if err := kv.setStateSlotBitField(ctx, tx, states[i].Slot()); err != nil {
return err
indicesByBucket := createStateIndicesFromStateSlot(ctx, states[i].Slot())
if err := updateValueForIndices(ctx, indicesByBucket, rt[:], tx); err != nil {
return errors.Wrap(err, "could not update DB indices")
}
err = bucket.Put(rt[:], multipleEncs[i])
if err != nil {
if err := bucket.Put(rt[:], multipleEncs[i]); err != nil {
return err
}
}
@@ -203,8 +202,9 @@ func (kv *Store) DeleteStates(ctx context.Context, blockRoots [][32]byte) error
if err != nil {
return err
}
if err := kv.clearStateSlotBitField(ctx, tx, slot); err != nil {
return err
indicesByBucket := createStateIndicesFromStateSlot(ctx, slot)
if err := deleteValueForIndices(ctx, indicesByBucket, blockRoot[:], tx); err != nil {
return errors.Wrap(err, "could not delete root for DB indices")
}
if err := c.Delete(); err != nil {
@@ -283,157 +283,69 @@ func slotByBlockRoot(ctx context.Context, tx *bolt.Tx, blockRoot []byte) (uint64
return stateSummary.Slot, nil
}
// HighestSlotStates returns the states with the highest slot from the db.
// Ideally there should just be one state per slot, but given validator
// can double propose, a single slot could have multiple block roots and
// reuslts states. This returns a list of states.
func (kv *Store) HighestSlotStates(ctx context.Context) ([]*state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.HighestSlotState")
defer span.End()
var states []*state.BeaconState
err := kv.db.View(func(tx *bolt.Tx) error {
slotBkt := tx.Bucket(slotsHasObjectBucket)
savedSlots := slotBkt.Get(savedStateSlotsKey)
highestIndex, err := bytesutil.HighestBitIndex(savedSlots)
if err != nil {
return err
}
states, err = kv.statesAtSlotBitfieldIndex(ctx, tx, highestIndex)
return err
})
if err != nil {
return nil, err
}
if len(states) == 0 {
return nil, errors.New("could not get one state")
}
return states, nil
}
// HighestSlotStatesBelow returns the states with the highest slot below the input slot
// from the db. Ideally there should just be one state per slot, but given validator
// can double propose, a single slot could have multiple block roots and
// reuslts states. This returns a list of states.
// results states. This returns a list of states.
func (kv *Store) HighestSlotStatesBelow(ctx context.Context, slot uint64) ([]*state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.HighestSlotStatesBelow")
defer span.End()
var states []*state.BeaconState
err := kv.db.View(func(tx *bolt.Tx) error {
slotBkt := tx.Bucket(slotsHasObjectBucket)
savedSlots := slotBkt.Get(savedStateSlotsKey)
if len(savedSlots) == 0 {
savedSlots = bytesutil.MakeEmptyBitlists(int(slot))
}
highestIndex, err := bytesutil.HighestBitIndexAt(savedSlots, int(slot))
if err != nil {
return err
}
states, err = kv.statesAtSlotBitfieldIndex(ctx, tx, highestIndex)
return err
})
if err != nil {
var best []byte
if err := kv.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(stateSlotIndicesBucket)
c := bkt.Cursor()
for s, root := c.First(); s != nil; s, root = c.Next() {
key := bytesutil.BytesToUint64BigEndian(s)
if root == nil {
continue
}
if key >= slot {
break
}
best = root
}
return nil
}); err != nil {
return nil, err
}
if len(states) == 0 {
return nil, errors.New("could not get one state")
}
return states, nil
}
// statesAtSlotBitfieldIndex retrieves the states in DB given the input index. The index represents
// the position of the slot bitfield the saved state maps to.
func (kv *Store) statesAtSlotBitfieldIndex(ctx context.Context, tx *bolt.Tx, index int) ([]*state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.statesAtSlotBitfieldIndex")
defer span.End()
highestSlot := uint64(0)
if uint64(index) > highestSlot+1 {
highestSlot = uint64(index - 1)
}
if highestSlot == 0 {
gState, err := kv.GenesisState(ctx)
var st *state.BeaconState
var err error
if best != nil {
st, err = kv.State(ctx, bytesutil.ToBytes32(best))
if err != nil {
return nil, err
}
return []*state.BeaconState{gState}, nil
}
f := filters.NewFilter().SetStartSlot(highestSlot).SetEndSlot(highestSlot)
keys, err := getBlockRootsByFilter(ctx, tx, f)
if err != nil {
return nil, err
}
if len(keys) == 0 {
return nil, errors.New("could not get one block root to get state")
}
stateBkt := tx.Bucket(stateBucket)
states := make([]*state.BeaconState, 0, len(keys))
for i := range keys {
enc := stateBkt.Get(keys[i][:])
if enc == nil {
continue
}
pbState, err := createState(ctx, enc)
if st == nil {
st, err = kv.GenesisState(ctx)
if err != nil {
return nil, err
}
s, err := state.InitializeFromProtoUnsafe(pbState)
if err != nil {
return nil, err
}
states = append(states, s)
}
return states, err
return []*state.BeaconState{st}, nil
}
// setStateSlotBitField sets the state slot bit in DB.
// This helps to track which slot has a saved state in db.
func (kv *Store) setStateSlotBitField(ctx context.Context, tx *bolt.Tx, slot uint64) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.setStateSlotBitField")
// createBlockIndicesFromBlock takes in a beacon block and returns
// a map of bolt DB index buckets corresponding to each particular key for indices for
// data, such as (shard indices bucket -> shard 5).
func createStateIndicesFromStateSlot(ctx context.Context, slot uint64) map[string][]byte {
ctx, span := trace.StartSpan(ctx, "BeaconDB.createStateIndicesFromState")
defer span.End()
indicesByBucket := make(map[string][]byte)
// Every index has a unique bucket for fast, binary-search
// range scans for filtering across keys.
buckets := [][]byte{
stateSlotIndicesBucket,
}
kv.stateSlotBitLock.Lock()
defer kv.stateSlotBitLock.Unlock()
bucket := tx.Bucket(slotsHasObjectBucket)
slotBitfields := bucket.Get(savedStateSlotsKey)
// Copy is needed to avoid unsafe pointer conversions.
// See: https://github.com/etcd-io/bbolt/pull/201
tmp := make([]byte, len(slotBitfields))
copy(tmp, slotBitfields)
slotBitfields = bytesutil.SetBit(tmp, int(slot))
return bucket.Put(savedStateSlotsKey, slotBitfields)
}
// clearStateSlotBitField clears the state slot bit in DB.
// This helps to track which slot has a saved state in db.
func (kv *Store) clearStateSlotBitField(ctx context.Context, tx *bolt.Tx, slot uint64) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.clearStateSlotBitField")
defer span.End()
kv.stateSlotBitLock.Lock()
defer kv.stateSlotBitLock.Unlock()
bucket := tx.Bucket(slotsHasObjectBucket)
slotBitfields := bucket.Get(savedStateSlotsKey)
// Copy is needed to avoid unsafe pointer conversions.
// See: https://github.com/etcd-io/bbolt/pull/201
tmp := make([]byte, len(slotBitfields))
copy(tmp, slotBitfields)
slotBitfields = bytesutil.ClearBit(tmp, int(slot))
return bucket.Put(savedStateSlotsKey, slotBitfields)
indices := [][]byte{
bytesutil.Uint64ToBytesBigEndian(slot),
}
for i := 0; i < len(buckets); i++ {
indicesByBucket[string(buckets[i])] = indices[i]
}
return indicesByBucket
}

View File

@@ -275,103 +275,6 @@ func TestStore_DeleteHeadState(t *testing.T) {
}
}
func TestStore_SaveDeleteState_CanGetHighest(t *testing.T) {
db := setupDB(t)
b := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{Slot: 1}}
r, err := stateutil.BlockRoot(b.Block)
if err != nil {
t.Fatal(err)
}
if err := db.SaveBlock(context.Background(), b); err != nil {
t.Fatal(err)
}
st := testutil.NewBeaconState()
if err := db.SaveState(context.Background(), st, r); err != nil {
t.Fatal(err)
}
if err := db.SaveGenesisBlockRoot(context.Background(), r); err != nil {
t.Error(err)
}
s0 := st.InnerStateUnsafe()
b = &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{Slot: 999}}
r1, err := stateutil.BlockRoot(b.Block)
if err != nil {
t.Fatal(err)
}
if err := db.SaveBlock(context.Background(), b); err != nil {
t.Fatal(err)
}
st = testutil.NewBeaconState()
if err := st.SetSlot(999); err != nil {
t.Fatal(err)
}
s1 := st.InnerStateUnsafe()
if err := db.SaveState(context.Background(), st, r1); err != nil {
t.Fatal(err)
}
highest, err := db.HighestSlotStates(context.Background())
if err != nil {
t.Fatal(err)
}
if !proto.Equal(highest[0].InnerStateUnsafe(), s1) {
t.Errorf("Did not retrieve saved state: %v != %v", highest, s1)
}
b = &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{Slot: 1000}}
r2, err := stateutil.BlockRoot(b.Block)
if err != nil {
t.Fatal(err)
}
if err := db.SaveBlock(context.Background(), b); err != nil {
t.Fatal(err)
}
st = testutil.NewBeaconState()
if err := st.SetSlot(1000); err != nil {
t.Fatal(err)
}
s2 := st.InnerStateUnsafe()
if err := db.SaveState(context.Background(), st, r2); err != nil {
t.Fatal(err)
}
highest, err = db.HighestSlotStates(context.Background())
if err != nil {
t.Fatal(err)
}
if !proto.Equal(highest[0].InnerStateUnsafe(), s2) {
t.Errorf("Did not retrieve saved state: %v != %v", highest, s2)
}
if err := db.DeleteState(context.Background(), r2); err != nil {
t.Fatal(err)
}
highest, err = db.HighestSlotStates(context.Background())
if err != nil {
t.Fatal(err)
}
if !proto.Equal(highest[0].InnerStateUnsafe(), s1) {
t.Errorf("Did not retrieve saved state: %v != %v", highest, s1)
}
if err := db.DeleteState(context.Background(), r1); err != nil {
t.Fatal(err)
}
highest, err = db.HighestSlotStates(context.Background())
if err != nil {
t.Fatal(err)
}
if highest[0] == nil {
t.Fatal("returned nil state ")
}
if !proto.Equal(highest[0].InnerStateUnsafe(), s0) {
diff, _ := messagediff.PrettyDiff(highest[0].InnerStateUnsafe(), s0)
t.Errorf("Did not retrieve saved state: %v", diff)
}
}
func TestStore_SaveDeleteState_CanGetHighestBelow(t *testing.T) {
db := setupDB(t)
@@ -409,14 +312,6 @@ func TestStore_SaveDeleteState_CanGetHighestBelow(t *testing.T) {
t.Fatal(err)
}
highest, err := db.HighestSlotStates(context.Background())
if err != nil {
t.Fatal(err)
}
if !proto.Equal(highest[0].InnerStateUnsafe(), s1) {
t.Errorf("Did not retrieve saved state: %v != %v", highest, s1)
}
b = &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{Slot: 1000}}
r2, err := stateutil.BlockRoot(b.Block)
if err != nil {
@@ -435,7 +330,7 @@ func TestStore_SaveDeleteState_CanGetHighestBelow(t *testing.T) {
t.Fatal(err)
}
highest, err = db.HighestSlotStatesBelow(context.Background(), 2)
highest, err := db.HighestSlotStatesBelow(context.Background(), 2)
if err != nil {
t.Fatal(err)
}

View File

@@ -84,6 +84,14 @@ func deleteValueForIndices(ctx context.Context, indicesByBucket map[string][]byt
copy(valuesEnd, valuesAtIndex[start+len(root):])
valuesAtIndex = append(valuesStart, valuesEnd...)
// If this removes the last value, delete the whole key/value entry.
if len(valuesAtIndex) == 0 {
if err := bkt.Delete(idx); err != nil {
return err
}
continue
}
if err := bkt.Put(idx, valuesAtIndex); err != nil {
return err
}

View File

@@ -93,7 +93,7 @@ func Test_deleteValueForIndices(t *testing.T) {
outputIndices: map[string][]byte{
"blocks": {0xff, 0x32, 0x45, 0x25, 0x24},
"state": {0x01, 0x02, 0x03, 0x04},
"check-point": {},
"check-point": nil,
"powchain": {0xba, 0xad, 0xb0, 0x00, 0xff},
},
wantErr: false,

View File

@@ -268,6 +268,8 @@ func (b *BeaconNode) startDB(cliCtx *cli.Context) error {
clearDB := cliCtx.Bool(cmd.ClearDB.Name)
forceClearDB := cliCtx.Bool(cmd.ForceClearDB.Name)
log.WithField("database-path", dbPath).Info("Checking DB")
d, err := db.NewDB(dbPath, b.stateSummaryCache)
if err != nil {
return err
@@ -299,7 +301,10 @@ func (b *BeaconNode) startDB(cliCtx *cli.Context) error {
}
}
log.WithField("database-path", dbPath).Info("Checking DB")
if err := d.RunMigrations(b.ctx); err != nil {
return err
}
b.db = d
depositCache, err := depositcache.NewDepositCache()

View File

@@ -106,7 +106,6 @@ go_test(
],
embed = [":go_default_library"],
flaky = True,
tags = ["block-network"],
deps = [
"//beacon-chain/blockchain/testing:go_default_library",
"//beacon-chain/cache:go_default_library",

View File

@@ -17,20 +17,17 @@ func (s *State) saveColdState(ctx context.Context, blockRoot [32]byte, state *st
ctx, span := trace.StartSpan(ctx, "stateGen.saveColdState")
defer span.End()
if state.Slot()%s.slotsPerArchivedPoint != 0 {
slot := state.Slot()
if slot%s.slotsPerArchivedPoint != 0 {
return nil
}
if err := s.beaconDB.SaveState(ctx, state, blockRoot); err != nil {
return err
}
archivedIndex := state.Slot() / s.slotsPerArchivedPoint
if err := s.beaconDB.SaveArchivedPointRoot(ctx, blockRoot, archivedIndex); err != nil {
return err
}
log.WithFields(logrus.Fields{
"slot": state.Slot(),
"slot": slot,
"blockRoot": hex.EncodeToString(bytesutil.Trunc(blockRoot[:]))}).Info("Saved full state on archived point")
return nil
@@ -63,17 +60,10 @@ func (s *State) loadColdStateBySlot(ctx context.Context, slot uint64) (*state.Be
return nil, err
}
if archivedState == nil {
archivedRoot, err := s.archivedRoot(ctx, slot)
archivedState, err = s.beaconDB.GenesisState(ctx)
if err != nil {
return nil, err
}
archivedState, err = s.recoverStateByRoot(ctx, archivedRoot)
if err != nil {
return nil, err
}
if archivedState == nil {
return nil, errUnknownState
}
}
return s.processStateUpTo(ctx, archivedState, slot)

View File

@@ -61,7 +61,6 @@ func TestLoadStateByRoot_CanGet(t *testing.T) {
blk := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{}}
blkRoot, err := stateutil.BlockRoot(blk.Block)
require.NoError(t, err)
require.NoError(t, service.beaconDB.SaveArchivedPointRoot(ctx, blkRoot, 0))
require.NoError(t, service.beaconDB.SaveGenesisBlockRoot(ctx, blkRoot))
require.NoError(t, service.beaconDB.SaveBlock(ctx, blk))
require.NoError(t, service.beaconDB.SaveState(ctx, beaconState, blkRoot))
@@ -88,7 +87,6 @@ func TestLoadColdStateBySlot_CanGet(t *testing.T) {
blk := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{}}
blkRoot, err := stateutil.BlockRoot(blk.Block)
require.NoError(t, err)
require.NoError(t, service.beaconDB.SaveArchivedPointRoot(ctx, blkRoot, 0))
require.NoError(t, service.beaconDB.SaveGenesisBlockRoot(ctx, blkRoot))
require.NoError(t, service.beaconDB.SaveBlock(ctx, blk))
require.NoError(t, service.beaconDB.SaveState(ctx, beaconState, blkRoot))

View File

@@ -30,7 +30,6 @@ func TestStateByRoot_ColdState(t *testing.T) {
require.NoError(t, err)
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
require.NoError(t, beaconState.SetSlot(1))
require.NoError(t, service.beaconDB.SaveArchivedPointRoot(ctx, bRoot, 0))
require.NoError(t, service.beaconDB.SaveState(ctx, beaconState, bRoot))
require.NoError(t, service.beaconDB.SaveBlock(ctx, b))
require.NoError(t, service.beaconDB.SaveGenesisBlockRoot(ctx, bRoot))
@@ -189,11 +188,9 @@ func TestStateBySlot_ColdState(t *testing.T) {
bRoot, err := stateutil.BlockRoot(b.Block)
require.NoError(t, err)
require.NoError(t, db.SaveState(ctx, beaconState, bRoot))
require.NoError(t, service.beaconDB.SaveArchivedPointRoot(ctx, bRoot, 0))
require.NoError(t, db.SaveGenesisBlockRoot(ctx, bRoot))
r := [32]byte{}
require.NoError(t, service.beaconDB.SaveArchivedPointRoot(ctx, r, 1))
if err := service.beaconDB.SaveStateSummary(ctx, &pb.StateSummary{
Slot: service.slotsPerArchivedPoint,
Root: r[:],

View File

@@ -34,18 +34,17 @@ func (s *State) MigrateToCold(ctx context.Context, fRoot [32]byte) error {
// Start at previous finalized slot, stop at current finalized slot.
// If the slot is on archived point, save the state of that slot to the DB.
for i := oldFSlot; i < fSlot; i++ {
for slot := oldFSlot; slot < fSlot; slot++ {
if ctx.Err() != nil {
return ctx.Err()
}
if i%s.slotsPerArchivedPoint == 0 && i != 0 {
cached, exists, err := s.epochBoundaryStateCache.getBySlot(i)
if slot%s.slotsPerArchivedPoint == 0 && slot != 0 {
cached, exists, err := s.epochBoundaryStateCache.getBySlot(slot)
if err != nil {
return fmt.Errorf("could not get epoch boundary state for slot %d", i)
return fmt.Errorf("could not get epoch boundary state for slot %d", slot)
}
aIndex := i / s.slotsPerArchivedPoint
var aRoot [32]byte
var aState *stateTrie.BeaconState
@@ -57,7 +56,7 @@ func (s *State) MigrateToCold(ctx context.Context, fRoot [32]byte) error {
aRoot = cached.root
aState = cached.state
} else {
blks, err := s.beaconDB.HighestSlotBlocksBelow(ctx, i)
blks, err := s.beaconDB.HighestSlotBlocksBelow(ctx, slot)
if err != nil {
return err
}
@@ -84,17 +83,10 @@ func (s *State) MigrateToCold(ctx context.Context, fRoot [32]byte) error {
if err := s.beaconDB.SaveState(ctx, aState, aRoot); err != nil {
return err
}
if err := s.beaconDB.SaveArchivedPointRoot(ctx, aRoot, aIndex); err != nil {
return err
}
if err := s.beaconDB.SaveLastArchivedIndex(ctx, aIndex); err != nil {
return err
}
log.WithFields(
logrus.Fields{
"slot": aState.Slot(),
"archivedIndex": aIndex,
"root": hex.EncodeToString(bytesutil.Trunc(aRoot[:])),
"slot": aState.Slot(),
"root": hex.EncodeToString(bytesutil.Trunc(aRoot[:])),
}).Info("Saved state in DB")
}
}

View File

@@ -53,7 +53,7 @@ func TestMigrateToCold_HappyPath(t *testing.T) {
assert.DeepEqual(t, beaconState.InnerStateUnsafe(), gotState.InnerStateUnsafe(), "Did not save state")
gotRoot := service.beaconDB.ArchivedPointRoot(ctx, stateSlot/service.slotsPerArchivedPoint)
assert.Equal(t, fRoot, gotRoot, "Did not save archived root")
lastIndex, err := service.beaconDB.LastArchivedIndex(ctx)
lastIndex, err := service.beaconDB.LastArchivedSlot(ctx)
require.NoError(t, err)
assert.Equal(t, uint64(1), lastIndex, "Did not save last archived index")
@@ -94,7 +94,7 @@ func TestMigrateToCold_RegeneratePath(t *testing.T) {
assert.DeepEqual(t, beaconState.InnerStateUnsafe(), gotState.InnerStateUnsafe(), "Did not save state")
gotRoot := service.beaconDB.ArchivedPointRoot(ctx, stateSlot/service.slotsPerArchivedPoint)
assert.Equal(t, fRoot, gotRoot, "Did not save archived root")
lastIndex, err := service.beaconDB.LastArchivedIndex(ctx)
lastIndex, err := service.beaconDB.LastArchivedSlot(ctx)
require.NoError(t, err)
assert.Equal(t, uint64(1), lastIndex, "Did not save last archived index")

View File

@@ -13,7 +13,6 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/params"
"go.opencensus.io/trace"
)
@@ -309,35 +308,14 @@ func (s *State) genesisRoot(ctx context.Context) ([32]byte, error) {
return stateutil.BlockRoot(b.Block)
}
// This returns the highest archived root based on input slot in the DB.
// If the archived root is not available at that exact input slot due to an event of skip block,
// this will look back and return the last available archived root (ie. the one with the highest slot below input slot).
func (s *State) archivedRoot(ctx context.Context, slot uint64) ([32]byte, error) {
archivedIndex := slot / params.BeaconConfig().SlotsPerArchivedPoint
for archivedIndex > 0 {
if ctx.Err() != nil {
return [32]byte{}, ctx.Err()
}
if s.beaconDB.HasArchivedPoint(ctx, archivedIndex) {
return s.beaconDB.ArchivedPointRoot(ctx, archivedIndex), nil
}
archivedIndex--
}
if archivedIndex == 0 {
return s.genesisRoot(ctx)
}
return [32]byte{}, errUnknownArchivedState
}
// This retrieves the archived state in the DB.
func (s *State) archivedState(ctx context.Context, slot uint64) (*state.BeaconState, error) {
archivedRoot, err := s.archivedRoot(ctx, slot)
if err != nil {
return nil, err
var st *state.BeaconState
sts, err := s.beaconDB.HighestSlotStatesBelow(ctx, slot+1)
if len(sts) > 0 {
st = sts[0]
}
return s.beaconDB.State(ctx, archivedRoot)
return st, err
}
// This recomputes a state given the block root.

View File

@@ -418,54 +418,12 @@ func TestLastSavedState_NoSavedBlockState(t *testing.T) {
assert.ErrorContains(t, errUnknownState.Error(), err)
}
func TestArchivedRoot_CanGetSpecificIndex(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
r := [32]byte{'a'}
require.NoError(t, db.SaveArchivedPointRoot(ctx, r, 1))
got, err := service.archivedRoot(ctx, params.BeaconConfig().SlotsPerArchivedPoint*2)
require.NoError(t, err)
assert.Equal(t, r, got, "Did not get wanted root")
}
func TestArchivedRoot_CanGetOlderOlder(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
r := [32]byte{'a'}
require.NoError(t, db.SaveArchivedPointRoot(ctx, r, 10))
r = [32]byte{'b'}
require.NoError(t, db.SaveArchivedPointRoot(ctx, r, 11))
got, err := service.archivedRoot(ctx, 100000)
require.NoError(t, err)
assert.Equal(t, r, got, "Did not get wanted root")
}
func TestArchivedRoot_CanGetGenesisIndex(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
gBlock := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{}}
gRoot, err := stateutil.BlockRoot(gBlock.Block)
require.NoError(t, err)
require.NoError(t, db.SaveBlock(ctx, gBlock))
require.NoError(t, db.SaveGenesisBlockRoot(ctx, gRoot))
got, err := service.archivedRoot(ctx, 100000)
require.NoError(t, err)
assert.Equal(t, gRoot, got, "Did not get wanted root")
}
func TestArchivedState_CanGetSpecificIndex(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
r := [32]byte{'a'}
require.NoError(t, db.SaveArchivedPointRoot(ctx, r, 1))
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
require.NoError(t, db.SaveState(ctx, beaconState, r))
got, err := service.archivedState(ctx, params.BeaconConfig().SlotsPerArchivedPoint)

View File

@@ -51,7 +51,7 @@ func (s *State) Resume(ctx context.Context) (*state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "stateGen.Resume")
defer span.End()
lastArchivedRoot := s.beaconDB.LastArchivedIndexRoot(ctx)
lastArchivedRoot := s.beaconDB.LastArchivedRoot(ctx)
lastArchivedState, err := s.beaconDB.State(ctx, lastArchivedRoot)
if err != nil {
return nil, err

View File

@@ -22,8 +22,6 @@ func TestResume(t *testing.T) {
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
require.NoError(t, beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch))
require.NoError(t, service.beaconDB.SaveState(ctx, beaconState, root))
require.NoError(t, service.beaconDB.SaveArchivedPointRoot(ctx, root, 1))
require.NoError(t, service.beaconDB.SaveLastArchivedIndex(ctx, 1))
resumeState, err := service.Resume(ctx)
require.NoError(t, err)

View File

@@ -308,9 +308,25 @@ func HighestBitIndexAt(b []byte, index int) (int, error) {
return 0, nil
}
// Uint64ToBytes little endian conversion.
func Uint64ToBytes(i uint64) []byte {
// Uint64ToBytesLittleEndian conversion.
func Uint64ToBytesLittleEndian(i uint64) []byte {
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, i)
return buf
}
// Uint64ToBytesBigEndian conversion.
func Uint64ToBytesBigEndian(i uint64) []byte {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, i)
return buf
}
// BytesToUint64BigEndian conversion. Returns 0 if empty bytes or byte slice with length less
// than 8.
func BytesToUint64BigEndian(b []byte) uint64 {
if len(b) < 8 { // This will panic otherwise.
return 0
}
return binary.BigEndian.Uint64(b)
}

View File

@@ -369,3 +369,12 @@ func TestHighestBitIndexBelow(t *testing.T) {
}
}
}
func TestUint64ToBytes_RoundTrip(t *testing.T) {
for i := uint64(0); i < 10000; i++ {
b := bytesutil.Uint64ToBytesBigEndian(i)
if got := bytesutil.BytesToUint64BigEndian(b); got != i {
t.Error("Round trip did not match original value")
}
}
}

View File

@@ -938,10 +938,10 @@ func TestCheckAndLogValidatorStatus_OK(t *testing.T) {
active bool
}
pubKeys := [][]byte{
bytesutil.Uint64ToBytes(0),
bytesutil.Uint64ToBytes(1),
bytesutil.Uint64ToBytes(2),
bytesutil.Uint64ToBytes(3),
bytesutil.Uint64ToBytesLittleEndian(0),
bytesutil.Uint64ToBytesLittleEndian(1),
bytesutil.Uint64ToBytesLittleEndian(2),
bytesutil.Uint64ToBytesLittleEndian(3),
}
tests := []statusTest{
{