feat(prune): prune rocksdb account and storage history indices (#21331)

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com>
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
This commit is contained in:
YK
2026-02-02 02:42:17 +08:00
committed by GitHub
parent 3d699ac9c6
commit 9f8c22e2c3
11 changed files with 1396 additions and 34 deletions

View File

@@ -42,11 +42,15 @@ rayon.workspace = true
tokio.workspace = true
rustc-hash.workspace = true
[features]
rocksdb = ["reth-provider/rocksdb"]
[dev-dependencies]
# reth
reth-db = { workspace = true, features = ["test-utils"] }
reth-stages = { workspace = true, features = ["test-utils"] }
reth-stages = { workspace = true, features = ["test-utils", "rocksdb"] }
reth-primitives-traits = { workspace = true, features = ["arbitrary"] }
reth-storage-api.workspace = true
reth-testing-utils.workspace = true
reth-tracing.workspace = true

View File

@@ -7,10 +7,10 @@ use reth_primitives_traits::NodePrimitives;
use reth_provider::{
providers::StaticFileProvider, BlockReader, ChainStateBlockReader, DBProvider,
DatabaseProviderFactory, NodePrimitivesProvider, PruneCheckpointReader, PruneCheckpointWriter,
StageCheckpointReader, StaticFileProviderFactory, StorageSettingsCache,
RocksDBProviderFactory, StageCheckpointReader, StaticFileProviderFactory,
};
use reth_prune_types::PruneModes;
use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
use reth_storage_api::{ChangeSetReader, StorageChangeSetReader, StorageSettingsCache};
use std::time::Duration;
use tokio::sync::watch;
@@ -85,6 +85,7 @@ impl PrunerBuilder {
+ StageCheckpointReader
+ ChangeSetReader
+ StorageChangeSetReader
+ RocksDBProviderFactory
+ StaticFileProviderFactory<
Primitives: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>,
>,
@@ -121,7 +122,8 @@ impl PrunerBuilder {
+ StorageSettingsCache
+ StageCheckpointReader
+ ChangeSetReader
+ StorageChangeSetReader,
+ StorageChangeSetReader
+ RocksDBProviderFactory,
{
let segments = SegmentSet::<Provider>::from_components(static_file_provider, self.segments);

View File

@@ -7,10 +7,11 @@ use reth_db_api::{table::Value, transaction::DbTxMut};
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
providers::StaticFileProvider, BlockReader, ChainStateBlockReader, DBProvider,
PruneCheckpointReader, PruneCheckpointWriter, StaticFileProviderFactory, StorageSettingsCache,
PruneCheckpointReader, PruneCheckpointWriter, RocksDBProviderFactory,
StaticFileProviderFactory,
};
use reth_prune_types::PruneModes;
use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
use reth_storage_api::{ChangeSetReader, StorageChangeSetReader, StorageSettingsCache};
/// Collection of [`Segment`]. Thread-safe, allocated on the heap.
#[derive(Debug)]
@@ -55,7 +56,8 @@ where
+ ChainStateBlockReader
+ StorageSettingsCache
+ ChangeSetReader
+ StorageChangeSetReader,
+ StorageChangeSetReader
+ RocksDBProviderFactory,
{
/// Creates a [`SegmentSet`] from an existing components, such as [`StaticFileProvider`] and
/// [`PruneModes`].

View File

@@ -10,20 +10,20 @@ use alloy_primitives::BlockNumber;
use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut};
use reth_provider::{
changeset_walker::StaticFileAccountChangesetWalker, DBProvider, EitherWriter,
StaticFileProviderFactory, StorageSettingsCache,
RocksDBProviderFactory, StaticFileProviderFactory,
};
use reth_prune_types::{
PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
use reth_static_file_types::StaticFileSegment;
use reth_storage_api::ChangeSetReader;
use reth_storage_api::{ChangeSetReader, StorageSettingsCache};
use rustc_hash::FxHashMap;
use tracing::{instrument, trace};
/// Number of account history tables to prune in one step.
///
/// Account History consists of two tables: [`tables::AccountChangeSets`] and
/// [`tables::AccountsHistory`]. We want to prune them to the same block number.
/// Account History consists of two tables: [`tables::AccountChangeSets`] (either in database or
/// static files) and [`tables::AccountsHistory`]. We want to prune them to the same block number.
const ACCOUNT_HISTORY_TABLES_TO_PRUNE: usize = 2;
#[derive(Debug)]
@@ -42,7 +42,8 @@ where
Provider: DBProvider<Tx: DbTxMut>
+ StaticFileProviderFactory
+ StorageSettingsCache
+ ChangeSetReader,
+ ChangeSetReader
+ RocksDBProviderFactory,
{
fn segment(&self) -> PruneSegment {
PruneSegment::AccountHistory
@@ -67,7 +68,13 @@ where
};
let range_end = *range.end();
// Check where account changesets are stored
// Check where account history indices are stored
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().account_history_in_rocksdb {
return self.prune_rocksdb(provider, input, range, range_end);
}
// Check where account changesets are stored (MDBX path)
if EitherWriter::account_changesets_destination(provider).is_static_file() {
self.prune_static_files(provider, input, range, range_end)
} else {
@@ -94,6 +101,8 @@ impl AccountHistory {
input.limiter
};
// The limiter may already be exhausted from a previous segment in the same prune run.
// Early exit avoids unnecessary iteration when no budget remains.
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
@@ -101,11 +110,14 @@ impl AccountHistory {
))
}
// The size of this map it's limited by `prune_delete_limit * blocks_since_last_run /
// ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with the current defaults it's usually `3500 * 5 /
// 2`, so 8750 entries. Each entry is `160 bit + 64 bit`, so the total size should be up to
// ~0.25MB + some hashmap overhead. `blocks_since_last_run` is additionally limited by the
// `max_reorg_depth`, so no OOM is expected here.
// Deleted account changeset keys (account addresses) with the highest block number deleted
// for that key.
//
// The size of this map is limited by `prune_delete_limit * blocks_since_last_run /
// ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with current default it's usually `3500 * 5
// / 2`, so 8750 entries. Each entry is `160 bit + 64 bit`, so the total
// size should be up to ~0.25MB + some hashmap overhead. `blocks_since_last_run` is
// additionally limited by the `max_reorg_depth`, so no OOM is expected here.
let mut highest_deleted_accounts = FxHashMap::default();
let mut last_changeset_pruned_block = None;
let mut pruned_changesets = 0;
@@ -124,8 +136,8 @@ impl AccountHistory {
limiter.increment_deleted_entries_count();
}
// Delete static file jars below the pruned block
if let Some(last_block) = last_changeset_pruned_block {
// Delete static file jars only when fully processed
if done && let Some(last_block) = last_changeset_pruned_block {
provider
.static_file_provider()
.delete_segment_below_block(StaticFileSegment::AccountChangeSets, last_block + 1)?;
@@ -210,6 +222,106 @@ impl AccountHistory {
)
.map_err(Into::into)
}
/// Prunes account history when indices are stored in `RocksDB`.
///
/// Reads account changesets from static files and prunes the corresponding
/// `RocksDB` history shards.
#[cfg(all(unix, feature = "rocksdb"))]
fn prune_rocksdb<Provider>(
&self,
provider: &Provider,
input: PruneInput,
range: std::ops::RangeInclusive<BlockNumber>,
range_end: BlockNumber,
) -> Result<SegmentOutput, PrunerError>
where
Provider: DBProvider + StaticFileProviderFactory + ChangeSetReader + RocksDBProviderFactory,
{
use reth_provider::PruneShardOutcome;
// Unlike MDBX path, we don't divide the limit by 2 because RocksDB path only prunes
// history shards (no separate changeset table to delete from). The changesets are in
// static files which are deleted separately.
let mut limiter = input.limiter;
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
))
}
let mut highest_deleted_accounts = FxHashMap::default();
let mut last_changeset_pruned_block = None;
let mut changesets_processed = 0usize;
let mut done = true;
// Walk account changesets from static files using a streaming iterator.
// For each changeset, track the highest block number seen for each address
// to determine which history shard entries need pruning.
let walker = StaticFileAccountChangesetWalker::new(provider, range);
for result in walker {
if limiter.is_limit_reached() {
done = false;
break;
}
let (block_number, changeset) = result?;
highest_deleted_accounts.insert(changeset.address, block_number);
last_changeset_pruned_block = Some(block_number);
changesets_processed += 1;
limiter.increment_deleted_entries_count();
}
trace!(target: "pruner", processed = %changesets_processed, %done, "Scanned account changesets from static files");
let last_changeset_pruned_block = last_changeset_pruned_block
.map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
.unwrap_or(range_end);
// Prune RocksDB history shards for affected accounts
let mut deleted_shards = 0usize;
let mut updated_shards = 0usize;
// Sort by address for better RocksDB cache locality
let mut sorted_accounts: Vec<_> = highest_deleted_accounts.into_iter().collect();
sorted_accounts.sort_unstable_by_key(|(addr, _)| *addr);
provider.with_rocksdb_batch(|mut batch| {
for (address, highest_block) in &sorted_accounts {
let prune_to = (*highest_block).min(last_changeset_pruned_block);
match batch.prune_account_history_to(*address, prune_to)? {
PruneShardOutcome::Deleted => deleted_shards += 1,
PruneShardOutcome::Updated => updated_shards += 1,
PruneShardOutcome::Unchanged => {}
}
}
Ok(((), Some(batch.into_inner())))
})?;
trace!(target: "pruner", deleted = deleted_shards, updated = updated_shards, %done, "Pruned account history (RocksDB indices)");
// Delete static file jars only when fully processed. During provider.commit(), RocksDB
// batch is committed before the MDBX checkpoint. If crash occurs after RocksDB commit
// but before MDBX commit, on restart the pruner checkpoint indicates data needs
// re-pruning, but the RocksDB shards are already pruned - this is safe because pruning
// is idempotent (re-pruning already-pruned shards is a no-op).
if done {
provider.static_file_provider().delete_segment_below_block(
StaticFileSegment::AccountChangeSets,
last_changeset_pruned_block + 1,
)?;
}
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
pruned: changesets_processed + deleted_shards + updated_shards,
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,
}),
})
}
}
#[cfg(test)]
@@ -539,4 +651,272 @@ mod tests {
test_prune(998, 2, (PruneProgress::Finished, 1000));
test_prune(1400, 3, (PruneProgress::Finished, 804));
}
#[cfg(all(unix, feature = "rocksdb"))]
#[test]
fn prune_rocksdb_path() {
use reth_db_api::models::ShardedKey;
use reth_provider::{RocksDBProviderFactory, StaticFileProviderFactory};
let db = TestStageDB::default();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=100,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
let (changesets, _) = random_changeset_range(
&mut rng,
blocks.iter(),
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
0..0,
0..0,
);
db.insert_changesets_to_static_files(changesets.clone(), None)
.expect("insert changesets to static files");
let mut account_blocks: BTreeMap<_, Vec<u64>> = BTreeMap::new();
for (block, changeset) in changesets.iter().enumerate() {
for (address, _, _) in changeset {
account_blocks.entry(*address).or_default().push(block as u64);
}
}
let rocksdb = db.factory.rocksdb_provider();
let mut batch = rocksdb.batch();
for (address, block_numbers) in &account_blocks {
let shard = BlockNumberList::new_pre_sorted(block_numbers.iter().copied());
batch
.put::<tables::AccountsHistory>(ShardedKey::new(*address, u64::MAX), &shard)
.unwrap();
}
batch.commit().unwrap();
for (address, expected_blocks) in &account_blocks {
let shards = rocksdb.account_history_shards(*address).unwrap();
assert_eq!(shards.len(), 1);
assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), *expected_blocks);
}
let to_block: BlockNumber = 50;
let prune_mode = PruneMode::Before(to_block);
let input =
PruneInput { previous_checkpoint: None, to_block, limiter: PruneLimiter::default() };
let segment = AccountHistory::new(prune_mode);
db.factory.set_storage_settings_cache(
StorageSettings::default()
.with_account_changesets_in_static_files(true)
.with_account_history_in_rocksdb(true),
);
let provider = db.factory.database_provider_rw().unwrap();
let result = segment.prune(&provider, input).unwrap();
provider.commit().expect("commit");
assert_matches!(
result,
SegmentOutput { progress: PruneProgress::Finished, pruned, checkpoint: Some(_) }
if pruned > 0
);
for (address, original_blocks) in &account_blocks {
let shards = rocksdb.account_history_shards(*address).unwrap();
let expected_blocks: Vec<u64> =
original_blocks.iter().copied().filter(|b| *b > to_block).collect();
if expected_blocks.is_empty() {
assert!(
shards.is_empty(),
"Expected no shards for address {address:?} after pruning"
);
} else {
assert_eq!(shards.len(), 1, "Expected 1 shard for address {address:?}");
assert_eq!(
shards[0].1.iter().collect::<Vec<_>>(),
expected_blocks,
"Shard blocks mismatch for address {address:?}"
);
}
}
let static_file_provider = db.factory.static_file_provider();
let highest_block = static_file_provider.get_highest_static_file_block(
reth_static_file_types::StaticFileSegment::AccountChangeSets,
);
if let Some(block) = highest_block {
assert!(
block > to_block,
"Static files should only contain blocks above to_block ({to_block}), got {block}"
);
}
}
/// Tests that when a limiter stops mid-block (with multiple changes for the same block),
/// the checkpoint is set to `block_number - 1` to avoid dangling index entries.
#[test]
fn prune_partial_progress_mid_block() {
use alloy_primitives::{Address, U256};
use reth_primitives_traits::Account;
use reth_testing_utils::generators::ChangeSet;
let db = TestStageDB::default();
let mut rng = generators::rng();
// Create blocks 0..=10
let blocks = random_block_range(
&mut rng,
0..=10,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
// Create specific changesets where block 5 has 4 account changes
let addr1 = Address::with_last_byte(1);
let addr2 = Address::with_last_byte(2);
let addr3 = Address::with_last_byte(3);
let addr4 = Address::with_last_byte(4);
let addr5 = Address::with_last_byte(5);
let account = Account { nonce: 1, balance: U256::from(100), bytecode_hash: None };
// Build changesets: blocks 0-4 have 1 change each, block 5 has 4 changes, block 6 has 1
let changesets: Vec<ChangeSet> = vec![
vec![(addr1, account, vec![])], // block 0
vec![(addr1, account, vec![])], // block 1
vec![(addr1, account, vec![])], // block 2
vec![(addr1, account, vec![])], // block 3
vec![(addr1, account, vec![])], // block 4
// block 5: 4 different account changes (sorted by address for consistency)
vec![
(addr1, account, vec![]),
(addr2, account, vec![]),
(addr3, account, vec![]),
(addr4, account, vec![]),
],
vec![(addr5, account, vec![])], // block 6
];
db.insert_changesets(changesets.clone(), None).expect("insert changesets");
db.insert_history(changesets.clone(), None).expect("insert history");
// Total changesets: 5 (blocks 0-4) + 4 (block 5) + 1 (block 6) = 10
assert_eq!(
db.table::<tables::AccountChangeSets>().unwrap().len(),
changesets.iter().flatten().count()
);
let prune_mode = PruneMode::Before(10);
// Set limiter to stop after 7 entries (mid-block 5: 5 from blocks 0-4, then 2 of 4 from
// block 5). Due to ACCOUNT_HISTORY_TABLES_TO_PRUNE=2, actual limit is 7/2=3
// changesets. So we'll process blocks 0, 1, 2 (3 changesets), stopping before block
// 3. Actually, let's use a higher limit to reach block 5. With limit=14, we get 7
// changeset slots. Blocks 0-4 use 5 slots, leaving 2 for block 5 (which has 4), so
// we stop mid-block 5.
let deleted_entries_limit = 14; // 14/2 = 7 changeset entries before limit
let limiter = PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
let input = PruneInput { previous_checkpoint: None, to_block: 10, limiter };
let segment = AccountHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(
StorageSettings::default().with_account_changesets_in_static_files(false),
);
let result = segment.prune(&provider, input).unwrap();
// Should report that there's more data
assert!(!result.progress.is_finished(), "Expected HasMoreData since we stopped mid-block");
// Save checkpoint and commit
segment
.save_checkpoint(&provider, result.checkpoint.unwrap().as_prune_checkpoint(prune_mode))
.unwrap();
provider.commit().expect("commit");
// Verify checkpoint is set to block 4 (not 5), since block 5 is incomplete
let checkpoint = db
.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::AccountHistory)
.unwrap()
.expect("checkpoint should exist");
assert_eq!(
checkpoint.block_number,
Some(4),
"Checkpoint should be block 4 (block before incomplete block 5)"
);
// Verify remaining changesets (block 5 and 6 should still have entries)
let remaining_changesets = db.table::<tables::AccountChangeSets>().unwrap();
// After pruning blocks 0-4, remaining should be block 5 (4 entries) + block 6 (1 entry) = 5
// But since we stopped mid-block 5, some of block 5 might be pruned
// However, checkpoint is 4, so on re-run we should re-process from block 5
assert!(
!remaining_changesets.is_empty(),
"Should have remaining changesets for blocks 5-6"
);
// Verify no dangling history indices for blocks that weren't fully pruned
// The indices for block 5 should still reference blocks <= 5 appropriately
let history = db.table::<tables::AccountsHistory>().unwrap();
for (key, _blocks) in &history {
// All blocks in the history should be > checkpoint block number
// OR the shard's highest_block_number should be > checkpoint
assert!(
key.highest_block_number > 4,
"Found stale history shard with highest_block_number {} <= checkpoint 4",
key.highest_block_number
);
}
// Run prune again to complete - should finish processing block 5 and 6
let input2 = PruneInput {
previous_checkpoint: Some(checkpoint),
to_block: 10,
limiter: PruneLimiter::default().set_deleted_entries_limit(100), // high limit
};
let provider2 = db.factory.database_provider_rw().unwrap();
provider2.set_storage_settings_cache(
StorageSettings::default().with_account_changesets_in_static_files(false),
);
let result2 = segment.prune(&provider2, input2).unwrap();
assert!(result2.progress.is_finished(), "Second run should complete");
segment
.save_checkpoint(
&provider2,
result2.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
)
.unwrap();
provider2.commit().expect("commit");
// Verify final checkpoint
let final_checkpoint = db
.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::AccountHistory)
.unwrap()
.expect("checkpoint should exist");
// Should now be at block 6 (the last block with changesets)
assert_eq!(final_checkpoint.block_number, Some(6), "Final checkpoint should be at block 6");
// All changesets should be pruned
let final_changesets = db.table::<tables::AccountChangeSets>().unwrap();
assert!(final_changesets.is_empty(), "All changesets up to block 10 should be pruned");
}
}

View File

@@ -12,7 +12,7 @@ use reth_db_api::{
tables,
transaction::DbTxMut,
};
use reth_provider::{DBProvider, EitherWriter, StaticFileProviderFactory};
use reth_provider::{DBProvider, EitherWriter, RocksDBProviderFactory, StaticFileProviderFactory};
use reth_prune_types::{
PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
@@ -43,7 +43,8 @@ where
Provider: DBProvider<Tx: DbTxMut>
+ StaticFileProviderFactory
+ StorageChangeSetReader
+ StorageSettingsCache,
+ StorageSettingsCache
+ RocksDBProviderFactory,
{
fn segment(&self) -> PruneSegment {
PruneSegment::StorageHistory
@@ -68,6 +69,13 @@ where
};
let range_end = *range.end();
// Check where storage history indices are stored
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storages_history_in_rocksdb {
return self.prune_rocksdb(provider, input, range, range_end);
}
// Check where storage changesets are stored (MDBX path)
if EitherWriter::storage_changesets_destination(provider).is_static_file() {
self.prune_static_files(provider, input, range, range_end)
} else {
@@ -94,6 +102,8 @@ impl StorageHistory {
input.limiter
};
// The limiter may already be exhausted from a previous segment in the same prune run.
// Early exit avoids unnecessary iteration when no budget remains.
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
@@ -126,8 +136,8 @@ impl StorageHistory {
limiter.increment_deleted_entries_count();
}
// Delete static file jars below the pruned block
if let Some(last_block) = last_changeset_pruned_block {
// Delete static file jars only when fully processed
if done && let Some(last_block) = last_changeset_pruned_block {
provider
.static_file_provider()
.delete_segment_below_block(StaticFileSegment::StorageChangeSets, last_block + 1)?;
@@ -216,6 +226,107 @@ impl StorageHistory {
)
.map_err(Into::into)
}
/// Prunes storage history when indices are stored in `RocksDB`.
///
/// Reads storage changesets from static files and prunes the corresponding
/// `RocksDB` history shards.
#[cfg(all(unix, feature = "rocksdb"))]
fn prune_rocksdb<Provider>(
&self,
provider: &Provider,
input: PruneInput,
range: std::ops::RangeInclusive<BlockNumber>,
range_end: BlockNumber,
) -> Result<SegmentOutput, PrunerError>
where
Provider: DBProvider + StaticFileProviderFactory + RocksDBProviderFactory,
{
use reth_provider::PruneShardOutcome;
let mut limiter = input.limiter;
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
))
}
let mut highest_deleted_storages: FxHashMap<_, _> = FxHashMap::default();
let mut last_changeset_pruned_block = None;
let mut changesets_processed = 0usize;
let mut done = true;
// Walk storage changesets from static files using a streaming iterator.
// For each changeset, track the highest block number seen for each (address, storage_key)
// pair to determine which history shard entries need pruning.
let walker = provider.static_file_provider().walk_storage_changeset_range(range);
for result in walker {
if limiter.is_limit_reached() {
done = false;
break;
}
let (block_address, entry) = result?;
let block_number = block_address.block_number();
let address = block_address.address();
highest_deleted_storages.insert((address, entry.key), block_number);
last_changeset_pruned_block = Some(block_number);
changesets_processed += 1;
limiter.increment_deleted_entries_count();
}
trace!(target: "pruner", processed = %changesets_processed, %done, "Scanned storage changesets from static files");
let last_changeset_pruned_block = last_changeset_pruned_block
.map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
.unwrap_or(range_end);
// Prune RocksDB history shards for affected storage slots
let mut deleted_shards = 0usize;
let mut updated_shards = 0usize;
// Sort by (address, storage_key) for better RocksDB cache locality
let mut sorted_storages: Vec<_> = highest_deleted_storages.into_iter().collect();
sorted_storages.sort_unstable_by_key(|((addr, key), _)| (*addr, *key));
provider.with_rocksdb_batch(|mut batch| {
for ((address, storage_key), highest_block) in &sorted_storages {
let prune_to = (*highest_block).min(last_changeset_pruned_block);
match batch.prune_storage_history_to(*address, *storage_key, prune_to)? {
PruneShardOutcome::Deleted => deleted_shards += 1,
PruneShardOutcome::Updated => updated_shards += 1,
PruneShardOutcome::Unchanged => {}
}
}
Ok(((), Some(batch.into_inner())))
})?;
trace!(target: "pruner", deleted = deleted_shards, updated = updated_shards, %done, "Pruned storage history (RocksDB indices)");
// Delete static file jars only when fully processed. During provider.commit(), RocksDB
// batch is committed before the MDBX checkpoint. If crash occurs after RocksDB commit
// but before MDBX commit, on restart the pruner checkpoint indicates data needs
// re-pruning, but the RocksDB shards are already pruned - this is safe because pruning
// is idempotent (re-pruning already-pruned shards is a no-op).
if done {
provider.static_file_provider().delete_segment_below_block(
StaticFileSegment::StorageChangeSets,
last_changeset_pruned_block + 1,
)?;
}
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
pruned: changesets_processed + deleted_shards + updated_shards,
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,
}),
})
}
}
#[cfg(test)]
@@ -553,4 +664,270 @@ mod tests {
test_prune(998, 2, (PruneProgress::Finished, 500));
test_prune(1200, 3, (PruneProgress::Finished, 202));
}
/// Tests that when a limiter stops mid-block (with multiple storage changes for the same
/// block), the checkpoint is set to `block_number - 1` to avoid dangling index entries.
#[test]
fn prune_partial_progress_mid_block() {
use alloy_primitives::{Address, U256};
use reth_primitives_traits::Account;
use reth_testing_utils::generators::ChangeSet;
let db = TestStageDB::default();
let mut rng = generators::rng();
// Create blocks 0..=10
let blocks = random_block_range(
&mut rng,
0..=10,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
// Create specific changesets where block 5 has 4 storage changes
let addr1 = Address::with_last_byte(1);
let addr2 = Address::with_last_byte(2);
let account = Account { nonce: 1, balance: U256::from(100), bytecode_hash: None };
// Create storage entries
let storage_entry = |key: u8| reth_primitives_traits::StorageEntry {
key: B256::with_last_byte(key),
value: U256::from(100),
};
// Build changesets: blocks 0-4 have 1 storage change each, block 5 has 4 changes, block 6
// has 1. Entries within each account must be sorted by key.
let changesets: Vec<ChangeSet> = vec![
vec![(addr1, account, vec![storage_entry(1)])], // block 0
vec![(addr1, account, vec![storage_entry(1)])], // block 1
vec![(addr1, account, vec![storage_entry(1)])], // block 2
vec![(addr1, account, vec![storage_entry(1)])], // block 3
vec![(addr1, account, vec![storage_entry(1)])], // block 4
// block 5: 4 different storage changes (2 addresses, each with 2 storage slots)
// Sorted by address, then by storage key within each address
vec![
(addr1, account, vec![storage_entry(1), storage_entry(2)]),
(addr2, account, vec![storage_entry(1), storage_entry(2)]),
],
vec![(addr1, account, vec![storage_entry(3)])], // block 6
];
db.insert_changesets(changesets.clone(), None).expect("insert changesets");
db.insert_history(changesets.clone(), None).expect("insert history");
// Total storage changesets
let total_storage_entries: usize =
changesets.iter().flat_map(|c| c.iter()).map(|(_, _, entries)| entries.len()).sum();
assert_eq!(db.table::<tables::StorageChangeSets>().unwrap().len(), total_storage_entries);
let prune_mode = PruneMode::Before(10);
// Set limiter to stop mid-block 5
// With STORAGE_HISTORY_TABLES_TO_PRUNE=2, limit=14 gives us 7 storage entries before limit
// Blocks 0-4 use 5 slots, leaving 2 for block 5 (which has 4), so we stop mid-block 5
let deleted_entries_limit = 14; // 14/2 = 7 storage entries before limit
let limiter = PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
let input = PruneInput { previous_checkpoint: None, to_block: 10, limiter };
let segment = StorageHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(
StorageSettings::default().with_storage_changesets_in_static_files(false),
);
let result = segment.prune(&provider, input).unwrap();
// Should report that there's more data
assert!(!result.progress.is_finished(), "Expected HasMoreData since we stopped mid-block");
// Save checkpoint and commit
segment
.save_checkpoint(&provider, result.checkpoint.unwrap().as_prune_checkpoint(prune_mode))
.unwrap();
provider.commit().expect("commit");
// Verify checkpoint is set to block 4 (not 5), since block 5 is incomplete
let checkpoint = db
.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::StorageHistory)
.unwrap()
.expect("checkpoint should exist");
assert_eq!(
checkpoint.block_number,
Some(4),
"Checkpoint should be block 4 (block before incomplete block 5)"
);
// Verify remaining changesets
let remaining_changesets = db.table::<tables::StorageChangeSets>().unwrap();
assert!(
!remaining_changesets.is_empty(),
"Should have remaining changesets for blocks 5-6"
);
// Verify no dangling history indices for blocks that weren't fully pruned
let history = db.table::<tables::StoragesHistory>().unwrap();
for (key, _blocks) in &history {
assert!(
key.sharded_key.highest_block_number > 4,
"Found stale history shard with highest_block_number {} <= checkpoint 4",
key.sharded_key.highest_block_number
);
}
// Run prune again to complete - should finish processing block 5 and 6
let input2 = PruneInput {
previous_checkpoint: Some(checkpoint),
to_block: 10,
limiter: PruneLimiter::default().set_deleted_entries_limit(100), // high limit
};
let provider2 = db.factory.database_provider_rw().unwrap();
provider2.set_storage_settings_cache(
StorageSettings::default().with_storage_changesets_in_static_files(false),
);
let result2 = segment.prune(&provider2, input2).unwrap();
assert!(result2.progress.is_finished(), "Second run should complete");
segment
.save_checkpoint(
&provider2,
result2.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
)
.unwrap();
provider2.commit().expect("commit");
// Verify final checkpoint
let final_checkpoint = db
.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::StorageHistory)
.unwrap()
.expect("checkpoint should exist");
// Should now be at block 6 (the last block with changesets)
assert_eq!(final_checkpoint.block_number, Some(6), "Final checkpoint should be at block 6");
// All changesets should be pruned
let final_changesets = db.table::<tables::StorageChangeSets>().unwrap();
assert!(final_changesets.is_empty(), "All changesets up to block 10 should be pruned");
}
#[cfg(all(unix, feature = "rocksdb"))]
#[test]
fn prune_rocksdb() {
use reth_db_api::models::storage_sharded_key::StorageShardedKey;
use reth_provider::RocksDBProviderFactory;
use reth_storage_api::StorageSettings;
let db = TestStageDB::default();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=100,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
let (changesets, _) = random_changeset_range(
&mut rng,
blocks.iter(),
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
1..2,
1..2,
);
db.insert_changesets_to_static_files(changesets.clone(), None)
.expect("insert changesets to static files");
let mut storage_indices: BTreeMap<(alloy_primitives::Address, B256), Vec<u64>> =
BTreeMap::new();
for (block, changeset) in changesets.iter().enumerate() {
for (address, _, storage_entries) in changeset {
for entry in storage_entries {
storage_indices.entry((*address, entry.key)).or_default().push(block as u64);
}
}
}
{
let rocksdb = db.factory.rocksdb_provider();
let mut batch = rocksdb.batch();
for ((address, storage_key), block_numbers) in &storage_indices {
let shard = BlockNumberList::new_pre_sorted(block_numbers.clone());
batch
.put::<tables::StoragesHistory>(
StorageShardedKey::last(*address, *storage_key),
&shard,
)
.expect("insert storage history shard");
}
batch.commit().expect("commit rocksdb batch");
}
{
let rocksdb = db.factory.rocksdb_provider();
for (address, storage_key) in storage_indices.keys() {
let shards = rocksdb.storage_history_shards(*address, *storage_key).unwrap();
assert!(!shards.is_empty(), "RocksDB should contain storage history before prune");
}
}
let to_block = 50u64;
let prune_mode = PruneMode::Before(to_block);
let input =
PruneInput { previous_checkpoint: None, to_block, limiter: PruneLimiter::default() };
let segment = StorageHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(
StorageSettings::default()
.with_storage_changesets_in_static_files(true)
.with_storages_history_in_rocksdb(true),
);
let result = segment.prune(&provider, input).unwrap();
provider.commit().expect("commit");
assert_matches!(
result,
SegmentOutput { progress: PruneProgress::Finished, checkpoint: Some(_), .. }
);
{
let rocksdb = db.factory.rocksdb_provider();
for ((address, storage_key), block_numbers) in &storage_indices {
let shards = rocksdb.storage_history_shards(*address, *storage_key).unwrap();
let remaining_blocks: Vec<u64> =
block_numbers.iter().copied().filter(|&b| b > to_block).collect();
if remaining_blocks.is_empty() {
assert!(
shards.is_empty(),
"Shard for {:?}/{:?} should be deleted when all blocks pruned",
address,
storage_key
);
} else {
assert!(!shards.is_empty(), "Shard should exist with remaining blocks");
let actual_blocks: Vec<u64> =
shards.iter().flat_map(|(_, list)| list.iter()).collect();
assert_eq!(
actual_blocks, remaining_blocks,
"RocksDB shard should only contain blocks > {}",
to_block
);
}
}
}
}
}

View File

@@ -2,7 +2,7 @@ use reth_db_api::{table::Value, transaction::DbTxMut};
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
BlockReader, ChainStateBlockReader, DBProvider, PruneCheckpointReader, PruneCheckpointWriter,
StageCheckpointReader, StaticFileProviderFactory, StorageSettingsCache,
RocksDBProviderFactory, StageCheckpointReader, StaticFileProviderFactory,
};
use reth_prune::{
PruneMode, PruneModes, PruneSegment, PrunerBuilder, SegmentOutput, SegmentOutputCheckpoint,
@@ -10,7 +10,7 @@ use reth_prune::{
use reth_stages_api::{
ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
};
use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
use reth_storage_api::{ChangeSetReader, StorageChangeSetReader, StorageSettingsCache};
use tracing::info;
/// The prune stage that runs the pruner with the provided prune modes.
@@ -49,7 +49,8 @@ where
Primitives: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>,
> + StorageSettingsCache
+ ChangeSetReader
+ StorageChangeSetReader,
+ StorageChangeSetReader
+ RocksDBProviderFactory,
{
fn id(&self) -> StageId {
StageId::Prune
@@ -156,7 +157,8 @@ where
Primitives: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>,
> + StorageSettingsCache
+ ChangeSetReader
+ StorageChangeSetReader,
+ StorageChangeSetReader
+ RocksDBProviderFactory,
{
fn id(&self) -> StageId {
StageId::PruneSenderRecovery

View File

@@ -21,8 +21,8 @@ pub mod providers;
pub use providers::{
DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, HistoricalStateProvider,
HistoricalStateProviderRef, LatestStateProvider, LatestStateProviderRef, ProviderFactory,
SaveBlocksMode, StaticFileAccess, StaticFileProviderBuilder, StaticFileWriteCtx,
StaticFileWriter,
PruneShardOutcome, SaveBlocksMode, StaticFileAccess, StaticFileProviderBuilder,
StaticFileWriteCtx, StaticFileWriter,
};
pub mod changeset_walker;

View File

@@ -39,8 +39,8 @@ pub use consistent::ConsistentProvider;
pub(crate) mod rocksdb;
pub use rocksdb::{
RocksDBBatch, RocksDBBuilder, RocksDBIter, RocksDBProvider, RocksDBRawIter, RocksDBStats,
RocksDBTableStats, RocksTx,
PruneShardOutcome, RocksDBBatch, RocksDBBuilder, RocksDBIter, RocksDBProvider, RocksDBRawIter,
RocksDBStats, RocksDBTableStats, RocksTx,
};
/// Helper trait to bound [`NodeTypes`] so that combined with database they satisfy

View File

@@ -6,6 +6,6 @@ mod provider;
pub(crate) use provider::{PendingRocksDBBatches, RocksDBWriteCtx};
pub use provider::{
RocksDBBatch, RocksDBBuilder, RocksDBIter, RocksDBProvider, RocksDBRawIter, RocksDBStats,
RocksDBTableStats, RocksTx,
PruneShardOutcome, RocksDBBatch, RocksDBBuilder, RocksDBIter, RocksDBProvider, RocksDBRawIter,
RocksDBStats, RocksDBTableStats, RocksTx,
};

View File

@@ -1278,6 +1278,17 @@ impl RocksDBProvider {
}
}
/// Outcome of pruning a history shard in `RocksDB`.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PruneShardOutcome {
/// Shard was deleted entirely.
Deleted,
/// Shard was updated with filtered block numbers.
Updated,
/// Shard was unchanged (no blocks <= `to_block`).
Unchanged,
}
/// Handle for building a batch of operations atomically.
///
/// Uses `WriteBatchWithTransaction` for atomic writes without full transaction overhead.
@@ -1617,6 +1628,116 @@ impl<'a> RocksDBBatch<'a> {
Ok(())
}
/// Prunes history shards, removing blocks <= `to_block`.
///
/// Generic implementation for both account and storage history pruning.
/// Mirrors MDBX `prune_shard` semantics. After pruning, the last remaining shard
/// (if any) will have the sentinel key (`u64::MAX`).
#[allow(clippy::too_many_arguments)]
fn prune_history_shards_inner<K>(
&mut self,
shards: Vec<(K, BlockNumberList)>,
to_block: BlockNumber,
get_highest: impl Fn(&K) -> u64,
is_sentinel: impl Fn(&K) -> bool,
delete_shard: impl Fn(&mut Self, K) -> ProviderResult<()>,
put_shard: impl Fn(&mut Self, K, &BlockNumberList) -> ProviderResult<()>,
create_sentinel: impl Fn() -> K,
) -> ProviderResult<PruneShardOutcome>
where
K: Clone,
{
if shards.is_empty() {
return Ok(PruneShardOutcome::Unchanged);
}
let mut deleted = false;
let mut updated = false;
let mut last_remaining: Option<(K, BlockNumberList)> = None;
for (key, block_list) in shards {
if !is_sentinel(&key) && get_highest(&key) <= to_block {
delete_shard(self, key)?;
deleted = true;
} else {
let original_len = block_list.len();
let filtered =
BlockNumberList::new_pre_sorted(block_list.iter().filter(|&b| b > to_block));
if filtered.is_empty() {
delete_shard(self, key)?;
deleted = true;
} else if filtered.len() < original_len {
put_shard(self, key.clone(), &filtered)?;
last_remaining = Some((key, filtered));
updated = true;
} else {
last_remaining = Some((key, block_list));
}
}
}
if let Some((last_key, last_value)) = last_remaining &&
!is_sentinel(&last_key)
{
delete_shard(self, last_key)?;
put_shard(self, create_sentinel(), &last_value)?;
updated = true;
}
if deleted {
Ok(PruneShardOutcome::Deleted)
} else if updated {
Ok(PruneShardOutcome::Updated)
} else {
Ok(PruneShardOutcome::Unchanged)
}
}
/// Prunes account history for the given address, removing blocks <= `to_block`.
///
/// Mirrors MDBX `prune_shard` semantics. After pruning, the last remaining shard
/// (if any) will have the sentinel key (`u64::MAX`).
pub fn prune_account_history_to(
&mut self,
address: Address,
to_block: BlockNumber,
) -> ProviderResult<PruneShardOutcome> {
let shards = self.provider.account_history_shards(address)?;
self.prune_history_shards_inner(
shards,
to_block,
|key| key.highest_block_number,
|key| key.highest_block_number == u64::MAX,
|batch, key| batch.delete::<tables::AccountsHistory>(key),
|batch, key, value| batch.put::<tables::AccountsHistory>(key, value),
|| ShardedKey::new(address, u64::MAX),
)
}
/// Prunes storage history for the given address and storage key, removing blocks <=
/// `to_block`.
///
/// Mirrors MDBX `prune_shard` semantics. After pruning, the last remaining shard
/// (if any) will have the sentinel key (`u64::MAX`).
pub fn prune_storage_history_to(
&mut self,
address: Address,
storage_key: B256,
to_block: BlockNumber,
) -> ProviderResult<PruneShardOutcome> {
let shards = self.provider.storage_history_shards(address, storage_key)?;
self.prune_history_shards_inner(
shards,
to_block,
|key| key.sharded_key.highest_block_number,
|key| key.sharded_key.highest_block_number == u64::MAX,
|batch, key| batch.delete::<tables::StoragesHistory>(key),
|batch, key, value| batch.put::<tables::StoragesHistory>(key, value),
|| StorageShardedKey::last(address, storage_key),
)
}
/// Unwinds storage history to keep only blocks `<= keep_to`.
///
/// Handles multi-shard scenarios by:
@@ -2995,4 +3116,467 @@ mod tests {
assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
}
}
// ==================== PARAMETERIZED PRUNE TESTS ====================
/// Test case for account history pruning
struct AccountPruneCase {
name: &'static str,
initial_shards: &'static [(u64, &'static [u64])],
prune_to: u64,
expected_outcome: PruneShardOutcome,
expected_shards: &'static [(u64, &'static [u64])],
}
/// Test case for storage history pruning
struct StoragePruneCase {
name: &'static str,
initial_shards: &'static [(u64, &'static [u64])],
prune_to: u64,
expected_outcome: PruneShardOutcome,
expected_shards: &'static [(u64, &'static [u64])],
}
#[test]
fn test_prune_account_history_cases() {
const MAX: u64 = u64::MAX;
const CASES: &[AccountPruneCase] = &[
AccountPruneCase {
name: "single_shard_truncate",
initial_shards: &[(MAX, &[10, 20, 30, 40])],
prune_to: 25,
expected_outcome: PruneShardOutcome::Updated,
expected_shards: &[(MAX, &[30, 40])],
},
AccountPruneCase {
name: "single_shard_delete_all",
initial_shards: &[(MAX, &[10, 20])],
prune_to: 20,
expected_outcome: PruneShardOutcome::Deleted,
expected_shards: &[],
},
AccountPruneCase {
name: "single_shard_noop",
initial_shards: &[(MAX, &[10, 20])],
prune_to: 5,
expected_outcome: PruneShardOutcome::Unchanged,
expected_shards: &[(MAX, &[10, 20])],
},
AccountPruneCase {
name: "no_shards",
initial_shards: &[],
prune_to: 100,
expected_outcome: PruneShardOutcome::Unchanged,
expected_shards: &[],
},
AccountPruneCase {
name: "multi_shard_truncate_first",
initial_shards: &[(30, &[10, 20, 30]), (MAX, &[40, 50, 60])],
prune_to: 25,
expected_outcome: PruneShardOutcome::Updated,
expected_shards: &[(30, &[30]), (MAX, &[40, 50, 60])],
},
AccountPruneCase {
name: "delete_first_shard_sentinel_unchanged",
initial_shards: &[(20, &[10, 20]), (MAX, &[30, 40])],
prune_to: 20,
expected_outcome: PruneShardOutcome::Deleted,
expected_shards: &[(MAX, &[30, 40])],
},
AccountPruneCase {
name: "multi_shard_delete_all_but_last",
initial_shards: &[(10, &[5, 10]), (20, &[15, 20]), (MAX, &[25, 30])],
prune_to: 22,
expected_outcome: PruneShardOutcome::Deleted,
expected_shards: &[(MAX, &[25, 30])],
},
AccountPruneCase {
name: "mid_shard_preserves_key",
initial_shards: &[(50, &[10, 20, 30, 40, 50]), (MAX, &[60, 70])],
prune_to: 25,
expected_outcome: PruneShardOutcome::Updated,
expected_shards: &[(50, &[30, 40, 50]), (MAX, &[60, 70])],
},
// Equivalence tests
AccountPruneCase {
name: "equiv_delete_early_shards_keep_sentinel",
initial_shards: &[(20, &[10, 15, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
prune_to: 55,
expected_outcome: PruneShardOutcome::Deleted,
expected_shards: &[(MAX, &[60, 70])],
},
AccountPruneCase {
name: "equiv_sentinel_becomes_empty_with_prev",
initial_shards: &[(50, &[30, 40, 50]), (MAX, &[35])],
prune_to: 40,
expected_outcome: PruneShardOutcome::Deleted,
expected_shards: &[(MAX, &[50])],
},
AccountPruneCase {
name: "equiv_all_shards_become_empty",
initial_shards: &[(50, &[30, 40, 50]), (MAX, &[51])],
prune_to: 51,
expected_outcome: PruneShardOutcome::Deleted,
expected_shards: &[],
},
AccountPruneCase {
name: "equiv_non_sentinel_last_shard_promoted",
initial_shards: &[(100, &[50, 75, 100])],
prune_to: 60,
expected_outcome: PruneShardOutcome::Updated,
expected_shards: &[(MAX, &[75, 100])],
},
AccountPruneCase {
name: "equiv_filter_within_shard",
initial_shards: &[(MAX, &[10, 20, 30, 40])],
prune_to: 25,
expected_outcome: PruneShardOutcome::Updated,
expected_shards: &[(MAX, &[30, 40])],
},
AccountPruneCase {
name: "equiv_multi_shard_partial_delete",
initial_shards: &[(20, &[10, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
prune_to: 35,
expected_outcome: PruneShardOutcome::Deleted,
expected_shards: &[(50, &[40, 50]), (MAX, &[60, 70])],
},
];
let address = Address::from([0x42; 20]);
for case in CASES {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
// Setup initial shards
let mut batch = provider.batch();
for (highest, blocks) in case.initial_shards {
let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
batch
.put::<tables::AccountsHistory>(ShardedKey::new(address, *highest), &shard)
.unwrap();
}
batch.commit().unwrap();
// Prune
let mut batch = provider.batch();
let outcome = batch.prune_account_history_to(address, case.prune_to).unwrap();
batch.commit().unwrap();
// Assert outcome
assert_eq!(outcome, case.expected_outcome, "case '{}': wrong outcome", case.name);
// Assert final shards
let shards = provider.account_history_shards(address).unwrap();
assert_eq!(
shards.len(),
case.expected_shards.len(),
"case '{}': wrong shard count",
case.name
);
for (i, ((key, blocks), (exp_key, exp_blocks))) in
shards.iter().zip(case.expected_shards.iter()).enumerate()
{
assert_eq!(
key.highest_block_number, *exp_key,
"case '{}': shard {} wrong key",
case.name, i
);
assert_eq!(
blocks.iter().collect::<Vec<_>>(),
*exp_blocks,
"case '{}': shard {} wrong blocks",
case.name,
i
);
}
}
}
#[test]
fn test_prune_storage_history_cases() {
const MAX: u64 = u64::MAX;
const CASES: &[StoragePruneCase] = &[
StoragePruneCase {
name: "single_shard_truncate",
initial_shards: &[(MAX, &[10, 20, 30, 40])],
prune_to: 25,
expected_outcome: PruneShardOutcome::Updated,
expected_shards: &[(MAX, &[30, 40])],
},
StoragePruneCase {
name: "single_shard_delete_all",
initial_shards: &[(MAX, &[10, 20])],
prune_to: 20,
expected_outcome: PruneShardOutcome::Deleted,
expected_shards: &[],
},
StoragePruneCase {
name: "noop",
initial_shards: &[(MAX, &[10, 20])],
prune_to: 5,
expected_outcome: PruneShardOutcome::Unchanged,
expected_shards: &[(MAX, &[10, 20])],
},
StoragePruneCase {
name: "no_shards",
initial_shards: &[],
prune_to: 100,
expected_outcome: PruneShardOutcome::Unchanged,
expected_shards: &[],
},
StoragePruneCase {
name: "mid_shard_preserves_key",
initial_shards: &[(50, &[10, 20, 30, 40, 50]), (MAX, &[60, 70])],
prune_to: 25,
expected_outcome: PruneShardOutcome::Updated,
expected_shards: &[(50, &[30, 40, 50]), (MAX, &[60, 70])],
},
// Equivalence tests
StoragePruneCase {
name: "equiv_sentinel_promotion",
initial_shards: &[(100, &[50, 75, 100])],
prune_to: 60,
expected_outcome: PruneShardOutcome::Updated,
expected_shards: &[(MAX, &[75, 100])],
},
StoragePruneCase {
name: "equiv_delete_early_shards_keep_sentinel",
initial_shards: &[(20, &[10, 15, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
prune_to: 55,
expected_outcome: PruneShardOutcome::Deleted,
expected_shards: &[(MAX, &[60, 70])],
},
StoragePruneCase {
name: "equiv_sentinel_becomes_empty_with_prev",
initial_shards: &[(50, &[30, 40, 50]), (MAX, &[35])],
prune_to: 40,
expected_outcome: PruneShardOutcome::Deleted,
expected_shards: &[(MAX, &[50])],
},
StoragePruneCase {
name: "equiv_all_shards_become_empty",
initial_shards: &[(50, &[30, 40, 50]), (MAX, &[51])],
prune_to: 51,
expected_outcome: PruneShardOutcome::Deleted,
expected_shards: &[],
},
StoragePruneCase {
name: "equiv_filter_within_shard",
initial_shards: &[(MAX, &[10, 20, 30, 40])],
prune_to: 25,
expected_outcome: PruneShardOutcome::Updated,
expected_shards: &[(MAX, &[30, 40])],
},
StoragePruneCase {
name: "equiv_multi_shard_partial_delete",
initial_shards: &[(20, &[10, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])],
prune_to: 35,
expected_outcome: PruneShardOutcome::Deleted,
expected_shards: &[(50, &[40, 50]), (MAX, &[60, 70])],
},
];
let address = Address::from([0x42; 20]);
let storage_key = B256::from([0x01; 32]);
for case in CASES {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
// Setup initial shards
let mut batch = provider.batch();
for (highest, blocks) in case.initial_shards {
let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
let key = if *highest == MAX {
StorageShardedKey::last(address, storage_key)
} else {
StorageShardedKey::new(address, storage_key, *highest)
};
batch.put::<tables::StoragesHistory>(key, &shard).unwrap();
}
batch.commit().unwrap();
// Prune
let mut batch = provider.batch();
let outcome =
batch.prune_storage_history_to(address, storage_key, case.prune_to).unwrap();
batch.commit().unwrap();
// Assert outcome
assert_eq!(outcome, case.expected_outcome, "case '{}': wrong outcome", case.name);
// Assert final shards
let shards = provider.storage_history_shards(address, storage_key).unwrap();
assert_eq!(
shards.len(),
case.expected_shards.len(),
"case '{}': wrong shard count",
case.name
);
for (i, ((key, blocks), (exp_key, exp_blocks))) in
shards.iter().zip(case.expected_shards.iter()).enumerate()
{
assert_eq!(
key.sharded_key.highest_block_number, *exp_key,
"case '{}': shard {} wrong key",
case.name, i
);
assert_eq!(
blocks.iter().collect::<Vec<_>>(),
*exp_blocks,
"case '{}': shard {} wrong blocks",
case.name,
i
);
}
}
}
#[test]
fn test_prune_storage_history_does_not_affect_other_slots() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0x42; 20]);
let slot1 = B256::from([0x01; 32]);
let slot2 = B256::from([0x02; 32]);
// Two different storage slots
let mut batch = provider.batch();
batch
.put::<tables::StoragesHistory>(
StorageShardedKey::last(address, slot1),
&BlockNumberList::new_pre_sorted([10u64, 20]),
)
.unwrap();
batch
.put::<tables::StoragesHistory>(
StorageShardedKey::last(address, slot2),
&BlockNumberList::new_pre_sorted([30u64, 40]),
)
.unwrap();
batch.commit().unwrap();
// Prune slot1 to block 20 (deletes all)
let mut batch = provider.batch();
let outcome = batch.prune_storage_history_to(address, slot1, 20).unwrap();
batch.commit().unwrap();
assert_eq!(outcome, PruneShardOutcome::Deleted);
// slot1 should be empty
let shards1 = provider.storage_history_shards(address, slot1).unwrap();
assert!(shards1.is_empty());
// slot2 should be unchanged
let shards2 = provider.storage_history_shards(address, slot2).unwrap();
assert_eq!(shards2.len(), 1);
assert_eq!(shards2[0].1.iter().collect::<Vec<_>>(), vec![30, 40]);
}
#[test]
fn test_prune_invariants() {
// Test invariants: no empty shards, sentinel is always last
let address = Address::from([0x42; 20]);
let storage_key = B256::from([0x01; 32]);
// Test cases that exercise invariants
#[allow(clippy::type_complexity)]
let invariant_cases: &[(&[(u64, &[u64])], u64)] = &[
// Account: shards where middle becomes empty
(&[(10, &[5, 10]), (20, &[15, 20]), (u64::MAX, &[25, 30])], 20),
// Account: non-sentinel shard only, partial prune -> must become sentinel
(&[(100, &[50, 100])], 60),
];
for (initial_shards, prune_to) in invariant_cases {
// Test account history invariants
{
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let mut batch = provider.batch();
for (highest, blocks) in *initial_shards {
let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
batch
.put::<tables::AccountsHistory>(ShardedKey::new(address, *highest), &shard)
.unwrap();
}
batch.commit().unwrap();
let mut batch = provider.batch();
batch.prune_account_history_to(address, *prune_to).unwrap();
batch.commit().unwrap();
let shards = provider.account_history_shards(address).unwrap();
// Invariant 1: no empty shards
for (key, blocks) in &shards {
assert!(
!blocks.is_empty(),
"Account: empty shard at key {}",
key.highest_block_number
);
}
// Invariant 2: last shard is sentinel
if !shards.is_empty() {
let last = shards.last().unwrap();
assert_eq!(
last.0.highest_block_number,
u64::MAX,
"Account: last shard must be sentinel"
);
}
}
// Test storage history invariants
{
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let mut batch = provider.batch();
for (highest, blocks) in *initial_shards {
let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
let key = if *highest == u64::MAX {
StorageShardedKey::last(address, storage_key)
} else {
StorageShardedKey::new(address, storage_key, *highest)
};
batch.put::<tables::StoragesHistory>(key, &shard).unwrap();
}
batch.commit().unwrap();
let mut batch = provider.batch();
batch.prune_storage_history_to(address, storage_key, *prune_to).unwrap();
batch.commit().unwrap();
let shards = provider.storage_history_shards(address, storage_key).unwrap();
// Invariant 1: no empty shards
for (key, blocks) in &shards {
assert!(
!blocks.is_empty(),
"Storage: empty shard at key {}",
key.sharded_key.highest_block_number
);
}
// Invariant 2: last shard is sentinel
if !shards.is_empty() {
let last = shards.last().unwrap();
assert_eq!(
last.0.sharded_key.highest_block_number,
u64::MAX,
"Storage: last shard must be sentinel"
);
}
}
}
}
}

View File

@@ -208,3 +208,14 @@ impl<T: reth_db_api::table::Table> Iterator for RocksDBIter<T> {
None
}
}
/// Outcome of pruning a history shard in `RocksDB` (stub).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PruneShardOutcome {
/// Shard was deleted entirely.
Deleted,
/// Shard was updated with filtered block numbers.
Updated,
/// Shard was unchanged (no blocks <= `to_block`).
Unchanged,
}