mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-08 21:08:10 -05:00
Implement beacon db pruner (#14687)
* implement weak subjectivity pruner * fix goimports * add delete before slot method to database * add method to interface * update changelog * add flags * wire pruner * align pruner with backfill service * rename db method * fix imports * delete block slot indices * check for backfill and initial sync * add tests * fix imports * Update beacon-chain/db/kv/blocks.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * Update cmd/beacon-chain/flags/base.go Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com> * Update beacon-chain/db/pruner/pruner.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * cleanup * fix buildkite * initialise atomic bool * delete data from remaining buckets * fix build * fix build * address review comments * add test for blockParentRootIndicesBucket * fix changelog * fix build * address kasey's comments * fix build * add trace span to blockRootsBySlotRange --------- Co-authored-by: Radosław Kapka <rkapka@wp.pl> Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>
This commit is contained in:
80
CHANGELOG.md
80
CHANGELOG.md
@@ -60,7 +60,7 @@ Notable features:
|
||||
- Updated the default `scrape-interval` in `Client-stats` to 2 minutes to accommodate Beaconcha.in API rate limits.
|
||||
- Switch to compounding when consolidating with source==target.
|
||||
- Revert block db save when saving state fails.
|
||||
- Return false from HasBlock if the block is being synced.
|
||||
- Return false from HasBlock if the block is being synced.
|
||||
- Cleanup forkchoice on failed insertions.
|
||||
- Use read only validator for core processing to avoid unnecessary copying.
|
||||
- Use ROBlock across block processing pipeline.
|
||||
@@ -73,7 +73,7 @@ Notable features:
|
||||
- Simplified `EjectedValidatorIndices`.
|
||||
- `engine_newPayloadV4`,`engine_getPayloadV4` are changes due to new execution request serialization decisions, [PR](https://github.com/prysmaticlabs/prysm/pull/14580)
|
||||
- Fixed various small things in state-native code.
|
||||
- Use ROBlock earlier in block syncing pipeline.
|
||||
- Use ROBlock earlier in block syncing pipeline.
|
||||
- Changed the signature of `ProcessPayload`.
|
||||
- Only Build the Protobuf state once during serialization.
|
||||
- Capella blocks are execution.
|
||||
@@ -139,9 +139,9 @@ Notable features:
|
||||
|
||||
### Security
|
||||
|
||||
## [v5.1.2](https://github.com/prysmaticlabs/prysm/compare/v5.1.1...v5.1.2) - 2024-10-16
|
||||
## [v5.1.2](https://github.com/prysmaticlabs/prysm/compare/v5.1.1...v5.1.2) - 2024-10-16
|
||||
|
||||
This is a hotfix release with one change.
|
||||
This is a hotfix release with one change.
|
||||
|
||||
Prysm v5.1.1 contains an updated implementation of the beacon api streaming events endpoint. This
|
||||
new implementation contains a bug that can cause a panic in certain conditions. The issue is
|
||||
@@ -153,20 +153,20 @@ prysm REST mode validator (a feature which requires the validator to be configur
|
||||
api instead of prysm's stock grpc endpoints) or accessory software that connects to the events api,
|
||||
like https://github.com/ethpandaops/ethereum-metrics-exporter
|
||||
|
||||
### Fixed
|
||||
### Fixed
|
||||
|
||||
- Recover from panics when writing the event stream [#14545](https://github.com/prysmaticlabs/prysm/pull/14545)
|
||||
|
||||
## [v5.1.1](https://github.com/prysmaticlabs/prysm/compare/v5.1.0...v5.1.1) - 2024-10-15
|
||||
|
||||
This release has a number of features and improvements. Most notably, the feature flag
|
||||
`--enable-experimental-state` has been flipped to "opt out" via `--disable-experimental-state`.
|
||||
This release has a number of features and improvements. Most notably, the feature flag
|
||||
`--enable-experimental-state` has been flipped to "opt out" via `--disable-experimental-state`.
|
||||
The experimental state management design has shown significant improvements in memory usage at
|
||||
runtime. Updates to libp2p's gossipsub have some bandwidith stability improvements with support for
|
||||
IDONTWANT control messages.
|
||||
IDONTWANT control messages.
|
||||
|
||||
The gRPC gateway has been deprecated from Prysm in this release. If you need JSON data, consider the
|
||||
standardized beacon-APIs.
|
||||
standardized beacon-APIs.
|
||||
|
||||
Updating to this release is recommended at your convenience.
|
||||
|
||||
@@ -208,7 +208,7 @@ Updating to this release is recommended at your convenience.
|
||||
- `grpc-gateway-corsdomain` is renamed to http-cors-domain. The old name can still be used as an alias.
|
||||
- `api-timeout` is changed from int flag to duration flag, default value updated.
|
||||
- Light client support: abstracted out the light client headers with different versions.
|
||||
- `ApplyToEveryValidator` has been changed to prevent misuse bugs, it takes a closure that takes a `ReadOnlyValidator` and returns a raw pointer to a `Validator`.
|
||||
- `ApplyToEveryValidator` has been changed to prevent misuse bugs, it takes a closure that takes a `ReadOnlyValidator` and returns a raw pointer to a `Validator`.
|
||||
- Removed gorilla mux library and replaced it with net/http updates in go 1.22.
|
||||
- Clean up `ProposeBlock` for validator client to reduce cognitive scoring and enable further changes.
|
||||
- Updated k8s-io/client-go to v0.30.4 and k8s-io/apimachinery to v0.30.4
|
||||
@@ -219,7 +219,7 @@ Updating to this release is recommended at your convenience.
|
||||
- Updated Sepolia bootnodes.
|
||||
- Make committee aware packing the default by deprecating `--enable-committee-aware-packing`.
|
||||
- Moved `ConvertKzgCommitmentToVersionedHash` to the `primitives` package.
|
||||
- Updated correlation penalty for EIP-7251.
|
||||
- Updated correlation penalty for EIP-7251.
|
||||
|
||||
### Deprecated
|
||||
- `--disable-grpc-gateway` flag is deprecated due to grpc gateway removal.
|
||||
@@ -693,34 +693,34 @@ AVX support (eg Celeron) after the Deneb fork. This is not an issue for mainnet.
|
||||
|
||||
- Linter: Wastedassign linter enabled to improve code quality.
|
||||
- API Enhancements:
|
||||
- Added payload return in Wei for /eth/v3/validator/blocks.
|
||||
- Added Holesky Deneb Epoch for better epoch management.
|
||||
- Added payload return in Wei for /eth/v3/validator/blocks.
|
||||
- Added Holesky Deneb Epoch for better epoch management.
|
||||
- Testing Enhancements:
|
||||
- Clear cache in tests of core helpers to ensure test reliability.
|
||||
- Added Debug State Transition Method for improved debugging.
|
||||
- Backfilling test: Enabled backfill in E2E tests for more comprehensive coverage.
|
||||
- Clear cache in tests of core helpers to ensure test reliability.
|
||||
- Added Debug State Transition Method for improved debugging.
|
||||
- Backfilling test: Enabled backfill in E2E tests for more comprehensive coverage.
|
||||
- API Updates: Re-enabled jwt on keymanager API for enhanced security.
|
||||
- Logging Improvements: Enhanced block by root log for better traceability.
|
||||
- Validator Client Improvements:
|
||||
- Added Spans to Core Validator Methods for enhanced monitoring.
|
||||
- Improved readability in validator client code for better maintenance (various commits).
|
||||
- Added Spans to Core Validator Methods for enhanced monitoring.
|
||||
- Improved readability in validator client code for better maintenance (various commits).
|
||||
|
||||
### Changed
|
||||
|
||||
- Optimizations and Refinements:
|
||||
- Lowered resource usage in certain processes for efficiency.
|
||||
- Moved blob rpc validation closer to peer read for optimized processing.
|
||||
- Cleaned up validate beacon block code for clarity and efficiency.
|
||||
- Updated Sepolia Deneb fork epoch for alignment with network changes.
|
||||
- Changed blob latency metrics to milliseconds for more precise measurement.
|
||||
- Altered getLegacyDatabaseLocation message for better clarity.
|
||||
- Improved wait for activation method for enhanced performance.
|
||||
- Capitalized Aggregated Unaggregated Attestations Log for consistency.
|
||||
- Modified HistoricalRoots usage for accuracy.
|
||||
- Adjusted checking of attribute emptiness for efficiency.
|
||||
- Lowered resource usage in certain processes for efficiency.
|
||||
- Moved blob rpc validation closer to peer read for optimized processing.
|
||||
- Cleaned up validate beacon block code for clarity and efficiency.
|
||||
- Updated Sepolia Deneb fork epoch for alignment with network changes.
|
||||
- Changed blob latency metrics to milliseconds for more precise measurement.
|
||||
- Altered getLegacyDatabaseLocation message for better clarity.
|
||||
- Improved wait for activation method for enhanced performance.
|
||||
- Capitalized Aggregated Unaggregated Attestations Log for consistency.
|
||||
- Modified HistoricalRoots usage for accuracy.
|
||||
- Adjusted checking of attribute emptiness for efficiency.
|
||||
- Database Management:
|
||||
- Moved --db-backup-output-dir as a deprecated flag for database management simplification.
|
||||
- Added the Ability to Defragment the Beacon State for improved database performance.
|
||||
- Moved --db-backup-output-dir as a deprecated flag for database management simplification.
|
||||
- Added the Ability to Defragment the Beacon State for improved database performance.
|
||||
- Dependency Update: Bumped quic-go version from 0.39.3 to 0.39.4 for up-to-date dependencies.
|
||||
|
||||
### Removed
|
||||
@@ -731,12 +731,12 @@ AVX support (eg Celeron) after the Deneb fork. This is not an issue for mainnet.
|
||||
### Fixed
|
||||
|
||||
- Bug Fixes:
|
||||
- Fixed off by one error for improved accuracy.
|
||||
- Resolved small typo in error messages for clarity.
|
||||
- Addressed minor issue in blsToExecChange validator for better validation.
|
||||
- Corrected blobsidecar json tag for commitment inclusion proof.
|
||||
- Fixed ssz post-requests content type check.
|
||||
- Resolved issue with port logging in bootnode.
|
||||
- Fixed off by one error for improved accuracy.
|
||||
- Resolved small typo in error messages for clarity.
|
||||
- Addressed minor issue in blsToExecChange validator for better validation.
|
||||
- Corrected blobsidecar json tag for commitment inclusion proof.
|
||||
- Fixed ssz post-requests content type check.
|
||||
- Resolved issue with port logging in bootnode.
|
||||
- Test Fixes: Re-enabled Slasher E2E Test for more comprehensive testing.
|
||||
|
||||
### Security
|
||||
@@ -1163,9 +1163,9 @@ No security issues in this release.
|
||||
now features runtime detection, automatically enabling optimized code paths if your CPU supports it.
|
||||
- **Multiarch Containers Preview Available**: multiarch (:wave: arm64 support :wave:) containers will be offered for
|
||||
preview at the following locations:
|
||||
- Beacon Chain: [gcr.io/prylabs-dev/prysm/beacon-chain:v4.1.0](gcr.io/prylabs-dev/prysm/beacon-chain:v4.1.0)
|
||||
- Validator: [gcr.io/prylabs-dev/prysm/validator:v4.1.0](gcr.io/prylabs-dev/prysm/validator:v4.1.0)
|
||||
- Please note that in the next cycle, we will exclusively use these containers at the canonical URLs.
|
||||
- Beacon Chain: [gcr.io/prylabs-dev/prysm/beacon-chain:v4.1.0](gcr.io/prylabs-dev/prysm/beacon-chain:v4.1.0)
|
||||
- Validator: [gcr.io/prylabs-dev/prysm/validator:v4.1.0](gcr.io/prylabs-dev/prysm/validator:v4.1.0)
|
||||
- Please note that in the next cycle, we will exclusively use these containers at the canonical URLs.
|
||||
|
||||
### Added
|
||||
|
||||
@@ -2987,4 +2987,4 @@ There are no security updates in this release.
|
||||
|
||||
# Older than v2.0.0
|
||||
|
||||
For changelog history for releases older than v2.0.0, please refer to https://github.com/prysmaticlabs/prysm/releases
|
||||
For changelog history for releases older than v2.0.0, please refer to https://github.com/prysmaticlabs/prysm/releases
|
||||
@@ -101,6 +101,7 @@ type NoHeadAccessDatabase interface {
|
||||
SaveLightClientBootstrap(ctx context.Context, blockRoot []byte, bootstrap interfaces.LightClientBootstrap) error
|
||||
|
||||
CleanUpDirtyStates(ctx context.Context, slotsPerArchivedPoint primitives.Slot) error
|
||||
DeleteHistoricalDataBeforeSlot(ctx context.Context, slot primitives.Slot) error
|
||||
}
|
||||
|
||||
// HeadAccessDatabase defines a struct with access to reading chain head data.
|
||||
|
||||
@@ -227,10 +227,7 @@ func (s *Store) DeleteBlock(ctx context.Context, root [32]byte) error {
|
||||
return ErrDeleteJustifiedAndFinalized
|
||||
}
|
||||
|
||||
if err := tx.Bucket(blocksBucket).Delete(root[:]); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := tx.Bucket(blockParentRootIndicesBucket).Delete(root[:]); err != nil {
|
||||
if err := s.deleteBlock(tx, root[:]); err != nil {
|
||||
return err
|
||||
}
|
||||
s.blockCache.Del(string(root[:]))
|
||||
@@ -238,6 +235,89 @@ func (s *Store) DeleteBlock(ctx context.Context, root [32]byte) error {
|
||||
})
|
||||
}
|
||||
|
||||
// DeleteHistoricalDataBeforeSlot deletes all blocks and states before the given slot.
|
||||
// This function deletes data from the following buckets:
|
||||
// - blocksBucket
|
||||
// - blockParentRootIndicesBucket
|
||||
// - finalizedBlockRootsIndexBucket
|
||||
// - stateBucket
|
||||
// - stateSummaryBucket
|
||||
// - blockRootValidatorHashesBucket
|
||||
// - blockSlotIndicesBucket
|
||||
// - stateSlotIndicesBucket
|
||||
func (s *Store) DeleteHistoricalDataBeforeSlot(ctx context.Context, cutoffSlot primitives.Slot) error {
|
||||
ctx, span := trace.StartSpan(ctx, "BeaconDB.DeleteHistoricalDataBeforeSlot")
|
||||
defer span.End()
|
||||
|
||||
// Collect slot/root pairs to perform deletions in a separate read only transaction.
|
||||
var (
|
||||
roots [][]byte
|
||||
slts []primitives.Slot
|
||||
)
|
||||
err := s.db.View(func(tx *bolt.Tx) error {
|
||||
var err error
|
||||
roots, slts, err = blockRootsBySlotRange(ctx, tx.Bucket(blockSlotIndicesBucket), primitives.Slot(0), cutoffSlot, nil, nil, nil)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not retrieve block roots")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not retrieve block roots and slots")
|
||||
}
|
||||
|
||||
// Perform all deletions in a single transaction for atomicity
|
||||
return s.db.Update(func(tx *bolt.Tx) error {
|
||||
for _, root := range roots {
|
||||
// Delete block
|
||||
if err = s.deleteBlock(tx, root); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Delete finalized block roots index
|
||||
if err = tx.Bucket(finalizedBlockRootsIndexBucket).Delete(root); err != nil {
|
||||
return errors.Wrap(err, "could not delete finalized block root index")
|
||||
}
|
||||
|
||||
// Delete state
|
||||
if err = tx.Bucket(stateBucket).Delete(root); err != nil {
|
||||
return errors.Wrap(err, "could not delete state")
|
||||
}
|
||||
|
||||
// Delete state summary
|
||||
if err = tx.Bucket(stateSummaryBucket).Delete(root); err != nil {
|
||||
return errors.Wrap(err, "could not delete state summary")
|
||||
}
|
||||
|
||||
// Delete validator entries
|
||||
if err = s.deleteValidatorHashes(tx, root); err != nil {
|
||||
return errors.Wrap(err, "could not delete validators")
|
||||
}
|
||||
}
|
||||
|
||||
for _, slot := range slts {
|
||||
// Delete slot indices
|
||||
if err = tx.Bucket(blockSlotIndicesBucket).Delete(bytesutil.SlotToBytesBigEndian(slot)); err != nil {
|
||||
return errors.Wrap(err, "could not delete block slot index")
|
||||
}
|
||||
if err = tx.Bucket(stateSlotIndicesBucket).Delete(bytesutil.SlotToBytesBigEndian(slot)); err != nil {
|
||||
return errors.Wrap(err, "could not delete state slot index")
|
||||
}
|
||||
}
|
||||
|
||||
// Delete all caches after we have deleted everything from buckets.
|
||||
// This is done after the buckets are deleted to avoid any issues in case of transaction rollback.
|
||||
for _, root := range roots {
|
||||
// Delete block from cache
|
||||
s.blockCache.Del(string(root))
|
||||
// Delete state summary from cache
|
||||
s.stateSummaryCache.delete([32]byte(root))
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// SaveBlock to the db.
|
||||
func (s *Store) SaveBlock(ctx context.Context, signed interfaces.ReadOnlySignedBeaconBlock) error {
|
||||
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveBlock")
|
||||
@@ -609,7 +689,7 @@ func blockRootsByFilter(ctx context.Context, tx *bolt.Tx, f *filters.QueryFilter
|
||||
|
||||
// We retrieve block roots that match a filter criteria of slot ranges, if specified.
|
||||
filtersMap := f.Filters()
|
||||
rootsBySlotRange, err := blockRootsBySlotRange(
|
||||
rootsBySlotRange, _, err := blockRootsBySlotRange(
|
||||
ctx,
|
||||
tx.Bucket(blockSlotIndicesBucket),
|
||||
filtersMap[filters.StartSlot],
|
||||
@@ -627,6 +707,7 @@ func blockRootsByFilter(ctx context.Context, tx *bolt.Tx, f *filters.QueryFilter
|
||||
// that list of roots to lookup the block. These block will
|
||||
// meet the filter criteria.
|
||||
indices := lookupValuesForIndices(ctx, indicesByBucket, tx)
|
||||
|
||||
keys := rootsBySlotRange
|
||||
if len(indices) > 0 {
|
||||
// If we have found indices that meet the filter criteria, and there are also
|
||||
@@ -653,13 +734,13 @@ func blockRootsBySlotRange(
|
||||
ctx context.Context,
|
||||
bkt *bolt.Bucket,
|
||||
startSlotEncoded, endSlotEncoded, startEpochEncoded, endEpochEncoded, slotStepEncoded interface{},
|
||||
) ([][]byte, error) {
|
||||
) ([][]byte, []primitives.Slot, error) {
|
||||
_, span := trace.StartSpan(ctx, "BeaconDB.blockRootsBySlotRange")
|
||||
defer span.End()
|
||||
|
||||
// Return nothing when all slot parameters are missing
|
||||
if startSlotEncoded == nil && endSlotEncoded == nil && startEpochEncoded == nil && endEpochEncoded == nil {
|
||||
return [][]byte{}, nil
|
||||
return [][]byte{}, nil, nil
|
||||
}
|
||||
|
||||
var startSlot, endSlot primitives.Slot
|
||||
@@ -680,11 +761,11 @@ func blockRootsBySlotRange(
|
||||
if startEpochOk && endEpochOk {
|
||||
startSlot, err = slots.EpochStart(startEpoch)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
endSlot, err = slots.EpochStart(endEpoch)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
endSlot = endSlot + params.BeaconConfig().SlotsPerEpoch - 1
|
||||
}
|
||||
@@ -695,14 +776,15 @@ func blockRootsBySlotRange(
|
||||
return key != nil && bytes.Compare(key, max) <= 0
|
||||
}
|
||||
if endSlot < startSlot {
|
||||
return nil, errInvalidSlotRange
|
||||
return nil, nil, errInvalidSlotRange
|
||||
}
|
||||
rootsRange := endSlot.SubSlot(startSlot).Div(step)
|
||||
roots := make([][]byte, 0, rootsRange)
|
||||
var slts []primitives.Slot
|
||||
c := bkt.Cursor()
|
||||
for k, v := c.Seek(min); conditional(k, max); k, v = c.Next() {
|
||||
slot := bytesutil.BytesToSlotBigEndian(k)
|
||||
if step > 1 {
|
||||
slot := bytesutil.BytesToSlotBigEndian(k)
|
||||
if slot.SubSlot(startSlot).Mod(step) != 0 {
|
||||
continue
|
||||
}
|
||||
@@ -713,8 +795,9 @@ func blockRootsBySlotRange(
|
||||
splitRoots = append(splitRoots, v[i:i+32])
|
||||
}
|
||||
roots = append(roots, splitRoots...)
|
||||
slts = append(slts, slot)
|
||||
}
|
||||
return roots, nil
|
||||
return roots, slts, nil
|
||||
}
|
||||
|
||||
// blockRootsBySlot retrieves the block roots by slot
|
||||
@@ -908,3 +991,32 @@ func keyForBlock(blk interfaces.ReadOnlySignedBeaconBlock) ([]byte, error) {
|
||||
|
||||
return nil, fmt.Errorf("unsupported block version: %v", blk.Version())
|
||||
}
|
||||
|
||||
func (s *Store) deleteBlock(tx *bolt.Tx, root []byte) error {
|
||||
if err := tx.Bucket(blocksBucket).Delete(root); err != nil {
|
||||
return errors.Wrap(err, "could not delete block")
|
||||
}
|
||||
|
||||
if err := tx.Bucket(blockParentRootIndicesBucket).Delete(root); err != nil {
|
||||
return errors.Wrap(err, "could not delete block parent indices")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) deleteValidatorHashes(tx *bolt.Tx, root []byte) error {
|
||||
ok, err := s.isStateValidatorMigrationOver()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete the validator hash index
|
||||
if err = tx.Bucket(blockRootValidatorHashesBucket).Delete(root); err != nil {
|
||||
return errors.Wrap(err, "could not delete validator index")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -2,9 +2,13 @@ package kv
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
bolt "go.etcd.io/bbolt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filters"
|
||||
@@ -353,6 +357,189 @@ func TestStore_DeleteFinalizedBlock(t *testing.T) {
|
||||
require.NoError(t, db.SaveFinalizedCheckpoint(ctx, cp))
|
||||
require.ErrorIs(t, db.DeleteBlock(ctx, root), ErrDeleteJustifiedAndFinalized)
|
||||
}
|
||||
|
||||
func TestStore_HistoricalDataBeforeSlot(t *testing.T) {
|
||||
slotsPerEpoch := uint64(params.BeaconConfig().SlotsPerEpoch)
|
||||
db := setupDB(t)
|
||||
ctx := context.Background()
|
||||
|
||||
// Save genesis block root
|
||||
require.NoError(t, db.SaveGenesisBlockRoot(ctx, genesisBlockRoot))
|
||||
|
||||
// Create and save blocks for 4 epochs
|
||||
blks := makeBlocks(t, 0, slotsPerEpoch*4, genesisBlockRoot)
|
||||
require.NoError(t, db.SaveBlocks(ctx, blks))
|
||||
|
||||
// Mark state validator migration as complete
|
||||
err := db.db.Update(func(tx *bolt.Tx) error {
|
||||
return tx.Bucket(migrationsBucket).Put(migrationStateValidatorsKey, migrationCompleted)
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
migrated, err := db.isStateValidatorMigrationOver()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, true, migrated)
|
||||
|
||||
// Create state summaries and states for each block
|
||||
ss := make([]*ethpb.StateSummary, len(blks))
|
||||
states := make([]state.BeaconState, len(blks))
|
||||
|
||||
for i, blk := range blks {
|
||||
slot := blk.Block().Slot()
|
||||
r, err := blk.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Create and save state summary
|
||||
ss[i] = ðpb.StateSummary{
|
||||
Slot: slot,
|
||||
Root: r[:],
|
||||
}
|
||||
|
||||
// Create and save state with validator entries
|
||||
vals := make([]*ethpb.Validator, 2)
|
||||
for j := range vals {
|
||||
vals[j] = ðpb.Validator{
|
||||
PublicKey: bytesutil.PadTo([]byte{byte(i*j + 1)}, 48),
|
||||
WithdrawalCredentials: bytesutil.PadTo([]byte{byte(i*j + 2)}, 32),
|
||||
}
|
||||
}
|
||||
|
||||
st, err := util.NewBeaconState(func(state *ethpb.BeaconState) error {
|
||||
state.Validators = vals
|
||||
state.Slot = slot
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, db.SaveState(ctx, st, r))
|
||||
states[i] = st
|
||||
|
||||
// Verify validator entries are saved to db
|
||||
valsActual, err := db.validatorEntries(ctx, r)
|
||||
require.NoError(t, err)
|
||||
for j, val := range valsActual {
|
||||
require.DeepEqual(t, vals[j], val)
|
||||
}
|
||||
}
|
||||
require.NoError(t, db.SaveStateSummaries(ctx, ss))
|
||||
|
||||
// Verify slot indices exist before deletion
|
||||
err = db.db.View(func(tx *bolt.Tx) error {
|
||||
blockSlotBkt := tx.Bucket(blockSlotIndicesBucket)
|
||||
stateSlotBkt := tx.Bucket(stateSlotIndicesBucket)
|
||||
|
||||
for i := uint64(0); i < slotsPerEpoch; i++ {
|
||||
slot := bytesutil.SlotToBytesBigEndian(primitives.Slot(i + 1))
|
||||
assert.NotNil(t, blockSlotBkt.Get(slot), "Expected block slot index to exist")
|
||||
assert.NotNil(t, stateSlotBkt.Get(slot), "Expected state slot index to exist", i)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Delete data before slot at epoch 1
|
||||
require.NoError(t, db.DeleteHistoricalDataBeforeSlot(ctx, primitives.Slot(slotsPerEpoch)))
|
||||
|
||||
// Verify blocks from epoch 0 are deleted
|
||||
for i := uint64(0); i < slotsPerEpoch; i++ {
|
||||
root, err := blks[i].Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check block is deleted
|
||||
retrievedBlocks, err := db.BlocksBySlot(ctx, primitives.Slot(i))
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 0, len(retrievedBlocks))
|
||||
|
||||
// Verify block does not exist
|
||||
assert.Equal(t, false, db.HasBlock(ctx, root))
|
||||
|
||||
// Verify block parent root does not exist
|
||||
err = db.db.View(func(tx *bolt.Tx) error {
|
||||
require.Equal(t, 0, len(tx.Bucket(blockParentRootIndicesBucket).Get(root[:])))
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify state is deleted
|
||||
hasState := db.HasState(ctx, root)
|
||||
assert.Equal(t, false, hasState)
|
||||
|
||||
// Verify state summary is deleted
|
||||
hasSummary := db.HasStateSummary(ctx, root)
|
||||
assert.Equal(t, false, hasSummary)
|
||||
|
||||
// Verify validator hashes for block roots are deleted
|
||||
err = db.db.View(func(tx *bolt.Tx) error {
|
||||
assert.Equal(t, 0, len(tx.Bucket(blockRootValidatorHashesBucket).Get(root[:])))
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// Verify slot indices are deleted
|
||||
err = db.db.View(func(tx *bolt.Tx) error {
|
||||
blockSlotBkt := tx.Bucket(blockSlotIndicesBucket)
|
||||
stateSlotBkt := tx.Bucket(stateSlotIndicesBucket)
|
||||
|
||||
for i := uint64(0); i < slotsPerEpoch; i++ {
|
||||
slot := bytesutil.SlotToBytesBigEndian(primitives.Slot(i + 1))
|
||||
assert.Equal(t, 0, len(blockSlotBkt.Get(slot)), fmt.Sprintf("Expected block slot index to be deleted, slot: %d", slot))
|
||||
assert.Equal(t, 0, len(stateSlotBkt.Get(slot)), fmt.Sprintf("Expected state slot index to be deleted, slot: %d", slot))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify blocks from epochs 1-3 still exist
|
||||
for i := slotsPerEpoch; i < slotsPerEpoch*4; i++ {
|
||||
root, err := blks[i].Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify block exists
|
||||
assert.Equal(t, true, db.HasBlock(ctx, root))
|
||||
|
||||
// Verify remaining block parent root exists, except last slot since we store parent roots of each block.
|
||||
if i < slotsPerEpoch*4-1 {
|
||||
err = db.db.View(func(tx *bolt.Tx) error {
|
||||
require.NotNil(t, tx.Bucket(blockParentRootIndicesBucket).Get(root[:]), fmt.Sprintf("Expected block parent index to be deleted, slot: %d", i))
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// Verify state exists
|
||||
hasState := db.HasState(ctx, root)
|
||||
assert.Equal(t, true, hasState)
|
||||
|
||||
// Verify state summary exists
|
||||
hasSummary := db.HasStateSummary(ctx, root)
|
||||
assert.Equal(t, true, hasSummary)
|
||||
|
||||
// Verify slot indices still exist
|
||||
err = db.db.View(func(tx *bolt.Tx) error {
|
||||
blockSlotBkt := tx.Bucket(blockSlotIndicesBucket)
|
||||
stateSlotBkt := tx.Bucket(stateSlotIndicesBucket)
|
||||
|
||||
slot := bytesutil.SlotToBytesBigEndian(primitives.Slot(i + 1))
|
||||
assert.NotNil(t, blockSlotBkt.Get(slot), "Expected block slot index to exist")
|
||||
assert.NotNil(t, stateSlotBkt.Get(slot), "Expected state slot index to exist")
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify validator entries still exist
|
||||
valsActual, err := db.validatorEntries(ctx, root)
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, valsActual)
|
||||
|
||||
// Verify remaining validator hashes for block roots exists
|
||||
err = db.db.View(func(tx *bolt.Tx) error {
|
||||
assert.NotNil(t, tx.Bucket(blockRootValidatorHashesBucket).Get(root[:]))
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_GenesisBlock(t *testing.T) {
|
||||
db := setupDB(t)
|
||||
ctx := context.Background()
|
||||
|
||||
@@ -725,7 +725,7 @@ func (s *Store) validatorEntries(ctx context.Context, blockRoot [32]byte) ([]*et
|
||||
idxBkt := tx.Bucket(blockRootValidatorHashesBucket)
|
||||
valKey := idxBkt.Get(blockRoot[:])
|
||||
if len(valKey) == 0 {
|
||||
return errors.Errorf("invalid compressed validator keys length")
|
||||
return errors.Errorf("validator keys not found for given block root: %x", blockRoot)
|
||||
}
|
||||
|
||||
// decompress the keys and check if they are of proper length.
|
||||
|
||||
38
beacon-chain/db/pruner/BUILD.bazel
Normal file
38
beacon-chain/db/pruner/BUILD.bazel
Normal file
@@ -0,0 +1,38 @@
|
||||
load("@prysm//tools/go:def.bzl", "go_library", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["pruner.go"],
|
||||
importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/pruner",
|
||||
visibility = [
|
||||
"//beacon-chain:__subpackages__",
|
||||
],
|
||||
deps = [
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
"//beacon-chain/db:go_default_library",
|
||||
"//beacon-chain/db/iface:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
"//time/slots:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["pruner_test.go"],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//beacon-chain/db/testing:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
"//consensus-types/blocks:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
"//testing/require:go_default_library",
|
||||
"//testing/util:go_default_library",
|
||||
"//time/slots/testing:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
|
||||
],
|
||||
)
|
||||
174
beacon-chain/db/pruner/pruner.go
Normal file
174
beacon-chain/db/pruner/pruner.go
Normal file
@@ -0,0 +1,174 @@
|
||||
package pruner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/iface"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v5/time/slots"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var log = logrus.WithField("prefix", "db-pruner")
|
||||
|
||||
type ServiceOption func(*Service)
|
||||
|
||||
// WithRetentionPeriod allows the user to specify a different data retention period than the spec default.
|
||||
// The retention period is specified in epochs, and must be >= MIN_EPOCHS_FOR_BLOCK_REQUESTS.
|
||||
func WithRetentionPeriod(retentionEpochs primitives.Epoch) ServiceOption {
|
||||
return func(s *Service) {
|
||||
defaultRetentionEpochs := helpers.MinEpochsForBlockRequests() + 1
|
||||
if retentionEpochs < defaultRetentionEpochs {
|
||||
log.WithField("userEpochs", retentionEpochs).
|
||||
WithField("minRequired", defaultRetentionEpochs).
|
||||
Warn("Retention period too low, using minimum required value")
|
||||
}
|
||||
|
||||
s.ps = pruneStartSlotFunc(retentionEpochs)
|
||||
}
|
||||
}
|
||||
|
||||
func WithSlotTicker(slotTicker slots.Ticker) ServiceOption {
|
||||
return func(s *Service) {
|
||||
s.slotTicker = slotTicker
|
||||
}
|
||||
}
|
||||
|
||||
// Service defines a service that prunes beacon chain DB based on MIN_EPOCHS_FOR_BLOCK_REQUESTS.
|
||||
type Service struct {
|
||||
ctx context.Context
|
||||
db db.Database
|
||||
ps func(current primitives.Slot) primitives.Slot
|
||||
prunedUpto primitives.Slot
|
||||
done chan struct{}
|
||||
slotTicker slots.Ticker
|
||||
backfillWaiter func() error
|
||||
initSyncWaiter func() error
|
||||
}
|
||||
|
||||
func New(ctx context.Context, db iface.Database, genesisTime uint64, initSyncWaiter, backfillWaiter func() error, opts ...ServiceOption) (*Service, error) {
|
||||
p := &Service{
|
||||
ctx: ctx,
|
||||
db: db,
|
||||
ps: pruneStartSlotFunc(helpers.MinEpochsForBlockRequests() + 1), // Default retention epochs is MIN_EPOCHS_FOR_BLOCK_REQUESTS + 1 from the current slot.
|
||||
done: make(chan struct{}),
|
||||
slotTicker: slots.NewSlotTicker(slots.StartTime(genesisTime, 0), params.BeaconConfig().SecondsPerSlot),
|
||||
initSyncWaiter: initSyncWaiter,
|
||||
backfillWaiter: backfillWaiter,
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(p)
|
||||
}
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func (p *Service) Start() {
|
||||
log.Info("Starting Beacon DB pruner service")
|
||||
p.run()
|
||||
}
|
||||
|
||||
func (p *Service) Stop() error {
|
||||
log.Info("Stopping Beacon DB pruner service")
|
||||
close(p.done)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Service) Status() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Service) run() {
|
||||
if p.initSyncWaiter != nil {
|
||||
log.Info("Waiting for initial sync service to complete before starting pruner")
|
||||
if err := p.initSyncWaiter(); err != nil {
|
||||
log.WithError(err).Error("Failed to start database pruner, error waiting for initial sync completion")
|
||||
return
|
||||
}
|
||||
}
|
||||
if p.backfillWaiter != nil {
|
||||
log.Info("Waiting for backfill service to complete before starting pruner")
|
||||
if err := p.backfillWaiter(); err != nil {
|
||||
log.WithError(err).Error("Failed to start database pruner, error waiting for backfill completion")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
defer p.slotTicker.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-p.ctx.Done():
|
||||
log.Debug("Stopping Beacon DB pruner service", "prunedUpto", p.prunedUpto)
|
||||
return
|
||||
case <-p.done:
|
||||
log.Debug("Stopping Beacon DB pruner service", "prunedUpto", p.prunedUpto)
|
||||
return
|
||||
case slot := <-p.slotTicker.C():
|
||||
// Prune at the middle of every epoch since we do a lot of things around epoch boundaries.
|
||||
if slots.SinceEpochStarts(slot) != (params.BeaconConfig().SlotsPerEpoch / 2) {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := p.prune(slot); err != nil {
|
||||
log.WithError(err).Error("Failed to prune database")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// prune deletes historical chain data beyond the pruneSlot.
|
||||
func (p *Service) prune(slot primitives.Slot) error {
|
||||
// Prune everything up to this slot (inclusive).
|
||||
pruneUpto := p.ps(slot)
|
||||
|
||||
// Can't prune beyond genesis.
|
||||
if pruneUpto == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Skip if already pruned up to this slot.
|
||||
if pruneUpto <= p.prunedUpto {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.WithFields(logrus.Fields{
|
||||
"pruneUpto": pruneUpto,
|
||||
}).Debug("Pruning chain data")
|
||||
|
||||
tt := time.Now()
|
||||
if err := p.db.DeleteHistoricalDataBeforeSlot(p.ctx, pruneUpto); err != nil {
|
||||
return errors.Wrapf(err, "could not delete upto slot %d", pruneUpto)
|
||||
}
|
||||
|
||||
log.WithFields(logrus.Fields{
|
||||
"prunedUpto": pruneUpto,
|
||||
"duration": time.Since(tt),
|
||||
"currentSlot": slot,
|
||||
}).Debug("Successfully pruned chain data")
|
||||
|
||||
// Update pruning checkpoint.
|
||||
p.prunedUpto = pruneUpto
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// pruneStartSlotFunc returns the function to determine the start slot to start pruning.
|
||||
func pruneStartSlotFunc(retentionEpochs primitives.Epoch) func(primitives.Slot) primitives.Slot {
|
||||
return func(current primitives.Slot) primitives.Slot {
|
||||
if retentionEpochs > slots.MaxSafeEpoch() {
|
||||
retentionEpochs = slots.MaxSafeEpoch()
|
||||
}
|
||||
offset := slots.UnsafeEpochStart(retentionEpochs)
|
||||
if offset >= current {
|
||||
return 0
|
||||
}
|
||||
return current - offset
|
||||
}
|
||||
}
|
||||
135
beacon-chain/db/pruner/pruner_test.go
Normal file
135
beacon-chain/db/pruner/pruner_test.go
Normal file
@@ -0,0 +1,135 @@
|
||||
package pruner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
|
||||
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v5/testing/util"
|
||||
slottest "github.com/prysmaticlabs/prysm/v5/time/slots/testing"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
dbtest "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/testing"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v5/testing/require"
|
||||
logTest "github.com/sirupsen/logrus/hooks/test"
|
||||
)
|
||||
|
||||
func TestPruner_PruningConditions(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
synced bool
|
||||
backfillCompleted bool
|
||||
expectedLog string
|
||||
}{
|
||||
{
|
||||
name: "Not synced",
|
||||
synced: false,
|
||||
backfillCompleted: true,
|
||||
expectedLog: "Waiting for initial sync service to complete before starting pruner",
|
||||
},
|
||||
{
|
||||
name: "Backfill incomplete",
|
||||
synced: true,
|
||||
backfillCompleted: false,
|
||||
expectedLog: "Waiting for backfill service to complete before starting pruner",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
logrus.SetLevel(logrus.DebugLevel)
|
||||
hook := logTest.NewGlobal()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
beaconDB := dbtest.SetupDB(t)
|
||||
|
||||
slotTicker := &slottest.MockTicker{Channel: make(chan primitives.Slot)}
|
||||
|
||||
waitChan := make(chan struct{})
|
||||
waiter := func() error {
|
||||
close(waitChan)
|
||||
return nil
|
||||
}
|
||||
|
||||
var initSyncWaiter, backfillWaiter func() error
|
||||
if !tt.synced {
|
||||
initSyncWaiter = waiter
|
||||
}
|
||||
if !tt.backfillCompleted {
|
||||
backfillWaiter = waiter
|
||||
}
|
||||
p, err := New(ctx, beaconDB, uint64(time.Now().Unix()), initSyncWaiter, backfillWaiter, WithSlotTicker(slotTicker))
|
||||
require.NoError(t, err)
|
||||
|
||||
go p.Start()
|
||||
<-waitChan
|
||||
cancel()
|
||||
|
||||
if tt.expectedLog != "" {
|
||||
require.LogsContain(t, hook, tt.expectedLog)
|
||||
}
|
||||
|
||||
require.NoError(t, p.Stop())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPruner_PruneSuccess(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
beaconDB := dbtest.SetupDB(t)
|
||||
|
||||
// Create and save some blocks at different slots
|
||||
var blks []*eth.SignedBeaconBlock
|
||||
for slot := primitives.Slot(1); slot <= 32; slot++ {
|
||||
blk := util.NewBeaconBlock()
|
||||
blk.Block.Slot = slot
|
||||
wsb, err := blocks.NewSignedBeaconBlock(blk)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, beaconDB.SaveBlock(ctx, wsb))
|
||||
blks = append(blks, blk)
|
||||
}
|
||||
|
||||
// Create pruner with retention of 2 epochs (64 slots)
|
||||
retentionEpochs := primitives.Epoch(2)
|
||||
slotTicker := &slottest.MockTicker{Channel: make(chan primitives.Slot)}
|
||||
|
||||
p, err := New(
|
||||
ctx,
|
||||
beaconDB,
|
||||
uint64(time.Now().Unix()),
|
||||
nil,
|
||||
nil,
|
||||
WithSlotTicker(slotTicker),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
p.ps = func(current primitives.Slot) primitives.Slot {
|
||||
return current - primitives.Slot(retentionEpochs)*params.BeaconConfig().SlotsPerEpoch
|
||||
}
|
||||
|
||||
// Start pruner and trigger at middle of 3rd epoch (slot 80)
|
||||
go p.Start()
|
||||
currentSlot := primitives.Slot(80) // Middle of 3rd epoch
|
||||
slotTicker.Channel <- currentSlot
|
||||
// Send the same slot again to ensure the pruning operation completes
|
||||
slotTicker.Channel <- currentSlot
|
||||
|
||||
for slot := primitives.Slot(1); slot <= 32; slot++ {
|
||||
root, err := blks[slot-1].Block.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
present := beaconDB.HasBlock(ctx, root)
|
||||
if slot <= 16 { // These should be pruned
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, false, present, "Expected present at slot %d to be pruned", slot)
|
||||
} else { // These should remain
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, true, present, "Expected present at slot %d to exist", slot)
|
||||
}
|
||||
}
|
||||
|
||||
require.NoError(t, p.Stop())
|
||||
}
|
||||
@@ -25,6 +25,7 @@ go_library(
|
||||
"//beacon-chain/db:go_default_library",
|
||||
"//beacon-chain/db/filesystem:go_default_library",
|
||||
"//beacon-chain/db/kv:go_default_library",
|
||||
"//beacon-chain/db/pruner:go_default_library",
|
||||
"//beacon-chain/db/slasherkv:go_default_library",
|
||||
"//beacon-chain/execution:go_default_library",
|
||||
"//beacon-chain/forkchoice:go_default_library",
|
||||
|
||||
@@ -29,6 +29,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/kv"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/pruner"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/slasherkv"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/execution"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/forkchoice"
|
||||
@@ -369,6 +370,13 @@ func registerServices(cliCtx *cli.Context, beacon *BeaconNode, synchronizer *sta
|
||||
}
|
||||
}
|
||||
|
||||
if cliCtx.Bool(flags.BeaconDBPruning.Name) {
|
||||
log.Debugln("Registering Pruner Service")
|
||||
if err := beacon.registerPrunerService(cliCtx); err != nil {
|
||||
return errors.Wrap(err, "could not register pruner service")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1089,6 +1097,34 @@ func (b *BeaconNode) registerBuilderService(cliCtx *cli.Context) error {
|
||||
return b.services.RegisterService(svc)
|
||||
}
|
||||
|
||||
func (b *BeaconNode) registerPrunerService(cliCtx *cli.Context) error {
|
||||
genesisTimeUnix := params.BeaconConfig().MinGenesisTime + params.BeaconConfig().GenesisDelay
|
||||
var backfillService *backfill.Service
|
||||
if err := b.services.FetchService(&backfillService); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var opts []pruner.ServiceOption
|
||||
if cliCtx.IsSet(flags.PrunerRetentionEpochs.Name) {
|
||||
uv := cliCtx.Uint64(flags.PrunerRetentionEpochs.Name)
|
||||
opts = append(opts, pruner.WithRetentionPeriod(primitives.Epoch(uv)))
|
||||
}
|
||||
|
||||
p, err := pruner.New(
|
||||
cliCtx.Context,
|
||||
b.db,
|
||||
genesisTimeUnix,
|
||||
initSyncWaiter(cliCtx.Context, b.initialSyncComplete),
|
||||
backfillService.WaitForCompletion,
|
||||
opts...,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return b.services.RegisterService(p)
|
||||
}
|
||||
|
||||
func (b *BeaconNode) RegisterBackfillService(cliCtx *cli.Context, bfs *backfill.Store) error {
|
||||
pa := peers.NewAssigner(b.fetchP2P().Peers(), b.forkChoicer)
|
||||
bf, err := backfill.NewService(cliCtx.Context, bfs, b.BlobStorage, b.clockWaiter, b.fetchP2P(), pa, b.BackfillOpts...)
|
||||
|
||||
@@ -39,6 +39,7 @@ type Service struct {
|
||||
batchImporter batchImporter
|
||||
blobStore *filesystem.BlobStorage
|
||||
initSyncWaiter func() error
|
||||
complete chan struct{}
|
||||
}
|
||||
|
||||
var _ runtime.Service = (*Service)(nil)
|
||||
@@ -148,6 +149,7 @@ func NewService(ctx context.Context, su *Store, bStore *filesystem.BlobStorage,
|
||||
p2p: p,
|
||||
pa: pa,
|
||||
batchImporter: defaultBatchImporter,
|
||||
complete: make(chan struct{}),
|
||||
}
|
||||
for _, o := range opts {
|
||||
if err := o(s); err != nil {
|
||||
@@ -250,6 +252,7 @@ func (s *Service) scheduleTodos() {
|
||||
func (s *Service) Start() {
|
||||
if !s.enabled {
|
||||
log.Info("Backfill service not enabled")
|
||||
s.markComplete()
|
||||
return
|
||||
}
|
||||
ctx, cancel := context.WithCancel(s.ctx)
|
||||
@@ -273,6 +276,7 @@ func (s *Service) Start() {
|
||||
|
||||
if s.store.isGenesisSync() {
|
||||
log.Info("Backfill short-circuit; node synced from genesis")
|
||||
s.markComplete()
|
||||
return
|
||||
}
|
||||
status := s.store.status()
|
||||
@@ -281,6 +285,7 @@ func (s *Service) Start() {
|
||||
log.WithField("minimumRequiredSlot", s.ms(s.clock.CurrentSlot())).
|
||||
WithField("backfillLowestSlot", status.LowSlot).
|
||||
Info("Exiting backfill service; minimum block retention slot > lowest backfilled block")
|
||||
s.markComplete()
|
||||
return
|
||||
}
|
||||
s.verifier, s.ctxMap, err = s.initVerifier(ctx)
|
||||
@@ -308,6 +313,7 @@ func (s *Service) Start() {
|
||||
return
|
||||
}
|
||||
if s.updateComplete() {
|
||||
s.markComplete()
|
||||
return
|
||||
}
|
||||
s.importBatches(ctx)
|
||||
@@ -363,3 +369,17 @@ func newBlobVerifierFromInitializer(ini *verification.Initializer) verification.
|
||||
return ini.NewBlobVerifier(b, reqs)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) markComplete() {
|
||||
close(s.complete)
|
||||
log.Info("Backfill service marked as complete")
|
||||
}
|
||||
|
||||
func (s *Service) WaitForCompletion() error {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return s.ctx.Err()
|
||||
case <-s.complete:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
3
changelog/dB2510_beacondbpruning.md
Normal file
3
changelog/dB2510_beacondbpruning.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Added
|
||||
|
||||
- Add Beacon DB pruning service to prune historical data older than MIN_EPOCHS_FOR_BLOCK_REQUESTS (roughly equivalent to the weak subjectivity period)
|
||||
@@ -296,4 +296,16 @@ var (
|
||||
Usage: "Directory for the slasher database",
|
||||
Value: cmd.DefaultDataDir(),
|
||||
}
|
||||
// BeaconDBPruning enables the pruning of beacon db.
|
||||
BeaconDBPruning = &cli.BoolFlag{
|
||||
Name: "beacon-db-pruning",
|
||||
Usage: "Enables pruning of beacon db beyond MIN_EPOCHS_FOR_BLOCK_REQUESTS duration. This is an opt-in feature," +
|
||||
" and should only be enabled if operators doesn't require historical data.",
|
||||
}
|
||||
// PrunerRetentionEpochs defines the retention period for the pruner service in terms of epochs.
|
||||
PrunerRetentionEpochs = &cli.Uint64Flag{
|
||||
Name: "pruner-retention-epochs",
|
||||
Usage: "Specifies the retention period for the pruner service in terms of epochs. " +
|
||||
"If this value is less than MIN_EPOCHS_FOR_BLOCK_REQUESTS, it will be ignored.",
|
||||
}
|
||||
)
|
||||
|
||||
@@ -82,6 +82,8 @@ var appFlags = []cli.Flag{
|
||||
flags.LocalBlockValueBoost,
|
||||
flags.MinBuilderBid,
|
||||
flags.MinBuilderDiff,
|
||||
flags.BeaconDBPruning,
|
||||
flags.PrunerRetentionEpochs,
|
||||
cmd.BackupWebhookOutputDir,
|
||||
cmd.MinimalConfigFlag,
|
||||
cmd.E2EConfigFlag,
|
||||
|
||||
@@ -133,6 +133,8 @@ var appHelpFlagGroups = []flagGroup{
|
||||
flags.MinBuilderBid,
|
||||
flags.MinBuilderDiff,
|
||||
flags.JwtId,
|
||||
flags.BeaconDBPruning,
|
||||
flags.PrunerRetentionEpochs,
|
||||
checkpoint.BlockPath,
|
||||
checkpoint.StatePath,
|
||||
checkpoint.RemoteURL,
|
||||
|
||||
Reference in New Issue
Block a user