decreasing stategen interface surface area (#11132)

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
kasey
2022-08-11 15:12:05 -05:00
committed by GitHub
parent f8ff36f534
commit 6b42a0a3a1
9 changed files with 41 additions and 90 deletions

View File

@@ -29,7 +29,7 @@ func TestService_getBlock(t *testing.T) {
// block in cache
b, err := blocks.NewSignedBeaconBlock(b1)
require.NoError(t, err)
s.saveInitSyncBlock(ctx, r1, b)
require.NoError(t, s.saveInitSyncBlock(ctx, r1, b))
got, err := s.getBlock(ctx, r1)
require.NoError(t, err)
require.DeepEqual(t, b, got)
@@ -59,7 +59,7 @@ func TestService_hasBlockInInitSyncOrDB(t *testing.T) {
// block in cache
b, err := blocks.NewSignedBeaconBlock(b1)
require.NoError(t, err)
s.saveInitSyncBlock(ctx, r1, b)
require.NoError(t, s.saveInitSyncBlock(ctx, r1, b))
require.Equal(t, true, s.hasBlockInInitSyncOrDB(ctx, r1))
// block in db

View File

@@ -17,7 +17,7 @@ var ErrNoDataForSlot = errors.New("cannot retrieve data for slot")
// HasState returns true if the state exists in cache or in DB.
func (s *State) HasState(ctx context.Context, blockRoot [32]byte) (bool, error) {
has, err := s.HasStateInCache(ctx, blockRoot)
has, err := s.hasStateInCache(ctx, blockRoot)
if err != nil {
return false, err
}
@@ -27,8 +27,8 @@ func (s *State) HasState(ctx context.Context, blockRoot [32]byte) (bool, error)
return s.beaconDB.HasState(ctx, blockRoot), nil
}
// HasStateInCache returns true if the state exists in cache.
func (s *State) HasStateInCache(_ context.Context, blockRoot [32]byte) (bool, error) {
// hasStateInCache returns true if the state exists in cache.
func (s *State) hasStateInCache(_ context.Context, blockRoot [32]byte) (bool, error) {
if s.hotStateCache.has(blockRoot) {
return true, nil
}
@@ -87,7 +87,7 @@ func (s *State) StateByRootInitialSync(ctx context.Context, blockRoot [32]byte)
return cachedInfo.state, nil
}
startState, err := s.LastAncestorState(ctx, blockRoot)
startState, err := s.latestAncestor(ctx, blockRoot)
if err != nil {
return nil, errors.Wrap(err, "could not get ancestor state")
}
@@ -102,11 +102,11 @@ func (s *State) StateByRootInitialSync(ctx context.Context, blockRoot [32]byte)
return startState, nil
}
blks, err := s.LoadBlocks(ctx, startState.Slot()+1, summary.Slot, bytesutil.ToBytes32(summary.Root))
blks, err := s.loadBlocks(ctx, startState.Slot()+1, summary.Slot, bytesutil.ToBytes32(summary.Root))
if err != nil {
return nil, errors.Wrap(err, "could not load blocks")
}
startState, err = s.ReplayBlocks(ctx, startState, blks, summary.Slot)
startState, err = s.replayBlocks(ctx, startState, blks, summary.Slot)
if err != nil {
return nil, errors.Wrap(err, "could not replay blocks")
}
@@ -125,13 +125,13 @@ func (s *State) stateSummary(ctx context.Context, blockRoot [32]byte) (*ethpb.St
}
if summary == nil {
return s.RecoverStateSummary(ctx, blockRoot)
return s.recoverStateSummary(ctx, blockRoot)
}
return summary, nil
}
// RecoverStateSummary recovers state summary object of a given block root by using the saved block in DB.
func (s *State) RecoverStateSummary(ctx context.Context, blockRoot [32]byte) (*ethpb.StateSummary, error) {
func (s *State) recoverStateSummary(ctx context.Context, blockRoot [32]byte) (*ethpb.StateSummary, error) {
if s.beaconDB.HasBlock(ctx, blockRoot) {
b, err := s.beaconDB.Block(ctx, blockRoot)
if err != nil {
@@ -185,7 +185,7 @@ func (s *State) loadStateByRoot(ctx context.Context, blockRoot [32]byte) (state.
// Since the requested state is not in caches or DB, start replaying using the last
// available ancestor state which is retrieved using input block's root.
startState, err := s.LastAncestorState(ctx, blockRoot)
startState, err := s.latestAncestor(ctx, blockRoot)
if err != nil {
return nil, errors.Wrap(err, "could not get ancestor state")
}
@@ -197,17 +197,17 @@ func (s *State) loadStateByRoot(ctx context.Context, blockRoot [32]byte) (state.
return startState, nil
}
blks, err := s.LoadBlocks(ctx, startState.Slot()+1, targetSlot, bytesutil.ToBytes32(summary.Root))
blks, err := s.loadBlocks(ctx, startState.Slot()+1, targetSlot, bytesutil.ToBytes32(summary.Root))
if err != nil {
return nil, errors.Wrap(err, "could not load blocks for hot state using root")
}
replayBlockCount.Observe(float64(len(blks)))
return s.ReplayBlocks(ctx, startState, blks, targetSlot)
return s.replayBlocks(ctx, startState, blks, targetSlot)
}
// LastAncestorState returns the highest available ancestor state of the input block root.
// latestAncestor returns the highest available ancestor state of the input block root.
// It recursively looks up block's parent until a corresponding state of the block root
// is found in the caches or DB.
//
@@ -215,8 +215,8 @@ func (s *State) loadStateByRoot(ctx context.Context, blockRoot [32]byte) (state.
// 1) block parent state is the last finalized state
// 2) block parent state is the epoch boundary state and exists in epoch boundary cache
// 3) block parent state is in DB
func (s *State) LastAncestorState(ctx context.Context, blockRoot [32]byte) (state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "stateGen.LastAncestorState")
func (s *State) latestAncestor(ctx context.Context, blockRoot [32]byte) (state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "stateGen.latestAncestor")
defer span.End()
if s.isFinalizedRoot(blockRoot) && s.finalizedState() != nil {

View File

@@ -385,7 +385,7 @@ func TestLastAncestorState_CanGetUsingDB(t *testing.T) {
util.SaveBlock(t, ctx, service.beaconDB, b3)
require.NoError(t, service.beaconDB.SaveState(ctx, b1State, r1))
lastState, err := service.LastAncestorState(ctx, r3)
lastState, err := service.latestAncestor(ctx, r3)
require.NoError(t, err)
assert.Equal(t, b1State.Slot(), lastState.Slot(), "Did not get wanted state")
}
@@ -425,7 +425,7 @@ func TestLastAncestorState_CanGetUsingCache(t *testing.T) {
util.SaveBlock(t, ctx, service.beaconDB, b3)
service.hotStateCache.put(r1, b1State)
lastState, err := service.LastAncestorState(ctx, r3)
lastState, err := service.latestAncestor(ctx, r3)
require.NoError(t, err)
assert.Equal(t, b1State.Slot(), lastState.Slot(), "Did not get wanted state")
}
@@ -483,7 +483,7 @@ func TestState_HasStateInCache(t *testing.T) {
{rMiss, false},
}
for _, tc := range tt {
got, err := service.HasStateInCache(ctx, tc.root)
got, err := service.hasStateInCache(ctx, tc.root)
require.NoError(t, err)
require.Equal(t, tc.want, got)
}

View File

@@ -12,8 +12,6 @@ go_library(
deps = [
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
],
)

View File

@@ -4,9 +4,7 @@ import (
"context"
"github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/consensus-types/interfaces"
types "github.com/prysmaticlabs/prysm/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
)
// MockStateManager is a fake implementation of StateManager.
@@ -43,35 +41,11 @@ func (_ *MockStateManager) MigrateToCold(_ context.Context, _ [32]byte) error {
panic("implement me")
}
// ReplayBlocks --
func (_ *MockStateManager) ReplayBlocks(
_ context.Context,
_ state.BeaconState,
_ []interfaces.SignedBeaconBlock,
_ types.Slot,
) (state.BeaconState, error) {
panic("implement me")
}
// LoadBlocks --
func (_ *MockStateManager) LoadBlocks(
_ context.Context,
_, _ types.Slot,
_ [32]byte,
) ([]interfaces.SignedBeaconBlock, error) {
panic("implement me")
}
// HasState --
func (_ *MockStateManager) HasState(_ context.Context, _ [32]byte) (bool, error) {
panic("implement me")
}
// HasStateInCache --
func (_ *MockStateManager) HasStateInCache(_ context.Context, _ [32]byte) (bool, error) {
panic("implement me")
}
// StateByRoot --
func (m *MockStateManager) StateByRoot(_ context.Context, blockRoot [32]byte) (state.BeaconState, error) {
return m.StatesByRoot[blockRoot], nil
@@ -87,14 +61,6 @@ func (m *MockStateManager) StateBySlot(_ context.Context, slot types.Slot) (stat
return m.StatesBySlot[slot], nil
}
// RecoverStateSummary --
func (_ *MockStateManager) RecoverStateSummary(
_ context.Context,
_ [32]byte,
) (*ethpb.StateSummary, error) {
panic("implement me")
}
// SaveState --
func (_ *MockStateManager) SaveState(_ context.Context, _ [32]byte, _ state.BeaconState) error {
panic("implement me")

View File

@@ -25,13 +25,13 @@ import (
// ReplayBlocks replays the input blocks on the input state until the target slot is reached.
//
// WARNING Blocks passed to the function must be in decreasing slots order.
func (_ *State) ReplayBlocks(
func (_ *State) replayBlocks(
ctx context.Context,
state state.BeaconState,
signed []interfaces.SignedBeaconBlock,
targetSlot types.Slot,
) (state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "stateGen.ReplayBlocks")
ctx, span := trace.StartSpan(ctx, "stateGen.replayBlocks")
defer span.End()
var err error
@@ -77,9 +77,9 @@ func (_ *State) ReplayBlocks(
return state, nil
}
// LoadBlocks loads the blocks between start slot and end slot by recursively fetching from end block root.
// loadBlocks loads the blocks between start slot and end slot by recursively fetching from end block root.
// The Blocks are returned in slot-descending order.
func (s *State) LoadBlocks(ctx context.Context, startSlot, endSlot types.Slot, endBlockRoot [32]byte) ([]interfaces.SignedBeaconBlock, error) {
func (s *State) loadBlocks(ctx context.Context, startSlot, endSlot types.Slot, endBlockRoot [32]byte) ([]interfaces.SignedBeaconBlock, error) {
// Nothing to load for invalid range.
if endSlot < startSlot {
return nil, fmt.Errorf("start slot %d >= end slot %d", startSlot, endSlot)

View File

@@ -43,7 +43,7 @@ func TestReplayBlocks_AllSkipSlots(t *testing.T) {
service := New(beaconDB)
targetSlot := params.BeaconConfig().SlotsPerEpoch - 1
newState, err := service.ReplayBlocks(context.Background(), beaconState, []interfaces.SignedBeaconBlock{}, targetSlot)
newState, err := service.replayBlocks(context.Background(), beaconState, []interfaces.SignedBeaconBlock{}, targetSlot)
require.NoError(t, err)
assert.Equal(t, targetSlot, newState.Slot(), "Did not advance slots")
}
@@ -72,7 +72,7 @@ func TestReplayBlocks_SameSlot(t *testing.T) {
service := New(beaconDB)
targetSlot := beaconState.Slot()
newState, err := service.ReplayBlocks(context.Background(), beaconState, []interfaces.SignedBeaconBlock{}, targetSlot)
newState, err := service.replayBlocks(context.Background(), beaconState, []interfaces.SignedBeaconBlock{}, targetSlot)
require.NoError(t, err)
assert.Equal(t, targetSlot, newState.Slot(), "Did not advance slots")
}
@@ -106,7 +106,7 @@ func TestReplayBlocks_LowerSlotBlock(t *testing.T) {
b.Block.Slot = beaconState.Slot() - 1
wsb, err := consensusblocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
newState, err := service.ReplayBlocks(context.Background(), beaconState, []interfaces.SignedBeaconBlock{wsb}, targetSlot)
newState, err := service.replayBlocks(context.Background(), beaconState, []interfaces.SignedBeaconBlock{wsb}, targetSlot)
require.NoError(t, err)
assert.Equal(t, targetSlot, newState.Slot(), "Did not advance slots")
}
@@ -132,7 +132,7 @@ func TestReplayBlocks_ThroughForkBoundary(t *testing.T) {
service := New(testDB.SetupDB(t))
targetSlot := params.BeaconConfig().SlotsPerEpoch
newState, err := service.ReplayBlocks(context.Background(), beaconState, []interfaces.SignedBeaconBlock{}, targetSlot)
newState, err := service.replayBlocks(context.Background(), beaconState, []interfaces.SignedBeaconBlock{}, targetSlot)
require.NoError(t, err)
// Verify state is version Altair.
@@ -162,7 +162,7 @@ func TestReplayBlocks_ThroughBellatrixForkBoundary(t *testing.T) {
service := New(testDB.SetupDB(t))
targetSlot := params.BeaconConfig().SlotsPerEpoch * 2
newState, err := service.ReplayBlocks(context.Background(), beaconState, []interfaces.SignedBeaconBlock{}, targetSlot)
newState, err := service.replayBlocks(context.Background(), beaconState, []interfaces.SignedBeaconBlock{}, targetSlot)
require.NoError(t, err)
// Verify state is version Altair.
@@ -179,7 +179,7 @@ func TestLoadBlocks_FirstBranch(t *testing.T) {
roots, savedBlocks, err := tree1(t, beaconDB, bytesutil.PadTo([]byte{'A'}, 32))
require.NoError(t, err)
filteredBlocks, err := s.LoadBlocks(ctx, 0, 8, roots[len(roots)-1])
filteredBlocks, err := s.loadBlocks(ctx, 0, 8, roots[len(roots)-1])
require.NoError(t, err)
wanted := []*ethpb.SignedBeaconBlock{
@@ -210,7 +210,7 @@ func TestLoadBlocks_SecondBranch(t *testing.T) {
roots, savedBlocks, err := tree1(t, beaconDB, bytesutil.PadTo([]byte{'A'}, 32))
require.NoError(t, err)
filteredBlocks, err := s.LoadBlocks(ctx, 0, 5, roots[5])
filteredBlocks, err := s.loadBlocks(ctx, 0, 5, roots[5])
require.NoError(t, err)
wanted := []*ethpb.SignedBeaconBlock{
@@ -239,7 +239,7 @@ func TestLoadBlocks_ThirdBranch(t *testing.T) {
roots, savedBlocks, err := tree1(t, beaconDB, bytesutil.PadTo([]byte{'A'}, 32))
require.NoError(t, err)
filteredBlocks, err := s.LoadBlocks(ctx, 0, 7, roots[7])
filteredBlocks, err := s.loadBlocks(ctx, 0, 7, roots[7])
require.NoError(t, err)
wanted := []*ethpb.SignedBeaconBlock{
@@ -270,7 +270,7 @@ func TestLoadBlocks_SameSlots(t *testing.T) {
roots, savedBlocks, err := tree2(t, beaconDB, bytesutil.PadTo([]byte{'A'}, 32))
require.NoError(t, err)
filteredBlocks, err := s.LoadBlocks(ctx, 0, 3, roots[6])
filteredBlocks, err := s.loadBlocks(ctx, 0, 3, roots[6])
require.NoError(t, err)
wanted := []*ethpb.SignedBeaconBlock{
@@ -299,7 +299,7 @@ func TestLoadBlocks_SameEndSlots(t *testing.T) {
roots, savedBlocks, err := tree3(t, beaconDB, bytesutil.PadTo([]byte{'A'}, 32))
require.NoError(t, err)
filteredBlocks, err := s.LoadBlocks(ctx, 0, 2, roots[2])
filteredBlocks, err := s.loadBlocks(ctx, 0, 2, roots[2])
require.NoError(t, err)
wanted := []*ethpb.SignedBeaconBlock{
@@ -327,7 +327,7 @@ func TestLoadBlocks_SameEndSlotsWith2blocks(t *testing.T) {
roots, savedBlocks, err := tree4(t, beaconDB, bytesutil.PadTo([]byte{'A'}, 32))
require.NoError(t, err)
filteredBlocks, err := s.LoadBlocks(ctx, 0, 2, roots[1])
filteredBlocks, err := s.loadBlocks(ctx, 0, 2, roots[1])
require.NoError(t, err)
wanted := []*ethpb.SignedBeaconBlock{
@@ -353,7 +353,7 @@ func TestLoadBlocks_BadStart(t *testing.T) {
roots, _, err := tree1(t, beaconDB, bytesutil.PadTo([]byte{'A'}, 32))
require.NoError(t, err)
_, err = s.LoadBlocks(ctx, 0, 5, roots[8])
_, err = s.loadBlocks(ctx, 0, 5, roots[8])
assert.ErrorContains(t, "end block roots don't match", err)
}

View File

@@ -12,10 +12,8 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/beacon-chain/sync/backfill"
"github.com/prysmaticlabs/prysm/config/params"
"github.com/prysmaticlabs/prysm/consensus-types/interfaces"
types "github.com/prysmaticlabs/prysm/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"go.opencensus.io/trace"
)
@@ -25,21 +23,17 @@ var defaultHotStateDBInterval types.Slot = 128
// logic of maintaining both hot and cold states in DB.
type StateManager interface {
Resume(ctx context.Context, fState state.BeaconState) (state.BeaconState, error)
DisableSaveHotStateToDB(ctx context.Context) error
EnableSaveHotStateToDB(_ context.Context)
HasState(ctx context.Context, blockRoot [32]byte) (bool, error)
DeleteStateFromCaches(ctx context.Context, blockRoot [32]byte) error
ForceCheckpoint(ctx context.Context, root []byte) error
SaveState(ctx context.Context, blockRoot [32]byte, st state.BeaconState) error
SaveFinalizedState(fSlot types.Slot, fRoot [32]byte, fState state.BeaconState)
MigrateToCold(ctx context.Context, fRoot [32]byte) error
ReplayBlocks(ctx context.Context, state state.BeaconState, signed []interfaces.SignedBeaconBlock, targetSlot types.Slot) (state.BeaconState, error)
LoadBlocks(ctx context.Context, startSlot, endSlot types.Slot, endBlockRoot [32]byte) ([]interfaces.SignedBeaconBlock, error)
HasState(ctx context.Context, blockRoot [32]byte) (bool, error)
HasStateInCache(ctx context.Context, blockRoot [32]byte) (bool, error)
StateByRoot(ctx context.Context, blockRoot [32]byte) (state.BeaconState, error)
StateByRootIfCachedNoCopy(blockRoot [32]byte) state.BeaconState
StateByRootInitialSync(ctx context.Context, blockRoot [32]byte) (state.BeaconState, error)
RecoverStateSummary(ctx context.Context, blockRoot [32]byte) (*ethpb.StateSummary, error)
SaveState(ctx context.Context, blockRoot [32]byte, st state.BeaconState) error
ForceCheckpoint(ctx context.Context, root []byte) error
EnableSaveHotStateToDB(_ context.Context)
DisableSaveHotStateToDB(ctx context.Context) error
DeleteStateFromCaches(ctx context.Context, blockRoot [32]byte) error
}
// State is a concrete implementation of StateManager.

View File

@@ -216,13 +216,6 @@ func (s *Service) validateBeaconBlock(ctx context.Context, blk interfaces.Signed
return err
}
hasStateSummaryDB := s.cfg.beaconDB.HasStateSummary(ctx, bytesutil.ToBytes32(blk.Block().ParentRoot()))
if !hasStateSummaryDB {
_, err := s.cfg.stateGen.RecoverStateSummary(ctx, bytesutil.ToBytes32(blk.Block().ParentRoot()))
if err != nil {
return err
}
}
parentState, err := s.cfg.stateGen.StateByRoot(ctx, bytesutil.ToBytes32(blk.Block().ParentRoot()))
if err != nil {
return err