From 840ffc84ac3a91aa45f5bcaa70334dcc11dcb656 Mon Sep 17 00:00:00 2001 From: terence tsao Date: Thu, 22 Oct 2020 17:35:30 -0700 Subject: [PATCH] Save state to DB during long non-finality (#7597) * Starting saving state during hot * Add a log * Add helpers to turn on/off mode * Add locks * Add missing return * Clean up * Add logic to migration to handle db roots * Add tests for on and off * Add more tests * Add test for migrate * @prestonvanloon's feedback Co-authored-by: Raul Jordan Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com> --- beacon-chain/blockchain/receive_block.go | 27 ++++++++ beacon-chain/blockchain/receive_block_test.go | 40 +++++++++++ beacon-chain/state/stategen/migrate.go | 12 ++++ beacon-chain/state/stategen/migrate_test.go | 24 +++++++ beacon-chain/state/stategen/service.go | 16 +++++ beacon-chain/state/stategen/setter.go | 60 +++++++++++++++++ beacon-chain/state/stategen/setter_test.go | 66 +++++++++++++++++++ 7 files changed, 245 insertions(+) diff --git a/beacon-chain/blockchain/receive_block.go b/beacon-chain/blockchain/receive_block.go index a6751cd8f6..efbe9e38a8 100644 --- a/beacon-chain/blockchain/receive_block.go +++ b/beacon-chain/blockchain/receive_block.go @@ -7,12 +7,16 @@ import ( ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state" + "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state" "github.com/prysmaticlabs/prysm/shared/traceutil" "github.com/sirupsen/logrus" "go.opencensus.io/trace" ) +// This defines how many epochs since finality the run time will begin to save hot state on to the DB. +var epochsSinceFinalitySaveHotStateDB = 100 + // BlockReceiver interface defines the methods of chain service receive and processing new blocks. type BlockReceiver interface { ReceiveBlock(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error @@ -58,6 +62,11 @@ func (s *Service) ReceiveBlock(ctx context.Context, block *ethpb.SignedBeaconBlo return err } + // Have we been finalizing? Should we start saving hot states to db? + if err := s.checkSaveHotStateDB(ctx); err != nil { + return err + } + // Reports on block and fork choice metrics. reportSlotMetrics(blockCopy.Block.Slot, s.HeadSlot(), s.CurrentSlot(), s.finalizedCheckpt) @@ -179,3 +188,21 @@ func (s *Service) handlePostBlockOperations(b *ethpb.BeaconBlock) error { } return nil } + +// This checks whether it's time to start saving hot state to DB. +// It's time when there's `epochsSinceFinalitySaveHotStateDB` epochs of non-finality. +func (s *Service) checkSaveHotStateDB(ctx context.Context) error { + currentEpoch := helpers.SlotToEpoch(s.CurrentSlot()) + // Prevent `sinceFinality` going underflow. + var sinceFinality uint64 + if currentEpoch > s.finalizedCheckpt.Epoch { + sinceFinality = currentEpoch - s.finalizedCheckpt.Epoch + } + + if sinceFinality >= uint64(epochsSinceFinalitySaveHotStateDB) { + s.stateGen.EnableSaveHotStateToDB(ctx) + return nil + } + + return s.stateGen.DisableSaveHotStateToDB(ctx) +} diff --git a/beacon-chain/blockchain/receive_block_test.go b/beacon-chain/blockchain/receive_block_test.go index f6dd4008de..da8bdc9314 100644 --- a/beacon-chain/blockchain/receive_block_test.go +++ b/beacon-chain/blockchain/receive_block_test.go @@ -4,6 +4,7 @@ import ( "context" "sync" "testing" + "time" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" blockchainTesting "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" @@ -17,6 +18,7 @@ import ( "github.com/prysmaticlabs/prysm/shared/testutil" "github.com/prysmaticlabs/prysm/shared/testutil/assert" "github.com/prysmaticlabs/prysm/shared/testutil/require" + logTest "github.com/sirupsen/logrus/hooks/test" ) func TestService_ReceiveBlock(t *testing.T) { @@ -373,3 +375,41 @@ func TestService_HasInitSyncBlock(t *testing.T) { t.Error("Should have block") } } + +func TestCheckSaveHotStateDB_Enabling(t *testing.T) { + db, stateSummaryCache := testDB.SetupDB(t) + hook := logTest.NewGlobal() + s, err := NewService(context.Background(), &Config{StateGen: stategen.New(db, stateSummaryCache)}) + require.NoError(t, err) + st := params.BeaconConfig().SlotsPerEpoch * uint64(epochsSinceFinalitySaveHotStateDB) + s.genesisTime = time.Now().Add(time.Duration(-1*int64(st)*int64(params.BeaconConfig().SecondsPerSlot)) * time.Second) + s.finalizedCheckpt = ðpb.Checkpoint{} + + require.NoError(t, s.checkSaveHotStateDB(context.Background())) + assert.LogsContain(t, hook, "Entering mode to save hot states in DB") +} + +func TestCheckSaveHotStateDB_Disabling(t *testing.T) { + db, stateSummaryCache := testDB.SetupDB(t) + hook := logTest.NewGlobal() + s, err := NewService(context.Background(), &Config{StateGen: stategen.New(db, stateSummaryCache)}) + require.NoError(t, err) + s.finalizedCheckpt = ðpb.Checkpoint{} + require.NoError(t, s.checkSaveHotStateDB(context.Background())) + s.genesisTime = time.Now() + + require.NoError(t, s.checkSaveHotStateDB(context.Background())) + assert.LogsContain(t, hook, "Exiting mode to save hot states in DB") +} + +func TestCheckSaveHotStateDB_Overflow(t *testing.T) { + db, stateSummaryCache := testDB.SetupDB(t) + hook := logTest.NewGlobal() + s, err := NewService(context.Background(), &Config{StateGen: stategen.New(db, stateSummaryCache)}) + require.NoError(t, err) + s.finalizedCheckpt = ðpb.Checkpoint{Epoch: 10000000} + s.genesisTime = time.Now() + + require.NoError(t, s.checkSaveHotStateDB(context.Background())) + assert.LogsDoNotContain(t, hook, "Entering mode to save hot states in DB") +} diff --git a/beacon-chain/state/stategen/migrate.go b/beacon-chain/state/stategen/migrate.go index 27d1877bc1..8d521db727 100644 --- a/beacon-chain/state/stategen/migrate.go +++ b/beacon-chain/state/stategen/migrate.go @@ -76,6 +76,18 @@ func (s *State) MigrateToCold(ctx context.Context, fRoot [32]byte) error { aState = missingState } if s.beaconDB.HasState(ctx, aRoot) { + // Remove hot state DB root to prevent it gets deleted later when we turn hot state save DB mode off. + s.saveHotStateDB.lock.Lock() + roots := s.saveHotStateDB.savedStateRoots + for i := 0; i < len(roots); i++ { + if aRoot == roots[i] { + s.saveHotStateDB.savedStateRoots = append(roots[:i], roots[i+1:]...) + // There shouldn't be duplicated roots in `savedStateRoots`. + // Break here is ok. + break + } + } + s.saveHotStateDB.lock.Unlock() continue } diff --git a/beacon-chain/state/stategen/migrate_test.go b/beacon-chain/state/stategen/migrate_test.go index 08708a28ad..a6f24b7ae4 100644 --- a/beacon-chain/state/stategen/migrate_test.go +++ b/beacon-chain/state/stategen/migrate_test.go @@ -96,3 +96,27 @@ func TestMigrateToCold_RegeneratePath(t *testing.T) { require.LogsContain(t, hook, "Saved state in DB") } + +func TestMigrateToCold_StateExistsInDB(t *testing.T) { + hook := logTest.NewGlobal() + ctx := context.Background() + db, _ := testDB.SetupDB(t) + + service := New(db, cache.NewStateSummaryCache()) + service.slotsPerArchivedPoint = 1 + beaconState, _ := testutil.DeterministicGenesisState(t, 32) + stateSlot := uint64(1) + require.NoError(t, beaconState.SetSlot(stateSlot)) + b := testutil.NewBeaconBlock() + b.Block.Slot = 2 + fRoot, err := b.Block.HashTreeRoot() + require.NoError(t, err) + require.NoError(t, service.beaconDB.SaveBlock(ctx, b)) + require.NoError(t, service.epochBoundaryStateCache.put(fRoot, beaconState)) + require.NoError(t, service.beaconDB.SaveState(ctx, beaconState, fRoot)) + + service.saveHotStateDB.savedStateRoots = [][32]byte{{1}, {2}, {3}, {4}, fRoot} + require.NoError(t, service.MigrateToCold(ctx, fRoot)) + assert.DeepEqual(t, [][32]byte{{1}, {2}, {3}, {4}}, service.saveHotStateDB.savedStateRoots) + assert.LogsDoNotContain(t, hook, "Saved state in DB") +} diff --git a/beacon-chain/state/stategen/service.go b/beacon-chain/state/stategen/service.go index b4aba6c8f0..f8473cea18 100644 --- a/beacon-chain/state/stategen/service.go +++ b/beacon-chain/state/stategen/service.go @@ -14,6 +14,8 @@ import ( "go.opencensus.io/trace" ) +var defaultHotStateDBInterval uint64 = 128 // slots + // State represents a management object that handles the internal // logic of maintaining both hot and cold states in DB. type State struct { @@ -23,6 +25,17 @@ type State struct { finalizedInfo *finalizedInfo stateSummaryCache *cache.StateSummaryCache epochBoundaryStateCache *epochBoundaryState + saveHotStateDB *saveHotStateDbConfig +} + +// This tracks the config in the event of long non-finality, +// how often does the node save hot states to db? what are +// the saved hot states in db?... etc +type saveHotStateDbConfig struct { + enabled bool + lock sync.Mutex + duration uint64 + savedStateRoots [][32]byte } // This tracks the finalized point. It's also the point where slot and the block root of @@ -43,6 +56,9 @@ func New(db db.NoHeadAccessDatabase, stateSummaryCache *cache.StateSummaryCache) slotsPerArchivedPoint: params.BeaconConfig().SlotsPerArchivedPoint, stateSummaryCache: stateSummaryCache, epochBoundaryStateCache: newBoundaryStateCache(), + saveHotStateDB: &saveHotStateDbConfig{ + duration: defaultHotStateDBInterval, + }, } } diff --git a/beacon-chain/state/stategen/setter.go b/beacon-chain/state/stategen/setter.go index 722239761b..a26fd2a7e9 100644 --- a/beacon-chain/state/stategen/setter.go +++ b/beacon-chain/state/stategen/setter.go @@ -2,6 +2,7 @@ package stategen import ( "context" + "math" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" @@ -9,6 +10,7 @@ import ( pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/prysmaticlabs/prysm/shared/params" + "github.com/sirupsen/logrus" "go.opencensus.io/trace" ) @@ -58,6 +60,23 @@ func (s *State) saveStateByRoot(ctx context.Context, blockRoot [32]byte, state * ctx, span := trace.StartSpan(ctx, "stateGen.saveStateByRoot") defer span.End() + // Duration can't be 0 to prevent panic for division. + duration := uint64(math.Max(float64(s.saveHotStateDB.duration), 1)) + + s.saveHotStateDB.lock.Lock() + if s.saveHotStateDB.enabled && state.Slot()%duration == 0 { + if err := s.beaconDB.SaveState(ctx, state, blockRoot); err != nil { + return err + } + s.saveHotStateDB.savedStateRoots = append(s.saveHotStateDB.savedStateRoots, blockRoot) + + log.WithFields(logrus.Fields{ + "slot": state.Slot(), + "totalHotStateSavedInDB": len(s.saveHotStateDB.savedStateRoots), + }).Info("Saving hot state to DB") + } + s.saveHotStateDB.lock.Unlock() + // If the hot state is already in cache, one can be sure the state was processed and in the DB. if s.hotStateCache.Has(blockRoot) { return nil @@ -81,3 +100,44 @@ func (s *State) saveStateByRoot(ctx context.Context, blockRoot [32]byte, state * return nil } + +// EnableSaveHotStateToDB enters the mode that saves hot beacon state to the DB. +// This usually gets triggered when there's long duration since finality. +func (s *State) EnableSaveHotStateToDB(_ context.Context) { + s.saveHotStateDB.lock.Lock() + defer s.saveHotStateDB.lock.Unlock() + if s.saveHotStateDB.enabled { + return + } + + s.saveHotStateDB.enabled = true + + log.WithFields(logrus.Fields{ + "enabled": s.saveHotStateDB.enabled, + "slotsInterval": s.saveHotStateDB.duration, + }).Warn("Entering mode to save hot states in DB") +} + +// DisableSaveHotStateToDB exits the mode that saves beacon state to DB for the hot states. +// This usually gets triggered once there's finality after long duration since finality. +func (s *State) DisableSaveHotStateToDB(ctx context.Context) error { + s.saveHotStateDB.lock.Lock() + defer s.saveHotStateDB.lock.Unlock() + if !s.saveHotStateDB.enabled { + return nil + } + + log.WithFields(logrus.Fields{ + "enabled": s.saveHotStateDB.enabled, + "deletedHotStates": len(s.saveHotStateDB.savedStateRoots), + }).Warn("Exiting mode to save hot states in DB") + + // Delete previous saved states in DB as we are turning this mode off. + s.saveHotStateDB.enabled = false + if err := s.beaconDB.DeleteStates(ctx, s.saveHotStateDB.savedStateRoots); err != nil { + return err + } + s.saveHotStateDB.savedStateRoots = nil + + return nil +} diff --git a/beacon-chain/state/stategen/setter_test.go b/beacon-chain/state/stategen/setter_test.go index 75c15bae52..9173f7ca79 100644 --- a/beacon-chain/state/stategen/setter_test.go +++ b/beacon-chain/state/stategen/setter_test.go @@ -109,6 +109,8 @@ func TestSaveState_CanSaveOnEpochBoundary(t *testing.T) { require.NoError(t, err) require.Equal(t, true, ok, "Did not save epoch boundary state") assert.Equal(t, true, service.stateSummaryCache.Has(r), "Should have saved the state summary") + // Should have not been saved in DB. + require.Equal(t, false, db.HasState(ctx, r)) } func TestSaveState_NoSaveNotEpochBoundary(t *testing.T) { @@ -131,4 +133,68 @@ func TestSaveState_NoSaveNotEpochBoundary(t *testing.T) { assert.Equal(t, false, service.beaconDB.HasState(ctx, r), "Should not have saved the state") assert.Equal(t, true, service.stateSummaryCache.Has(r), "Should have saved the state summary") require.LogsDoNotContain(t, hook, "Saved full state on epoch boundary") + // Should have not been saved in DB. + require.Equal(t, false, db.HasState(ctx, r)) +} + +func TestSaveState_CanSaveHotStateToDB(t *testing.T) { + hook := logTest.NewGlobal() + ctx := context.Background() + db, _ := testDB.SetupDB(t) + service := New(db, cache.NewStateSummaryCache()) + service.EnableSaveHotStateToDB(ctx) + beaconState, _ := testutil.DeterministicGenesisState(t, 32) + require.NoError(t, beaconState.SetSlot(defaultHotStateDBInterval)) + + r := [32]byte{'A'} + require.NoError(t, service.saveStateByRoot(ctx, r, beaconState)) + + require.LogsContain(t, hook, "Saving hot state to DB") + // Should have saved in DB. + require.Equal(t, true, db.HasState(ctx, r)) +} + +func TestEnableSaveHotStateToDB_Enabled(t *testing.T) { + hook := logTest.NewGlobal() + ctx := context.Background() + db, _ := testDB.SetupDB(t) + service := New(db, cache.NewStateSummaryCache()) + + service.EnableSaveHotStateToDB(ctx) + require.LogsContain(t, hook, "Entering mode to save hot states in DB") + require.Equal(t, true, service.saveHotStateDB.enabled) +} + +func TestEnableSaveHotStateToDB_AlreadyEnabled(t *testing.T) { + hook := logTest.NewGlobal() + ctx := context.Background() + db, _ := testDB.SetupDB(t) + service := New(db, cache.NewStateSummaryCache()) + service.saveHotStateDB.enabled = true + service.EnableSaveHotStateToDB(ctx) + require.LogsDoNotContain(t, hook, "Entering mode to save hot states in DB") + require.Equal(t, true, service.saveHotStateDB.enabled) +} + +func TestEnableSaveHotStateToDB_Disabled(t *testing.T) { + hook := logTest.NewGlobal() + ctx := context.Background() + db, _ := testDB.SetupDB(t) + service := New(db, cache.NewStateSummaryCache()) + service.saveHotStateDB.enabled = true + service.saveHotStateDB.savedStateRoots = [][32]byte{{'a'}} + require.NoError(t, service.DisableSaveHotStateToDB(ctx)) + require.LogsContain(t, hook, "Exiting mode to save hot states in DB") + require.Equal(t, false, service.saveHotStateDB.enabled) + require.Equal(t, 0, len(service.saveHotStateDB.savedStateRoots)) +} + +func TestEnableSaveHotStateToDB_AlreadyDisabled(t *testing.T) { + hook := logTest.NewGlobal() + ctx := context.Background() + db, _ := testDB.SetupDB(t) + service := New(db, cache.NewStateSummaryCache()) + require.NoError(t, service.DisableSaveHotStateToDB(ctx)) + require.LogsDoNotContain(t, hook, "Exiting mode to save hot states in DB") + require.Equal(t, false, service.saveHotStateDB.enabled) }