diff --git a/beacon-chain/db/iface/interface.go b/beacon-chain/db/iface/interface.go index 81ab5fef8d..1afb0ed98a 100644 --- a/beacon-chain/db/iface/interface.go +++ b/beacon-chain/db/iface/interface.go @@ -101,7 +101,7 @@ type NoHeadAccessDatabase interface { SaveLightClientBootstrap(ctx context.Context, blockRoot []byte, bootstrap interfaces.LightClientBootstrap) error CleanUpDirtyStates(ctx context.Context, slotsPerArchivedPoint primitives.Slot) error - DeleteHistoricalDataBeforeSlot(ctx context.Context, slot primitives.Slot) error + DeleteHistoricalDataBeforeSlot(ctx context.Context, slot primitives.Slot, batchSize int) (int, error) } // HeadAccessDatabase defines a struct with access to reading chain head data. diff --git a/beacon-chain/db/kv/blocks.go b/beacon-chain/db/kv/blocks.go index 4ce3144fc9..e99f5ed91d 100644 --- a/beacon-chain/db/kv/blocks.go +++ b/beacon-chain/db/kv/blocks.go @@ -245,77 +245,82 @@ func (s *Store) DeleteBlock(ctx context.Context, root [32]byte) error { // - blockRootValidatorHashesBucket // - blockSlotIndicesBucket // - stateSlotIndicesBucket -func (s *Store) DeleteHistoricalDataBeforeSlot(ctx context.Context, cutoffSlot primitives.Slot) error { +func (s *Store) DeleteHistoricalDataBeforeSlot(ctx context.Context, cutoffSlot primitives.Slot, batchSize int) (int, error) { ctx, span := trace.StartSpan(ctx, "BeaconDB.DeleteHistoricalDataBeforeSlot") defer span.End() // Collect slot/root pairs to perform deletions in a separate read only transaction. - var ( - roots [][]byte - slts []primitives.Slot - ) - err := s.db.View(func(tx *bolt.Tx) error { - var err error - roots, slts, err = blockRootsBySlotRange(ctx, tx.Bucket(blockSlotIndicesBucket), primitives.Slot(0), cutoffSlot, nil, nil, nil) - if err != nil { - return errors.Wrap(err, "could not retrieve block roots") - } - return nil - }) + slotRoots, err := s.slotRootsInRange(ctx, primitives.Slot(0), cutoffSlot, batchSize) if err != nil { - return errors.Wrap(err, "could not retrieve block roots and slots") + return 0, err + } + + // Return early if there's nothing to delete. + if len(slotRoots) == 0 { + return 0, nil } // Perform all deletions in a single transaction for atomicity - return s.db.Update(func(tx *bolt.Tx) error { - for _, root := range roots { + var numSlotsDeleted int + err = s.db.Update(func(tx *bolt.Tx) error { + for _, sr := range slotRoots { + // Return if context is cancelled or deadline is exceeded. + if ctx.Err() != nil { + //nolint:nilerr + return nil + } + // Delete block - if err = s.deleteBlock(tx, root); err != nil { + if err = s.deleteBlock(tx, sr.root[:]); err != nil { return err } // Delete finalized block roots index - if err = tx.Bucket(finalizedBlockRootsIndexBucket).Delete(root); err != nil { + if err = tx.Bucket(finalizedBlockRootsIndexBucket).Delete(sr.root[:]); err != nil { return errors.Wrap(err, "could not delete finalized block root index") } // Delete state - if err = tx.Bucket(stateBucket).Delete(root); err != nil { + if err = tx.Bucket(stateBucket).Delete(sr.root[:]); err != nil { return errors.Wrap(err, "could not delete state") } // Delete state summary - if err = tx.Bucket(stateSummaryBucket).Delete(root); err != nil { + if err = tx.Bucket(stateSummaryBucket).Delete(sr.root[:]); err != nil { return errors.Wrap(err, "could not delete state summary") } // Delete validator entries - if err = s.deleteValidatorHashes(tx, root); err != nil { + if err = s.deleteValidatorHashes(tx, sr.root[:]); err != nil { return errors.Wrap(err, "could not delete validators") } + + numSlotsDeleted++ } - for _, slot := range slts { + for _, sr := range slotRoots { // Delete slot indices - if err = tx.Bucket(blockSlotIndicesBucket).Delete(bytesutil.SlotToBytesBigEndian(slot)); err != nil { + if err = tx.Bucket(blockSlotIndicesBucket).Delete(bytesutil.SlotToBytesBigEndian(sr.slot)); err != nil { return errors.Wrap(err, "could not delete block slot index") } - if err = tx.Bucket(stateSlotIndicesBucket).Delete(bytesutil.SlotToBytesBigEndian(slot)); err != nil { + if err = tx.Bucket(stateSlotIndicesBucket).Delete(bytesutil.SlotToBytesBigEndian(sr.slot)); err != nil { return errors.Wrap(err, "could not delete state slot index") } } // Delete all caches after we have deleted everything from buckets. // This is done after the buckets are deleted to avoid any issues in case of transaction rollback. - for _, root := range roots { + for _, sr := range slotRoots { // Delete block from cache - s.blockCache.Del(string(root)) + s.blockCache.Del(string(sr.root[:])) // Delete state summary from cache - s.stateSummaryCache.delete([32]byte(root)) + s.stateSummaryCache.delete(sr.root) } return nil }) + + return numSlotsDeleted, err } // SaveBlock to the db. @@ -336,7 +341,7 @@ func (s *Store) SaveBlock(ctx context.Context, signed interfaces.ReadOnlySignedB // if a `saveBlindedBeaconBlocks` key exists in the database. Otherwise, we check if the last // blocked stored to check if it is blinded, and then write that `saveBlindedBeaconBlocks` key // to the DB for future checks. -func (s *Store) shouldSaveBlinded(ctx context.Context) (bool, error) { +func (s *Store) shouldSaveBlinded() (bool, error) { var saveBlinded bool if err := s.db.View(func(tx *bolt.Tx) error { metadataBkt := tx.Bucket(chainMetadataBucket) @@ -398,7 +403,7 @@ func prepareBlockBatch(blks []blocks.ROBlock, shouldBlind bool) ([]blockBatchEnt } func (s *Store) SaveROBlocks(ctx context.Context, blks []blocks.ROBlock, cache bool) error { - shouldBlind, err := s.shouldSaveBlinded(ctx) + shouldBlind, err := s.shouldSaveBlinded() if err != nil { return err } @@ -669,6 +674,49 @@ func (s *Store) SaveRegistrationsByValidatorIDs(ctx context.Context, ids []primi }) } +type slotRoot struct { + slot primitives.Slot + root [32]byte +} + +// slotRootsInRange returns slot and block root pairs of length min(batchSize, end-slot) +func (s *Store) slotRootsInRange(ctx context.Context, start, end primitives.Slot, batchSize int) ([]slotRoot, error) { + _, span := trace.StartSpan(ctx, "BeaconDB.slotRootsInRange") + defer span.End() + if end < start { + return nil, errInvalidSlotRange + } + + var pairs []slotRoot + key := bytesutil.SlotToBytesBigEndian(end) + err := s.db.View(func(tx *bolt.Tx) error { + bkt := tx.Bucket(blockSlotIndicesBucket) + c := bkt.Cursor() + for k, v := c.Seek(key); k != nil; k, v = c.Prev() { + slot := bytesutil.BytesToSlotBigEndian(k) + if slot > end { + continue // Seek will seek to the next key *after* the given one if not present + } + if slot < start { + return nil + } + roots, err := splitRoots(v) + if err != nil { + return errors.Wrapf(err, "corrupt value %v in block slot index for slot=%d", v, slot) + } + for _, r := range roots { + pairs = append(pairs, slotRoot{slot: slot, root: r}) + } + if len(pairs) >= batchSize { + return nil // allows code to easily cap the number of items pruned + } + } + return nil + }) + + return pairs, err +} + // blockRootsByFilter retrieves the block roots given the filter criteria. func blockRootsByFilter(ctx context.Context, tx *bolt.Tx, f *filters.QueryFilter) ([][]byte, error) { ctx, span := trace.StartSpan(ctx, "BeaconDB.blockRootsByFilter") @@ -689,7 +737,7 @@ func blockRootsByFilter(ctx context.Context, tx *bolt.Tx, f *filters.QueryFilter // We retrieve block roots that match a filter criteria of slot ranges, if specified. filtersMap := f.Filters() - rootsBySlotRange, _, err := blockRootsBySlotRange( + rootsBySlotRange, err := blockRootsBySlotRange( ctx, tx.Bucket(blockSlotIndicesBucket), filtersMap[filters.StartSlot], @@ -734,13 +782,13 @@ func blockRootsBySlotRange( ctx context.Context, bkt *bolt.Bucket, startSlotEncoded, endSlotEncoded, startEpochEncoded, endEpochEncoded, slotStepEncoded interface{}, -) ([][]byte, []primitives.Slot, error) { +) ([][]byte, error) { _, span := trace.StartSpan(ctx, "BeaconDB.blockRootsBySlotRange") defer span.End() // Return nothing when all slot parameters are missing if startSlotEncoded == nil && endSlotEncoded == nil && startEpochEncoded == nil && endEpochEncoded == nil { - return [][]byte{}, nil, nil + return [][]byte{}, nil } var startSlot, endSlot primitives.Slot @@ -761,11 +809,11 @@ func blockRootsBySlotRange( if startEpochOk && endEpochOk { startSlot, err = slots.EpochStart(startEpoch) if err != nil { - return nil, nil, err + return nil, err } endSlot, err = slots.EpochStart(endEpoch) if err != nil { - return nil, nil, err + return nil, err } endSlot = endSlot + params.BeaconConfig().SlotsPerEpoch - 1 } @@ -776,11 +824,10 @@ func blockRootsBySlotRange( return key != nil && bytes.Compare(key, max) <= 0 } if endSlot < startSlot { - return nil, nil, errInvalidSlotRange + return nil, errInvalidSlotRange } rootsRange := endSlot.SubSlot(startSlot).Div(step) roots := make([][]byte, 0, rootsRange) - var slts []primitives.Slot c := bkt.Cursor() for k, v := c.Seek(min); conditional(k, max); k, v = c.Next() { slot := bytesutil.BytesToSlotBigEndian(k) @@ -795,9 +842,8 @@ func blockRootsBySlotRange( splitRoots = append(splitRoots, v[i:i+32]) } roots = append(roots, splitRoots...) - slts = append(slts, slot) } - return roots, slts, nil + return roots, nil } // blockRootsBySlot retrieves the block roots by slot diff --git a/beacon-chain/db/kv/blocks_test.go b/beacon-chain/db/kv/blocks_test.go index a964dd08ad..80362ffd96 100644 --- a/beacon-chain/db/kv/blocks_test.go +++ b/beacon-chain/db/kv/blocks_test.go @@ -359,184 +359,221 @@ func TestStore_DeleteFinalizedBlock(t *testing.T) { func TestStore_HistoricalDataBeforeSlot(t *testing.T) { slotsPerEpoch := uint64(params.BeaconConfig().SlotsPerEpoch) - db := setupDB(t) ctx := context.Background() - // Save genesis block root - require.NoError(t, db.SaveGenesisBlockRoot(ctx, genesisBlockRoot)) + tests := []struct { + name string + batchSize int + numOfEpochs uint64 + deleteBeforeSlot uint64 + }{ + { + name: "batchSize less than delete range", + batchSize: 10, + numOfEpochs: 4, + deleteBeforeSlot: 25, + }, + { + name: "batchSize greater than delete range", + batchSize: 30, + numOfEpochs: 4, + deleteBeforeSlot: 15, + }, + } - // Create and save blocks for 4 epochs - blks := makeBlocks(t, 0, slotsPerEpoch*4, genesisBlockRoot) - require.NoError(t, db.SaveBlocks(ctx, blks)) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + db := setupDB(t) + // Save genesis block root + require.NoError(t, db.SaveGenesisBlockRoot(ctx, genesisBlockRoot)) - // Mark state validator migration as complete - err := db.db.Update(func(tx *bolt.Tx) error { - return tx.Bucket(migrationsBucket).Put(migrationStateValidatorsKey, migrationCompleted) - }) - require.NoError(t, err) + // Create and save blocks for given epochs + blks := makeBlocks(t, 0, slotsPerEpoch*tt.numOfEpochs, genesisBlockRoot) + require.NoError(t, db.SaveBlocks(ctx, blks)) - migrated, err := db.isStateValidatorMigrationOver() - require.NoError(t, err) - require.Equal(t, true, migrated) + // Mark state validator migration as complete + err := db.db.Update(func(tx *bolt.Tx) error { + return tx.Bucket(migrationsBucket).Put(migrationStateValidatorsKey, migrationCompleted) + }) + require.NoError(t, err) - // Create state summaries and states for each block - ss := make([]*ethpb.StateSummary, len(blks)) - states := make([]state.BeaconState, len(blks)) + migrated, err := db.isStateValidatorMigrationOver() + require.NoError(t, err) + require.Equal(t, true, migrated) - for i, blk := range blks { - slot := blk.Block().Slot() - r, err := blk.Block().HashTreeRoot() - require.NoError(t, err) + // Create state summaries and states for each block + ss := make([]*ethpb.StateSummary, len(blks)) + states := make([]state.BeaconState, len(blks)) - // Create and save state summary - ss[i] = ðpb.StateSummary{ - Slot: slot, - Root: r[:], - } + for i, blk := range blks { + slot := blk.Block().Slot() + r, err := blk.Block().HashTreeRoot() + require.NoError(t, err) - // Create and save state with validator entries - vals := make([]*ethpb.Validator, 2) - for j := range vals { - vals[j] = ðpb.Validator{ - PublicKey: bytesutil.PadTo([]byte{byte(i*j + 1)}, 48), - WithdrawalCredentials: bytesutil.PadTo([]byte{byte(i*j + 2)}, 32), + // Create and save state summary + ss[i] = ðpb.StateSummary{ + Slot: slot, + Root: r[:], + } + + // Create and save state with validator entries + vals := make([]*ethpb.Validator, 2) + for j := range vals { + vals[j] = ðpb.Validator{ + PublicKey: bytesutil.PadTo([]byte{byte(i*j + 1)}, 48), + WithdrawalCredentials: bytesutil.PadTo([]byte{byte(i*j + 2)}, 32), + } + } + + st, err := util.NewBeaconState(func(state *ethpb.BeaconState) error { + state.Validators = vals + state.Slot = slot + return nil + }) + require.NoError(t, err) + require.NoError(t, db.SaveState(ctx, st, r)) + states[i] = st + + // Verify validator entries are saved to db + valsActual, err := db.validatorEntries(ctx, r) + require.NoError(t, err) + for j, val := range valsActual { + require.DeepEqual(t, vals[j], val) + } } - } + require.NoError(t, db.SaveStateSummaries(ctx, ss)) - st, err := util.NewBeaconState(func(state *ethpb.BeaconState) error { - state.Validators = vals - state.Slot = slot - return nil - }) - require.NoError(t, err) - require.NoError(t, db.SaveState(ctx, st, r)) - states[i] = st - - // Verify validator entries are saved to db - valsActual, err := db.validatorEntries(ctx, r) - require.NoError(t, err) - for j, val := range valsActual { - require.DeepEqual(t, vals[j], val) - } - } - require.NoError(t, db.SaveStateSummaries(ctx, ss)) - - // Verify slot indices exist before deletion - err = db.db.View(func(tx *bolt.Tx) error { - blockSlotBkt := tx.Bucket(blockSlotIndicesBucket) - stateSlotBkt := tx.Bucket(stateSlotIndicesBucket) - - for i := uint64(0); i < slotsPerEpoch; i++ { - slot := bytesutil.SlotToBytesBigEndian(primitives.Slot(i + 1)) - assert.NotNil(t, blockSlotBkt.Get(slot), "Expected block slot index to exist") - assert.NotNil(t, stateSlotBkt.Get(slot), "Expected state slot index to exist", i) - } - return nil - }) - require.NoError(t, err) - - // Delete data before slot at epoch 1 - require.NoError(t, db.DeleteHistoricalDataBeforeSlot(ctx, primitives.Slot(slotsPerEpoch))) - - // Verify blocks from epoch 0 are deleted - for i := uint64(0); i < slotsPerEpoch; i++ { - root, err := blks[i].Block().HashTreeRoot() - require.NoError(t, err) - - // Check block is deleted - retrievedBlocks, err := db.BlocksBySlot(ctx, primitives.Slot(i)) - require.NoError(t, err) - assert.Equal(t, 0, len(retrievedBlocks)) - - // Verify block does not exist - assert.Equal(t, false, db.HasBlock(ctx, root)) - - // Verify block parent root does not exist - err = db.db.View(func(tx *bolt.Tx) error { - require.Equal(t, 0, len(tx.Bucket(blockParentRootIndicesBucket).Get(root[:]))) - return nil - }) - require.NoError(t, err) - - // Verify state is deleted - hasState := db.HasState(ctx, root) - assert.Equal(t, false, hasState) - - // Verify state summary is deleted - hasSummary := db.HasStateSummary(ctx, root) - assert.Equal(t, false, hasSummary) - - // Verify validator hashes for block roots are deleted - err = db.db.View(func(tx *bolt.Tx) error { - assert.Equal(t, 0, len(tx.Bucket(blockRootValidatorHashesBucket).Get(root[:]))) - return nil - }) - require.NoError(t, err) - } - - // Verify slot indices are deleted - err = db.db.View(func(tx *bolt.Tx) error { - blockSlotBkt := tx.Bucket(blockSlotIndicesBucket) - stateSlotBkt := tx.Bucket(stateSlotIndicesBucket) - - for i := uint64(0); i < slotsPerEpoch; i++ { - slot := bytesutil.SlotToBytesBigEndian(primitives.Slot(i + 1)) - assert.Equal(t, 0, len(blockSlotBkt.Get(slot)), fmt.Sprintf("Expected block slot index to be deleted, slot: %d", slot)) - assert.Equal(t, 0, len(stateSlotBkt.Get(slot)), fmt.Sprintf("Expected state slot index to be deleted, slot: %d", slot)) - } - return nil - }) - require.NoError(t, err) - - // Verify blocks from epochs 1-3 still exist - for i := slotsPerEpoch; i < slotsPerEpoch*4; i++ { - root, err := blks[i].Block().HashTreeRoot() - require.NoError(t, err) - - // Verify block exists - assert.Equal(t, true, db.HasBlock(ctx, root)) - - // Verify remaining block parent root exists, except last slot since we store parent roots of each block. - if i < slotsPerEpoch*4-1 { + // Verify slot indices exist before deletion err = db.db.View(func(tx *bolt.Tx) error { - require.NotNil(t, tx.Bucket(blockParentRootIndicesBucket).Get(root[:]), fmt.Sprintf("Expected block parent index to be deleted, slot: %d", i)) + blockSlotBkt := tx.Bucket(blockSlotIndicesBucket) + stateSlotBkt := tx.Bucket(stateSlotIndicesBucket) + + for i := uint64(0); i < uint64(tt.deleteBeforeSlot); i++ { + slot := bytesutil.SlotToBytesBigEndian(primitives.Slot(i + 1)) + assert.NotNil(t, blockSlotBkt.Get(slot), "Expected block slot index to exist") + assert.NotNil(t, stateSlotBkt.Get(slot), "Expected state slot index to exist", i) + } return nil }) require.NoError(t, err) - } - // Verify state exists - hasState := db.HasState(ctx, root) - assert.Equal(t, true, hasState) + // Delete data before slot + slotsDeleted, err := db.DeleteHistoricalDataBeforeSlot(ctx, primitives.Slot(tt.deleteBeforeSlot), tt.batchSize) + require.NoError(t, err) - // Verify state summary exists - hasSummary := db.HasStateSummary(ctx, root) - assert.Equal(t, true, hasSummary) + var startSlotDeleted, endSlotDeleted uint64 + if tt.batchSize >= int(tt.deleteBeforeSlot) { + startSlotDeleted = 1 + endSlotDeleted = tt.deleteBeforeSlot + } else { + startSlotDeleted = tt.deleteBeforeSlot - uint64(tt.batchSize) + 1 + endSlotDeleted = tt.deleteBeforeSlot + } - // Verify slot indices still exist - err = db.db.View(func(tx *bolt.Tx) error { - blockSlotBkt := tx.Bucket(blockSlotIndicesBucket) - stateSlotBkt := tx.Bucket(stateSlotIndicesBucket) + require.Equal(t, endSlotDeleted-startSlotDeleted+1, uint64(slotsDeleted)) - slot := bytesutil.SlotToBytesBigEndian(primitives.Slot(i + 1)) - assert.NotNil(t, blockSlotBkt.Get(slot), "Expected block slot index to exist") - assert.NotNil(t, stateSlotBkt.Get(slot), "Expected state slot index to exist") - return nil + // Verify blocks before given slot/batch are deleted + for i := startSlotDeleted; i < endSlotDeleted; i++ { + root, err := blks[i].Block().HashTreeRoot() + require.NoError(t, err) + + // Check block is deleted + retrievedBlocks, err := db.BlocksBySlot(ctx, primitives.Slot(i)) + require.NoError(t, err) + assert.Equal(t, 0, len(retrievedBlocks), fmt.Sprintf("Expected %d blocks, got %d for slot %d", 0, len(retrievedBlocks), i)) + + // Verify block does not exist + assert.Equal(t, false, db.HasBlock(ctx, root), fmt.Sprintf("Expected block index to not exist for slot %d", i)) + + // Verify block parent root does not exist + err = db.db.View(func(tx *bolt.Tx) error { + require.Equal(t, 0, len(tx.Bucket(blockParentRootIndicesBucket).Get(root[:]))) + return nil + }) + require.NoError(t, err) + + // Verify state is deleted + hasState := db.HasState(ctx, root) + assert.Equal(t, false, hasState) + + // Verify state summary is deleted + hasSummary := db.HasStateSummary(ctx, root) + assert.Equal(t, false, hasSummary) + + // Verify validator hashes for block roots are deleted + err = db.db.View(func(tx *bolt.Tx) error { + assert.Equal(t, 0, len(tx.Bucket(blockRootValidatorHashesBucket).Get(root[:]))) + return nil + }) + require.NoError(t, err) + } + + // Verify slot indices are deleted + err = db.db.View(func(tx *bolt.Tx) error { + blockSlotBkt := tx.Bucket(blockSlotIndicesBucket) + stateSlotBkt := tx.Bucket(stateSlotIndicesBucket) + + for i := startSlotDeleted; i < endSlotDeleted; i++ { + slot := bytesutil.SlotToBytesBigEndian(primitives.Slot(i + 1)) + assert.Equal(t, 0, len(blockSlotBkt.Get(slot)), fmt.Sprintf("Expected block slot index to be deleted, slot: %d", slot)) + assert.Equal(t, 0, len(stateSlotBkt.Get(slot)), fmt.Sprintf("Expected state slot index to be deleted, slot: %d", slot)) + } + return nil + }) + require.NoError(t, err) + + // Verify blocks from expectedLastDeletedSlot till numEpochs still exist + for i := endSlotDeleted; i < slotsPerEpoch*tt.numOfEpochs; i++ { + root, err := blks[i].Block().HashTreeRoot() + require.NoError(t, err) + + // Verify block exists + assert.Equal(t, true, db.HasBlock(ctx, root)) + + // Verify remaining block parent root exists, except last slot since we store parent roots of each block. + if i < slotsPerEpoch*tt.numOfEpochs-1 { + err = db.db.View(func(tx *bolt.Tx) error { + require.NotNil(t, tx.Bucket(blockParentRootIndicesBucket).Get(root[:]), fmt.Sprintf("Expected block parent index to be deleted, slot: %d", i)) + return nil + }) + require.NoError(t, err) + } + + // Verify state exists + hasState := db.HasState(ctx, root) + assert.Equal(t, true, hasState) + + // Verify state summary exists + hasSummary := db.HasStateSummary(ctx, root) + assert.Equal(t, true, hasSummary) + + // Verify slot indices still exist + err = db.db.View(func(tx *bolt.Tx) error { + blockSlotBkt := tx.Bucket(blockSlotIndicesBucket) + stateSlotBkt := tx.Bucket(stateSlotIndicesBucket) + + slot := bytesutil.SlotToBytesBigEndian(primitives.Slot(i + 1)) + assert.NotNil(t, blockSlotBkt.Get(slot), "Expected block slot index to exist") + assert.NotNil(t, stateSlotBkt.Get(slot), "Expected state slot index to exist") + return nil + }) + require.NoError(t, err) + + // Verify validator entries still exist + valsActual, err := db.validatorEntries(ctx, root) + require.NoError(t, err) + assert.NotNil(t, valsActual) + + // Verify remaining validator hashes for block roots exists + err = db.db.View(func(tx *bolt.Tx) error { + assert.NotNil(t, tx.Bucket(blockRootValidatorHashesBucket).Get(root[:])) + return nil + }) + require.NoError(t, err) + } }) - require.NoError(t, err) - - // Verify validator entries still exist - valsActual, err := db.validatorEntries(ctx, root) - require.NoError(t, err) - assert.NotNil(t, valsActual) - - // Verify remaining validator hashes for block roots exists - err = db.db.View(func(tx *bolt.Tx) error { - assert.NotNil(t, tx.Bucket(blockRootValidatorHashesBucket).Get(root[:])) - return nil - }) - require.NoError(t, err) } + } func TestStore_GenesisBlock(t *testing.T) { diff --git a/beacon-chain/db/pruner/pruner.go b/beacon-chain/db/pruner/pruner.go index 5bdb101571..e70f704143 100644 --- a/beacon-chain/db/pruner/pruner.go +++ b/beacon-chain/db/pruner/pruner.go @@ -16,6 +16,15 @@ import ( var log = logrus.WithField("prefix", "db-pruner") +const ( + // defaultPrunableBatchSize is the number of slots that can be pruned at once. + defaultPrunableBatchSize = 32 + // defaultPruningWindow is the duration of one pruning window. + defaultPruningWindow = time.Second * 3 + // defaultNumBatchesToPrune is the number of batches to prune in one pruning window. + defaultNumBatchesToPrune = 15 +) + type ServiceOption func(*Service) // WithRetentionPeriod allows the user to specify a different data retention period than the spec default. @@ -143,14 +152,17 @@ func (p *Service) prune(slot primitives.Slot) error { }).Debug("Pruning chain data") tt := time.Now() - if err := p.db.DeleteHistoricalDataBeforeSlot(p.ctx, pruneUpto); err != nil { - return errors.Wrapf(err, "could not delete upto slot %d", pruneUpto) + numBatches, err := p.pruneBatches(pruneUpto) + if err != nil { + return errors.Wrap(err, "failed to prune batches") } log.WithFields(logrus.Fields{ "prunedUpto": pruneUpto, "duration": time.Since(tt), "currentSlot": slot, + "batchSize": defaultPrunableBatchSize, + "numBatches": numBatches, }).Debug("Successfully pruned chain data") // Update pruning checkpoint. @@ -159,6 +171,33 @@ func (p *Service) prune(slot primitives.Slot) error { return nil } +func (p *Service) pruneBatches(pruneUpto primitives.Slot) (int, error) { + ctx, cancel := context.WithTimeout(p.ctx, defaultPruningWindow) + defer cancel() + + numBatches := 0 + for { + select { + case <-ctx.Done(): + return numBatches, nil + default: + for i := 0; i < defaultNumBatchesToPrune; i++ { + slotsDeleted, err := p.db.DeleteHistoricalDataBeforeSlot(ctx, pruneUpto, defaultPrunableBatchSize) + if err != nil { + return 0, errors.Wrapf(err, "could not delete upto slot %d", pruneUpto) + } + + // Return if there's nothing to delete. + if slotsDeleted == 0 { + return numBatches, nil + } + + numBatches++ + } + } + } +} + // pruneStartSlotFunc returns the function to determine the start slot to start pruning. func pruneStartSlotFunc(retentionEpochs primitives.Epoch) func(primitives.Slot) primitives.Slot { return func(current primitives.Slot) primitives.Slot { diff --git a/changelog/dB2510_fixpruner.md b/changelog/dB2510_fixpruner.md new file mode 100644 index 0000000000..720da20e2b --- /dev/null +++ b/changelog/dB2510_fixpruner.md @@ -0,0 +1,3 @@ +### Fixed + +- Fixed pruner to not block while pruning large database by introducing batchSize