mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-05-02 03:02:54 -04:00
Save state to DB during long non-finality (#7597)
* Starting saving state during hot * Add a log * Add helpers to turn on/off mode * Add locks * Add missing return * Clean up * Add logic to migration to handle db roots * Add tests for on and off * Add more tests * Add test for migrate * @prestonvanloon's feedback Co-authored-by: Raul Jordan <raul@prysmaticlabs.com> Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
@@ -7,12 +7,16 @@ import (
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
|
||||
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
|
||||
"github.com/prysmaticlabs/prysm/shared/traceutil"
|
||||
"github.com/sirupsen/logrus"
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
// This defines how many epochs since finality the run time will begin to save hot state on to the DB.
|
||||
var epochsSinceFinalitySaveHotStateDB = 100
|
||||
|
||||
// BlockReceiver interface defines the methods of chain service receive and processing new blocks.
|
||||
type BlockReceiver interface {
|
||||
ReceiveBlock(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error
|
||||
@@ -58,6 +62,11 @@ func (s *Service) ReceiveBlock(ctx context.Context, block *ethpb.SignedBeaconBlo
|
||||
return err
|
||||
}
|
||||
|
||||
// Have we been finalizing? Should we start saving hot states to db?
|
||||
if err := s.checkSaveHotStateDB(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Reports on block and fork choice metrics.
|
||||
reportSlotMetrics(blockCopy.Block.Slot, s.HeadSlot(), s.CurrentSlot(), s.finalizedCheckpt)
|
||||
|
||||
@@ -179,3 +188,21 @@ func (s *Service) handlePostBlockOperations(b *ethpb.BeaconBlock) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// This checks whether it's time to start saving hot state to DB.
|
||||
// It's time when there's `epochsSinceFinalitySaveHotStateDB` epochs of non-finality.
|
||||
func (s *Service) checkSaveHotStateDB(ctx context.Context) error {
|
||||
currentEpoch := helpers.SlotToEpoch(s.CurrentSlot())
|
||||
// Prevent `sinceFinality` going underflow.
|
||||
var sinceFinality uint64
|
||||
if currentEpoch > s.finalizedCheckpt.Epoch {
|
||||
sinceFinality = currentEpoch - s.finalizedCheckpt.Epoch
|
||||
}
|
||||
|
||||
if sinceFinality >= uint64(epochsSinceFinalitySaveHotStateDB) {
|
||||
s.stateGen.EnableSaveHotStateToDB(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
return s.stateGen.DisableSaveHotStateToDB(ctx)
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
blockchainTesting "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
|
||||
@@ -17,6 +18,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil/require"
|
||||
logTest "github.com/sirupsen/logrus/hooks/test"
|
||||
)
|
||||
|
||||
func TestService_ReceiveBlock(t *testing.T) {
|
||||
@@ -373,3 +375,41 @@ func TestService_HasInitSyncBlock(t *testing.T) {
|
||||
t.Error("Should have block")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCheckSaveHotStateDB_Enabling(t *testing.T) {
|
||||
db, stateSummaryCache := testDB.SetupDB(t)
|
||||
hook := logTest.NewGlobal()
|
||||
s, err := NewService(context.Background(), &Config{StateGen: stategen.New(db, stateSummaryCache)})
|
||||
require.NoError(t, err)
|
||||
st := params.BeaconConfig().SlotsPerEpoch * uint64(epochsSinceFinalitySaveHotStateDB)
|
||||
s.genesisTime = time.Now().Add(time.Duration(-1*int64(st)*int64(params.BeaconConfig().SecondsPerSlot)) * time.Second)
|
||||
s.finalizedCheckpt = ðpb.Checkpoint{}
|
||||
|
||||
require.NoError(t, s.checkSaveHotStateDB(context.Background()))
|
||||
assert.LogsContain(t, hook, "Entering mode to save hot states in DB")
|
||||
}
|
||||
|
||||
func TestCheckSaveHotStateDB_Disabling(t *testing.T) {
|
||||
db, stateSummaryCache := testDB.SetupDB(t)
|
||||
hook := logTest.NewGlobal()
|
||||
s, err := NewService(context.Background(), &Config{StateGen: stategen.New(db, stateSummaryCache)})
|
||||
require.NoError(t, err)
|
||||
s.finalizedCheckpt = ðpb.Checkpoint{}
|
||||
require.NoError(t, s.checkSaveHotStateDB(context.Background()))
|
||||
s.genesisTime = time.Now()
|
||||
|
||||
require.NoError(t, s.checkSaveHotStateDB(context.Background()))
|
||||
assert.LogsContain(t, hook, "Exiting mode to save hot states in DB")
|
||||
}
|
||||
|
||||
func TestCheckSaveHotStateDB_Overflow(t *testing.T) {
|
||||
db, stateSummaryCache := testDB.SetupDB(t)
|
||||
hook := logTest.NewGlobal()
|
||||
s, err := NewService(context.Background(), &Config{StateGen: stategen.New(db, stateSummaryCache)})
|
||||
require.NoError(t, err)
|
||||
s.finalizedCheckpt = ðpb.Checkpoint{Epoch: 10000000}
|
||||
s.genesisTime = time.Now()
|
||||
|
||||
require.NoError(t, s.checkSaveHotStateDB(context.Background()))
|
||||
assert.LogsDoNotContain(t, hook, "Entering mode to save hot states in DB")
|
||||
}
|
||||
|
||||
@@ -76,6 +76,18 @@ func (s *State) MigrateToCold(ctx context.Context, fRoot [32]byte) error {
|
||||
aState = missingState
|
||||
}
|
||||
if s.beaconDB.HasState(ctx, aRoot) {
|
||||
// Remove hot state DB root to prevent it gets deleted later when we turn hot state save DB mode off.
|
||||
s.saveHotStateDB.lock.Lock()
|
||||
roots := s.saveHotStateDB.savedStateRoots
|
||||
for i := 0; i < len(roots); i++ {
|
||||
if aRoot == roots[i] {
|
||||
s.saveHotStateDB.savedStateRoots = append(roots[:i], roots[i+1:]...)
|
||||
// There shouldn't be duplicated roots in `savedStateRoots`.
|
||||
// Break here is ok.
|
||||
break
|
||||
}
|
||||
}
|
||||
s.saveHotStateDB.lock.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
@@ -96,3 +96,27 @@ func TestMigrateToCold_RegeneratePath(t *testing.T) {
|
||||
|
||||
require.LogsContain(t, hook, "Saved state in DB")
|
||||
}
|
||||
|
||||
func TestMigrateToCold_StateExistsInDB(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
ctx := context.Background()
|
||||
db, _ := testDB.SetupDB(t)
|
||||
|
||||
service := New(db, cache.NewStateSummaryCache())
|
||||
service.slotsPerArchivedPoint = 1
|
||||
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
|
||||
stateSlot := uint64(1)
|
||||
require.NoError(t, beaconState.SetSlot(stateSlot))
|
||||
b := testutil.NewBeaconBlock()
|
||||
b.Block.Slot = 2
|
||||
fRoot, err := b.Block.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, service.beaconDB.SaveBlock(ctx, b))
|
||||
require.NoError(t, service.epochBoundaryStateCache.put(fRoot, beaconState))
|
||||
require.NoError(t, service.beaconDB.SaveState(ctx, beaconState, fRoot))
|
||||
|
||||
service.saveHotStateDB.savedStateRoots = [][32]byte{{1}, {2}, {3}, {4}, fRoot}
|
||||
require.NoError(t, service.MigrateToCold(ctx, fRoot))
|
||||
assert.DeepEqual(t, [][32]byte{{1}, {2}, {3}, {4}}, service.saveHotStateDB.savedStateRoots)
|
||||
assert.LogsDoNotContain(t, hook, "Saved state in DB")
|
||||
}
|
||||
|
||||
@@ -14,6 +14,8 @@ import (
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
var defaultHotStateDBInterval uint64 = 128 // slots
|
||||
|
||||
// State represents a management object that handles the internal
|
||||
// logic of maintaining both hot and cold states in DB.
|
||||
type State struct {
|
||||
@@ -23,6 +25,17 @@ type State struct {
|
||||
finalizedInfo *finalizedInfo
|
||||
stateSummaryCache *cache.StateSummaryCache
|
||||
epochBoundaryStateCache *epochBoundaryState
|
||||
saveHotStateDB *saveHotStateDbConfig
|
||||
}
|
||||
|
||||
// This tracks the config in the event of long non-finality,
|
||||
// how often does the node save hot states to db? what are
|
||||
// the saved hot states in db?... etc
|
||||
type saveHotStateDbConfig struct {
|
||||
enabled bool
|
||||
lock sync.Mutex
|
||||
duration uint64
|
||||
savedStateRoots [][32]byte
|
||||
}
|
||||
|
||||
// This tracks the finalized point. It's also the point where slot and the block root of
|
||||
@@ -43,6 +56,9 @@ func New(db db.NoHeadAccessDatabase, stateSummaryCache *cache.StateSummaryCache)
|
||||
slotsPerArchivedPoint: params.BeaconConfig().SlotsPerArchivedPoint,
|
||||
stateSummaryCache: stateSummaryCache,
|
||||
epochBoundaryStateCache: newBoundaryStateCache(),
|
||||
saveHotStateDB: &saveHotStateDbConfig{
|
||||
duration: defaultHotStateDBInterval,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ package stategen
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
@@ -9,6 +10,7 @@ import (
|
||||
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
"github.com/sirupsen/logrus"
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
@@ -58,6 +60,23 @@ func (s *State) saveStateByRoot(ctx context.Context, blockRoot [32]byte, state *
|
||||
ctx, span := trace.StartSpan(ctx, "stateGen.saveStateByRoot")
|
||||
defer span.End()
|
||||
|
||||
// Duration can't be 0 to prevent panic for division.
|
||||
duration := uint64(math.Max(float64(s.saveHotStateDB.duration), 1))
|
||||
|
||||
s.saveHotStateDB.lock.Lock()
|
||||
if s.saveHotStateDB.enabled && state.Slot()%duration == 0 {
|
||||
if err := s.beaconDB.SaveState(ctx, state, blockRoot); err != nil {
|
||||
return err
|
||||
}
|
||||
s.saveHotStateDB.savedStateRoots = append(s.saveHotStateDB.savedStateRoots, blockRoot)
|
||||
|
||||
log.WithFields(logrus.Fields{
|
||||
"slot": state.Slot(),
|
||||
"totalHotStateSavedInDB": len(s.saveHotStateDB.savedStateRoots),
|
||||
}).Info("Saving hot state to DB")
|
||||
}
|
||||
s.saveHotStateDB.lock.Unlock()
|
||||
|
||||
// If the hot state is already in cache, one can be sure the state was processed and in the DB.
|
||||
if s.hotStateCache.Has(blockRoot) {
|
||||
return nil
|
||||
@@ -81,3 +100,44 @@ func (s *State) saveStateByRoot(ctx context.Context, blockRoot [32]byte, state *
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// EnableSaveHotStateToDB enters the mode that saves hot beacon state to the DB.
|
||||
// This usually gets triggered when there's long duration since finality.
|
||||
func (s *State) EnableSaveHotStateToDB(_ context.Context) {
|
||||
s.saveHotStateDB.lock.Lock()
|
||||
defer s.saveHotStateDB.lock.Unlock()
|
||||
if s.saveHotStateDB.enabled {
|
||||
return
|
||||
}
|
||||
|
||||
s.saveHotStateDB.enabled = true
|
||||
|
||||
log.WithFields(logrus.Fields{
|
||||
"enabled": s.saveHotStateDB.enabled,
|
||||
"slotsInterval": s.saveHotStateDB.duration,
|
||||
}).Warn("Entering mode to save hot states in DB")
|
||||
}
|
||||
|
||||
// DisableSaveHotStateToDB exits the mode that saves beacon state to DB for the hot states.
|
||||
// This usually gets triggered once there's finality after long duration since finality.
|
||||
func (s *State) DisableSaveHotStateToDB(ctx context.Context) error {
|
||||
s.saveHotStateDB.lock.Lock()
|
||||
defer s.saveHotStateDB.lock.Unlock()
|
||||
if !s.saveHotStateDB.enabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.WithFields(logrus.Fields{
|
||||
"enabled": s.saveHotStateDB.enabled,
|
||||
"deletedHotStates": len(s.saveHotStateDB.savedStateRoots),
|
||||
}).Warn("Exiting mode to save hot states in DB")
|
||||
|
||||
// Delete previous saved states in DB as we are turning this mode off.
|
||||
s.saveHotStateDB.enabled = false
|
||||
if err := s.beaconDB.DeleteStates(ctx, s.saveHotStateDB.savedStateRoots); err != nil {
|
||||
return err
|
||||
}
|
||||
s.saveHotStateDB.savedStateRoots = nil
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -109,6 +109,8 @@ func TestSaveState_CanSaveOnEpochBoundary(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, true, ok, "Did not save epoch boundary state")
|
||||
assert.Equal(t, true, service.stateSummaryCache.Has(r), "Should have saved the state summary")
|
||||
// Should have not been saved in DB.
|
||||
require.Equal(t, false, db.HasState(ctx, r))
|
||||
}
|
||||
|
||||
func TestSaveState_NoSaveNotEpochBoundary(t *testing.T) {
|
||||
@@ -131,4 +133,68 @@ func TestSaveState_NoSaveNotEpochBoundary(t *testing.T) {
|
||||
assert.Equal(t, false, service.beaconDB.HasState(ctx, r), "Should not have saved the state")
|
||||
assert.Equal(t, true, service.stateSummaryCache.Has(r), "Should have saved the state summary")
|
||||
require.LogsDoNotContain(t, hook, "Saved full state on epoch boundary")
|
||||
// Should have not been saved in DB.
|
||||
require.Equal(t, false, db.HasState(ctx, r))
|
||||
}
|
||||
|
||||
func TestSaveState_CanSaveHotStateToDB(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
ctx := context.Background()
|
||||
db, _ := testDB.SetupDB(t)
|
||||
service := New(db, cache.NewStateSummaryCache())
|
||||
service.EnableSaveHotStateToDB(ctx)
|
||||
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
|
||||
require.NoError(t, beaconState.SetSlot(defaultHotStateDBInterval))
|
||||
|
||||
r := [32]byte{'A'}
|
||||
require.NoError(t, service.saveStateByRoot(ctx, r, beaconState))
|
||||
|
||||
require.LogsContain(t, hook, "Saving hot state to DB")
|
||||
// Should have saved in DB.
|
||||
require.Equal(t, true, db.HasState(ctx, r))
|
||||
}
|
||||
|
||||
func TestEnableSaveHotStateToDB_Enabled(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
ctx := context.Background()
|
||||
db, _ := testDB.SetupDB(t)
|
||||
service := New(db, cache.NewStateSummaryCache())
|
||||
|
||||
service.EnableSaveHotStateToDB(ctx)
|
||||
require.LogsContain(t, hook, "Entering mode to save hot states in DB")
|
||||
require.Equal(t, true, service.saveHotStateDB.enabled)
|
||||
}
|
||||
|
||||
func TestEnableSaveHotStateToDB_AlreadyEnabled(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
ctx := context.Background()
|
||||
db, _ := testDB.SetupDB(t)
|
||||
service := New(db, cache.NewStateSummaryCache())
|
||||
service.saveHotStateDB.enabled = true
|
||||
service.EnableSaveHotStateToDB(ctx)
|
||||
require.LogsDoNotContain(t, hook, "Entering mode to save hot states in DB")
|
||||
require.Equal(t, true, service.saveHotStateDB.enabled)
|
||||
}
|
||||
|
||||
func TestEnableSaveHotStateToDB_Disabled(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
ctx := context.Background()
|
||||
db, _ := testDB.SetupDB(t)
|
||||
service := New(db, cache.NewStateSummaryCache())
|
||||
service.saveHotStateDB.enabled = true
|
||||
service.saveHotStateDB.savedStateRoots = [][32]byte{{'a'}}
|
||||
require.NoError(t, service.DisableSaveHotStateToDB(ctx))
|
||||
require.LogsContain(t, hook, "Exiting mode to save hot states in DB")
|
||||
require.Equal(t, false, service.saveHotStateDB.enabled)
|
||||
require.Equal(t, 0, len(service.saveHotStateDB.savedStateRoots))
|
||||
}
|
||||
|
||||
func TestEnableSaveHotStateToDB_AlreadyDisabled(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
ctx := context.Background()
|
||||
db, _ := testDB.SetupDB(t)
|
||||
service := New(db, cache.NewStateSummaryCache())
|
||||
require.NoError(t, service.DisableSaveHotStateToDB(ctx))
|
||||
require.LogsDoNotContain(t, hook, "Exiting mode to save hot states in DB")
|
||||
require.Equal(t, false, service.saveHotStateDB.enabled)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user