From 9f8c22e2c33a7b0fd07cd2e5afcd2280e64dcb2c Mon Sep 17 00:00:00 2001 From: YK Date: Mon, 2 Feb 2026 02:42:17 +0800 Subject: [PATCH] feat(prune): prune rocksdb account and storage history indices (#21331) Co-authored-by: Georgios Konstantopoulos Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com> Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com> --- crates/prune/prune/Cargo.toml | 6 +- crates/prune/prune/src/builder.rs | 8 +- crates/prune/prune/src/segments/set.rs | 8 +- .../src/segments/user/account_history.rs | 406 +++++++++++- .../src/segments/user/storage_history.rs | 385 +++++++++++- crates/stages/stages/src/stages/prune.rs | 10 +- crates/storage/provider/src/lib.rs | 4 +- crates/storage/provider/src/providers/mod.rs | 4 +- .../provider/src/providers/rocksdb/mod.rs | 4 +- .../src/providers/rocksdb/provider.rs | 584 ++++++++++++++++++ .../provider/src/providers/rocksdb_stub.rs | 11 + 11 files changed, 1396 insertions(+), 34 deletions(-) diff --git a/crates/prune/prune/Cargo.toml b/crates/prune/prune/Cargo.toml index d86a35eaf1..a9889f7739 100644 --- a/crates/prune/prune/Cargo.toml +++ b/crates/prune/prune/Cargo.toml @@ -42,11 +42,15 @@ rayon.workspace = true tokio.workspace = true rustc-hash.workspace = true +[features] +rocksdb = ["reth-provider/rocksdb"] + [dev-dependencies] # reth reth-db = { workspace = true, features = ["test-utils"] } -reth-stages = { workspace = true, features = ["test-utils"] } +reth-stages = { workspace = true, features = ["test-utils", "rocksdb"] } reth-primitives-traits = { workspace = true, features = ["arbitrary"] } +reth-storage-api.workspace = true reth-testing-utils.workspace = true reth-tracing.workspace = true diff --git a/crates/prune/prune/src/builder.rs b/crates/prune/prune/src/builder.rs index 52b175c66a..bc96c8710c 100644 --- a/crates/prune/prune/src/builder.rs +++ b/crates/prune/prune/src/builder.rs @@ -7,10 +7,10 @@ use reth_primitives_traits::NodePrimitives; use reth_provider::{ providers::StaticFileProvider, BlockReader, ChainStateBlockReader, DBProvider, DatabaseProviderFactory, NodePrimitivesProvider, PruneCheckpointReader, PruneCheckpointWriter, - StageCheckpointReader, StaticFileProviderFactory, StorageSettingsCache, + RocksDBProviderFactory, StageCheckpointReader, StaticFileProviderFactory, }; use reth_prune_types::PruneModes; -use reth_storage_api::{ChangeSetReader, StorageChangeSetReader}; +use reth_storage_api::{ChangeSetReader, StorageChangeSetReader, StorageSettingsCache}; use std::time::Duration; use tokio::sync::watch; @@ -85,6 +85,7 @@ impl PrunerBuilder { + StageCheckpointReader + ChangeSetReader + StorageChangeSetReader + + RocksDBProviderFactory + StaticFileProviderFactory< Primitives: NodePrimitives, >, @@ -121,7 +122,8 @@ impl PrunerBuilder { + StorageSettingsCache + StageCheckpointReader + ChangeSetReader - + StorageChangeSetReader, + + StorageChangeSetReader + + RocksDBProviderFactory, { let segments = SegmentSet::::from_components(static_file_provider, self.segments); diff --git a/crates/prune/prune/src/segments/set.rs b/crates/prune/prune/src/segments/set.rs index 3e56664f26..1af7902ffc 100644 --- a/crates/prune/prune/src/segments/set.rs +++ b/crates/prune/prune/src/segments/set.rs @@ -7,10 +7,11 @@ use reth_db_api::{table::Value, transaction::DbTxMut}; use reth_primitives_traits::NodePrimitives; use reth_provider::{ providers::StaticFileProvider, BlockReader, ChainStateBlockReader, DBProvider, - PruneCheckpointReader, PruneCheckpointWriter, StaticFileProviderFactory, StorageSettingsCache, + PruneCheckpointReader, PruneCheckpointWriter, RocksDBProviderFactory, + StaticFileProviderFactory, }; use reth_prune_types::PruneModes; -use reth_storage_api::{ChangeSetReader, StorageChangeSetReader}; +use reth_storage_api::{ChangeSetReader, StorageChangeSetReader, StorageSettingsCache}; /// Collection of [`Segment`]. Thread-safe, allocated on the heap. #[derive(Debug)] @@ -55,7 +56,8 @@ where + ChainStateBlockReader + StorageSettingsCache + ChangeSetReader - + StorageChangeSetReader, + + StorageChangeSetReader + + RocksDBProviderFactory, { /// Creates a [`SegmentSet`] from an existing components, such as [`StaticFileProvider`] and /// [`PruneModes`]. diff --git a/crates/prune/prune/src/segments/user/account_history.rs b/crates/prune/prune/src/segments/user/account_history.rs index 9bdd26d111..216f0d6c0c 100644 --- a/crates/prune/prune/src/segments/user/account_history.rs +++ b/crates/prune/prune/src/segments/user/account_history.rs @@ -10,20 +10,20 @@ use alloy_primitives::BlockNumber; use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut}; use reth_provider::{ changeset_walker::StaticFileAccountChangesetWalker, DBProvider, EitherWriter, - StaticFileProviderFactory, StorageSettingsCache, + RocksDBProviderFactory, StaticFileProviderFactory, }; use reth_prune_types::{ PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint, }; use reth_static_file_types::StaticFileSegment; -use reth_storage_api::ChangeSetReader; +use reth_storage_api::{ChangeSetReader, StorageSettingsCache}; use rustc_hash::FxHashMap; use tracing::{instrument, trace}; /// Number of account history tables to prune in one step. /// -/// Account History consists of two tables: [`tables::AccountChangeSets`] and -/// [`tables::AccountsHistory`]. We want to prune them to the same block number. +/// Account History consists of two tables: [`tables::AccountChangeSets`] (either in database or +/// static files) and [`tables::AccountsHistory`]. We want to prune them to the same block number. const ACCOUNT_HISTORY_TABLES_TO_PRUNE: usize = 2; #[derive(Debug)] @@ -42,7 +42,8 @@ where Provider: DBProvider + StaticFileProviderFactory + StorageSettingsCache - + ChangeSetReader, + + ChangeSetReader + + RocksDBProviderFactory, { fn segment(&self) -> PruneSegment { PruneSegment::AccountHistory @@ -67,7 +68,13 @@ where }; let range_end = *range.end(); - // Check where account changesets are stored + // Check where account history indices are stored + #[cfg(all(unix, feature = "rocksdb"))] + if provider.cached_storage_settings().account_history_in_rocksdb { + return self.prune_rocksdb(provider, input, range, range_end); + } + + // Check where account changesets are stored (MDBX path) if EitherWriter::account_changesets_destination(provider).is_static_file() { self.prune_static_files(provider, input, range, range_end) } else { @@ -94,6 +101,8 @@ impl AccountHistory { input.limiter }; + // The limiter may already be exhausted from a previous segment in the same prune run. + // Early exit avoids unnecessary iteration when no budget remains. if limiter.is_limit_reached() { return Ok(SegmentOutput::not_done( limiter.interrupt_reason(), @@ -101,11 +110,14 @@ impl AccountHistory { )) } - // The size of this map it's limited by `prune_delete_limit * blocks_since_last_run / - // ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with the current defaults it's usually `3500 * 5 / - // 2`, so 8750 entries. Each entry is `160 bit + 64 bit`, so the total size should be up to - // ~0.25MB + some hashmap overhead. `blocks_since_last_run` is additionally limited by the - // `max_reorg_depth`, so no OOM is expected here. + // Deleted account changeset keys (account addresses) with the highest block number deleted + // for that key. + // + // The size of this map is limited by `prune_delete_limit * blocks_since_last_run / + // ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with current default it's usually `3500 * 5 + // / 2`, so 8750 entries. Each entry is `160 bit + 64 bit`, so the total + // size should be up to ~0.25MB + some hashmap overhead. `blocks_since_last_run` is + // additionally limited by the `max_reorg_depth`, so no OOM is expected here. let mut highest_deleted_accounts = FxHashMap::default(); let mut last_changeset_pruned_block = None; let mut pruned_changesets = 0; @@ -124,8 +136,8 @@ impl AccountHistory { limiter.increment_deleted_entries_count(); } - // Delete static file jars below the pruned block - if let Some(last_block) = last_changeset_pruned_block { + // Delete static file jars only when fully processed + if done && let Some(last_block) = last_changeset_pruned_block { provider .static_file_provider() .delete_segment_below_block(StaticFileSegment::AccountChangeSets, last_block + 1)?; @@ -210,6 +222,106 @@ impl AccountHistory { ) .map_err(Into::into) } + + /// Prunes account history when indices are stored in `RocksDB`. + /// + /// Reads account changesets from static files and prunes the corresponding + /// `RocksDB` history shards. + #[cfg(all(unix, feature = "rocksdb"))] + fn prune_rocksdb( + &self, + provider: &Provider, + input: PruneInput, + range: std::ops::RangeInclusive, + range_end: BlockNumber, + ) -> Result + where + Provider: DBProvider + StaticFileProviderFactory + ChangeSetReader + RocksDBProviderFactory, + { + use reth_provider::PruneShardOutcome; + + // Unlike MDBX path, we don't divide the limit by 2 because RocksDB path only prunes + // history shards (no separate changeset table to delete from). The changesets are in + // static files which are deleted separately. + let mut limiter = input.limiter; + + if limiter.is_limit_reached() { + return Ok(SegmentOutput::not_done( + limiter.interrupt_reason(), + input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint), + )) + } + + let mut highest_deleted_accounts = FxHashMap::default(); + let mut last_changeset_pruned_block = None; + let mut changesets_processed = 0usize; + let mut done = true; + + // Walk account changesets from static files using a streaming iterator. + // For each changeset, track the highest block number seen for each address + // to determine which history shard entries need pruning. + let walker = StaticFileAccountChangesetWalker::new(provider, range); + for result in walker { + if limiter.is_limit_reached() { + done = false; + break; + } + let (block_number, changeset) = result?; + highest_deleted_accounts.insert(changeset.address, block_number); + last_changeset_pruned_block = Some(block_number); + changesets_processed += 1; + limiter.increment_deleted_entries_count(); + } + trace!(target: "pruner", processed = %changesets_processed, %done, "Scanned account changesets from static files"); + + let last_changeset_pruned_block = last_changeset_pruned_block + .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) }) + .unwrap_or(range_end); + + // Prune RocksDB history shards for affected accounts + let mut deleted_shards = 0usize; + let mut updated_shards = 0usize; + + // Sort by address for better RocksDB cache locality + let mut sorted_accounts: Vec<_> = highest_deleted_accounts.into_iter().collect(); + sorted_accounts.sort_unstable_by_key(|(addr, _)| *addr); + + provider.with_rocksdb_batch(|mut batch| { + for (address, highest_block) in &sorted_accounts { + let prune_to = (*highest_block).min(last_changeset_pruned_block); + match batch.prune_account_history_to(*address, prune_to)? { + PruneShardOutcome::Deleted => deleted_shards += 1, + PruneShardOutcome::Updated => updated_shards += 1, + PruneShardOutcome::Unchanged => {} + } + } + Ok(((), Some(batch.into_inner()))) + })?; + trace!(target: "pruner", deleted = deleted_shards, updated = updated_shards, %done, "Pruned account history (RocksDB indices)"); + + // Delete static file jars only when fully processed. During provider.commit(), RocksDB + // batch is committed before the MDBX checkpoint. If crash occurs after RocksDB commit + // but before MDBX commit, on restart the pruner checkpoint indicates data needs + // re-pruning, but the RocksDB shards are already pruned - this is safe because pruning + // is idempotent (re-pruning already-pruned shards is a no-op). + if done { + provider.static_file_provider().delete_segment_below_block( + StaticFileSegment::AccountChangeSets, + last_changeset_pruned_block + 1, + )?; + } + + let progress = limiter.progress(done); + + Ok(SegmentOutput { + progress, + pruned: changesets_processed + deleted_shards + updated_shards, + checkpoint: Some(SegmentOutputCheckpoint { + block_number: Some(last_changeset_pruned_block), + tx_number: None, + }), + }) + } } #[cfg(test)] @@ -539,4 +651,272 @@ mod tests { test_prune(998, 2, (PruneProgress::Finished, 1000)); test_prune(1400, 3, (PruneProgress::Finished, 804)); } + + #[cfg(all(unix, feature = "rocksdb"))] + #[test] + fn prune_rocksdb_path() { + use reth_db_api::models::ShardedKey; + use reth_provider::{RocksDBProviderFactory, StaticFileProviderFactory}; + + let db = TestStageDB::default(); + let mut rng = generators::rng(); + + let blocks = random_block_range( + &mut rng, + 0..=100, + BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() }, + ); + db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks"); + + let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::>(); + + let (changesets, _) = random_changeset_range( + &mut rng, + blocks.iter(), + accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))), + 0..0, + 0..0, + ); + + db.insert_changesets_to_static_files(changesets.clone(), None) + .expect("insert changesets to static files"); + + let mut account_blocks: BTreeMap<_, Vec> = BTreeMap::new(); + for (block, changeset) in changesets.iter().enumerate() { + for (address, _, _) in changeset { + account_blocks.entry(*address).or_default().push(block as u64); + } + } + + let rocksdb = db.factory.rocksdb_provider(); + let mut batch = rocksdb.batch(); + for (address, block_numbers) in &account_blocks { + let shard = BlockNumberList::new_pre_sorted(block_numbers.iter().copied()); + batch + .put::(ShardedKey::new(*address, u64::MAX), &shard) + .unwrap(); + } + batch.commit().unwrap(); + + for (address, expected_blocks) in &account_blocks { + let shards = rocksdb.account_history_shards(*address).unwrap(); + assert_eq!(shards.len(), 1); + assert_eq!(shards[0].1.iter().collect::>(), *expected_blocks); + } + + let to_block: BlockNumber = 50; + let prune_mode = PruneMode::Before(to_block); + let input = + PruneInput { previous_checkpoint: None, to_block, limiter: PruneLimiter::default() }; + let segment = AccountHistory::new(prune_mode); + + db.factory.set_storage_settings_cache( + StorageSettings::default() + .with_account_changesets_in_static_files(true) + .with_account_history_in_rocksdb(true), + ); + + let provider = db.factory.database_provider_rw().unwrap(); + let result = segment.prune(&provider, input).unwrap(); + provider.commit().expect("commit"); + + assert_matches!( + result, + SegmentOutput { progress: PruneProgress::Finished, pruned, checkpoint: Some(_) } + if pruned > 0 + ); + + for (address, original_blocks) in &account_blocks { + let shards = rocksdb.account_history_shards(*address).unwrap(); + + let expected_blocks: Vec = + original_blocks.iter().copied().filter(|b| *b > to_block).collect(); + + if expected_blocks.is_empty() { + assert!( + shards.is_empty(), + "Expected no shards for address {address:?} after pruning" + ); + } else { + assert_eq!(shards.len(), 1, "Expected 1 shard for address {address:?}"); + assert_eq!( + shards[0].1.iter().collect::>(), + expected_blocks, + "Shard blocks mismatch for address {address:?}" + ); + } + } + + let static_file_provider = db.factory.static_file_provider(); + let highest_block = static_file_provider.get_highest_static_file_block( + reth_static_file_types::StaticFileSegment::AccountChangeSets, + ); + if let Some(block) = highest_block { + assert!( + block > to_block, + "Static files should only contain blocks above to_block ({to_block}), got {block}" + ); + } + } + + /// Tests that when a limiter stops mid-block (with multiple changes for the same block), + /// the checkpoint is set to `block_number - 1` to avoid dangling index entries. + #[test] + fn prune_partial_progress_mid_block() { + use alloy_primitives::{Address, U256}; + use reth_primitives_traits::Account; + use reth_testing_utils::generators::ChangeSet; + + let db = TestStageDB::default(); + let mut rng = generators::rng(); + + // Create blocks 0..=10 + let blocks = random_block_range( + &mut rng, + 0..=10, + BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() }, + ); + db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks"); + + // Create specific changesets where block 5 has 4 account changes + let addr1 = Address::with_last_byte(1); + let addr2 = Address::with_last_byte(2); + let addr3 = Address::with_last_byte(3); + let addr4 = Address::with_last_byte(4); + let addr5 = Address::with_last_byte(5); + + let account = Account { nonce: 1, balance: U256::from(100), bytecode_hash: None }; + + // Build changesets: blocks 0-4 have 1 change each, block 5 has 4 changes, block 6 has 1 + let changesets: Vec = vec![ + vec![(addr1, account, vec![])], // block 0 + vec![(addr1, account, vec![])], // block 1 + vec![(addr1, account, vec![])], // block 2 + vec![(addr1, account, vec![])], // block 3 + vec![(addr1, account, vec![])], // block 4 + // block 5: 4 different account changes (sorted by address for consistency) + vec![ + (addr1, account, vec![]), + (addr2, account, vec![]), + (addr3, account, vec![]), + (addr4, account, vec![]), + ], + vec![(addr5, account, vec![])], // block 6 + ]; + + db.insert_changesets(changesets.clone(), None).expect("insert changesets"); + db.insert_history(changesets.clone(), None).expect("insert history"); + + // Total changesets: 5 (blocks 0-4) + 4 (block 5) + 1 (block 6) = 10 + assert_eq!( + db.table::().unwrap().len(), + changesets.iter().flatten().count() + ); + + let prune_mode = PruneMode::Before(10); + + // Set limiter to stop after 7 entries (mid-block 5: 5 from blocks 0-4, then 2 of 4 from + // block 5). Due to ACCOUNT_HISTORY_TABLES_TO_PRUNE=2, actual limit is 7/2=3 + // changesets. So we'll process blocks 0, 1, 2 (3 changesets), stopping before block + // 3. Actually, let's use a higher limit to reach block 5. With limit=14, we get 7 + // changeset slots. Blocks 0-4 use 5 slots, leaving 2 for block 5 (which has 4), so + // we stop mid-block 5. + let deleted_entries_limit = 14; // 14/2 = 7 changeset entries before limit + let limiter = PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit); + + let input = PruneInput { previous_checkpoint: None, to_block: 10, limiter }; + let segment = AccountHistory::new(prune_mode); + + let provider = db.factory.database_provider_rw().unwrap(); + provider.set_storage_settings_cache( + StorageSettings::default().with_account_changesets_in_static_files(false), + ); + let result = segment.prune(&provider, input).unwrap(); + + // Should report that there's more data + assert!(!result.progress.is_finished(), "Expected HasMoreData since we stopped mid-block"); + + // Save checkpoint and commit + segment + .save_checkpoint(&provider, result.checkpoint.unwrap().as_prune_checkpoint(prune_mode)) + .unwrap(); + provider.commit().expect("commit"); + + // Verify checkpoint is set to block 4 (not 5), since block 5 is incomplete + let checkpoint = db + .factory + .provider() + .unwrap() + .get_prune_checkpoint(PruneSegment::AccountHistory) + .unwrap() + .expect("checkpoint should exist"); + + assert_eq!( + checkpoint.block_number, + Some(4), + "Checkpoint should be block 4 (block before incomplete block 5)" + ); + + // Verify remaining changesets (block 5 and 6 should still have entries) + let remaining_changesets = db.table::().unwrap(); + // After pruning blocks 0-4, remaining should be block 5 (4 entries) + block 6 (1 entry) = 5 + // But since we stopped mid-block 5, some of block 5 might be pruned + // However, checkpoint is 4, so on re-run we should re-process from block 5 + assert!( + !remaining_changesets.is_empty(), + "Should have remaining changesets for blocks 5-6" + ); + + // Verify no dangling history indices for blocks that weren't fully pruned + // The indices for block 5 should still reference blocks <= 5 appropriately + let history = db.table::().unwrap(); + for (key, _blocks) in &history { + // All blocks in the history should be > checkpoint block number + // OR the shard's highest_block_number should be > checkpoint + assert!( + key.highest_block_number > 4, + "Found stale history shard with highest_block_number {} <= checkpoint 4", + key.highest_block_number + ); + } + + // Run prune again to complete - should finish processing block 5 and 6 + let input2 = PruneInput { + previous_checkpoint: Some(checkpoint), + to_block: 10, + limiter: PruneLimiter::default().set_deleted_entries_limit(100), // high limit + }; + + let provider2 = db.factory.database_provider_rw().unwrap(); + provider2.set_storage_settings_cache( + StorageSettings::default().with_account_changesets_in_static_files(false), + ); + let result2 = segment.prune(&provider2, input2).unwrap(); + + assert!(result2.progress.is_finished(), "Second run should complete"); + + segment + .save_checkpoint( + &provider2, + result2.checkpoint.unwrap().as_prune_checkpoint(prune_mode), + ) + .unwrap(); + provider2.commit().expect("commit"); + + // Verify final checkpoint + let final_checkpoint = db + .factory + .provider() + .unwrap() + .get_prune_checkpoint(PruneSegment::AccountHistory) + .unwrap() + .expect("checkpoint should exist"); + + // Should now be at block 6 (the last block with changesets) + assert_eq!(final_checkpoint.block_number, Some(6), "Final checkpoint should be at block 6"); + + // All changesets should be pruned + let final_changesets = db.table::().unwrap(); + assert!(final_changesets.is_empty(), "All changesets up to block 10 should be pruned"); + } } diff --git a/crates/prune/prune/src/segments/user/storage_history.rs b/crates/prune/prune/src/segments/user/storage_history.rs index 7abe709e11..47c5ab1364 100644 --- a/crates/prune/prune/src/segments/user/storage_history.rs +++ b/crates/prune/prune/src/segments/user/storage_history.rs @@ -12,7 +12,7 @@ use reth_db_api::{ tables, transaction::DbTxMut, }; -use reth_provider::{DBProvider, EitherWriter, StaticFileProviderFactory}; +use reth_provider::{DBProvider, EitherWriter, RocksDBProviderFactory, StaticFileProviderFactory}; use reth_prune_types::{ PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint, }; @@ -43,7 +43,8 @@ where Provider: DBProvider + StaticFileProviderFactory + StorageChangeSetReader - + StorageSettingsCache, + + StorageSettingsCache + + RocksDBProviderFactory, { fn segment(&self) -> PruneSegment { PruneSegment::StorageHistory @@ -68,6 +69,13 @@ where }; let range_end = *range.end(); + // Check where storage history indices are stored + #[cfg(all(unix, feature = "rocksdb"))] + if provider.cached_storage_settings().storages_history_in_rocksdb { + return self.prune_rocksdb(provider, input, range, range_end); + } + + // Check where storage changesets are stored (MDBX path) if EitherWriter::storage_changesets_destination(provider).is_static_file() { self.prune_static_files(provider, input, range, range_end) } else { @@ -94,6 +102,8 @@ impl StorageHistory { input.limiter }; + // The limiter may already be exhausted from a previous segment in the same prune run. + // Early exit avoids unnecessary iteration when no budget remains. if limiter.is_limit_reached() { return Ok(SegmentOutput::not_done( limiter.interrupt_reason(), @@ -126,8 +136,8 @@ impl StorageHistory { limiter.increment_deleted_entries_count(); } - // Delete static file jars below the pruned block - if let Some(last_block) = last_changeset_pruned_block { + // Delete static file jars only when fully processed + if done && let Some(last_block) = last_changeset_pruned_block { provider .static_file_provider() .delete_segment_below_block(StaticFileSegment::StorageChangeSets, last_block + 1)?; @@ -216,6 +226,107 @@ impl StorageHistory { ) .map_err(Into::into) } + + /// Prunes storage history when indices are stored in `RocksDB`. + /// + /// Reads storage changesets from static files and prunes the corresponding + /// `RocksDB` history shards. + #[cfg(all(unix, feature = "rocksdb"))] + fn prune_rocksdb( + &self, + provider: &Provider, + input: PruneInput, + range: std::ops::RangeInclusive, + range_end: BlockNumber, + ) -> Result + where + Provider: DBProvider + StaticFileProviderFactory + RocksDBProviderFactory, + { + use reth_provider::PruneShardOutcome; + + let mut limiter = input.limiter; + + if limiter.is_limit_reached() { + return Ok(SegmentOutput::not_done( + limiter.interrupt_reason(), + input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint), + )) + } + + let mut highest_deleted_storages: FxHashMap<_, _> = FxHashMap::default(); + let mut last_changeset_pruned_block = None; + let mut changesets_processed = 0usize; + let mut done = true; + + // Walk storage changesets from static files using a streaming iterator. + // For each changeset, track the highest block number seen for each (address, storage_key) + // pair to determine which history shard entries need pruning. + let walker = provider.static_file_provider().walk_storage_changeset_range(range); + for result in walker { + if limiter.is_limit_reached() { + done = false; + break; + } + let (block_address, entry) = result?; + let block_number = block_address.block_number(); + let address = block_address.address(); + highest_deleted_storages.insert((address, entry.key), block_number); + last_changeset_pruned_block = Some(block_number); + changesets_processed += 1; + limiter.increment_deleted_entries_count(); + } + + trace!(target: "pruner", processed = %changesets_processed, %done, "Scanned storage changesets from static files"); + + let last_changeset_pruned_block = last_changeset_pruned_block + .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) }) + .unwrap_or(range_end); + + // Prune RocksDB history shards for affected storage slots + let mut deleted_shards = 0usize; + let mut updated_shards = 0usize; + + // Sort by (address, storage_key) for better RocksDB cache locality + let mut sorted_storages: Vec<_> = highest_deleted_storages.into_iter().collect(); + sorted_storages.sort_unstable_by_key(|((addr, key), _)| (*addr, *key)); + + provider.with_rocksdb_batch(|mut batch| { + for ((address, storage_key), highest_block) in &sorted_storages { + let prune_to = (*highest_block).min(last_changeset_pruned_block); + match batch.prune_storage_history_to(*address, *storage_key, prune_to)? { + PruneShardOutcome::Deleted => deleted_shards += 1, + PruneShardOutcome::Updated => updated_shards += 1, + PruneShardOutcome::Unchanged => {} + } + } + Ok(((), Some(batch.into_inner()))) + })?; + + trace!(target: "pruner", deleted = deleted_shards, updated = updated_shards, %done, "Pruned storage history (RocksDB indices)"); + + // Delete static file jars only when fully processed. During provider.commit(), RocksDB + // batch is committed before the MDBX checkpoint. If crash occurs after RocksDB commit + // but before MDBX commit, on restart the pruner checkpoint indicates data needs + // re-pruning, but the RocksDB shards are already pruned - this is safe because pruning + // is idempotent (re-pruning already-pruned shards is a no-op). + if done { + provider.static_file_provider().delete_segment_below_block( + StaticFileSegment::StorageChangeSets, + last_changeset_pruned_block + 1, + )?; + } + + let progress = limiter.progress(done); + + Ok(SegmentOutput { + progress, + pruned: changesets_processed + deleted_shards + updated_shards, + checkpoint: Some(SegmentOutputCheckpoint { + block_number: Some(last_changeset_pruned_block), + tx_number: None, + }), + }) + } } #[cfg(test)] @@ -553,4 +664,270 @@ mod tests { test_prune(998, 2, (PruneProgress::Finished, 500)); test_prune(1200, 3, (PruneProgress::Finished, 202)); } + + /// Tests that when a limiter stops mid-block (with multiple storage changes for the same + /// block), the checkpoint is set to `block_number - 1` to avoid dangling index entries. + #[test] + fn prune_partial_progress_mid_block() { + use alloy_primitives::{Address, U256}; + use reth_primitives_traits::Account; + use reth_testing_utils::generators::ChangeSet; + + let db = TestStageDB::default(); + let mut rng = generators::rng(); + + // Create blocks 0..=10 + let blocks = random_block_range( + &mut rng, + 0..=10, + BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() }, + ); + db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks"); + + // Create specific changesets where block 5 has 4 storage changes + let addr1 = Address::with_last_byte(1); + let addr2 = Address::with_last_byte(2); + + let account = Account { nonce: 1, balance: U256::from(100), bytecode_hash: None }; + + // Create storage entries + let storage_entry = |key: u8| reth_primitives_traits::StorageEntry { + key: B256::with_last_byte(key), + value: U256::from(100), + }; + + // Build changesets: blocks 0-4 have 1 storage change each, block 5 has 4 changes, block 6 + // has 1. Entries within each account must be sorted by key. + let changesets: Vec = vec![ + vec![(addr1, account, vec![storage_entry(1)])], // block 0 + vec![(addr1, account, vec![storage_entry(1)])], // block 1 + vec![(addr1, account, vec![storage_entry(1)])], // block 2 + vec![(addr1, account, vec![storage_entry(1)])], // block 3 + vec![(addr1, account, vec![storage_entry(1)])], // block 4 + // block 5: 4 different storage changes (2 addresses, each with 2 storage slots) + // Sorted by address, then by storage key within each address + vec![ + (addr1, account, vec![storage_entry(1), storage_entry(2)]), + (addr2, account, vec![storage_entry(1), storage_entry(2)]), + ], + vec![(addr1, account, vec![storage_entry(3)])], // block 6 + ]; + + db.insert_changesets(changesets.clone(), None).expect("insert changesets"); + db.insert_history(changesets.clone(), None).expect("insert history"); + + // Total storage changesets + let total_storage_entries: usize = + changesets.iter().flat_map(|c| c.iter()).map(|(_, _, entries)| entries.len()).sum(); + assert_eq!(db.table::().unwrap().len(), total_storage_entries); + + let prune_mode = PruneMode::Before(10); + + // Set limiter to stop mid-block 5 + // With STORAGE_HISTORY_TABLES_TO_PRUNE=2, limit=14 gives us 7 storage entries before limit + // Blocks 0-4 use 5 slots, leaving 2 for block 5 (which has 4), so we stop mid-block 5 + let deleted_entries_limit = 14; // 14/2 = 7 storage entries before limit + let limiter = PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit); + + let input = PruneInput { previous_checkpoint: None, to_block: 10, limiter }; + let segment = StorageHistory::new(prune_mode); + + let provider = db.factory.database_provider_rw().unwrap(); + provider.set_storage_settings_cache( + StorageSettings::default().with_storage_changesets_in_static_files(false), + ); + let result = segment.prune(&provider, input).unwrap(); + + // Should report that there's more data + assert!(!result.progress.is_finished(), "Expected HasMoreData since we stopped mid-block"); + + // Save checkpoint and commit + segment + .save_checkpoint(&provider, result.checkpoint.unwrap().as_prune_checkpoint(prune_mode)) + .unwrap(); + provider.commit().expect("commit"); + + // Verify checkpoint is set to block 4 (not 5), since block 5 is incomplete + let checkpoint = db + .factory + .provider() + .unwrap() + .get_prune_checkpoint(PruneSegment::StorageHistory) + .unwrap() + .expect("checkpoint should exist"); + + assert_eq!( + checkpoint.block_number, + Some(4), + "Checkpoint should be block 4 (block before incomplete block 5)" + ); + + // Verify remaining changesets + let remaining_changesets = db.table::().unwrap(); + assert!( + !remaining_changesets.is_empty(), + "Should have remaining changesets for blocks 5-6" + ); + + // Verify no dangling history indices for blocks that weren't fully pruned + let history = db.table::().unwrap(); + for (key, _blocks) in &history { + assert!( + key.sharded_key.highest_block_number > 4, + "Found stale history shard with highest_block_number {} <= checkpoint 4", + key.sharded_key.highest_block_number + ); + } + + // Run prune again to complete - should finish processing block 5 and 6 + let input2 = PruneInput { + previous_checkpoint: Some(checkpoint), + to_block: 10, + limiter: PruneLimiter::default().set_deleted_entries_limit(100), // high limit + }; + + let provider2 = db.factory.database_provider_rw().unwrap(); + provider2.set_storage_settings_cache( + StorageSettings::default().with_storage_changesets_in_static_files(false), + ); + let result2 = segment.prune(&provider2, input2).unwrap(); + + assert!(result2.progress.is_finished(), "Second run should complete"); + + segment + .save_checkpoint( + &provider2, + result2.checkpoint.unwrap().as_prune_checkpoint(prune_mode), + ) + .unwrap(); + provider2.commit().expect("commit"); + + // Verify final checkpoint + let final_checkpoint = db + .factory + .provider() + .unwrap() + .get_prune_checkpoint(PruneSegment::StorageHistory) + .unwrap() + .expect("checkpoint should exist"); + + // Should now be at block 6 (the last block with changesets) + assert_eq!(final_checkpoint.block_number, Some(6), "Final checkpoint should be at block 6"); + + // All changesets should be pruned + let final_changesets = db.table::().unwrap(); + assert!(final_changesets.is_empty(), "All changesets up to block 10 should be pruned"); + } + + #[cfg(all(unix, feature = "rocksdb"))] + #[test] + fn prune_rocksdb() { + use reth_db_api::models::storage_sharded_key::StorageShardedKey; + use reth_provider::RocksDBProviderFactory; + use reth_storage_api::StorageSettings; + + let db = TestStageDB::default(); + let mut rng = generators::rng(); + + let blocks = random_block_range( + &mut rng, + 0..=100, + BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() }, + ); + db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks"); + + let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::>(); + + let (changesets, _) = random_changeset_range( + &mut rng, + blocks.iter(), + accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))), + 1..2, + 1..2, + ); + + db.insert_changesets_to_static_files(changesets.clone(), None) + .expect("insert changesets to static files"); + + let mut storage_indices: BTreeMap<(alloy_primitives::Address, B256), Vec> = + BTreeMap::new(); + for (block, changeset) in changesets.iter().enumerate() { + for (address, _, storage_entries) in changeset { + for entry in storage_entries { + storage_indices.entry((*address, entry.key)).or_default().push(block as u64); + } + } + } + + { + let rocksdb = db.factory.rocksdb_provider(); + let mut batch = rocksdb.batch(); + for ((address, storage_key), block_numbers) in &storage_indices { + let shard = BlockNumberList::new_pre_sorted(block_numbers.clone()); + batch + .put::( + StorageShardedKey::last(*address, *storage_key), + &shard, + ) + .expect("insert storage history shard"); + } + batch.commit().expect("commit rocksdb batch"); + } + + { + let rocksdb = db.factory.rocksdb_provider(); + for (address, storage_key) in storage_indices.keys() { + let shards = rocksdb.storage_history_shards(*address, *storage_key).unwrap(); + assert!(!shards.is_empty(), "RocksDB should contain storage history before prune"); + } + } + + let to_block = 50u64; + let prune_mode = PruneMode::Before(to_block); + let input = + PruneInput { previous_checkpoint: None, to_block, limiter: PruneLimiter::default() }; + let segment = StorageHistory::new(prune_mode); + + let provider = db.factory.database_provider_rw().unwrap(); + provider.set_storage_settings_cache( + StorageSettings::default() + .with_storage_changesets_in_static_files(true) + .with_storages_history_in_rocksdb(true), + ); + let result = segment.prune(&provider, input).unwrap(); + provider.commit().expect("commit"); + + assert_matches!( + result, + SegmentOutput { progress: PruneProgress::Finished, checkpoint: Some(_), .. } + ); + + { + let rocksdb = db.factory.rocksdb_provider(); + for ((address, storage_key), block_numbers) in &storage_indices { + let shards = rocksdb.storage_history_shards(*address, *storage_key).unwrap(); + + let remaining_blocks: Vec = + block_numbers.iter().copied().filter(|&b| b > to_block).collect(); + + if remaining_blocks.is_empty() { + assert!( + shards.is_empty(), + "Shard for {:?}/{:?} should be deleted when all blocks pruned", + address, + storage_key + ); + } else { + assert!(!shards.is_empty(), "Shard should exist with remaining blocks"); + let actual_blocks: Vec = + shards.iter().flat_map(|(_, list)| list.iter()).collect(); + assert_eq!( + actual_blocks, remaining_blocks, + "RocksDB shard should only contain blocks > {}", + to_block + ); + } + } + } + } } diff --git a/crates/stages/stages/src/stages/prune.rs b/crates/stages/stages/src/stages/prune.rs index 98c9a578c6..8ae849d156 100644 --- a/crates/stages/stages/src/stages/prune.rs +++ b/crates/stages/stages/src/stages/prune.rs @@ -2,7 +2,7 @@ use reth_db_api::{table::Value, transaction::DbTxMut}; use reth_primitives_traits::NodePrimitives; use reth_provider::{ BlockReader, ChainStateBlockReader, DBProvider, PruneCheckpointReader, PruneCheckpointWriter, - StageCheckpointReader, StaticFileProviderFactory, StorageSettingsCache, + RocksDBProviderFactory, StageCheckpointReader, StaticFileProviderFactory, }; use reth_prune::{ PruneMode, PruneModes, PruneSegment, PrunerBuilder, SegmentOutput, SegmentOutputCheckpoint, @@ -10,7 +10,7 @@ use reth_prune::{ use reth_stages_api::{ ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput, }; -use reth_storage_api::{ChangeSetReader, StorageChangeSetReader}; +use reth_storage_api::{ChangeSetReader, StorageChangeSetReader, StorageSettingsCache}; use tracing::info; /// The prune stage that runs the pruner with the provided prune modes. @@ -49,7 +49,8 @@ where Primitives: NodePrimitives, > + StorageSettingsCache + ChangeSetReader - + StorageChangeSetReader, + + StorageChangeSetReader + + RocksDBProviderFactory, { fn id(&self) -> StageId { StageId::Prune @@ -156,7 +157,8 @@ where Primitives: NodePrimitives, > + StorageSettingsCache + ChangeSetReader - + StorageChangeSetReader, + + StorageChangeSetReader + + RocksDBProviderFactory, { fn id(&self) -> StageId { StageId::PruneSenderRecovery diff --git a/crates/storage/provider/src/lib.rs b/crates/storage/provider/src/lib.rs index bfab44cb2a..f6a24925e1 100644 --- a/crates/storage/provider/src/lib.rs +++ b/crates/storage/provider/src/lib.rs @@ -21,8 +21,8 @@ pub mod providers; pub use providers::{ DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, HistoricalStateProvider, HistoricalStateProviderRef, LatestStateProvider, LatestStateProviderRef, ProviderFactory, - SaveBlocksMode, StaticFileAccess, StaticFileProviderBuilder, StaticFileWriteCtx, - StaticFileWriter, + PruneShardOutcome, SaveBlocksMode, StaticFileAccess, StaticFileProviderBuilder, + StaticFileWriteCtx, StaticFileWriter, }; pub mod changeset_walker; diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index 67647228fe..bf32d02e3c 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -39,8 +39,8 @@ pub use consistent::ConsistentProvider; pub(crate) mod rocksdb; pub use rocksdb::{ - RocksDBBatch, RocksDBBuilder, RocksDBIter, RocksDBProvider, RocksDBRawIter, RocksDBStats, - RocksDBTableStats, RocksTx, + PruneShardOutcome, RocksDBBatch, RocksDBBuilder, RocksDBIter, RocksDBProvider, RocksDBRawIter, + RocksDBStats, RocksDBTableStats, RocksTx, }; /// Helper trait to bound [`NodeTypes`] so that combined with database they satisfy diff --git a/crates/storage/provider/src/providers/rocksdb/mod.rs b/crates/storage/provider/src/providers/rocksdb/mod.rs index e5ba0175e7..4820cd0742 100644 --- a/crates/storage/provider/src/providers/rocksdb/mod.rs +++ b/crates/storage/provider/src/providers/rocksdb/mod.rs @@ -6,6 +6,6 @@ mod provider; pub(crate) use provider::{PendingRocksDBBatches, RocksDBWriteCtx}; pub use provider::{ - RocksDBBatch, RocksDBBuilder, RocksDBIter, RocksDBProvider, RocksDBRawIter, RocksDBStats, - RocksDBTableStats, RocksTx, + PruneShardOutcome, RocksDBBatch, RocksDBBuilder, RocksDBIter, RocksDBProvider, RocksDBRawIter, + RocksDBStats, RocksDBTableStats, RocksTx, }; diff --git a/crates/storage/provider/src/providers/rocksdb/provider.rs b/crates/storage/provider/src/providers/rocksdb/provider.rs index a0e10cf47f..dbb5331cd2 100644 --- a/crates/storage/provider/src/providers/rocksdb/provider.rs +++ b/crates/storage/provider/src/providers/rocksdb/provider.rs @@ -1278,6 +1278,17 @@ impl RocksDBProvider { } } +/// Outcome of pruning a history shard in `RocksDB`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PruneShardOutcome { + /// Shard was deleted entirely. + Deleted, + /// Shard was updated with filtered block numbers. + Updated, + /// Shard was unchanged (no blocks <= `to_block`). + Unchanged, +} + /// Handle for building a batch of operations atomically. /// /// Uses `WriteBatchWithTransaction` for atomic writes without full transaction overhead. @@ -1617,6 +1628,116 @@ impl<'a> RocksDBBatch<'a> { Ok(()) } + /// Prunes history shards, removing blocks <= `to_block`. + /// + /// Generic implementation for both account and storage history pruning. + /// Mirrors MDBX `prune_shard` semantics. After pruning, the last remaining shard + /// (if any) will have the sentinel key (`u64::MAX`). + #[allow(clippy::too_many_arguments)] + fn prune_history_shards_inner( + &mut self, + shards: Vec<(K, BlockNumberList)>, + to_block: BlockNumber, + get_highest: impl Fn(&K) -> u64, + is_sentinel: impl Fn(&K) -> bool, + delete_shard: impl Fn(&mut Self, K) -> ProviderResult<()>, + put_shard: impl Fn(&mut Self, K, &BlockNumberList) -> ProviderResult<()>, + create_sentinel: impl Fn() -> K, + ) -> ProviderResult + where + K: Clone, + { + if shards.is_empty() { + return Ok(PruneShardOutcome::Unchanged); + } + + let mut deleted = false; + let mut updated = false; + let mut last_remaining: Option<(K, BlockNumberList)> = None; + + for (key, block_list) in shards { + if !is_sentinel(&key) && get_highest(&key) <= to_block { + delete_shard(self, key)?; + deleted = true; + } else { + let original_len = block_list.len(); + let filtered = + BlockNumberList::new_pre_sorted(block_list.iter().filter(|&b| b > to_block)); + + if filtered.is_empty() { + delete_shard(self, key)?; + deleted = true; + } else if filtered.len() < original_len { + put_shard(self, key.clone(), &filtered)?; + last_remaining = Some((key, filtered)); + updated = true; + } else { + last_remaining = Some((key, block_list)); + } + } + } + + if let Some((last_key, last_value)) = last_remaining && + !is_sentinel(&last_key) + { + delete_shard(self, last_key)?; + put_shard(self, create_sentinel(), &last_value)?; + updated = true; + } + + if deleted { + Ok(PruneShardOutcome::Deleted) + } else if updated { + Ok(PruneShardOutcome::Updated) + } else { + Ok(PruneShardOutcome::Unchanged) + } + } + + /// Prunes account history for the given address, removing blocks <= `to_block`. + /// + /// Mirrors MDBX `prune_shard` semantics. After pruning, the last remaining shard + /// (if any) will have the sentinel key (`u64::MAX`). + pub fn prune_account_history_to( + &mut self, + address: Address, + to_block: BlockNumber, + ) -> ProviderResult { + let shards = self.provider.account_history_shards(address)?; + self.prune_history_shards_inner( + shards, + to_block, + |key| key.highest_block_number, + |key| key.highest_block_number == u64::MAX, + |batch, key| batch.delete::(key), + |batch, key, value| batch.put::(key, value), + || ShardedKey::new(address, u64::MAX), + ) + } + + /// Prunes storage history for the given address and storage key, removing blocks <= + /// `to_block`. + /// + /// Mirrors MDBX `prune_shard` semantics. After pruning, the last remaining shard + /// (if any) will have the sentinel key (`u64::MAX`). + pub fn prune_storage_history_to( + &mut self, + address: Address, + storage_key: B256, + to_block: BlockNumber, + ) -> ProviderResult { + let shards = self.provider.storage_history_shards(address, storage_key)?; + self.prune_history_shards_inner( + shards, + to_block, + |key| key.sharded_key.highest_block_number, + |key| key.sharded_key.highest_block_number == u64::MAX, + |batch, key| batch.delete::(key), + |batch, key, value| batch.put::(key, value), + || StorageShardedKey::last(address, storage_key), + ) + } + /// Unwinds storage history to keep only blocks `<= keep_to`. /// /// Handles multi-shard scenarios by: @@ -2995,4 +3116,467 @@ mod tests { assert_eq!(provider.get::(i).unwrap(), Some(value)); } } + + // ==================== PARAMETERIZED PRUNE TESTS ==================== + + /// Test case for account history pruning + struct AccountPruneCase { + name: &'static str, + initial_shards: &'static [(u64, &'static [u64])], + prune_to: u64, + expected_outcome: PruneShardOutcome, + expected_shards: &'static [(u64, &'static [u64])], + } + + /// Test case for storage history pruning + struct StoragePruneCase { + name: &'static str, + initial_shards: &'static [(u64, &'static [u64])], + prune_to: u64, + expected_outcome: PruneShardOutcome, + expected_shards: &'static [(u64, &'static [u64])], + } + + #[test] + fn test_prune_account_history_cases() { + const MAX: u64 = u64::MAX; + const CASES: &[AccountPruneCase] = &[ + AccountPruneCase { + name: "single_shard_truncate", + initial_shards: &[(MAX, &[10, 20, 30, 40])], + prune_to: 25, + expected_outcome: PruneShardOutcome::Updated, + expected_shards: &[(MAX, &[30, 40])], + }, + AccountPruneCase { + name: "single_shard_delete_all", + initial_shards: &[(MAX, &[10, 20])], + prune_to: 20, + expected_outcome: PruneShardOutcome::Deleted, + expected_shards: &[], + }, + AccountPruneCase { + name: "single_shard_noop", + initial_shards: &[(MAX, &[10, 20])], + prune_to: 5, + expected_outcome: PruneShardOutcome::Unchanged, + expected_shards: &[(MAX, &[10, 20])], + }, + AccountPruneCase { + name: "no_shards", + initial_shards: &[], + prune_to: 100, + expected_outcome: PruneShardOutcome::Unchanged, + expected_shards: &[], + }, + AccountPruneCase { + name: "multi_shard_truncate_first", + initial_shards: &[(30, &[10, 20, 30]), (MAX, &[40, 50, 60])], + prune_to: 25, + expected_outcome: PruneShardOutcome::Updated, + expected_shards: &[(30, &[30]), (MAX, &[40, 50, 60])], + }, + AccountPruneCase { + name: "delete_first_shard_sentinel_unchanged", + initial_shards: &[(20, &[10, 20]), (MAX, &[30, 40])], + prune_to: 20, + expected_outcome: PruneShardOutcome::Deleted, + expected_shards: &[(MAX, &[30, 40])], + }, + AccountPruneCase { + name: "multi_shard_delete_all_but_last", + initial_shards: &[(10, &[5, 10]), (20, &[15, 20]), (MAX, &[25, 30])], + prune_to: 22, + expected_outcome: PruneShardOutcome::Deleted, + expected_shards: &[(MAX, &[25, 30])], + }, + AccountPruneCase { + name: "mid_shard_preserves_key", + initial_shards: &[(50, &[10, 20, 30, 40, 50]), (MAX, &[60, 70])], + prune_to: 25, + expected_outcome: PruneShardOutcome::Updated, + expected_shards: &[(50, &[30, 40, 50]), (MAX, &[60, 70])], + }, + // Equivalence tests + AccountPruneCase { + name: "equiv_delete_early_shards_keep_sentinel", + initial_shards: &[(20, &[10, 15, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])], + prune_to: 55, + expected_outcome: PruneShardOutcome::Deleted, + expected_shards: &[(MAX, &[60, 70])], + }, + AccountPruneCase { + name: "equiv_sentinel_becomes_empty_with_prev", + initial_shards: &[(50, &[30, 40, 50]), (MAX, &[35])], + prune_to: 40, + expected_outcome: PruneShardOutcome::Deleted, + expected_shards: &[(MAX, &[50])], + }, + AccountPruneCase { + name: "equiv_all_shards_become_empty", + initial_shards: &[(50, &[30, 40, 50]), (MAX, &[51])], + prune_to: 51, + expected_outcome: PruneShardOutcome::Deleted, + expected_shards: &[], + }, + AccountPruneCase { + name: "equiv_non_sentinel_last_shard_promoted", + initial_shards: &[(100, &[50, 75, 100])], + prune_to: 60, + expected_outcome: PruneShardOutcome::Updated, + expected_shards: &[(MAX, &[75, 100])], + }, + AccountPruneCase { + name: "equiv_filter_within_shard", + initial_shards: &[(MAX, &[10, 20, 30, 40])], + prune_to: 25, + expected_outcome: PruneShardOutcome::Updated, + expected_shards: &[(MAX, &[30, 40])], + }, + AccountPruneCase { + name: "equiv_multi_shard_partial_delete", + initial_shards: &[(20, &[10, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])], + prune_to: 35, + expected_outcome: PruneShardOutcome::Deleted, + expected_shards: &[(50, &[40, 50]), (MAX, &[60, 70])], + }, + ]; + + let address = Address::from([0x42; 20]); + + for case in CASES { + let temp_dir = TempDir::new().unwrap(); + let provider = + RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap(); + + // Setup initial shards + let mut batch = provider.batch(); + for (highest, blocks) in case.initial_shards { + let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied()); + batch + .put::(ShardedKey::new(address, *highest), &shard) + .unwrap(); + } + batch.commit().unwrap(); + + // Prune + let mut batch = provider.batch(); + let outcome = batch.prune_account_history_to(address, case.prune_to).unwrap(); + batch.commit().unwrap(); + + // Assert outcome + assert_eq!(outcome, case.expected_outcome, "case '{}': wrong outcome", case.name); + + // Assert final shards + let shards = provider.account_history_shards(address).unwrap(); + assert_eq!( + shards.len(), + case.expected_shards.len(), + "case '{}': wrong shard count", + case.name + ); + for (i, ((key, blocks), (exp_key, exp_blocks))) in + shards.iter().zip(case.expected_shards.iter()).enumerate() + { + assert_eq!( + key.highest_block_number, *exp_key, + "case '{}': shard {} wrong key", + case.name, i + ); + assert_eq!( + blocks.iter().collect::>(), + *exp_blocks, + "case '{}': shard {} wrong blocks", + case.name, + i + ); + } + } + } + + #[test] + fn test_prune_storage_history_cases() { + const MAX: u64 = u64::MAX; + const CASES: &[StoragePruneCase] = &[ + StoragePruneCase { + name: "single_shard_truncate", + initial_shards: &[(MAX, &[10, 20, 30, 40])], + prune_to: 25, + expected_outcome: PruneShardOutcome::Updated, + expected_shards: &[(MAX, &[30, 40])], + }, + StoragePruneCase { + name: "single_shard_delete_all", + initial_shards: &[(MAX, &[10, 20])], + prune_to: 20, + expected_outcome: PruneShardOutcome::Deleted, + expected_shards: &[], + }, + StoragePruneCase { + name: "noop", + initial_shards: &[(MAX, &[10, 20])], + prune_to: 5, + expected_outcome: PruneShardOutcome::Unchanged, + expected_shards: &[(MAX, &[10, 20])], + }, + StoragePruneCase { + name: "no_shards", + initial_shards: &[], + prune_to: 100, + expected_outcome: PruneShardOutcome::Unchanged, + expected_shards: &[], + }, + StoragePruneCase { + name: "mid_shard_preserves_key", + initial_shards: &[(50, &[10, 20, 30, 40, 50]), (MAX, &[60, 70])], + prune_to: 25, + expected_outcome: PruneShardOutcome::Updated, + expected_shards: &[(50, &[30, 40, 50]), (MAX, &[60, 70])], + }, + // Equivalence tests + StoragePruneCase { + name: "equiv_sentinel_promotion", + initial_shards: &[(100, &[50, 75, 100])], + prune_to: 60, + expected_outcome: PruneShardOutcome::Updated, + expected_shards: &[(MAX, &[75, 100])], + }, + StoragePruneCase { + name: "equiv_delete_early_shards_keep_sentinel", + initial_shards: &[(20, &[10, 15, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])], + prune_to: 55, + expected_outcome: PruneShardOutcome::Deleted, + expected_shards: &[(MAX, &[60, 70])], + }, + StoragePruneCase { + name: "equiv_sentinel_becomes_empty_with_prev", + initial_shards: &[(50, &[30, 40, 50]), (MAX, &[35])], + prune_to: 40, + expected_outcome: PruneShardOutcome::Deleted, + expected_shards: &[(MAX, &[50])], + }, + StoragePruneCase { + name: "equiv_all_shards_become_empty", + initial_shards: &[(50, &[30, 40, 50]), (MAX, &[51])], + prune_to: 51, + expected_outcome: PruneShardOutcome::Deleted, + expected_shards: &[], + }, + StoragePruneCase { + name: "equiv_filter_within_shard", + initial_shards: &[(MAX, &[10, 20, 30, 40])], + prune_to: 25, + expected_outcome: PruneShardOutcome::Updated, + expected_shards: &[(MAX, &[30, 40])], + }, + StoragePruneCase { + name: "equiv_multi_shard_partial_delete", + initial_shards: &[(20, &[10, 20]), (50, &[30, 40, 50]), (MAX, &[60, 70])], + prune_to: 35, + expected_outcome: PruneShardOutcome::Deleted, + expected_shards: &[(50, &[40, 50]), (MAX, &[60, 70])], + }, + ]; + + let address = Address::from([0x42; 20]); + let storage_key = B256::from([0x01; 32]); + + for case in CASES { + let temp_dir = TempDir::new().unwrap(); + let provider = + RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap(); + + // Setup initial shards + let mut batch = provider.batch(); + for (highest, blocks) in case.initial_shards { + let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied()); + let key = if *highest == MAX { + StorageShardedKey::last(address, storage_key) + } else { + StorageShardedKey::new(address, storage_key, *highest) + }; + batch.put::(key, &shard).unwrap(); + } + batch.commit().unwrap(); + + // Prune + let mut batch = provider.batch(); + let outcome = + batch.prune_storage_history_to(address, storage_key, case.prune_to).unwrap(); + batch.commit().unwrap(); + + // Assert outcome + assert_eq!(outcome, case.expected_outcome, "case '{}': wrong outcome", case.name); + + // Assert final shards + let shards = provider.storage_history_shards(address, storage_key).unwrap(); + assert_eq!( + shards.len(), + case.expected_shards.len(), + "case '{}': wrong shard count", + case.name + ); + for (i, ((key, blocks), (exp_key, exp_blocks))) in + shards.iter().zip(case.expected_shards.iter()).enumerate() + { + assert_eq!( + key.sharded_key.highest_block_number, *exp_key, + "case '{}': shard {} wrong key", + case.name, i + ); + assert_eq!( + blocks.iter().collect::>(), + *exp_blocks, + "case '{}': shard {} wrong blocks", + case.name, + i + ); + } + } + } + + #[test] + fn test_prune_storage_history_does_not_affect_other_slots() { + let temp_dir = TempDir::new().unwrap(); + let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap(); + + let address = Address::from([0x42; 20]); + let slot1 = B256::from([0x01; 32]); + let slot2 = B256::from([0x02; 32]); + + // Two different storage slots + let mut batch = provider.batch(); + batch + .put::( + StorageShardedKey::last(address, slot1), + &BlockNumberList::new_pre_sorted([10u64, 20]), + ) + .unwrap(); + batch + .put::( + StorageShardedKey::last(address, slot2), + &BlockNumberList::new_pre_sorted([30u64, 40]), + ) + .unwrap(); + batch.commit().unwrap(); + + // Prune slot1 to block 20 (deletes all) + let mut batch = provider.batch(); + let outcome = batch.prune_storage_history_to(address, slot1, 20).unwrap(); + batch.commit().unwrap(); + + assert_eq!(outcome, PruneShardOutcome::Deleted); + + // slot1 should be empty + let shards1 = provider.storage_history_shards(address, slot1).unwrap(); + assert!(shards1.is_empty()); + + // slot2 should be unchanged + let shards2 = provider.storage_history_shards(address, slot2).unwrap(); + assert_eq!(shards2.len(), 1); + assert_eq!(shards2[0].1.iter().collect::>(), vec![30, 40]); + } + + #[test] + fn test_prune_invariants() { + // Test invariants: no empty shards, sentinel is always last + let address = Address::from([0x42; 20]); + let storage_key = B256::from([0x01; 32]); + + // Test cases that exercise invariants + #[allow(clippy::type_complexity)] + let invariant_cases: &[(&[(u64, &[u64])], u64)] = &[ + // Account: shards where middle becomes empty + (&[(10, &[5, 10]), (20, &[15, 20]), (u64::MAX, &[25, 30])], 20), + // Account: non-sentinel shard only, partial prune -> must become sentinel + (&[(100, &[50, 100])], 60), + ]; + + for (initial_shards, prune_to) in invariant_cases { + // Test account history invariants + { + let temp_dir = TempDir::new().unwrap(); + let provider = + RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap(); + + let mut batch = provider.batch(); + for (highest, blocks) in *initial_shards { + let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied()); + batch + .put::(ShardedKey::new(address, *highest), &shard) + .unwrap(); + } + batch.commit().unwrap(); + + let mut batch = provider.batch(); + batch.prune_account_history_to(address, *prune_to).unwrap(); + batch.commit().unwrap(); + + let shards = provider.account_history_shards(address).unwrap(); + + // Invariant 1: no empty shards + for (key, blocks) in &shards { + assert!( + !blocks.is_empty(), + "Account: empty shard at key {}", + key.highest_block_number + ); + } + + // Invariant 2: last shard is sentinel + if !shards.is_empty() { + let last = shards.last().unwrap(); + assert_eq!( + last.0.highest_block_number, + u64::MAX, + "Account: last shard must be sentinel" + ); + } + } + + // Test storage history invariants + { + let temp_dir = TempDir::new().unwrap(); + let provider = + RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap(); + + let mut batch = provider.batch(); + for (highest, blocks) in *initial_shards { + let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied()); + let key = if *highest == u64::MAX { + StorageShardedKey::last(address, storage_key) + } else { + StorageShardedKey::new(address, storage_key, *highest) + }; + batch.put::(key, &shard).unwrap(); + } + batch.commit().unwrap(); + + let mut batch = provider.batch(); + batch.prune_storage_history_to(address, storage_key, *prune_to).unwrap(); + batch.commit().unwrap(); + + let shards = provider.storage_history_shards(address, storage_key).unwrap(); + + // Invariant 1: no empty shards + for (key, blocks) in &shards { + assert!( + !blocks.is_empty(), + "Storage: empty shard at key {}", + key.sharded_key.highest_block_number + ); + } + + // Invariant 2: last shard is sentinel + if !shards.is_empty() { + let last = shards.last().unwrap(); + assert_eq!( + last.0.sharded_key.highest_block_number, + u64::MAX, + "Storage: last shard must be sentinel" + ); + } + } + } + } } diff --git a/crates/storage/provider/src/providers/rocksdb_stub.rs b/crates/storage/provider/src/providers/rocksdb_stub.rs index 28e0badfac..93e79ed320 100644 --- a/crates/storage/provider/src/providers/rocksdb_stub.rs +++ b/crates/storage/provider/src/providers/rocksdb_stub.rs @@ -208,3 +208,14 @@ impl Iterator for RocksDBIter { None } } + +/// Outcome of pruning a history shard in `RocksDB` (stub). +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PruneShardOutcome { + /// Shard was deleted entirely. + Deleted, + /// Shard was updated with filtered block numbers. + Updated, + /// Shard was unchanged (no blocks <= `to_block`). + Unchanged, +}