From 4a9c60f75f06eb1f9636b6fb19fba75f952db5d6 Mon Sep 17 00:00:00 2001 From: Dhruv Bodani Date: Tue, 28 Jan 2025 22:25:50 +0530 Subject: [PATCH] Implement beacon db pruner (#14687) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * implement weak subjectivity pruner * fix goimports * add delete before slot method to database * add method to interface * update changelog * add flags * wire pruner * align pruner with backfill service * rename db method * fix imports * delete block slot indices * check for backfill and initial sync * add tests * fix imports * Update beacon-chain/db/kv/blocks.go Co-authored-by: Radosław Kapka * Update cmd/beacon-chain/flags/base.go Co-authored-by: Preston Van Loon * Update beacon-chain/db/pruner/pruner.go Co-authored-by: Radosław Kapka * cleanup * fix buildkite * initialise atomic bool * delete data from remaining buckets * fix build * fix build * address review comments * add test for blockParentRootIndicesBucket * fix changelog * fix build * address kasey's comments * fix build * add trace span to blockRootsBySlotRange --------- Co-authored-by: Radosław Kapka Co-authored-by: Preston Van Loon --- CHANGELOG.md | 80 +++++------ beacon-chain/db/iface/interface.go | 1 + beacon-chain/db/kv/blocks.go | 136 +++++++++++++++++-- beacon-chain/db/kv/blocks_test.go | 187 ++++++++++++++++++++++++++ beacon-chain/db/kv/state.go | 2 +- beacon-chain/db/pruner/BUILD.bazel | 38 ++++++ beacon-chain/db/pruner/pruner.go | 174 ++++++++++++++++++++++++ beacon-chain/db/pruner/pruner_test.go | 135 +++++++++++++++++++ beacon-chain/node/BUILD.bazel | 1 + beacon-chain/node/node.go | 36 +++++ beacon-chain/sync/backfill/service.go | 20 +++ changelog/dB2510_beacondbpruning.md | 3 + cmd/beacon-chain/flags/base.go | 12 ++ cmd/beacon-chain/main.go | 2 + cmd/beacon-chain/usage.go | 2 + 15 files changed, 776 insertions(+), 53 deletions(-) create mode 100644 beacon-chain/db/pruner/BUILD.bazel create mode 100644 beacon-chain/db/pruner/pruner.go create mode 100644 beacon-chain/db/pruner/pruner_test.go create mode 100644 changelog/dB2510_beacondbpruning.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 18d61c61fd..83b6ae558b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -60,7 +60,7 @@ Notable features: - Updated the default `scrape-interval` in `Client-stats` to 2 minutes to accommodate Beaconcha.in API rate limits. - Switch to compounding when consolidating with source==target. - Revert block db save when saving state fails. -- Return false from HasBlock if the block is being synced. +- Return false from HasBlock if the block is being synced. - Cleanup forkchoice on failed insertions. - Use read only validator for core processing to avoid unnecessary copying. - Use ROBlock across block processing pipeline. @@ -73,7 +73,7 @@ Notable features: - Simplified `EjectedValidatorIndices`. - `engine_newPayloadV4`,`engine_getPayloadV4` are changes due to new execution request serialization decisions, [PR](https://github.com/prysmaticlabs/prysm/pull/14580) - Fixed various small things in state-native code. -- Use ROBlock earlier in block syncing pipeline. +- Use ROBlock earlier in block syncing pipeline. - Changed the signature of `ProcessPayload`. - Only Build the Protobuf state once during serialization. - Capella blocks are execution. @@ -139,9 +139,9 @@ Notable features: ### Security -## [v5.1.2](https://github.com/prysmaticlabs/prysm/compare/v5.1.1...v5.1.2) - 2024-10-16 +## [v5.1.2](https://github.com/prysmaticlabs/prysm/compare/v5.1.1...v5.1.2) - 2024-10-16 -This is a hotfix release with one change. +This is a hotfix release with one change. Prysm v5.1.1 contains an updated implementation of the beacon api streaming events endpoint. This new implementation contains a bug that can cause a panic in certain conditions. The issue is @@ -153,20 +153,20 @@ prysm REST mode validator (a feature which requires the validator to be configur api instead of prysm's stock grpc endpoints) or accessory software that connects to the events api, like https://github.com/ethpandaops/ethereum-metrics-exporter -### Fixed +### Fixed - Recover from panics when writing the event stream [#14545](https://github.com/prysmaticlabs/prysm/pull/14545) ## [v5.1.1](https://github.com/prysmaticlabs/prysm/compare/v5.1.0...v5.1.1) - 2024-10-15 -This release has a number of features and improvements. Most notably, the feature flag -`--enable-experimental-state` has been flipped to "opt out" via `--disable-experimental-state`. +This release has a number of features and improvements. Most notably, the feature flag +`--enable-experimental-state` has been flipped to "opt out" via `--disable-experimental-state`. The experimental state management design has shown significant improvements in memory usage at runtime. Updates to libp2p's gossipsub have some bandwidith stability improvements with support for -IDONTWANT control messages. +IDONTWANT control messages. The gRPC gateway has been deprecated from Prysm in this release. If you need JSON data, consider the -standardized beacon-APIs. +standardized beacon-APIs. Updating to this release is recommended at your convenience. @@ -208,7 +208,7 @@ Updating to this release is recommended at your convenience. - `grpc-gateway-corsdomain` is renamed to http-cors-domain. The old name can still be used as an alias. - `api-timeout` is changed from int flag to duration flag, default value updated. - Light client support: abstracted out the light client headers with different versions. -- `ApplyToEveryValidator` has been changed to prevent misuse bugs, it takes a closure that takes a `ReadOnlyValidator` and returns a raw pointer to a `Validator`. +- `ApplyToEveryValidator` has been changed to prevent misuse bugs, it takes a closure that takes a `ReadOnlyValidator` and returns a raw pointer to a `Validator`. - Removed gorilla mux library and replaced it with net/http updates in go 1.22. - Clean up `ProposeBlock` for validator client to reduce cognitive scoring and enable further changes. - Updated k8s-io/client-go to v0.30.4 and k8s-io/apimachinery to v0.30.4 @@ -219,7 +219,7 @@ Updating to this release is recommended at your convenience. - Updated Sepolia bootnodes. - Make committee aware packing the default by deprecating `--enable-committee-aware-packing`. - Moved `ConvertKzgCommitmentToVersionedHash` to the `primitives` package. -- Updated correlation penalty for EIP-7251. +- Updated correlation penalty for EIP-7251. ### Deprecated - `--disable-grpc-gateway` flag is deprecated due to grpc gateway removal. @@ -693,34 +693,34 @@ AVX support (eg Celeron) after the Deneb fork. This is not an issue for mainnet. - Linter: Wastedassign linter enabled to improve code quality. - API Enhancements: - - Added payload return in Wei for /eth/v3/validator/blocks. - - Added Holesky Deneb Epoch for better epoch management. + - Added payload return in Wei for /eth/v3/validator/blocks. + - Added Holesky Deneb Epoch for better epoch management. - Testing Enhancements: - - Clear cache in tests of core helpers to ensure test reliability. - - Added Debug State Transition Method for improved debugging. - - Backfilling test: Enabled backfill in E2E tests for more comprehensive coverage. + - Clear cache in tests of core helpers to ensure test reliability. + - Added Debug State Transition Method for improved debugging. + - Backfilling test: Enabled backfill in E2E tests for more comprehensive coverage. - API Updates: Re-enabled jwt on keymanager API for enhanced security. - Logging Improvements: Enhanced block by root log for better traceability. - Validator Client Improvements: - - Added Spans to Core Validator Methods for enhanced monitoring. - - Improved readability in validator client code for better maintenance (various commits). + - Added Spans to Core Validator Methods for enhanced monitoring. + - Improved readability in validator client code for better maintenance (various commits). ### Changed - Optimizations and Refinements: - - Lowered resource usage in certain processes for efficiency. - - Moved blob rpc validation closer to peer read for optimized processing. - - Cleaned up validate beacon block code for clarity and efficiency. - - Updated Sepolia Deneb fork epoch for alignment with network changes. - - Changed blob latency metrics to milliseconds for more precise measurement. - - Altered getLegacyDatabaseLocation message for better clarity. - - Improved wait for activation method for enhanced performance. - - Capitalized Aggregated Unaggregated Attestations Log for consistency. - - Modified HistoricalRoots usage for accuracy. - - Adjusted checking of attribute emptiness for efficiency. + - Lowered resource usage in certain processes for efficiency. + - Moved blob rpc validation closer to peer read for optimized processing. + - Cleaned up validate beacon block code for clarity and efficiency. + - Updated Sepolia Deneb fork epoch for alignment with network changes. + - Changed blob latency metrics to milliseconds for more precise measurement. + - Altered getLegacyDatabaseLocation message for better clarity. + - Improved wait for activation method for enhanced performance. + - Capitalized Aggregated Unaggregated Attestations Log for consistency. + - Modified HistoricalRoots usage for accuracy. + - Adjusted checking of attribute emptiness for efficiency. - Database Management: - - Moved --db-backup-output-dir as a deprecated flag for database management simplification. - - Added the Ability to Defragment the Beacon State for improved database performance. + - Moved --db-backup-output-dir as a deprecated flag for database management simplification. + - Added the Ability to Defragment the Beacon State for improved database performance. - Dependency Update: Bumped quic-go version from 0.39.3 to 0.39.4 for up-to-date dependencies. ### Removed @@ -731,12 +731,12 @@ AVX support (eg Celeron) after the Deneb fork. This is not an issue for mainnet. ### Fixed - Bug Fixes: - - Fixed off by one error for improved accuracy. - - Resolved small typo in error messages for clarity. - - Addressed minor issue in blsToExecChange validator for better validation. - - Corrected blobsidecar json tag for commitment inclusion proof. - - Fixed ssz post-requests content type check. - - Resolved issue with port logging in bootnode. + - Fixed off by one error for improved accuracy. + - Resolved small typo in error messages for clarity. + - Addressed minor issue in blsToExecChange validator for better validation. + - Corrected blobsidecar json tag for commitment inclusion proof. + - Fixed ssz post-requests content type check. + - Resolved issue with port logging in bootnode. - Test Fixes: Re-enabled Slasher E2E Test for more comprehensive testing. ### Security @@ -1163,9 +1163,9 @@ No security issues in this release. now features runtime detection, automatically enabling optimized code paths if your CPU supports it. - **Multiarch Containers Preview Available**: multiarch (:wave: arm64 support :wave:) containers will be offered for preview at the following locations: - - Beacon Chain: [gcr.io/prylabs-dev/prysm/beacon-chain:v4.1.0](gcr.io/prylabs-dev/prysm/beacon-chain:v4.1.0) - - Validator: [gcr.io/prylabs-dev/prysm/validator:v4.1.0](gcr.io/prylabs-dev/prysm/validator:v4.1.0) - - Please note that in the next cycle, we will exclusively use these containers at the canonical URLs. + - Beacon Chain: [gcr.io/prylabs-dev/prysm/beacon-chain:v4.1.0](gcr.io/prylabs-dev/prysm/beacon-chain:v4.1.0) + - Validator: [gcr.io/prylabs-dev/prysm/validator:v4.1.0](gcr.io/prylabs-dev/prysm/validator:v4.1.0) + - Please note that in the next cycle, we will exclusively use these containers at the canonical URLs. ### Added @@ -2987,4 +2987,4 @@ There are no security updates in this release. # Older than v2.0.0 -For changelog history for releases older than v2.0.0, please refer to https://github.com/prysmaticlabs/prysm/releases +For changelog history for releases older than v2.0.0, please refer to https://github.com/prysmaticlabs/prysm/releases \ No newline at end of file diff --git a/beacon-chain/db/iface/interface.go b/beacon-chain/db/iface/interface.go index 082a3816b9..81ab5fef8d 100644 --- a/beacon-chain/db/iface/interface.go +++ b/beacon-chain/db/iface/interface.go @@ -101,6 +101,7 @@ type NoHeadAccessDatabase interface { SaveLightClientBootstrap(ctx context.Context, blockRoot []byte, bootstrap interfaces.LightClientBootstrap) error CleanUpDirtyStates(ctx context.Context, slotsPerArchivedPoint primitives.Slot) error + DeleteHistoricalDataBeforeSlot(ctx context.Context, slot primitives.Slot) error } // HeadAccessDatabase defines a struct with access to reading chain head data. diff --git a/beacon-chain/db/kv/blocks.go b/beacon-chain/db/kv/blocks.go index 6539d772de..4ce3144fc9 100644 --- a/beacon-chain/db/kv/blocks.go +++ b/beacon-chain/db/kv/blocks.go @@ -227,10 +227,7 @@ func (s *Store) DeleteBlock(ctx context.Context, root [32]byte) error { return ErrDeleteJustifiedAndFinalized } - if err := tx.Bucket(blocksBucket).Delete(root[:]); err != nil { - return err - } - if err := tx.Bucket(blockParentRootIndicesBucket).Delete(root[:]); err != nil { + if err := s.deleteBlock(tx, root[:]); err != nil { return err } s.blockCache.Del(string(root[:])) @@ -238,6 +235,89 @@ func (s *Store) DeleteBlock(ctx context.Context, root [32]byte) error { }) } +// DeleteHistoricalDataBeforeSlot deletes all blocks and states before the given slot. +// This function deletes data from the following buckets: +// - blocksBucket +// - blockParentRootIndicesBucket +// - finalizedBlockRootsIndexBucket +// - stateBucket +// - stateSummaryBucket +// - blockRootValidatorHashesBucket +// - blockSlotIndicesBucket +// - stateSlotIndicesBucket +func (s *Store) DeleteHistoricalDataBeforeSlot(ctx context.Context, cutoffSlot primitives.Slot) error { + ctx, span := trace.StartSpan(ctx, "BeaconDB.DeleteHistoricalDataBeforeSlot") + defer span.End() + + // Collect slot/root pairs to perform deletions in a separate read only transaction. + var ( + roots [][]byte + slts []primitives.Slot + ) + err := s.db.View(func(tx *bolt.Tx) error { + var err error + roots, slts, err = blockRootsBySlotRange(ctx, tx.Bucket(blockSlotIndicesBucket), primitives.Slot(0), cutoffSlot, nil, nil, nil) + if err != nil { + return errors.Wrap(err, "could not retrieve block roots") + } + return nil + }) + if err != nil { + return errors.Wrap(err, "could not retrieve block roots and slots") + } + + // Perform all deletions in a single transaction for atomicity + return s.db.Update(func(tx *bolt.Tx) error { + for _, root := range roots { + // Delete block + if err = s.deleteBlock(tx, root); err != nil { + return err + } + + // Delete finalized block roots index + if err = tx.Bucket(finalizedBlockRootsIndexBucket).Delete(root); err != nil { + return errors.Wrap(err, "could not delete finalized block root index") + } + + // Delete state + if err = tx.Bucket(stateBucket).Delete(root); err != nil { + return errors.Wrap(err, "could not delete state") + } + + // Delete state summary + if err = tx.Bucket(stateSummaryBucket).Delete(root); err != nil { + return errors.Wrap(err, "could not delete state summary") + } + + // Delete validator entries + if err = s.deleteValidatorHashes(tx, root); err != nil { + return errors.Wrap(err, "could not delete validators") + } + } + + for _, slot := range slts { + // Delete slot indices + if err = tx.Bucket(blockSlotIndicesBucket).Delete(bytesutil.SlotToBytesBigEndian(slot)); err != nil { + return errors.Wrap(err, "could not delete block slot index") + } + if err = tx.Bucket(stateSlotIndicesBucket).Delete(bytesutil.SlotToBytesBigEndian(slot)); err != nil { + return errors.Wrap(err, "could not delete state slot index") + } + } + + // Delete all caches after we have deleted everything from buckets. + // This is done after the buckets are deleted to avoid any issues in case of transaction rollback. + for _, root := range roots { + // Delete block from cache + s.blockCache.Del(string(root)) + // Delete state summary from cache + s.stateSummaryCache.delete([32]byte(root)) + } + + return nil + }) +} + // SaveBlock to the db. func (s *Store) SaveBlock(ctx context.Context, signed interfaces.ReadOnlySignedBeaconBlock) error { ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveBlock") @@ -609,7 +689,7 @@ func blockRootsByFilter(ctx context.Context, tx *bolt.Tx, f *filters.QueryFilter // We retrieve block roots that match a filter criteria of slot ranges, if specified. filtersMap := f.Filters() - rootsBySlotRange, err := blockRootsBySlotRange( + rootsBySlotRange, _, err := blockRootsBySlotRange( ctx, tx.Bucket(blockSlotIndicesBucket), filtersMap[filters.StartSlot], @@ -627,6 +707,7 @@ func blockRootsByFilter(ctx context.Context, tx *bolt.Tx, f *filters.QueryFilter // that list of roots to lookup the block. These block will // meet the filter criteria. indices := lookupValuesForIndices(ctx, indicesByBucket, tx) + keys := rootsBySlotRange if len(indices) > 0 { // If we have found indices that meet the filter criteria, and there are also @@ -653,13 +734,13 @@ func blockRootsBySlotRange( ctx context.Context, bkt *bolt.Bucket, startSlotEncoded, endSlotEncoded, startEpochEncoded, endEpochEncoded, slotStepEncoded interface{}, -) ([][]byte, error) { +) ([][]byte, []primitives.Slot, error) { _, span := trace.StartSpan(ctx, "BeaconDB.blockRootsBySlotRange") defer span.End() // Return nothing when all slot parameters are missing if startSlotEncoded == nil && endSlotEncoded == nil && startEpochEncoded == nil && endEpochEncoded == nil { - return [][]byte{}, nil + return [][]byte{}, nil, nil } var startSlot, endSlot primitives.Slot @@ -680,11 +761,11 @@ func blockRootsBySlotRange( if startEpochOk && endEpochOk { startSlot, err = slots.EpochStart(startEpoch) if err != nil { - return nil, err + return nil, nil, err } endSlot, err = slots.EpochStart(endEpoch) if err != nil { - return nil, err + return nil, nil, err } endSlot = endSlot + params.BeaconConfig().SlotsPerEpoch - 1 } @@ -695,14 +776,15 @@ func blockRootsBySlotRange( return key != nil && bytes.Compare(key, max) <= 0 } if endSlot < startSlot { - return nil, errInvalidSlotRange + return nil, nil, errInvalidSlotRange } rootsRange := endSlot.SubSlot(startSlot).Div(step) roots := make([][]byte, 0, rootsRange) + var slts []primitives.Slot c := bkt.Cursor() for k, v := c.Seek(min); conditional(k, max); k, v = c.Next() { + slot := bytesutil.BytesToSlotBigEndian(k) if step > 1 { - slot := bytesutil.BytesToSlotBigEndian(k) if slot.SubSlot(startSlot).Mod(step) != 0 { continue } @@ -713,8 +795,9 @@ func blockRootsBySlotRange( splitRoots = append(splitRoots, v[i:i+32]) } roots = append(roots, splitRoots...) + slts = append(slts, slot) } - return roots, nil + return roots, slts, nil } // blockRootsBySlot retrieves the block roots by slot @@ -908,3 +991,32 @@ func keyForBlock(blk interfaces.ReadOnlySignedBeaconBlock) ([]byte, error) { return nil, fmt.Errorf("unsupported block version: %v", blk.Version()) } + +func (s *Store) deleteBlock(tx *bolt.Tx, root []byte) error { + if err := tx.Bucket(blocksBucket).Delete(root); err != nil { + return errors.Wrap(err, "could not delete block") + } + + if err := tx.Bucket(blockParentRootIndicesBucket).Delete(root); err != nil { + return errors.Wrap(err, "could not delete block parent indices") + } + + return nil +} + +func (s *Store) deleteValidatorHashes(tx *bolt.Tx, root []byte) error { + ok, err := s.isStateValidatorMigrationOver() + if err != nil { + return err + } + if !ok { + return nil + } + + // Delete the validator hash index + if err = tx.Bucket(blockRootValidatorHashesBucket).Delete(root); err != nil { + return errors.Wrap(err, "could not delete validator index") + } + + return nil +} diff --git a/beacon-chain/db/kv/blocks_test.go b/beacon-chain/db/kv/blocks_test.go index cc6805f1fb..0de3133182 100644 --- a/beacon-chain/db/kv/blocks_test.go +++ b/beacon-chain/db/kv/blocks_test.go @@ -2,9 +2,13 @@ package kv import ( "context" + "fmt" + bolt "go.etcd.io/bbolt" "testing" "time" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/state" + "github.com/ethereum/go-ethereum/common" "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filters" @@ -353,6 +357,189 @@ func TestStore_DeleteFinalizedBlock(t *testing.T) { require.NoError(t, db.SaveFinalizedCheckpoint(ctx, cp)) require.ErrorIs(t, db.DeleteBlock(ctx, root), ErrDeleteJustifiedAndFinalized) } + +func TestStore_HistoricalDataBeforeSlot(t *testing.T) { + slotsPerEpoch := uint64(params.BeaconConfig().SlotsPerEpoch) + db := setupDB(t) + ctx := context.Background() + + // Save genesis block root + require.NoError(t, db.SaveGenesisBlockRoot(ctx, genesisBlockRoot)) + + // Create and save blocks for 4 epochs + blks := makeBlocks(t, 0, slotsPerEpoch*4, genesisBlockRoot) + require.NoError(t, db.SaveBlocks(ctx, blks)) + + // Mark state validator migration as complete + err := db.db.Update(func(tx *bolt.Tx) error { + return tx.Bucket(migrationsBucket).Put(migrationStateValidatorsKey, migrationCompleted) + }) + require.NoError(t, err) + + migrated, err := db.isStateValidatorMigrationOver() + require.NoError(t, err) + require.Equal(t, true, migrated) + + // Create state summaries and states for each block + ss := make([]*ethpb.StateSummary, len(blks)) + states := make([]state.BeaconState, len(blks)) + + for i, blk := range blks { + slot := blk.Block().Slot() + r, err := blk.Block().HashTreeRoot() + require.NoError(t, err) + + // Create and save state summary + ss[i] = ðpb.StateSummary{ + Slot: slot, + Root: r[:], + } + + // Create and save state with validator entries + vals := make([]*ethpb.Validator, 2) + for j := range vals { + vals[j] = ðpb.Validator{ + PublicKey: bytesutil.PadTo([]byte{byte(i*j + 1)}, 48), + WithdrawalCredentials: bytesutil.PadTo([]byte{byte(i*j + 2)}, 32), + } + } + + st, err := util.NewBeaconState(func(state *ethpb.BeaconState) error { + state.Validators = vals + state.Slot = slot + return nil + }) + require.NoError(t, err) + require.NoError(t, db.SaveState(ctx, st, r)) + states[i] = st + + // Verify validator entries are saved to db + valsActual, err := db.validatorEntries(ctx, r) + require.NoError(t, err) + for j, val := range valsActual { + require.DeepEqual(t, vals[j], val) + } + } + require.NoError(t, db.SaveStateSummaries(ctx, ss)) + + // Verify slot indices exist before deletion + err = db.db.View(func(tx *bolt.Tx) error { + blockSlotBkt := tx.Bucket(blockSlotIndicesBucket) + stateSlotBkt := tx.Bucket(stateSlotIndicesBucket) + + for i := uint64(0); i < slotsPerEpoch; i++ { + slot := bytesutil.SlotToBytesBigEndian(primitives.Slot(i + 1)) + assert.NotNil(t, blockSlotBkt.Get(slot), "Expected block slot index to exist") + assert.NotNil(t, stateSlotBkt.Get(slot), "Expected state slot index to exist", i) + } + return nil + }) + require.NoError(t, err) + + // Delete data before slot at epoch 1 + require.NoError(t, db.DeleteHistoricalDataBeforeSlot(ctx, primitives.Slot(slotsPerEpoch))) + + // Verify blocks from epoch 0 are deleted + for i := uint64(0); i < slotsPerEpoch; i++ { + root, err := blks[i].Block().HashTreeRoot() + require.NoError(t, err) + + // Check block is deleted + retrievedBlocks, err := db.BlocksBySlot(ctx, primitives.Slot(i)) + require.NoError(t, err) + assert.Equal(t, 0, len(retrievedBlocks)) + + // Verify block does not exist + assert.Equal(t, false, db.HasBlock(ctx, root)) + + // Verify block parent root does not exist + err = db.db.View(func(tx *bolt.Tx) error { + require.Equal(t, 0, len(tx.Bucket(blockParentRootIndicesBucket).Get(root[:]))) + return nil + }) + require.NoError(t, err) + + // Verify state is deleted + hasState := db.HasState(ctx, root) + assert.Equal(t, false, hasState) + + // Verify state summary is deleted + hasSummary := db.HasStateSummary(ctx, root) + assert.Equal(t, false, hasSummary) + + // Verify validator hashes for block roots are deleted + err = db.db.View(func(tx *bolt.Tx) error { + assert.Equal(t, 0, len(tx.Bucket(blockRootValidatorHashesBucket).Get(root[:]))) + return nil + }) + require.NoError(t, err) + } + + // Verify slot indices are deleted + err = db.db.View(func(tx *bolt.Tx) error { + blockSlotBkt := tx.Bucket(blockSlotIndicesBucket) + stateSlotBkt := tx.Bucket(stateSlotIndicesBucket) + + for i := uint64(0); i < slotsPerEpoch; i++ { + slot := bytesutil.SlotToBytesBigEndian(primitives.Slot(i + 1)) + assert.Equal(t, 0, len(blockSlotBkt.Get(slot)), fmt.Sprintf("Expected block slot index to be deleted, slot: %d", slot)) + assert.Equal(t, 0, len(stateSlotBkt.Get(slot)), fmt.Sprintf("Expected state slot index to be deleted, slot: %d", slot)) + } + return nil + }) + require.NoError(t, err) + + // Verify blocks from epochs 1-3 still exist + for i := slotsPerEpoch; i < slotsPerEpoch*4; i++ { + root, err := blks[i].Block().HashTreeRoot() + require.NoError(t, err) + + // Verify block exists + assert.Equal(t, true, db.HasBlock(ctx, root)) + + // Verify remaining block parent root exists, except last slot since we store parent roots of each block. + if i < slotsPerEpoch*4-1 { + err = db.db.View(func(tx *bolt.Tx) error { + require.NotNil(t, tx.Bucket(blockParentRootIndicesBucket).Get(root[:]), fmt.Sprintf("Expected block parent index to be deleted, slot: %d", i)) + return nil + }) + require.NoError(t, err) + } + + // Verify state exists + hasState := db.HasState(ctx, root) + assert.Equal(t, true, hasState) + + // Verify state summary exists + hasSummary := db.HasStateSummary(ctx, root) + assert.Equal(t, true, hasSummary) + + // Verify slot indices still exist + err = db.db.View(func(tx *bolt.Tx) error { + blockSlotBkt := tx.Bucket(blockSlotIndicesBucket) + stateSlotBkt := tx.Bucket(stateSlotIndicesBucket) + + slot := bytesutil.SlotToBytesBigEndian(primitives.Slot(i + 1)) + assert.NotNil(t, blockSlotBkt.Get(slot), "Expected block slot index to exist") + assert.NotNil(t, stateSlotBkt.Get(slot), "Expected state slot index to exist") + return nil + }) + require.NoError(t, err) + + // Verify validator entries still exist + valsActual, err := db.validatorEntries(ctx, root) + require.NoError(t, err) + assert.NotNil(t, valsActual) + + // Verify remaining validator hashes for block roots exists + err = db.db.View(func(tx *bolt.Tx) error { + assert.NotNil(t, tx.Bucket(blockRootValidatorHashesBucket).Get(root[:])) + return nil + }) + require.NoError(t, err) + } +} + func TestStore_GenesisBlock(t *testing.T) { db := setupDB(t) ctx := context.Background() diff --git a/beacon-chain/db/kv/state.go b/beacon-chain/db/kv/state.go index 365ef557ce..032dd96395 100644 --- a/beacon-chain/db/kv/state.go +++ b/beacon-chain/db/kv/state.go @@ -725,7 +725,7 @@ func (s *Store) validatorEntries(ctx context.Context, blockRoot [32]byte) ([]*et idxBkt := tx.Bucket(blockRootValidatorHashesBucket) valKey := idxBkt.Get(blockRoot[:]) if len(valKey) == 0 { - return errors.Errorf("invalid compressed validator keys length") + return errors.Errorf("validator keys not found for given block root: %x", blockRoot) } // decompress the keys and check if they are of proper length. diff --git a/beacon-chain/db/pruner/BUILD.bazel b/beacon-chain/db/pruner/BUILD.bazel new file mode 100644 index 0000000000..d6a73162bb --- /dev/null +++ b/beacon-chain/db/pruner/BUILD.bazel @@ -0,0 +1,38 @@ +load("@prysm//tools/go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["pruner.go"], + importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/pruner", + visibility = [ + "//beacon-chain:__subpackages__", + ], + deps = [ + "//beacon-chain/core/helpers:go_default_library", + "//beacon-chain/db:go_default_library", + "//beacon-chain/db/iface:go_default_library", + "//config/params:go_default_library", + "//consensus-types/primitives:go_default_library", + "//time/slots:go_default_library", + "@com_github_pkg_errors//:go_default_library", + "@com_github_sirupsen_logrus//:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["pruner_test.go"], + embed = [":go_default_library"], + deps = [ + "//beacon-chain/db/testing:go_default_library", + "//config/params:go_default_library", + "//consensus-types/blocks:go_default_library", + "//consensus-types/primitives:go_default_library", + "//proto/prysm/v1alpha1:go_default_library", + "//testing/require:go_default_library", + "//testing/util:go_default_library", + "//time/slots/testing:go_default_library", + "@com_github_sirupsen_logrus//:go_default_library", + "@com_github_sirupsen_logrus//hooks/test:go_default_library", + ], +) diff --git a/beacon-chain/db/pruner/pruner.go b/beacon-chain/db/pruner/pruner.go new file mode 100644 index 0000000000..5bdb101571 --- /dev/null +++ b/beacon-chain/db/pruner/pruner.go @@ -0,0 +1,174 @@ +package pruner + +import ( + "context" + "time" + + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/db" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/iface" + "github.com/prysmaticlabs/prysm/v5/config/params" + "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v5/time/slots" + "github.com/sirupsen/logrus" +) + +var log = logrus.WithField("prefix", "db-pruner") + +type ServiceOption func(*Service) + +// WithRetentionPeriod allows the user to specify a different data retention period than the spec default. +// The retention period is specified in epochs, and must be >= MIN_EPOCHS_FOR_BLOCK_REQUESTS. +func WithRetentionPeriod(retentionEpochs primitives.Epoch) ServiceOption { + return func(s *Service) { + defaultRetentionEpochs := helpers.MinEpochsForBlockRequests() + 1 + if retentionEpochs < defaultRetentionEpochs { + log.WithField("userEpochs", retentionEpochs). + WithField("minRequired", defaultRetentionEpochs). + Warn("Retention period too low, using minimum required value") + } + + s.ps = pruneStartSlotFunc(retentionEpochs) + } +} + +func WithSlotTicker(slotTicker slots.Ticker) ServiceOption { + return func(s *Service) { + s.slotTicker = slotTicker + } +} + +// Service defines a service that prunes beacon chain DB based on MIN_EPOCHS_FOR_BLOCK_REQUESTS. +type Service struct { + ctx context.Context + db db.Database + ps func(current primitives.Slot) primitives.Slot + prunedUpto primitives.Slot + done chan struct{} + slotTicker slots.Ticker + backfillWaiter func() error + initSyncWaiter func() error +} + +func New(ctx context.Context, db iface.Database, genesisTime uint64, initSyncWaiter, backfillWaiter func() error, opts ...ServiceOption) (*Service, error) { + p := &Service{ + ctx: ctx, + db: db, + ps: pruneStartSlotFunc(helpers.MinEpochsForBlockRequests() + 1), // Default retention epochs is MIN_EPOCHS_FOR_BLOCK_REQUESTS + 1 from the current slot. + done: make(chan struct{}), + slotTicker: slots.NewSlotTicker(slots.StartTime(genesisTime, 0), params.BeaconConfig().SecondsPerSlot), + initSyncWaiter: initSyncWaiter, + backfillWaiter: backfillWaiter, + } + + for _, o := range opts { + o(p) + } + + return p, nil +} + +func (p *Service) Start() { + log.Info("Starting Beacon DB pruner service") + p.run() +} + +func (p *Service) Stop() error { + log.Info("Stopping Beacon DB pruner service") + close(p.done) + return nil +} + +func (p *Service) Status() error { + return nil +} + +func (p *Service) run() { + if p.initSyncWaiter != nil { + log.Info("Waiting for initial sync service to complete before starting pruner") + if err := p.initSyncWaiter(); err != nil { + log.WithError(err).Error("Failed to start database pruner, error waiting for initial sync completion") + return + } + } + if p.backfillWaiter != nil { + log.Info("Waiting for backfill service to complete before starting pruner") + if err := p.backfillWaiter(); err != nil { + log.WithError(err).Error("Failed to start database pruner, error waiting for backfill completion") + return + } + } + + defer p.slotTicker.Done() + + for { + select { + case <-p.ctx.Done(): + log.Debug("Stopping Beacon DB pruner service", "prunedUpto", p.prunedUpto) + return + case <-p.done: + log.Debug("Stopping Beacon DB pruner service", "prunedUpto", p.prunedUpto) + return + case slot := <-p.slotTicker.C(): + // Prune at the middle of every epoch since we do a lot of things around epoch boundaries. + if slots.SinceEpochStarts(slot) != (params.BeaconConfig().SlotsPerEpoch / 2) { + continue + } + + if err := p.prune(slot); err != nil { + log.WithError(err).Error("Failed to prune database") + } + } + } +} + +// prune deletes historical chain data beyond the pruneSlot. +func (p *Service) prune(slot primitives.Slot) error { + // Prune everything up to this slot (inclusive). + pruneUpto := p.ps(slot) + + // Can't prune beyond genesis. + if pruneUpto == 0 { + return nil + } + + // Skip if already pruned up to this slot. + if pruneUpto <= p.prunedUpto { + return nil + } + + log.WithFields(logrus.Fields{ + "pruneUpto": pruneUpto, + }).Debug("Pruning chain data") + + tt := time.Now() + if err := p.db.DeleteHistoricalDataBeforeSlot(p.ctx, pruneUpto); err != nil { + return errors.Wrapf(err, "could not delete upto slot %d", pruneUpto) + } + + log.WithFields(logrus.Fields{ + "prunedUpto": pruneUpto, + "duration": time.Since(tt), + "currentSlot": slot, + }).Debug("Successfully pruned chain data") + + // Update pruning checkpoint. + p.prunedUpto = pruneUpto + + return nil +} + +// pruneStartSlotFunc returns the function to determine the start slot to start pruning. +func pruneStartSlotFunc(retentionEpochs primitives.Epoch) func(primitives.Slot) primitives.Slot { + return func(current primitives.Slot) primitives.Slot { + if retentionEpochs > slots.MaxSafeEpoch() { + retentionEpochs = slots.MaxSafeEpoch() + } + offset := slots.UnsafeEpochStart(retentionEpochs) + if offset >= current { + return 0 + } + return current - offset + } +} diff --git a/beacon-chain/db/pruner/pruner_test.go b/beacon-chain/db/pruner/pruner_test.go new file mode 100644 index 0000000000..542ac0c710 --- /dev/null +++ b/beacon-chain/db/pruner/pruner_test.go @@ -0,0 +1,135 @@ +package pruner + +import ( + "context" + "testing" + "time" + + "github.com/prysmaticlabs/prysm/v5/config/params" + "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" + eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + + "github.com/prysmaticlabs/prysm/v5/testing/util" + slottest "github.com/prysmaticlabs/prysm/v5/time/slots/testing" + "github.com/sirupsen/logrus" + + dbtest "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/testing" + "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v5/testing/require" + logTest "github.com/sirupsen/logrus/hooks/test" +) + +func TestPruner_PruningConditions(t *testing.T) { + tests := []struct { + name string + synced bool + backfillCompleted bool + expectedLog string + }{ + { + name: "Not synced", + synced: false, + backfillCompleted: true, + expectedLog: "Waiting for initial sync service to complete before starting pruner", + }, + { + name: "Backfill incomplete", + synced: true, + backfillCompleted: false, + expectedLog: "Waiting for backfill service to complete before starting pruner", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logrus.SetLevel(logrus.DebugLevel) + hook := logTest.NewGlobal() + ctx, cancel := context.WithCancel(context.Background()) + beaconDB := dbtest.SetupDB(t) + + slotTicker := &slottest.MockTicker{Channel: make(chan primitives.Slot)} + + waitChan := make(chan struct{}) + waiter := func() error { + close(waitChan) + return nil + } + + var initSyncWaiter, backfillWaiter func() error + if !tt.synced { + initSyncWaiter = waiter + } + if !tt.backfillCompleted { + backfillWaiter = waiter + } + p, err := New(ctx, beaconDB, uint64(time.Now().Unix()), initSyncWaiter, backfillWaiter, WithSlotTicker(slotTicker)) + require.NoError(t, err) + + go p.Start() + <-waitChan + cancel() + + if tt.expectedLog != "" { + require.LogsContain(t, hook, tt.expectedLog) + } + + require.NoError(t, p.Stop()) + }) + } +} + +func TestPruner_PruneSuccess(t *testing.T) { + ctx := context.Background() + beaconDB := dbtest.SetupDB(t) + + // Create and save some blocks at different slots + var blks []*eth.SignedBeaconBlock + for slot := primitives.Slot(1); slot <= 32; slot++ { + blk := util.NewBeaconBlock() + blk.Block.Slot = slot + wsb, err := blocks.NewSignedBeaconBlock(blk) + require.NoError(t, err) + require.NoError(t, beaconDB.SaveBlock(ctx, wsb)) + blks = append(blks, blk) + } + + // Create pruner with retention of 2 epochs (64 slots) + retentionEpochs := primitives.Epoch(2) + slotTicker := &slottest.MockTicker{Channel: make(chan primitives.Slot)} + + p, err := New( + ctx, + beaconDB, + uint64(time.Now().Unix()), + nil, + nil, + WithSlotTicker(slotTicker), + ) + require.NoError(t, err) + + p.ps = func(current primitives.Slot) primitives.Slot { + return current - primitives.Slot(retentionEpochs)*params.BeaconConfig().SlotsPerEpoch + } + + // Start pruner and trigger at middle of 3rd epoch (slot 80) + go p.Start() + currentSlot := primitives.Slot(80) // Middle of 3rd epoch + slotTicker.Channel <- currentSlot + // Send the same slot again to ensure the pruning operation completes + slotTicker.Channel <- currentSlot + + for slot := primitives.Slot(1); slot <= 32; slot++ { + root, err := blks[slot-1].Block.HashTreeRoot() + require.NoError(t, err) + present := beaconDB.HasBlock(ctx, root) + if slot <= 16 { // These should be pruned + require.NoError(t, err) + require.Equal(t, false, present, "Expected present at slot %d to be pruned", slot) + } else { // These should remain + require.NoError(t, err) + require.Equal(t, true, present, "Expected present at slot %d to exist", slot) + } + } + + require.NoError(t, p.Stop()) +} diff --git a/beacon-chain/node/BUILD.bazel b/beacon-chain/node/BUILD.bazel index ab3eb9cd56..d09e1df4dc 100644 --- a/beacon-chain/node/BUILD.bazel +++ b/beacon-chain/node/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "//beacon-chain/db:go_default_library", "//beacon-chain/db/filesystem:go_default_library", "//beacon-chain/db/kv:go_default_library", + "//beacon-chain/db/pruner:go_default_library", "//beacon-chain/db/slasherkv:go_default_library", "//beacon-chain/execution:go_default_library", "//beacon-chain/forkchoice:go_default_library", diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index 65c447f086..d1e4dabd15 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -29,6 +29,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/db" "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem" "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/kv" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/pruner" "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/slasherkv" "github.com/prysmaticlabs/prysm/v5/beacon-chain/execution" "github.com/prysmaticlabs/prysm/v5/beacon-chain/forkchoice" @@ -369,6 +370,13 @@ func registerServices(cliCtx *cli.Context, beacon *BeaconNode, synchronizer *sta } } + if cliCtx.Bool(flags.BeaconDBPruning.Name) { + log.Debugln("Registering Pruner Service") + if err := beacon.registerPrunerService(cliCtx); err != nil { + return errors.Wrap(err, "could not register pruner service") + } + } + return nil } @@ -1089,6 +1097,34 @@ func (b *BeaconNode) registerBuilderService(cliCtx *cli.Context) error { return b.services.RegisterService(svc) } +func (b *BeaconNode) registerPrunerService(cliCtx *cli.Context) error { + genesisTimeUnix := params.BeaconConfig().MinGenesisTime + params.BeaconConfig().GenesisDelay + var backfillService *backfill.Service + if err := b.services.FetchService(&backfillService); err != nil { + return err + } + + var opts []pruner.ServiceOption + if cliCtx.IsSet(flags.PrunerRetentionEpochs.Name) { + uv := cliCtx.Uint64(flags.PrunerRetentionEpochs.Name) + opts = append(opts, pruner.WithRetentionPeriod(primitives.Epoch(uv))) + } + + p, err := pruner.New( + cliCtx.Context, + b.db, + genesisTimeUnix, + initSyncWaiter(cliCtx.Context, b.initialSyncComplete), + backfillService.WaitForCompletion, + opts..., + ) + if err != nil { + return err + } + + return b.services.RegisterService(p) +} + func (b *BeaconNode) RegisterBackfillService(cliCtx *cli.Context, bfs *backfill.Store) error { pa := peers.NewAssigner(b.fetchP2P().Peers(), b.forkChoicer) bf, err := backfill.NewService(cliCtx.Context, bfs, b.BlobStorage, b.clockWaiter, b.fetchP2P(), pa, b.BackfillOpts...) diff --git a/beacon-chain/sync/backfill/service.go b/beacon-chain/sync/backfill/service.go index 0dc0df6ddd..414ed20827 100644 --- a/beacon-chain/sync/backfill/service.go +++ b/beacon-chain/sync/backfill/service.go @@ -39,6 +39,7 @@ type Service struct { batchImporter batchImporter blobStore *filesystem.BlobStorage initSyncWaiter func() error + complete chan struct{} } var _ runtime.Service = (*Service)(nil) @@ -148,6 +149,7 @@ func NewService(ctx context.Context, su *Store, bStore *filesystem.BlobStorage, p2p: p, pa: pa, batchImporter: defaultBatchImporter, + complete: make(chan struct{}), } for _, o := range opts { if err := o(s); err != nil { @@ -250,6 +252,7 @@ func (s *Service) scheduleTodos() { func (s *Service) Start() { if !s.enabled { log.Info("Backfill service not enabled") + s.markComplete() return } ctx, cancel := context.WithCancel(s.ctx) @@ -273,6 +276,7 @@ func (s *Service) Start() { if s.store.isGenesisSync() { log.Info("Backfill short-circuit; node synced from genesis") + s.markComplete() return } status := s.store.status() @@ -281,6 +285,7 @@ func (s *Service) Start() { log.WithField("minimumRequiredSlot", s.ms(s.clock.CurrentSlot())). WithField("backfillLowestSlot", status.LowSlot). Info("Exiting backfill service; minimum block retention slot > lowest backfilled block") + s.markComplete() return } s.verifier, s.ctxMap, err = s.initVerifier(ctx) @@ -308,6 +313,7 @@ func (s *Service) Start() { return } if s.updateComplete() { + s.markComplete() return } s.importBatches(ctx) @@ -363,3 +369,17 @@ func newBlobVerifierFromInitializer(ini *verification.Initializer) verification. return ini.NewBlobVerifier(b, reqs) } } + +func (s *Service) markComplete() { + close(s.complete) + log.Info("Backfill service marked as complete") +} + +func (s *Service) WaitForCompletion() error { + select { + case <-s.ctx.Done(): + return s.ctx.Err() + case <-s.complete: + return nil + } +} diff --git a/changelog/dB2510_beacondbpruning.md b/changelog/dB2510_beacondbpruning.md new file mode 100644 index 0000000000..6f21c2506b --- /dev/null +++ b/changelog/dB2510_beacondbpruning.md @@ -0,0 +1,3 @@ +### Added + +- Add Beacon DB pruning service to prune historical data older than MIN_EPOCHS_FOR_BLOCK_REQUESTS (roughly equivalent to the weak subjectivity period) \ No newline at end of file diff --git a/cmd/beacon-chain/flags/base.go b/cmd/beacon-chain/flags/base.go index 70a851fc96..2772d467e8 100644 --- a/cmd/beacon-chain/flags/base.go +++ b/cmd/beacon-chain/flags/base.go @@ -296,4 +296,16 @@ var ( Usage: "Directory for the slasher database", Value: cmd.DefaultDataDir(), } + // BeaconDBPruning enables the pruning of beacon db. + BeaconDBPruning = &cli.BoolFlag{ + Name: "beacon-db-pruning", + Usage: "Enables pruning of beacon db beyond MIN_EPOCHS_FOR_BLOCK_REQUESTS duration. This is an opt-in feature," + + " and should only be enabled if operators doesn't require historical data.", + } + // PrunerRetentionEpochs defines the retention period for the pruner service in terms of epochs. + PrunerRetentionEpochs = &cli.Uint64Flag{ + Name: "pruner-retention-epochs", + Usage: "Specifies the retention period for the pruner service in terms of epochs. " + + "If this value is less than MIN_EPOCHS_FOR_BLOCK_REQUESTS, it will be ignored.", + } ) diff --git a/cmd/beacon-chain/main.go b/cmd/beacon-chain/main.go index 43d0aa8af7..666af96c5e 100644 --- a/cmd/beacon-chain/main.go +++ b/cmd/beacon-chain/main.go @@ -82,6 +82,8 @@ var appFlags = []cli.Flag{ flags.LocalBlockValueBoost, flags.MinBuilderBid, flags.MinBuilderDiff, + flags.BeaconDBPruning, + flags.PrunerRetentionEpochs, cmd.BackupWebhookOutputDir, cmd.MinimalConfigFlag, cmd.E2EConfigFlag, diff --git a/cmd/beacon-chain/usage.go b/cmd/beacon-chain/usage.go index 4ea3fe95dd..00096f3edf 100644 --- a/cmd/beacon-chain/usage.go +++ b/cmd/beacon-chain/usage.go @@ -133,6 +133,8 @@ var appHelpFlagGroups = []flagGroup{ flags.MinBuilderBid, flags.MinBuilderDiff, flags.JwtId, + flags.BeaconDBPruning, + flags.PrunerRetentionEpochs, checkpoint.BlockPath, checkpoint.StatePath, checkpoint.RemoteURL,