Cleanup stategen pkg (#7127)

* Clean up stategen
* Sync with master
* Fix tests
* Merge refs/heads/master into cleanup-newstate
* Merge refs/heads/master into cleanup-newstate
* Rename file
* Merge branch 'cleanup-newstate' of github.com:prysmaticlabs/prysm into cleanup-newstate
* gaz
* Forgot to add metrics page back
This commit is contained in:
terence tsao
2020-08-27 15:29:59 -07:00
committed by GitHub
parent bd489e345a
commit fea2cc9e2f
12 changed files with 456 additions and 861 deletions

View File

@@ -4,12 +4,11 @@ load("@io_bazel_rules_go//go:def.bzl", "go_test")
go_library(
name = "go_default_library",
srcs = [
"cold.go",
"epoch_boundary_state_cache.go",
"errors.go",
"getter.go",
"hot.go",
"log.go",
"metrics.go",
"migrate.go",
"replay.go",
"service.go",
@@ -41,10 +40,8 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"cold_test.go",
"epoch_boundary_state_cache_test.go",
"getter_test.go",
"hot_test.go",
"migrate_test.go",
"replay_test.go",
"service_test.go",
@@ -64,6 +61,7 @@ go_test(
"//shared/testutil/require:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_prysmaticlabs_go_ssz//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
],
)

View File

@@ -1,70 +0,0 @@
package stategen
import (
"context"
"encoding/hex"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
// This saves a pre finalized beacon state in the cold section of the DB. The returns an error
// and not store anything if the state does not lie on an archive point boundary.
func (s *State) saveColdState(ctx context.Context, blockRoot [32]byte, state *state.BeaconState) error {
ctx, span := trace.StartSpan(ctx, "stateGen.saveColdState")
defer span.End()
slot := state.Slot()
if slot%s.slotsPerArchivedPoint != 0 {
return nil
}
if err := s.beaconDB.SaveState(ctx, state, blockRoot); err != nil {
return err
}
log.WithFields(logrus.Fields{
"slot": slot,
"blockRoot": hex.EncodeToString(bytesutil.Trunc(blockRoot[:]))}).Info("Saved full state on archived point")
return nil
}
// This loads the cold state by block root.
func (s *State) loadColdStateByRoot(ctx context.Context, blockRoot [32]byte) (*state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "stateGen.loadColdStateByRoot")
defer span.End()
summary, err := s.stateSummary(ctx, blockRoot)
if err != nil {
return nil, errors.Wrap(err, "could not get state summary")
}
return s.loadColdStateBySlot(ctx, summary.Slot)
}
// This loads a cold state by slot.
func (s *State) loadColdStateBySlot(ctx context.Context, slot uint64) (*state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "stateGen.loadColdStateBySlot")
defer span.End()
if slot == 0 {
return s.beaconDB.GenesisState(ctx)
}
archivedState, err := s.archivedState(ctx, slot)
if err != nil {
return nil, err
}
if archivedState == nil {
archivedState, err = s.beaconDB.GenesisState(ctx)
if err != nil {
return nil, err
}
}
return s.processStateUpTo(ctx, archivedState, slot)
}

View File

@@ -1,89 +0,0 @@
package stategen
import (
"context"
"testing"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
)
func TestSaveColdState_NonArchivedPoint(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
service.slotsPerArchivedPoint = 2
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
require.NoError(t, beaconState.SetSlot(1))
assert.NoError(t, service.saveColdState(ctx, [32]byte{}, beaconState))
}
func TestSaveColdState_CanSave(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
service.slotsPerArchivedPoint = 1
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
require.NoError(t, beaconState.SetSlot(1))
r := [32]byte{'a'}
require.NoError(t, service.saveColdState(ctx, r, beaconState))
assert.Equal(t, true, service.beaconDB.HasArchivedPoint(ctx, 1), "Did not save cold state")
assert.Equal(t, r, service.beaconDB.ArchivedPointRoot(ctx, 1), "Did not get wanted root")
}
func TestLoadColdStateByRoot_NoStateSummary(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
_, err := service.loadColdStateByRoot(ctx, [32]byte{'a'})
require.ErrorContains(t, errUnknownStateSummary.Error(), err, "Did not get correct error")
}
func TestLoadStateByRoot_CanGet(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
service.slotsPerArchivedPoint = 1
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
blk := testutil.NewBeaconBlock()
blkRoot, err := blk.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, service.beaconDB.SaveGenesisBlockRoot(ctx, blkRoot))
require.NoError(t, service.beaconDB.SaveBlock(ctx, blk))
require.NoError(t, service.beaconDB.SaveState(ctx, beaconState, blkRoot))
require.NoError(t, service.beaconDB.SaveStateSummary(ctx, &pb.StateSummary{Root: blkRoot[:], Slot: 100}))
loadedState, err := service.StateByRoot(ctx, blkRoot)
require.NoError(t, err)
assert.DeepEqual(t, beaconState.InnerStateUnsafe(), loadedState.InnerStateUnsafe(), "Did not correctly save state")
}
func TestLoadColdStateBySlot_CanGet(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
blk := testutil.NewBeaconBlock()
blkRoot, err := blk.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, service.beaconDB.SaveGenesisBlockRoot(ctx, blkRoot))
require.NoError(t, service.beaconDB.SaveBlock(ctx, blk))
require.NoError(t, service.beaconDB.SaveState(ctx, beaconState, blkRoot))
loadedState, err := service.loadColdStateBySlot(ctx, 200)
require.NoError(t, err)
assert.Equal(t, uint64(200), loadedState.Slot(), "Did not correctly save state")
}

View File

@@ -11,9 +11,22 @@ import (
"go.opencensus.io/trace"
)
// StateByRoot retrieves the state from DB using input block root.
// It retrieves state from the hot section if the state summary slot
// is below the split point cut off.
// HasState returns true if the state exists in cache or in DB.
func (s *State) HasState(ctx context.Context, blockRoot [32]byte) (bool, error) {
if s.hotStateCache.Has(blockRoot) {
return true, nil
}
_, has, err := s.epochBoundaryStateCache.getByRoot(blockRoot)
if err != nil {
return false, err
}
if has {
return true, nil
}
return s.beaconDB.HasState(ctx, blockRoot), nil
}
// StateByRoot retrieves the state using input block root.
func (s *State) StateByRoot(ctx context.Context, blockRoot [32]byte) (*state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "stateGen.StateByRoot")
defer span.End()
@@ -28,16 +41,7 @@ func (s *State) StateByRoot(ctx context.Context, blockRoot [32]byte) (*state.Bea
return s.beaconDB.State(ctx, blockRoot)
}
summary, err := s.stateSummary(ctx, blockRoot)
if err != nil {
return nil, errors.Wrap(err, "could not get cachedState summary")
}
if summary.Slot < s.finalizedInfo.slot {
return s.loadColdStateByRoot(ctx, blockRoot)
}
return s.loadHotStateByRoot(ctx, blockRoot)
return s.loadStateByRoot(ctx, blockRoot)
}
// StateByRootInitialSync retrieves the state from the DB for the initial syncing phase.
@@ -83,30 +87,22 @@ func (s *State) StateByRootInitialSync(ctx context.Context, blockRoot [32]byte)
blks, err := s.LoadBlocks(ctx, startState.Slot()+1, summary.Slot, bytesutil.ToBytes32(summary.Root))
if err != nil {
return nil, errors.Wrap(err, "could not load blocks for hot state using root")
return nil, errors.Wrap(err, "could not load blocks")
}
startState, err = s.ReplayBlocks(ctx, startState, blks, summary.Slot)
if err != nil {
return nil, errors.Wrap(err, "could not replay blocks for hot state using root")
return nil, errors.Wrap(err, "could not replay blocks")
}
return startState, nil
}
// StateBySlot retrieves the state from DB using input slot.
// It retrieves state from the cold section if the input slot
// is below the split point cut off.
// Note: `StateByRoot` is preferred over this. Retrieving state
// by root `StateByRoot` is more performant than retrieving by slot.
// StateBySlot retrieves the state using input slot.
func (s *State) StateBySlot(ctx context.Context, slot uint64) (*state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "stateGen.StateBySlot")
defer span.End()
if slot < s.finalizedInfo.slot {
return s.loadColdStateBySlot(ctx, slot)
}
return s.loadHotStateBySlot(ctx, slot)
return s.loadStateBySlot(ctx, slot)
}
// StateSummaryExists returns true if the corresponding state summary of the input block root either
@@ -152,3 +148,144 @@ func (s *State) recoverStateSummary(ctx context.Context, blockRoot [32]byte) (*p
}
return nil, errUnknownStateSummary
}
// This loads a beacon state from either the cache or DB then replay blocks up the requested block root.
func (s *State) loadStateByRoot(ctx context.Context, blockRoot [32]byte) (*state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "stateGen.loadStateByRoot")
defer span.End()
// First, it checks if the state exists in hot state cache.
cachedState := s.hotStateCache.Get(blockRoot)
if cachedState != nil {
return cachedState, nil
}
// Second, it checks if the state exits in epoch boundary state cache.
cachedInfo, ok, err := s.epochBoundaryStateCache.getByRoot(blockRoot)
if err != nil {
return nil, err
}
if ok {
return cachedInfo.state, nil
}
summary, err := s.stateSummary(ctx, blockRoot)
if err != nil {
return nil, errors.Wrap(err, "could not get state summary")
}
targetSlot := summary.Slot
// Since the requested state is not in caches, start replaying using the last available ancestor state which is
// retrieved using input block's parent root.
startState, err := s.lastAncestorState(ctx, blockRoot)
if err != nil {
return nil, errors.Wrap(err, "could not get ancestor state")
}
if startState == nil {
return nil, errUnknownBoundaryState
}
blks, err := s.LoadBlocks(ctx, startState.Slot()+1, targetSlot, bytesutil.ToBytes32(summary.Root))
if err != nil {
return nil, errors.Wrap(err, "could not load blocks for hot state using root")
}
replayBlockCount.Observe(float64(len(blks)))
return s.ReplayBlocks(ctx, startState, blks, targetSlot)
}
// This loads a state by slot.
func (s *State) loadStateBySlot(ctx context.Context, slot uint64) (*state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "stateGen.loadStateBySlot")
defer span.End()
// Return genesis state if slot is 0.
if slot == 0 {
return s.beaconDB.GenesisState(ctx)
}
// Gather last saved state, that is where node starts to replay the blocks.
startState, err := s.lastSavedState(ctx, slot)
// Gather the last saved block root and the slot number.
lastValidRoot, lastValidSlot, err := s.lastSavedBlock(ctx, slot)
if err != nil {
return nil, errors.Wrap(err, "could not get last valid block for hot state using slot")
}
// Load and replay blocks to get the intermediate state.
replayBlks, err := s.LoadBlocks(ctx, startState.Slot()+1, lastValidSlot, lastValidRoot)
if err != nil {
return nil, err
}
return s.ReplayBlocks(ctx, startState, replayBlks, slot)
}
// This returns the highest available ancestor state of the input block root.
// It recursively look up block's parent until a corresponding state of the block root
// is found in the caches or DB.
//
// There's three ways to derive block parent state:
// 1.) block parent state is the last finalized state
// 2.) block parent state is the epoch boundary state and exists in epoch boundary cache.
// 3.) block parent state is in DB.
func (s *State) lastAncestorState(ctx context.Context, root [32]byte) (*state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "stateGen.lastAncestorState")
defer span.End()
if s.isFinalizedRoot(root) && s.finalizedState() != nil {
return s.finalizedState(), nil
}
b, err := s.beaconDB.Block(ctx, root)
if err != nil {
return nil, err
}
if b == nil {
return nil, errUnknownBlock
}
for {
if ctx.Err() != nil {
return nil, ctx.Err()
}
// Is the state a genesis state.
parentRoot := bytesutil.ToBytes32(b.Block.ParentRoot)
if parentRoot == params.BeaconConfig().ZeroHash {
return s.beaconDB.GenesisState(ctx)
}
// Does the state exist in the hot state cache.
if s.hotStateCache.Has(parentRoot) {
return s.hotStateCache.Get(parentRoot), nil
}
// Does the state exist in finalized info cache.
if s.isFinalizedRoot(parentRoot) {
return s.finalizedState(), nil
}
// Does the state exist in epoch boundary cache.
cachedInfo, ok, err := s.epochBoundaryStateCache.getByRoot(parentRoot)
if err != nil {
return nil, err
}
if ok {
return cachedInfo.state, nil
}
// Does the state exists in DB.
if s.beaconDB.HasState(ctx, parentRoot) {
return s.beaconDB.State(ctx, parentRoot)
}
b, err = s.beaconDB.Block(ctx, parentRoot)
if err != nil {
return nil, err
}
if b == nil {
return nil, errUnknownBlock
}
}
}

View File

@@ -5,9 +5,11 @@ import (
"testing"
"github.com/gogo/protobuf/proto"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
@@ -32,9 +34,7 @@ func TestStateByRoot_ColdState(t *testing.T) {
require.NoError(t, service.beaconDB.SaveState(ctx, beaconState, bRoot))
require.NoError(t, service.beaconDB.SaveBlock(ctx, b))
require.NoError(t, service.beaconDB.SaveGenesisBlockRoot(ctx, bRoot))
r := [32]byte{'a'}
require.NoError(t, service.beaconDB.SaveStateSummary(ctx, &pb.StateSummary{Root: r[:], Slot: 1}))
loadedState, err := service.StateByRoot(ctx, r)
loadedState, err := service.StateByRoot(ctx, bRoot)
require.NoError(t, err)
if !proto.Equal(loadedState.InnerStateUnsafe(), beaconState.InnerStateUnsafe()) {
t.Error("Did not correctly save state")
@@ -242,3 +242,169 @@ func TestStateSummary_CanGetFromCacheOrDB(t *testing.T) {
t.Error("Did not get wanted summary")
}
}
func TestLoadeStateByRoot_Cached(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
r := [32]byte{'A'}
service.hotStateCache.Put(r, beaconState)
// This tests where hot state was already cached.
loadedState, err := service.loadStateByRoot(ctx, r)
require.NoError(t, err)
if !proto.Equal(loadedState.InnerStateUnsafe(), beaconState.InnerStateUnsafe()) {
t.Error("Did not correctly cache state")
}
}
func TestLoadeStateByRoot_EpochBoundaryStateCanProcess(t *testing.T) {
ctx := context.Background()
db, ssc := testDB.SetupDB(t)
service := New(db, ssc)
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
gBlk := testutil.NewBeaconBlock()
gBlkRoot, err := gBlk.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, service.epochBoundaryStateCache.put(gBlkRoot, beaconState))
blk := testutil.NewBeaconBlock()
blk.Block.Slot = 11
blk.Block.ProposerIndex = 8
blk.Block.ParentRoot = gBlkRoot[:]
require.NoError(t, service.beaconDB.SaveBlock(ctx, blk))
blkRoot, err := blk.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, service.beaconDB.SaveStateSummary(ctx, &pb.StateSummary{Slot: 10, Root: blkRoot[:]}))
// This tests where hot state was not cached and needs processing.
loadedState, err := service.loadStateByRoot(ctx, blkRoot)
require.NoError(t, err)
assert.Equal(t, uint64(10), loadedState.Slot(), "Did not correctly load state")
}
func TestLoadeStateByRoot_FromDBBoundaryCase(t *testing.T) {
ctx := context.Background()
db, ssc := testDB.SetupDB(t)
service := New(db, ssc)
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
gBlk := testutil.NewBeaconBlock()
gBlkRoot, err := gBlk.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, service.epochBoundaryStateCache.put(gBlkRoot, beaconState))
blk := testutil.NewBeaconBlock()
blk.Block.Slot = 11
blk.Block.ProposerIndex = 8
blk.Block.ParentRoot = gBlkRoot[:]
require.NoError(t, service.beaconDB.SaveBlock(ctx, blk))
blkRoot, err := blk.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, service.beaconDB.SaveStateSummary(ctx, &pb.StateSummary{Slot: 10, Root: blkRoot[:]}))
// This tests where hot state was not cached and needs processing.
loadedState, err := service.loadStateByRoot(ctx, blkRoot)
require.NoError(t, err)
assert.Equal(t, uint64(10), loadedState.Slot(), "Did not correctly load state")
}
func TestLoadeStateBySlot_CanAdvanceSlotUsingDB(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
b := testutil.NewBeaconBlock()
require.NoError(t, service.beaconDB.SaveBlock(ctx, b))
gRoot, err := b.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, service.beaconDB.SaveGenesisBlockRoot(ctx, gRoot))
require.NoError(t, service.beaconDB.SaveState(ctx, beaconState, gRoot))
slot := uint64(10)
loadedState, err := service.loadStateBySlot(ctx, slot)
require.NoError(t, err)
assert.Equal(t, slot, loadedState.Slot(), "Did not correctly load state")
}
func TestLastAncestorState_CanGetUsingDB(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
b0 := testutil.NewBeaconBlock()
b0.Block.ParentRoot = bytesutil.PadTo([]byte{'a'}, 32)
r0, err := ssz.HashTreeRoot(b0.Block)
require.NoError(t, err)
b1 := testutil.NewBeaconBlock()
b1.Block.Slot = 1
b1.Block.ParentRoot = bytesutil.PadTo(r0[:], 32)
r1, err := ssz.HashTreeRoot(b1.Block)
require.NoError(t, err)
b2 := testutil.NewBeaconBlock()
b2.Block.Slot = 2
b2.Block.ParentRoot = bytesutil.PadTo(r1[:], 32)
r2, err := ssz.HashTreeRoot(b2.Block)
require.NoError(t, err)
b3 := testutil.NewBeaconBlock()
b3.Block.Slot = 3
b3.Block.ParentRoot = bytesutil.PadTo(r2[:], 32)
r3, err := ssz.HashTreeRoot(b3.Block)
require.NoError(t, err)
b1State := testutil.NewBeaconState()
require.NoError(t, b1State.SetSlot(1))
require.NoError(t, service.beaconDB.SaveBlock(ctx, b0))
require.NoError(t, service.beaconDB.SaveBlock(ctx, b1))
require.NoError(t, service.beaconDB.SaveBlock(ctx, b2))
require.NoError(t, service.beaconDB.SaveBlock(ctx, b3))
require.NoError(t, service.beaconDB.SaveState(ctx, b1State, r1))
lastState, err := service.lastAncestorState(ctx, r3)
require.NoError(t, err)
assert.Equal(t, b1State.Slot(), lastState.Slot(), "Did not get wanted state")
}
func TestLastAncestorState_CanGetUsingCache(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
b0 := testutil.NewBeaconBlock()
b0.Block.ParentRoot = bytesutil.PadTo([]byte{'a'}, 32)
r0, err := ssz.HashTreeRoot(b0.Block)
require.NoError(t, err)
b1 := testutil.NewBeaconBlock()
b1.Block.Slot = 1
b1.Block.ParentRoot = bytesutil.PadTo(r0[:], 32)
r1, err := ssz.HashTreeRoot(b1.Block)
require.NoError(t, err)
b2 := testutil.NewBeaconBlock()
b2.Block.Slot = 2
b2.Block.ParentRoot = bytesutil.PadTo(r1[:], 32)
r2, err := ssz.HashTreeRoot(b2.Block)
require.NoError(t, err)
b3 := testutil.NewBeaconBlock()
b3.Block.Slot = 3
b3.Block.ParentRoot = bytesutil.PadTo(r2[:], 32)
r3, err := ssz.HashTreeRoot(b3.Block)
require.NoError(t, err)
b1State := testutil.NewBeaconState()
require.NoError(t, b1State.SetSlot(1))
require.NoError(t, service.beaconDB.SaveBlock(ctx, b0))
require.NoError(t, service.beaconDB.SaveBlock(ctx, b1))
require.NoError(t, service.beaconDB.SaveBlock(ctx, b2))
require.NoError(t, service.beaconDB.SaveBlock(ctx, b3))
service.hotStateCache.Put(r1, b1State)
lastState, err := service.lastAncestorState(ctx, r3)
require.NoError(t, err)
assert.Equal(t, b1State.Slot(), lastState.Slot(), "Did not get wanted state")
}

View File

@@ -1,228 +0,0 @@
package stategen
import (
"context"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/state"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
"go.opencensus.io/trace"
)
var (
replayBlockCount = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "replay_blocks_count",
Help: "The number of blocks to replay to generate a state",
Buckets: []float64{64, 256, 1024, 2048, 4096},
},
)
)
// HasState returns true if the state exists in cache or in DB.
func (s *State) HasState(ctx context.Context, blockRoot [32]byte) (bool, error) {
if s.hotStateCache.Has(blockRoot) {
return true, nil
}
_, has, err := s.epochBoundaryStateCache.getByRoot(blockRoot)
if err != nil {
return false, err
}
if has {
return true, nil
}
return s.beaconDB.HasState(ctx, blockRoot), nil
}
// SaveStateSummary saves the relevant state summary for a block and its corresponding state slot in the
// state summary cache.
func (s *State) SaveStateSummary(ctx context.Context, blk *ethpb.SignedBeaconBlock, blockRoot [32]byte) {
// Save State summary
s.stateSummaryCache.Put(blockRoot, &pb.StateSummary{
Slot: blk.Block.Slot,
Root: blockRoot[:],
})
}
// This saves a post finalized beacon state in the hot section of the DB. On the epoch boundary,
// it saves a full state. On an intermediate slot, it saves a back pointer to the
// nearest epoch boundary state.
func (s *State) saveHotState(ctx context.Context, blockRoot [32]byte, state *state.BeaconState) error {
ctx, span := trace.StartSpan(ctx, "stateGen.saveHotState")
defer span.End()
// If the hot state is already in cache, one can be sure the state was processed and in the DB.
if s.hotStateCache.Has(blockRoot) {
return nil
}
// Only on an epoch boundary slot, saves epoch boundary state in epoch boundary root state cache.
if helpers.IsEpochStart(state.Slot()) {
if err := s.epochBoundaryStateCache.put(blockRoot, state); err != nil {
return err
}
}
// On an intermediate slots, save the hot state summary.
s.stateSummaryCache.Put(blockRoot, &pb.StateSummary{
Slot: state.Slot(),
Root: blockRoot[:],
})
// Store the copied state in the hot state cache.
s.hotStateCache.Put(blockRoot, state)
return nil
}
// This loads a post finalized beacon state from the hot section of the DB. If necessary it will
// replay blocks starting from the nearest epoch boundary state or last finalized state. It returns the beacon state that
// corresponds to the input block root.
func (s *State) loadHotStateByRoot(ctx context.Context, blockRoot [32]byte) (*state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "stateGen.loadHotStateByRoot")
defer span.End()
// First, it checks if the state exists in hot state cache.
cachedState := s.hotStateCache.Get(blockRoot)
if cachedState != nil {
return cachedState, nil
}
// Second, it checks if the state exits in epoch boundary state cache.
cachedInfo, ok, err := s.epochBoundaryStateCache.getByRoot(blockRoot)
if err != nil {
return nil, err
}
if ok {
return cachedInfo.state, nil
}
summary, err := s.stateSummary(ctx, blockRoot)
if err != nil {
return nil, errors.Wrap(err, "could not get state summary")
}
targetSlot := summary.Slot
// Since the requested state is not in caches, start replaying using the last available ancestor state which is
// retrieved using input block's parent root.
startState, err := s.lastAncestorState(ctx, blockRoot)
if err != nil {
return nil, errors.Wrap(err, "could not get ancestor state")
}
if startState == nil {
return nil, errUnknownBoundaryState
}
blks, err := s.LoadBlocks(ctx, startState.Slot()+1, targetSlot, bytesutil.ToBytes32(summary.Root))
if err != nil {
return nil, errors.Wrap(err, "could not load blocks for hot state using root")
}
replayBlockCount.Observe(float64(len(blks)))
return s.ReplayBlocks(ctx, startState, blks, targetSlot)
}
// This loads a hot state by slot where the slot lies between the epoch boundary points.
// This is a slower implementation (versus ByRoot) as slot is the only argument. It require fetching
// all the blocks between the epoch boundary points for playback.
// Use `loadHotStateByRoot` unless you really don't know the root.
func (s *State) loadHotStateBySlot(ctx context.Context, slot uint64) (*state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "stateGen.loadHotStateBySlot")
defer span.End()
// Return genesis state if slot is 0.
if slot == 0 {
return s.beaconDB.GenesisState(ctx)
}
// Gather last saved state, that is where node starts to replay the blocks.
startState, err := s.lastSavedState(ctx, slot)
// Gather the last saved block root and the slot number.
lastValidRoot, lastValidSlot, err := s.lastSavedBlock(ctx, slot)
if err != nil {
return nil, errors.Wrap(err, "could not get last valid block for hot state using slot")
}
// Load and replay blocks to get the intermediate state.
replayBlks, err := s.LoadBlocks(ctx, startState.Slot()+1, lastValidSlot, lastValidRoot)
if err != nil {
return nil, err
}
return s.ReplayBlocks(ctx, startState, replayBlks, slot)
}
// This returns the highest available ancestor state of the input block root.
// It recursively look up block's parent until a corresponding state of the block root
// is found in the caches or DB.
//
// There's three ways to derive block parent state:
// 1.) block parent state is the last finalized state
// 2.) block parent state is the epoch boundary state and exists in epoch boundary cache.
// 3.) block parent state is in DB.
func (s *State) lastAncestorState(ctx context.Context, root [32]byte) (*state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "stateGen.lastAncestorState")
defer span.End()
if s.isFinalizedRoot(root) && s.finalizedState() != nil {
return s.finalizedState(), nil
}
b, err := s.beaconDB.Block(ctx, root)
if err != nil {
return nil, err
}
if b == nil {
return nil, errUnknownBlock
}
for {
if ctx.Err() != nil {
return nil, ctx.Err()
}
// Is the state a genesis state.
parentRoot := bytesutil.ToBytes32(b.Block.ParentRoot)
if parentRoot == params.BeaconConfig().ZeroHash {
return s.beaconDB.GenesisState(ctx)
}
// Does the state exist in the hot state cache.
if s.hotStateCache.Has(parentRoot) {
return s.hotStateCache.Get(parentRoot), nil
}
// Does the state exist in finalized info cache.
if s.isFinalizedRoot(parentRoot) {
return s.finalizedState(), nil
}
// Does the state exist in epoch boundary cache.
cachedInfo, ok, err := s.epochBoundaryStateCache.getByRoot(parentRoot)
if err != nil {
return nil, err
}
if ok {
return cachedInfo.state, nil
}
// Does the state exists in DB.
if s.beaconDB.HasState(ctx, parentRoot) {
return s.beaconDB.State(ctx, parentRoot)
}
b, err = s.beaconDB.Block(ctx, parentRoot)
if err != nil {
return nil, err
}
if b == nil {
return nil, errUnknownBlock
}
}
}

View File

@@ -1,237 +0,0 @@
package stategen
import (
"context"
"testing"
"github.com/gogo/protobuf/proto"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
logTest "github.com/sirupsen/logrus/hooks/test"
)
func TestSaveHotState_AlreadyHas(t *testing.T) {
hook := logTest.NewGlobal()
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
require.NoError(t, beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch))
r := [32]byte{'A'}
// Pre cache the hot state.
service.hotStateCache.Put(r, beaconState)
require.NoError(t, service.saveHotState(ctx, r, beaconState))
// Should not save the state and state summary.
assert.Equal(t, false, service.beaconDB.HasState(ctx, r), "Should not have saved the state")
assert.Equal(t, false, service.beaconDB.HasStateSummary(ctx, r), "Should have saved the state summary")
require.LogsDoNotContain(t, hook, "Saved full state on epoch boundary")
}
func TestSaveHotState_CanSaveOnEpochBoundary(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
require.NoError(t, beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch))
r := [32]byte{'A'}
require.NoError(t, service.saveHotState(ctx, r, beaconState))
// Should save both state and state summary.
_, ok, err := service.epochBoundaryStateCache.getByRoot(r)
require.NoError(t, err)
require.Equal(t, true, ok, "Did not save epoch boundary state")
assert.Equal(t, true, service.stateSummaryCache.Has(r), "Should have saved the state summary")
}
func TestSaveHotState_NoSaveNotEpochBoundary(t *testing.T) {
hook := logTest.NewGlobal()
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
require.NoError(t, beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch-1))
r := [32]byte{'A'}
b := testutil.NewBeaconBlock()
require.NoError(t, db.SaveBlock(ctx, b))
gRoot, err := b.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, db.SaveGenesisBlockRoot(ctx, gRoot))
require.NoError(t, service.saveHotState(ctx, r, beaconState))
// Should only save state summary.
assert.Equal(t, false, service.beaconDB.HasState(ctx, r), "Should not have saved the state")
assert.Equal(t, true, service.stateSummaryCache.Has(r), "Should have saved the state summary")
require.LogsDoNotContain(t, hook, "Saved full state on epoch boundary")
}
func TestLoadHoteStateByRoot_Cached(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
r := [32]byte{'A'}
service.hotStateCache.Put(r, beaconState)
// This tests where hot state was already cached.
loadedState, err := service.loadHotStateByRoot(ctx, r)
require.NoError(t, err)
if !proto.Equal(loadedState.InnerStateUnsafe(), beaconState.InnerStateUnsafe()) {
t.Error("Did not correctly cache state")
}
}
func TestLoadHoteStateByRoot_EpochBoundaryStateCanProcess(t *testing.T) {
ctx := context.Background()
db, ssc := testDB.SetupDB(t)
service := New(db, ssc)
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
gBlk := testutil.NewBeaconBlock()
gBlkRoot, err := gBlk.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, service.epochBoundaryStateCache.put(gBlkRoot, beaconState))
blk := testutil.NewBeaconBlock()
blk.Block.Slot = 11
blk.Block.ProposerIndex = 8
blk.Block.ParentRoot = gBlkRoot[:]
require.NoError(t, service.beaconDB.SaveBlock(ctx, blk))
blkRoot, err := blk.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, service.beaconDB.SaveStateSummary(ctx, &pb.StateSummary{Slot: 10, Root: blkRoot[:]}))
// This tests where hot state was not cached and needs processing.
loadedState, err := service.loadHotStateByRoot(ctx, blkRoot)
require.NoError(t, err)
assert.Equal(t, uint64(10), loadedState.Slot(), "Did not correctly load state")
}
func TestLoadHoteStateByRoot_FromDBBoundaryCase(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
blk := testutil.NewBeaconBlock()
blkRoot, err := blk.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, service.epochBoundaryStateCache.put(blkRoot, beaconState))
targetSlot := uint64(0)
require.NoError(t, service.beaconDB.SaveStateSummary(ctx, &pb.StateSummary{Slot: targetSlot, Root: blkRoot[:]}))
// This tests where hot state was not cached but doesn't need processing
// because it on the epoch boundary slot.
loadedState, err := service.loadHotStateByRoot(ctx, blkRoot)
require.NoError(t, err)
assert.Equal(t, targetSlot, loadedState.Slot(), "Did not correctly load state")
}
func TestLoadHoteStateBySlot_CanAdvanceSlotUsingDB(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
b := testutil.NewBeaconBlock()
require.NoError(t, service.beaconDB.SaveBlock(ctx, b))
gRoot, err := b.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, service.beaconDB.SaveGenesisBlockRoot(ctx, gRoot))
require.NoError(t, service.beaconDB.SaveState(ctx, beaconState, gRoot))
slot := uint64(10)
loadedState, err := service.loadHotStateBySlot(ctx, slot)
require.NoError(t, err)
assert.Equal(t, slot, loadedState.Slot(), "Did not correctly load state")
}
func TestLastAncestorState_CanGetUsingDB(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
b0 := testutil.NewBeaconBlock()
b0.Block.ParentRoot = bytesutil.PadTo([]byte{'a'}, 32)
r0, err := b0.Block.HashTreeRoot()
require.NoError(t, err)
b1 := testutil.NewBeaconBlock()
b1.Block.Slot = 1
b1.Block.ParentRoot = bytesutil.PadTo(r0[:], 32)
r1, err := b1.Block.HashTreeRoot()
require.NoError(t, err)
b2 := testutil.NewBeaconBlock()
b2.Block.Slot = 2
b2.Block.ParentRoot = bytesutil.PadTo(r1[:], 32)
r2, err := b2.Block.HashTreeRoot()
require.NoError(t, err)
b3 := testutil.NewBeaconBlock()
b3.Block.Slot = 3
b3.Block.ParentRoot = bytesutil.PadTo(r2[:], 32)
r3, err := b3.Block.HashTreeRoot()
require.NoError(t, err)
b1State := testutil.NewBeaconState()
require.NoError(t, b1State.SetSlot(1))
require.NoError(t, service.beaconDB.SaveBlock(ctx, b0))
require.NoError(t, service.beaconDB.SaveBlock(ctx, b1))
require.NoError(t, service.beaconDB.SaveBlock(ctx, b2))
require.NoError(t, service.beaconDB.SaveBlock(ctx, b3))
require.NoError(t, service.beaconDB.SaveState(ctx, b1State, r1))
lastState, err := service.lastAncestorState(ctx, r3)
require.NoError(t, err)
assert.Equal(t, b1State.Slot(), lastState.Slot(), "Did not get wanted state")
}
func TestLastAncestorState_CanGetUsingHotCache(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
b0 := testutil.NewBeaconBlock()
b0.Block.ParentRoot = bytesutil.PadTo([]byte{'a'}, 32)
r0, err := b0.Block.HashTreeRoot()
require.NoError(t, err)
b1 := testutil.NewBeaconBlock()
b1.Block.Slot = 1
b1.Block.ParentRoot = bytesutil.PadTo(r0[:], 32)
r1, err := b1.Block.HashTreeRoot()
require.NoError(t, err)
b2 := testutil.NewBeaconBlock()
b2.Block.Slot = 2
b2.Block.ParentRoot = bytesutil.PadTo(r1[:], 32)
r2, err := b2.Block.HashTreeRoot()
require.NoError(t, err)
b3 := testutil.NewBeaconBlock()
b3.Block.Slot = 3
b3.Block.ParentRoot = bytesutil.PadTo(r2[:], 32)
r3, err := b3.Block.HashTreeRoot()
require.NoError(t, err)
b1State := testutil.NewBeaconState()
require.NoError(t, b1State.SetSlot(1))
require.NoError(t, service.beaconDB.SaveBlock(ctx, b0))
require.NoError(t, service.beaconDB.SaveBlock(ctx, b1))
require.NoError(t, service.beaconDB.SaveBlock(ctx, b2))
require.NoError(t, service.beaconDB.SaveBlock(ctx, b3))
service.hotStateCache.Put(r1, b1State)
lastState, err := service.lastAncestorState(ctx, r3)
require.NoError(t, err)
assert.Equal(t, b1State.Slot(), lastState.Slot(), "Did not get wanted state")
}

View File

@@ -0,0 +1,16 @@
package stategen
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
replayBlockCount = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "replay_blocks_count",
Help: "The number of blocks to replay to generate a state",
Buckets: []float64{64, 256, 1024, 2048, 4096},
},
)
)

View File

@@ -15,53 +15,6 @@ import (
"go.opencensus.io/trace"
)
// ComputeStateUpToSlot returns a processed state up to input target slot.
// If the last processed block is at slot 32, given input target slot at 40, this
// returns processed state up to slot 40 via empty slots.
// If there's duplicated blocks in a single slot, the canonical block will be returned.
func (s *State) ComputeStateUpToSlot(ctx context.Context, targetSlot uint64) (*state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "stateGen.ComputeStateUpToSlot")
defer span.End()
// Return genesis state if target slot is 0.
if targetSlot == 0 {
return s.beaconDB.GenesisState(ctx)
}
lastBlockRoot, lastBlockSlot, err := s.lastSavedBlock(ctx, targetSlot)
if err != nil {
return nil, errors.Wrap(err, "could not get last saved block")
}
lastState, err := s.lastSavedState(ctx, targetSlot)
if err != nil {
return nil, errors.Wrap(err, "could not get last valid state")
}
if lastState == nil {
return nil, errUnknownState
}
// Short circuit if no block was saved, replay using slots only.
if lastBlockSlot == 0 {
return s.ReplayBlocks(ctx, lastState, []*ethpb.SignedBeaconBlock{}, targetSlot)
}
// Return if the last valid state's slot is higher than the target slot.
if lastState.Slot() >= targetSlot {
return lastState, nil
}
blks, err := s.LoadBlocks(ctx, lastState.Slot()+1, lastBlockSlot, lastBlockRoot)
if err != nil {
return nil, errors.Wrap(err, "could not load blocks")
}
lastState, err = s.ReplayBlocks(ctx, lastState, blks, targetSlot)
if err != nil {
return nil, errors.Wrap(err, "could not replay blocks")
}
return lastState, nil
}
// ReplayBlocks replays the input blocks on the input state until the target slot is reached.
func (s *State) ReplayBlocks(ctx context.Context, state *state.BeaconState, signed []*ethpb.SignedBeaconBlock, targetSlot uint64) (*state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "stateGen.ReplayBlocks")
@@ -308,48 +261,6 @@ func (s *State) genesisRoot(ctx context.Context) ([32]byte, error) {
return b.Block.HashTreeRoot()
}
// This retrieves the archived state in the DB.
func (s *State) archivedState(ctx context.Context, slot uint64) (*state.BeaconState, error) {
var st *state.BeaconState
sts, err := s.beaconDB.HighestSlotStatesBelow(ctx, slot+1)
if len(sts) > 0 {
st = sts[0]
}
return st, err
}
// This processes a state up to input slot.
func (s *State) processStateUpTo(ctx context.Context, state *state.BeaconState, slot uint64) (*state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "stateGen.processStateUpTo")
defer span.End()
// Short circuit if the slot is already less than pre state.
if state.Slot() >= slot {
return state, nil
}
_, lastBlockSlot, err := s.lastSavedBlock(ctx, slot)
if err != nil {
return nil, errors.Wrap(err, "could not get last saved block")
}
// Short circuit if no block was saved, replay using slots only.
if lastBlockSlot == 0 {
return s.ReplayBlocks(ctx, state, []*ethpb.SignedBeaconBlock{}, slot)
}
blks, err := s.loadFinalizedBlocks(ctx, state.Slot()+1, lastBlockSlot)
if err != nil {
return nil, errors.Wrap(err, "could not load blocks")
}
state, err = s.ReplayBlocks(ctx, state, blks, slot)
if err != nil {
return nil, errors.Wrap(err, "could not replay blocks")
}
return state, nil
}
// Given the start slot and the end slot, this returns the finalized beacon blocks in between.
// Since hot states don't have finalized blocks, this should ONLY be used for replaying cold state.
func (s *State) loadFinalizedBlocks(ctx context.Context, startSlot uint64, endSlot uint64) ([]*ethpb.SignedBeaconBlock, error) {

View File

@@ -18,48 +18,6 @@ import (
"github.com/prysmaticlabs/prysm/shared/testutil/require"
)
func TestComputeStateUpToSlot_GenesisState(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
gBlk := testutil.NewBeaconBlock()
gRoot, err := gBlk.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, service.beaconDB.SaveBlock(ctx, gBlk))
require.NoError(t, service.beaconDB.SaveGenesisBlockRoot(ctx, gRoot))
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
require.NoError(t, service.beaconDB.SaveState(ctx, beaconState, gRoot))
s, err := service.ComputeStateUpToSlot(ctx, 0)
require.NoError(t, err)
if !proto.Equal(s.InnerStateUnsafe(), beaconState.InnerStateUnsafe()) {
t.Error("Did not receive correct genesis state")
}
}
func TestComputeStateUpToSlot_CanProcessUpTo(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
gBlk := testutil.NewBeaconBlock()
gRoot, err := gBlk.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, service.beaconDB.SaveBlock(ctx, gBlk))
require.NoError(t, service.beaconDB.SaveGenesisBlockRoot(ctx, gRoot))
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
require.NoError(t, service.beaconDB.SaveState(ctx, beaconState, gRoot))
s, err := service.ComputeStateUpToSlot(ctx, params.BeaconConfig().SlotsPerEpoch+1)
require.NoError(t, err)
assert.Equal(t, params.BeaconConfig().SlotsPerEpoch+1, s.Slot(), "Did not receive correct processed state")
}
func TestReplayBlocks_AllSkipSlots(t *testing.T) {
db, _ := testDB.SetupDB(t)
@@ -423,48 +381,6 @@ func TestLastSavedState_NoSavedBlockState(t *testing.T) {
assert.ErrorContains(t, errUnknownState.Error(), err)
}
func TestArchivedState_CanGetSpecificIndex(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
r := [32]byte{'a'}
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
require.NoError(t, db.SaveState(ctx, beaconState, r))
got, err := service.archivedState(ctx, params.BeaconConfig().SlotsPerArchivedPoint)
require.NoError(t, err)
assert.DeepEqual(t, beaconState.InnerStateUnsafe(), got.InnerStateUnsafe(), "Did not get wanted state")
got, err = service.archivedState(ctx, params.BeaconConfig().SlotsPerArchivedPoint*2)
require.NoError(t, err)
assert.DeepEqual(t, beaconState.InnerStateUnsafe(), got.InnerStateUnsafe(), "Did not get wanted state")
}
func TestProcessStateUpToSlot_CanExitEarly(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
require.NoError(t, beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch+1))
s, err := service.processStateUpTo(ctx, beaconState, params.BeaconConfig().SlotsPerEpoch)
require.NoError(t, err)
assert.Equal(t, params.BeaconConfig().SlotsPerEpoch+1, s.Slot(), "Did not receive correct processed state")
}
func TestProcessStateUpToSlot_CanProcess(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
s, err := service.processStateUpTo(ctx, beaconState, params.BeaconConfig().SlotsPerEpoch+1)
require.NoError(t, err)
assert.Equal(t, params.BeaconConfig().SlotsPerEpoch+1, s.Slot(), "Did not receive correct processed state")
}
// tree1 constructs the following tree:
// B0 - B1 - - B3 -- B5
// \- B2 -- B4 -- B6 ----- B8

View File

@@ -3,24 +3,21 @@ package stategen
import (
"context"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/state"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
"go.opencensus.io/trace"
)
// SaveState saves the state in the DB.
// It knows which cold and hot state section the input state should belong to.
// SaveState saves the state in the cache and/or DB.
func (s *State) SaveState(ctx context.Context, root [32]byte, state *state.BeaconState) error {
ctx, span := trace.StartSpan(ctx, "stateGen.SaveState")
defer span.End()
// The state belongs to the cold section if it's below the split slot threshold.
if state.Slot() < s.finalizedInfo.slot {
return s.saveColdState(ctx, root, state)
}
return s.saveHotState(ctx, root, state)
return s.saveStateByRoot(ctx, root, state)
}
// ForceCheckpoint initiates a cold state save of the given state. This method does not update the
@@ -36,7 +33,7 @@ func (s *State) ForceCheckpoint(ctx context.Context, root []byte) error {
return nil
}
fs, err := s.loadHotStateByRoot(ctx, root32)
fs, err := s.loadStateByRoot(ctx, root32)
if err != nil {
return err
}
@@ -46,3 +43,44 @@ func (s *State) ForceCheckpoint(ctx context.Context, root []byte) error {
return nil
}
// SaveStateSummary saves the relevant state summary for a block and its corresponding state slot in the
// state summary cache.
func (s *State) SaveStateSummary(ctx context.Context, blk *ethpb.SignedBeaconBlock, blockRoot [32]byte) {
// Save State summary
s.stateSummaryCache.Put(blockRoot, &pb.StateSummary{
Slot: blk.Block.Slot,
Root: blockRoot[:],
})
}
// This saves a post beacon state. On the epoch boundary,
// it saves a full state. On an intermediate slot, it saves a back pointer to the
// nearest epoch boundary state.
func (s *State) saveStateByRoot(ctx context.Context, blockRoot [32]byte, state *state.BeaconState) error {
ctx, span := trace.StartSpan(ctx, "stateGen.saveStateByRoot")
defer span.End()
// If the hot state is already in cache, one can be sure the state was processed and in the DB.
if s.hotStateCache.Has(blockRoot) {
return nil
}
// Only on an epoch boundary slot, saves epoch boundary state in epoch boundary root state cache.
if helpers.IsEpochStart(state.Slot()) {
if err := s.epochBoundaryStateCache.put(blockRoot, state); err != nil {
return err
}
}
// On an intermediate slots, save the hot state summary.
s.stateSummaryCache.Put(blockRoot, &pb.StateSummary{
Slot: state.Slot(),
Root: blockRoot[:],
})
// Store the copied state in the hot state cache.
s.hotStateCache.Put(blockRoot, state)
return nil
}

View File

@@ -13,29 +13,6 @@ import (
logTest "github.com/sirupsen/logrus/hooks/test"
)
func TestSaveState_ColdStateCanBeSaved(t *testing.T) {
hook := logTest.NewGlobal()
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
service.slotsPerArchivedPoint = 1
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
// This goes to cold section.
slot := uint64(1)
require.NoError(t, beaconState.SetSlot(slot))
service.finalizedInfo.slot = slot + 1
r := [32]byte{'a'}
require.NoError(t, service.SaveState(ctx, r, beaconState))
assert.Equal(t, true, service.beaconDB.HasArchivedPoint(ctx, 1), "Did not save cold state")
assert.Equal(t, r, service.beaconDB.ArchivedPointRoot(ctx, 1), "Did not get wanted root")
require.LogsContain(t, hook, "Saved full state on archived point")
}
func TestSaveState_HotStateCanBeSaved(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
@@ -95,3 +72,63 @@ func TestState_ForceCheckpoint_SavesStateToDatabase(t *testing.T) {
// Should not panic with genesis finalized root.
assert.NoError(t, svc.ForceCheckpoint(ctx, params.BeaconConfig().ZeroHash[:]))
}
func TestSaveState_AlreadyHas(t *testing.T) {
hook := logTest.NewGlobal()
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
require.NoError(t, beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch))
r := [32]byte{'A'}
// Pre cache the hot state.
service.hotStateCache.Put(r, beaconState)
require.NoError(t, service.saveStateByRoot(ctx, r, beaconState))
// Should not save the state and state summary.
assert.Equal(t, false, service.beaconDB.HasState(ctx, r), "Should not have saved the state")
assert.Equal(t, false, service.beaconDB.HasStateSummary(ctx, r), "Should have saved the state summary")
require.LogsDoNotContain(t, hook, "Saved full state on epoch boundary")
}
func TestSaveState_CanSaveOnEpochBoundary(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
require.NoError(t, beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch))
r := [32]byte{'A'}
require.NoError(t, service.saveStateByRoot(ctx, r, beaconState))
// Should save both state and state summary.
_, ok, err := service.epochBoundaryStateCache.getByRoot(r)
require.NoError(t, err)
require.Equal(t, true, ok, "Did not save epoch boundary state")
assert.Equal(t, true, service.stateSummaryCache.Has(r), "Should have saved the state summary")
}
func TestSaveState_NoSaveNotEpochBoundary(t *testing.T) {
hook := logTest.NewGlobal()
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
require.NoError(t, beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch-1))
r := [32]byte{'A'}
b := testutil.NewBeaconBlock()
require.NoError(t, db.SaveBlock(ctx, b))
gRoot, err := b.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, db.SaveGenesisBlockRoot(ctx, gRoot))
require.NoError(t, service.SaveState(ctx, r, beaconState))
// Should only save state summary.
assert.Equal(t, false, service.beaconDB.HasState(ctx, r), "Should not have saved the state")
assert.Equal(t, true, service.stateSummaryCache.Has(r), "Should have saved the state summary")
require.LogsDoNotContain(t, hook, "Saved full state on epoch boundary")
}