Compare commits

...

16 Commits

Author SHA1 Message Date
joshieDo
163cc01f5c Merge branch 'main' into feat/rocksdb-history-pruner 2026-01-30 08:32:30 +00:00
joshieDo
56188cd48f Merge branch 'main' into feat/rocksdb-history-pruner 2026-01-29 19:04:45 +00:00
yongkangc
055de28e48 fix: remove trailing semicolon for consistency
Amp-Thread-ID: https://ampcode.com/threads/T-019bfd8a-df09-727b-a483-ae8f344341ef
2026-01-27 04:14:52 +00:00
yongkangc
47c005c47b fix: align crash-safety comments between account and storage history pruners
Amp-Thread-ID: https://ampcode.com/threads/T-019bfd8a-df09-727b-a483-ae8f344341ef
2026-01-27 03:43:45 +00:00
yongkangc
f1b8be7480 fix: address yongkangc review comments
- Add comments explaining what the walker sections do in prune_rocksdb methods
- Add back 'Delete static file jars below the pruned block' comment in storage_history

Amp-Thread-ID: https://ampcode.com/threads/T-019bfd76-35a3-72a6-ad2f-63bdb3fb9b9c
2026-01-27 03:40:43 +00:00
yongkangc
f40fcb00c8 fix: address review issues after rebase
- Only delete static file jars when done=true in AccountHistory::prune_static_files (matches StorageHistory)
- Include updated_shards in pruned count for StorageHistory::prune_rocksdb (matches AccountHistory)
- Fix clippy doc_markdown warning for PruneShardOutcome
- Apply rustfmt formatting fixes

Amp-Thread-ID: https://ampcode.com/threads/T-019bfd76-35a3-72a6-ad2f-63bdb3fb9b9c
2026-01-27 03:27:54 +00:00
yongkangc
393dc0c3e0 fix: address review comments for RocksDB history pruner
- Add `if done` guard to static file deletion in prune_static_files()
  to prevent crash consistency issues when interrupted mid-block
- Fix misleading comment about RocksDB commit order (RocksDB commits
  before MDBX checkpoint, not atomically with it)
- Fix pruned count to exclude updated_shards for consistency with
  other code paths

Amp-Thread-ID: https://ampcode.com/threads/T-019bfd6a-fbd6-72d0-8211-3622cb777deb
2026-01-27 03:13:53 +00:00
yongkangc
15f6587e84 refactor: inline PruneShardOutcome and simplify RocksDB commit
- Move PruneShardOutcome enum from rocksdb_types.rs into both the real
  RocksDB provider and the stub, eliminating the shared types module
- Simplify RocksDB batch commit error handling by using the ? operator
  instead of verbose warn! + return pattern
- Add clarifying comments for early limiter exit and changeset tracking

Amp-Thread-ID: https://ampcode.com/threads/T-019beac9-7e8c-7348-a7e4-0a002144ba1f
2026-01-27 03:12:15 +00:00
yongkangc
3f3f808ab0 fix: address review comments - fix typos and remove redundant comments
- Fix typo 'it's' -> 'is' in account_history.rs comments (lines 109, 200)
- Remove redundant comments that restate code in both account_history.rs and storage_history.rs
2026-01-27 03:11:40 +00:00
yongkangc
21ecafdb06 fix(pruner): improve RocksDB history pruner correctness and performance
- Gate static file deletion on 'done' flag in StorageHistory RocksDB path
  to prevent data mismatch when interrupted (was deleting before checkpoint rollback)
- Cap prune target with last_changeset_pruned_block in both AccountHistory
  and StorageHistory RocksDB paths (matches MDBX behavior)
- Add changesets_processed to StorageHistory pruned count for consistency
- Sort storage keys before RocksDB iteration for better cache locality
2026-01-27 03:11:36 +00:00
yongkangc
9ecb970e26 refactor: extract PruneShardOutcome to shared module and remove broken test
- Extract PruneShardOutcome enum to rocksdb_types.rs to eliminate duplication
  between rocksdb/provider.rs and rocksdb_stub.rs
- Remove test_batch_auto_commit_on_threshold which references non-existent
  auto_commit_threshold field on RocksDBBatch
2026-01-27 03:11:36 +00:00
yongkangc
ecb32978ee changes 2026-01-27 03:11:16 +00:00
yongkangc
9f735b9164 feat(prune): add RocksDB history indices pruner segments
Integrate RocksDB history pruning into existing AccountHistory and StorageHistory
segments rather than having separate structs. The segments now detect when RocksDB
is available (feature-gated with #[cfg(all(unix, feature = "rocksdb"))]) and prune
RocksDB indices alongside static file/MDBX pruning.

When account_history_in_rocksdb or storages_history_in_rocksdb is enabled:
- Read changesets from static files via existing walkers
- Collect affected addresses/storage keys with highest block numbers
- Prune RocksDB history shards via with_rocksdb_batch()
- Delete static file jars below the pruned block

This eliminates the need for separate AccountsHistoryPruner and StoragesHistoryPruner
structs, sharing the changeset iteration logic with the MDBX path.
2026-01-27 03:11:15 +00:00
Dan Cline
0346f03bfd chore(pruner): remove redundant changeset segments 2026-01-27 03:10:40 +00:00
Dan Cline
28e0b1b8d8 chore(pruner): add static file changeset tests 2026-01-27 03:07:06 +00:00
Dan Cline
1b79a6d0b0 fix(pruner): account and storage changeset pruning for static files 2026-01-27 03:07:06 +00:00
11 changed files with 1941 additions and 61 deletions

View File

@@ -42,11 +42,15 @@ rayon.workspace = true
tokio.workspace = true
rustc-hash.workspace = true
[features]
rocksdb = ["reth-provider/rocksdb"]
[dev-dependencies]
# reth
reth-db = { workspace = true, features = ["test-utils"] }
reth-stages = { workspace = true, features = ["test-utils"] }
reth-stages = { workspace = true, features = ["test-utils", "rocksdb"] }
reth-primitives-traits = { workspace = true, features = ["arbitrary"] }
reth-storage-api.workspace = true
reth-testing-utils.workspace = true
reth-tracing.workspace = true

View File

@@ -7,10 +7,10 @@ use reth_primitives_traits::NodePrimitives;
use reth_provider::{
providers::StaticFileProvider, BlockReader, ChainStateBlockReader, DBProvider,
DatabaseProviderFactory, NodePrimitivesProvider, PruneCheckpointReader, PruneCheckpointWriter,
StageCheckpointReader, StaticFileProviderFactory, StorageSettingsCache,
RocksDBProviderFactory, StageCheckpointReader, StaticFileProviderFactory,
};
use reth_prune_types::PruneModes;
use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
use reth_storage_api::{ChangeSetReader, StorageChangeSetReader, StorageSettingsCache};
use std::time::Duration;
use tokio::sync::watch;
@@ -85,6 +85,7 @@ impl PrunerBuilder {
+ StageCheckpointReader
+ ChangeSetReader
+ StorageChangeSetReader
+ RocksDBProviderFactory
+ StaticFileProviderFactory<
Primitives: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>,
>,
@@ -121,7 +122,8 @@ impl PrunerBuilder {
+ StorageSettingsCache
+ StageCheckpointReader
+ ChangeSetReader
+ StorageChangeSetReader,
+ StorageChangeSetReader
+ RocksDBProviderFactory,
{
let segments = SegmentSet::<Provider>::from_components(static_file_provider, self.segments);

View File

@@ -7,10 +7,11 @@ use reth_db_api::{table::Value, transaction::DbTxMut};
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
providers::StaticFileProvider, BlockReader, ChainStateBlockReader, DBProvider,
PruneCheckpointReader, PruneCheckpointWriter, StaticFileProviderFactory, StorageSettingsCache,
PruneCheckpointReader, PruneCheckpointWriter, RocksDBProviderFactory,
StaticFileProviderFactory,
};
use reth_prune_types::PruneModes;
use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
use reth_storage_api::{ChangeSetReader, StorageChangeSetReader, StorageSettingsCache};
/// Collection of [`Segment`]. Thread-safe, allocated on the heap.
#[derive(Debug)]
@@ -55,7 +56,8 @@ where
+ ChainStateBlockReader
+ StorageSettingsCache
+ ChangeSetReader
+ StorageChangeSetReader,
+ StorageChangeSetReader
+ RocksDBProviderFactory,
{
/// Creates a [`SegmentSet`] from an existing components, such as [`StaticFileProvider`] and
/// [`PruneModes`].

View File

@@ -10,20 +10,20 @@ use alloy_primitives::BlockNumber;
use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut};
use reth_provider::{
changeset_walker::StaticFileAccountChangesetWalker, DBProvider, EitherWriter,
StaticFileProviderFactory, StorageSettingsCache,
RocksDBProviderFactory, StaticFileProviderFactory,
};
use reth_prune_types::{
PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
use reth_static_file_types::StaticFileSegment;
use reth_storage_api::ChangeSetReader;
use reth_storage_api::{ChangeSetReader, StorageSettingsCache};
use rustc_hash::FxHashMap;
use tracing::{instrument, trace};
/// Number of account history tables to prune in one step.
///
/// Account History consists of two tables: [`tables::AccountChangeSets`] and
/// [`tables::AccountsHistory`]. We want to prune them to the same block number.
/// Account History consists of two tables: [`tables::AccountChangeSets`] (either in database or
/// static files) and [`tables::AccountsHistory`]. We want to prune them to the same block number.
const ACCOUNT_HISTORY_TABLES_TO_PRUNE: usize = 2;
#[derive(Debug)]
@@ -42,7 +42,8 @@ where
Provider: DBProvider<Tx: DbTxMut>
+ StaticFileProviderFactory
+ StorageSettingsCache
+ ChangeSetReader,
+ ChangeSetReader
+ RocksDBProviderFactory,
{
fn segment(&self) -> PruneSegment {
PruneSegment::AccountHistory
@@ -67,7 +68,13 @@ where
};
let range_end = *range.end();
// Check where account changesets are stored
// Check where account history indices are stored
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().account_history_in_rocksdb {
return self.prune_rocksdb(provider, input, range, range_end);
}
// Check where account changesets are stored (MDBX path)
if EitherWriter::account_changesets_destination(provider).is_static_file() {
self.prune_static_files(provider, input, range, range_end)
} else {
@@ -94,6 +101,8 @@ impl AccountHistory {
input.limiter
};
// The limiter may already be exhausted from a previous segment in the same prune run.
// Early exit avoids unnecessary iteration when no budget remains.
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
@@ -101,11 +110,14 @@ impl AccountHistory {
))
}
// The size of this map it's limited by `prune_delete_limit * blocks_since_last_run /
// ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with the current defaults it's usually `3500 * 5 /
// 2`, so 8750 entries. Each entry is `160 bit + 64 bit`, so the total size should be up to
// ~0.25MB + some hashmap overhead. `blocks_since_last_run` is additionally limited by the
// `max_reorg_depth`, so no OOM is expected here.
// Deleted account changeset keys (account addresses) with the highest block number deleted
// for that key.
//
// The size of this map is limited by `prune_delete_limit * blocks_since_last_run /
// ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with current default it's usually `3500 * 5
// / 2`, so 8750 entries. Each entry is `160 bit + 64 bit`, so the total
// size should be up to ~0.25MB + some hashmap overhead. `blocks_since_last_run` is
// additionally limited by the `max_reorg_depth`, so no OOM is expected here.
let mut highest_deleted_accounts = FxHashMap::default();
let mut last_changeset_pruned_block = None;
let mut pruned_changesets = 0;
@@ -124,8 +136,8 @@ impl AccountHistory {
limiter.increment_deleted_entries_count();
}
// Delete static file jars below the pruned block
if let Some(last_block) = last_changeset_pruned_block {
// Delete static file jars only when fully processed
if done && let Some(last_block) = last_changeset_pruned_block {
provider
.static_file_provider()
.delete_segment_below_block(StaticFileSegment::AccountChangeSets, last_block + 1)?;
@@ -210,6 +222,106 @@ impl AccountHistory {
)
.map_err(Into::into)
}
/// Prunes account history when indices are stored in `RocksDB`.
///
/// Reads account changesets from static files and prunes the corresponding
/// `RocksDB` history shards.
#[cfg(all(unix, feature = "rocksdb"))]
fn prune_rocksdb<Provider>(
&self,
provider: &Provider,
input: PruneInput,
range: std::ops::RangeInclusive<BlockNumber>,
range_end: BlockNumber,
) -> Result<SegmentOutput, PrunerError>
where
Provider: DBProvider + StaticFileProviderFactory + ChangeSetReader + RocksDBProviderFactory,
{
use reth_provider::PruneShardOutcome;
// Unlike MDBX path, we don't divide the limit by 2 because RocksDB path only prunes
// history shards (no separate changeset table to delete from). The changesets are in
// static files which are deleted separately.
let mut limiter = input.limiter;
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
))
}
let mut highest_deleted_accounts = FxHashMap::default();
let mut last_changeset_pruned_block = None;
let mut changesets_processed = 0usize;
let mut done = true;
// Walk account changesets from static files using a streaming iterator.
// For each changeset, track the highest block number seen for each address
// to determine which history shard entries need pruning.
let walker = StaticFileAccountChangesetWalker::new(provider, range);
for result in walker {
if limiter.is_limit_reached() {
done = false;
break;
}
let (block_number, changeset) = result?;
highest_deleted_accounts.insert(changeset.address, block_number);
last_changeset_pruned_block = Some(block_number);
changesets_processed += 1;
limiter.increment_deleted_entries_count();
}
trace!(target: "pruner", processed = %changesets_processed, %done, "Scanned account changesets from static files");
let last_changeset_pruned_block = last_changeset_pruned_block
.map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
.unwrap_or(range_end);
// Prune RocksDB history shards for affected accounts
let mut deleted_shards = 0usize;
let mut updated_shards = 0usize;
// Sort by address for better RocksDB cache locality
let mut sorted_accounts: Vec<_> = highest_deleted_accounts.into_iter().collect();
sorted_accounts.sort_unstable_by_key(|(addr, _)| *addr);
provider.with_rocksdb_batch(|mut batch| {
for (address, highest_block) in &sorted_accounts {
let prune_to = (*highest_block).min(last_changeset_pruned_block);
match batch.prune_account_history_to(*address, prune_to)? {
PruneShardOutcome::Deleted => deleted_shards += 1,
PruneShardOutcome::Updated => updated_shards += 1,
PruneShardOutcome::Unchanged => {}
}
}
Ok(((), Some(batch.into_inner())))
})?;
trace!(target: "pruner", deleted = deleted_shards, updated = updated_shards, %done, "Pruned account history (RocksDB indices)");
// Delete static file jars only when fully processed. During provider.commit(), RocksDB
// batch is committed before the MDBX checkpoint. If crash occurs after RocksDB commit
// but before MDBX commit, on restart the pruner checkpoint indicates data needs
// re-pruning, but the RocksDB shards are already pruned - this is safe because pruning
// is idempotent (re-pruning already-pruned shards is a no-op).
if done {
provider.static_file_provider().delete_segment_below_block(
StaticFileSegment::AccountChangeSets,
last_changeset_pruned_block + 1,
)?;
}
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
pruned: changesets_processed + deleted_shards + updated_shards,
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,
}),
})
}
}
#[cfg(test)]
@@ -539,4 +651,272 @@ mod tests {
test_prune(998, 2, (PruneProgress::Finished, 1000));
test_prune(1400, 3, (PruneProgress::Finished, 804));
}
#[cfg(all(unix, feature = "rocksdb"))]
#[test]
fn prune_rocksdb_path() {
use reth_db_api::models::ShardedKey;
use reth_provider::{RocksDBProviderFactory, StaticFileProviderFactory};
let db = TestStageDB::default();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=100,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
let (changesets, _) = random_changeset_range(
&mut rng,
blocks.iter(),
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
0..0,
0..0,
);
db.insert_changesets_to_static_files(changesets.clone(), None)
.expect("insert changesets to static files");
let mut account_blocks: BTreeMap<_, Vec<u64>> = BTreeMap::new();
for (block, changeset) in changesets.iter().enumerate() {
for (address, _, _) in changeset {
account_blocks.entry(*address).or_default().push(block as u64);
}
}
let rocksdb = db.factory.rocksdb_provider();
let mut batch = rocksdb.batch();
for (address, block_numbers) in &account_blocks {
let shard = BlockNumberList::new_pre_sorted(block_numbers.iter().copied());
batch
.put::<tables::AccountsHistory>(ShardedKey::new(*address, u64::MAX), &shard)
.unwrap();
}
batch.commit().unwrap();
for (address, expected_blocks) in &account_blocks {
let shards = rocksdb.account_history_shards(*address).unwrap();
assert_eq!(shards.len(), 1);
assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), *expected_blocks);
}
let to_block: BlockNumber = 50;
let prune_mode = PruneMode::Before(to_block);
let input =
PruneInput { previous_checkpoint: None, to_block, limiter: PruneLimiter::default() };
let segment = AccountHistory::new(prune_mode);
db.factory.set_storage_settings_cache(
StorageSettings::default()
.with_account_changesets_in_static_files(true)
.with_account_history_in_rocksdb(true),
);
let provider = db.factory.database_provider_rw().unwrap();
let result = segment.prune(&provider, input).unwrap();
provider.commit().expect("commit");
assert_matches!(
result,
SegmentOutput { progress: PruneProgress::Finished, pruned, checkpoint: Some(_) }
if pruned > 0
);
for (address, original_blocks) in &account_blocks {
let shards = rocksdb.account_history_shards(*address).unwrap();
let expected_blocks: Vec<u64> =
original_blocks.iter().copied().filter(|b| *b > to_block).collect();
if expected_blocks.is_empty() {
assert!(
shards.is_empty(),
"Expected no shards for address {address:?} after pruning"
);
} else {
assert_eq!(shards.len(), 1, "Expected 1 shard for address {address:?}");
assert_eq!(
shards[0].1.iter().collect::<Vec<_>>(),
expected_blocks,
"Shard blocks mismatch for address {address:?}"
);
}
}
let static_file_provider = db.factory.static_file_provider();
let highest_block = static_file_provider.get_highest_static_file_block(
reth_static_file_types::StaticFileSegment::AccountChangeSets,
);
if let Some(block) = highest_block {
assert!(
block > to_block,
"Static files should only contain blocks above to_block ({to_block}), got {block}"
);
}
}
/// Tests that when a limiter stops mid-block (with multiple changes for the same block),
/// the checkpoint is set to `block_number - 1` to avoid dangling index entries.
#[test]
fn prune_partial_progress_mid_block() {
use alloy_primitives::{Address, U256};
use reth_primitives_traits::Account;
use reth_testing_utils::generators::ChangeSet;
let db = TestStageDB::default();
let mut rng = generators::rng();
// Create blocks 0..=10
let blocks = random_block_range(
&mut rng,
0..=10,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
// Create specific changesets where block 5 has 4 account changes
let addr1 = Address::with_last_byte(1);
let addr2 = Address::with_last_byte(2);
let addr3 = Address::with_last_byte(3);
let addr4 = Address::with_last_byte(4);
let addr5 = Address::with_last_byte(5);
let account = Account { nonce: 1, balance: U256::from(100), bytecode_hash: None };
// Build changesets: blocks 0-4 have 1 change each, block 5 has 4 changes, block 6 has 1
let changesets: Vec<ChangeSet> = vec![
vec![(addr1, account, vec![])], // block 0
vec![(addr1, account, vec![])], // block 1
vec![(addr1, account, vec![])], // block 2
vec![(addr1, account, vec![])], // block 3
vec![(addr1, account, vec![])], // block 4
// block 5: 4 different account changes (sorted by address for consistency)
vec![
(addr1, account, vec![]),
(addr2, account, vec![]),
(addr3, account, vec![]),
(addr4, account, vec![]),
],
vec![(addr5, account, vec![])], // block 6
];
db.insert_changesets(changesets.clone(), None).expect("insert changesets");
db.insert_history(changesets.clone(), None).expect("insert history");
// Total changesets: 5 (blocks 0-4) + 4 (block 5) + 1 (block 6) = 10
assert_eq!(
db.table::<tables::AccountChangeSets>().unwrap().len(),
changesets.iter().flatten().count()
);
let prune_mode = PruneMode::Before(10);
// Set limiter to stop after 7 entries (mid-block 5: 5 from blocks 0-4, then 2 of 4 from
// block 5). Due to ACCOUNT_HISTORY_TABLES_TO_PRUNE=2, actual limit is 7/2=3
// changesets. So we'll process blocks 0, 1, 2 (3 changesets), stopping before block
// 3. Actually, let's use a higher limit to reach block 5. With limit=14, we get 7
// changeset slots. Blocks 0-4 use 5 slots, leaving 2 for block 5 (which has 4), so
// we stop mid-block 5.
let deleted_entries_limit = 14; // 14/2 = 7 changeset entries before limit
let limiter = PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
let input = PruneInput { previous_checkpoint: None, to_block: 10, limiter };
let segment = AccountHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(
StorageSettings::default().with_account_changesets_in_static_files(false),
);
let result = segment.prune(&provider, input).unwrap();
// Should report that there's more data
assert!(!result.progress.is_finished(), "Expected HasMoreData since we stopped mid-block");
// Save checkpoint and commit
segment
.save_checkpoint(&provider, result.checkpoint.unwrap().as_prune_checkpoint(prune_mode))
.unwrap();
provider.commit().expect("commit");
// Verify checkpoint is set to block 4 (not 5), since block 5 is incomplete
let checkpoint = db
.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::AccountHistory)
.unwrap()
.expect("checkpoint should exist");
assert_eq!(
checkpoint.block_number,
Some(4),
"Checkpoint should be block 4 (block before incomplete block 5)"
);
// Verify remaining changesets (block 5 and 6 should still have entries)
let remaining_changesets = db.table::<tables::AccountChangeSets>().unwrap();
// After pruning blocks 0-4, remaining should be block 5 (4 entries) + block 6 (1 entry) = 5
// But since we stopped mid-block 5, some of block 5 might be pruned
// However, checkpoint is 4, so on re-run we should re-process from block 5
assert!(
!remaining_changesets.is_empty(),
"Should have remaining changesets for blocks 5-6"
);
// Verify no dangling history indices for blocks that weren't fully pruned
// The indices for block 5 should still reference blocks <= 5 appropriately
let history = db.table::<tables::AccountsHistory>().unwrap();
for (key, _blocks) in &history {
// All blocks in the history should be > checkpoint block number
// OR the shard's highest_block_number should be > checkpoint
assert!(
key.highest_block_number > 4,
"Found stale history shard with highest_block_number {} <= checkpoint 4",
key.highest_block_number
);
}
// Run prune again to complete - should finish processing block 5 and 6
let input2 = PruneInput {
previous_checkpoint: Some(checkpoint),
to_block: 10,
limiter: PruneLimiter::default().set_deleted_entries_limit(100), // high limit
};
let provider2 = db.factory.database_provider_rw().unwrap();
provider2.set_storage_settings_cache(
StorageSettings::default().with_account_changesets_in_static_files(false),
);
let result2 = segment.prune(&provider2, input2).unwrap();
assert!(result2.progress.is_finished(), "Second run should complete");
segment
.save_checkpoint(
&provider2,
result2.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
)
.unwrap();
provider2.commit().expect("commit");
// Verify final checkpoint
let final_checkpoint = db
.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::AccountHistory)
.unwrap()
.expect("checkpoint should exist");
// Should now be at block 6 (the last block with changesets)
assert_eq!(final_checkpoint.block_number, Some(6), "Final checkpoint should be at block 6");
// All changesets should be pruned
let final_changesets = db.table::<tables::AccountChangeSets>().unwrap();
assert!(final_changesets.is_empty(), "All changesets up to block 10 should be pruned");
}
}

View File

@@ -12,7 +12,7 @@ use reth_db_api::{
tables,
transaction::DbTxMut,
};
use reth_provider::{DBProvider, EitherWriter, StaticFileProviderFactory};
use reth_provider::{DBProvider, EitherWriter, RocksDBProviderFactory, StaticFileProviderFactory};
use reth_prune_types::{
PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
@@ -43,7 +43,8 @@ where
Provider: DBProvider<Tx: DbTxMut>
+ StaticFileProviderFactory
+ StorageChangeSetReader
+ StorageSettingsCache,
+ StorageSettingsCache
+ RocksDBProviderFactory,
{
fn segment(&self) -> PruneSegment {
PruneSegment::StorageHistory
@@ -68,6 +69,13 @@ where
};
let range_end = *range.end();
// Check where storage history indices are stored
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storages_history_in_rocksdb {
return self.prune_rocksdb(provider, input, range, range_end);
}
// Check where storage changesets are stored (MDBX path)
if EitherWriter::storage_changesets_destination(provider).is_static_file() {
self.prune_static_files(provider, input, range, range_end)
} else {
@@ -94,6 +102,8 @@ impl StorageHistory {
input.limiter
};
// The limiter may already be exhausted from a previous segment in the same prune run.
// Early exit avoids unnecessary iteration when no budget remains.
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
@@ -126,8 +136,8 @@ impl StorageHistory {
limiter.increment_deleted_entries_count();
}
// Delete static file jars below the pruned block
if let Some(last_block) = last_changeset_pruned_block {
// Delete static file jars only when fully processed
if done && let Some(last_block) = last_changeset_pruned_block {
provider
.static_file_provider()
.delete_segment_below_block(StaticFileSegment::StorageChangeSets, last_block + 1)?;
@@ -216,6 +226,107 @@ impl StorageHistory {
)
.map_err(Into::into)
}
/// Prunes storage history when indices are stored in `RocksDB`.
///
/// Reads storage changesets from static files and prunes the corresponding
/// `RocksDB` history shards.
#[cfg(all(unix, feature = "rocksdb"))]
fn prune_rocksdb<Provider>(
&self,
provider: &Provider,
input: PruneInput,
range: std::ops::RangeInclusive<BlockNumber>,
range_end: BlockNumber,
) -> Result<SegmentOutput, PrunerError>
where
Provider: DBProvider + StaticFileProviderFactory + RocksDBProviderFactory,
{
use reth_provider::PruneShardOutcome;
let mut limiter = input.limiter;
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
))
}
let mut highest_deleted_storages: FxHashMap<_, _> = FxHashMap::default();
let mut last_changeset_pruned_block = None;
let mut changesets_processed = 0usize;
let mut done = true;
// Walk storage changesets from static files using a streaming iterator.
// For each changeset, track the highest block number seen for each (address, storage_key)
// pair to determine which history shard entries need pruning.
let walker = provider.static_file_provider().walk_storage_changeset_range(range);
for result in walker {
if limiter.is_limit_reached() {
done = false;
break;
}
let (block_address, entry) = result?;
let block_number = block_address.block_number();
let address = block_address.address();
highest_deleted_storages.insert((address, entry.key), block_number);
last_changeset_pruned_block = Some(block_number);
changesets_processed += 1;
limiter.increment_deleted_entries_count();
}
trace!(target: "pruner", processed = %changesets_processed, %done, "Scanned storage changesets from static files");
let last_changeset_pruned_block = last_changeset_pruned_block
.map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
.unwrap_or(range_end);
// Prune RocksDB history shards for affected storage slots
let mut deleted_shards = 0usize;
let mut updated_shards = 0usize;
// Sort by (address, storage_key) for better RocksDB cache locality
let mut sorted_storages: Vec<_> = highest_deleted_storages.into_iter().collect();
sorted_storages.sort_unstable_by_key(|((addr, key), _)| (*addr, *key));
provider.with_rocksdb_batch(|mut batch| {
for ((address, storage_key), highest_block) in &sorted_storages {
let prune_to = (*highest_block).min(last_changeset_pruned_block);
match batch.prune_storage_history_to(*address, *storage_key, prune_to)? {
PruneShardOutcome::Deleted => deleted_shards += 1,
PruneShardOutcome::Updated => updated_shards += 1,
PruneShardOutcome::Unchanged => {}
}
}
Ok(((), Some(batch.into_inner())))
})?;
trace!(target: "pruner", deleted = deleted_shards, updated = updated_shards, %done, "Pruned storage history (RocksDB indices)");
// Delete static file jars only when fully processed. During provider.commit(), RocksDB
// batch is committed before the MDBX checkpoint. If crash occurs after RocksDB commit
// but before MDBX commit, on restart the pruner checkpoint indicates data needs
// re-pruning, but the RocksDB shards are already pruned - this is safe because pruning
// is idempotent (re-pruning already-pruned shards is a no-op).
if done {
provider.static_file_provider().delete_segment_below_block(
StaticFileSegment::StorageChangeSets,
last_changeset_pruned_block + 1,
)?;
}
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
pruned: changesets_processed + deleted_shards + updated_shards,
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,
}),
})
}
}
#[cfg(test)]
@@ -553,4 +664,270 @@ mod tests {
test_prune(998, 2, (PruneProgress::Finished, 500));
test_prune(1200, 3, (PruneProgress::Finished, 202));
}
/// Tests that when a limiter stops mid-block (with multiple storage changes for the same
/// block), the checkpoint is set to `block_number - 1` to avoid dangling index entries.
#[test]
fn prune_partial_progress_mid_block() {
use alloy_primitives::{Address, U256};
use reth_primitives_traits::Account;
use reth_testing_utils::generators::ChangeSet;
let db = TestStageDB::default();
let mut rng = generators::rng();
// Create blocks 0..=10
let blocks = random_block_range(
&mut rng,
0..=10,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
// Create specific changesets where block 5 has 4 storage changes
let addr1 = Address::with_last_byte(1);
let addr2 = Address::with_last_byte(2);
let account = Account { nonce: 1, balance: U256::from(100), bytecode_hash: None };
// Create storage entries
let storage_entry = |key: u8| reth_primitives_traits::StorageEntry {
key: B256::with_last_byte(key),
value: U256::from(100),
};
// Build changesets: blocks 0-4 have 1 storage change each, block 5 has 4 changes, block 6
// has 1. Entries within each account must be sorted by key.
let changesets: Vec<ChangeSet> = vec![
vec![(addr1, account, vec![storage_entry(1)])], // block 0
vec![(addr1, account, vec![storage_entry(1)])], // block 1
vec![(addr1, account, vec![storage_entry(1)])], // block 2
vec![(addr1, account, vec![storage_entry(1)])], // block 3
vec![(addr1, account, vec![storage_entry(1)])], // block 4
// block 5: 4 different storage changes (2 addresses, each with 2 storage slots)
// Sorted by address, then by storage key within each address
vec![
(addr1, account, vec![storage_entry(1), storage_entry(2)]),
(addr2, account, vec![storage_entry(1), storage_entry(2)]),
],
vec![(addr1, account, vec![storage_entry(3)])], // block 6
];
db.insert_changesets(changesets.clone(), None).expect("insert changesets");
db.insert_history(changesets.clone(), None).expect("insert history");
// Total storage changesets
let total_storage_entries: usize =
changesets.iter().flat_map(|c| c.iter()).map(|(_, _, entries)| entries.len()).sum();
assert_eq!(db.table::<tables::StorageChangeSets>().unwrap().len(), total_storage_entries);
let prune_mode = PruneMode::Before(10);
// Set limiter to stop mid-block 5
// With STORAGE_HISTORY_TABLES_TO_PRUNE=2, limit=14 gives us 7 storage entries before limit
// Blocks 0-4 use 5 slots, leaving 2 for block 5 (which has 4), so we stop mid-block 5
let deleted_entries_limit = 14; // 14/2 = 7 storage entries before limit
let limiter = PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
let input = PruneInput { previous_checkpoint: None, to_block: 10, limiter };
let segment = StorageHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(
StorageSettings::default().with_storage_changesets_in_static_files(false),
);
let result = segment.prune(&provider, input).unwrap();
// Should report that there's more data
assert!(!result.progress.is_finished(), "Expected HasMoreData since we stopped mid-block");
// Save checkpoint and commit
segment
.save_checkpoint(&provider, result.checkpoint.unwrap().as_prune_checkpoint(prune_mode))
.unwrap();
provider.commit().expect("commit");
// Verify checkpoint is set to block 4 (not 5), since block 5 is incomplete
let checkpoint = db
.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::StorageHistory)
.unwrap()
.expect("checkpoint should exist");
assert_eq!(
checkpoint.block_number,
Some(4),
"Checkpoint should be block 4 (block before incomplete block 5)"
);
// Verify remaining changesets
let remaining_changesets = db.table::<tables::StorageChangeSets>().unwrap();
assert!(
!remaining_changesets.is_empty(),
"Should have remaining changesets for blocks 5-6"
);
// Verify no dangling history indices for blocks that weren't fully pruned
let history = db.table::<tables::StoragesHistory>().unwrap();
for (key, _blocks) in &history {
assert!(
key.sharded_key.highest_block_number > 4,
"Found stale history shard with highest_block_number {} <= checkpoint 4",
key.sharded_key.highest_block_number
);
}
// Run prune again to complete - should finish processing block 5 and 6
let input2 = PruneInput {
previous_checkpoint: Some(checkpoint),
to_block: 10,
limiter: PruneLimiter::default().set_deleted_entries_limit(100), // high limit
};
let provider2 = db.factory.database_provider_rw().unwrap();
provider2.set_storage_settings_cache(
StorageSettings::default().with_storage_changesets_in_static_files(false),
);
let result2 = segment.prune(&provider2, input2).unwrap();
assert!(result2.progress.is_finished(), "Second run should complete");
segment
.save_checkpoint(
&provider2,
result2.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
)
.unwrap();
provider2.commit().expect("commit");
// Verify final checkpoint
let final_checkpoint = db
.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::StorageHistory)
.unwrap()
.expect("checkpoint should exist");
// Should now be at block 6 (the last block with changesets)
assert_eq!(final_checkpoint.block_number, Some(6), "Final checkpoint should be at block 6");
// All changesets should be pruned
let final_changesets = db.table::<tables::StorageChangeSets>().unwrap();
assert!(final_changesets.is_empty(), "All changesets up to block 10 should be pruned");
}
#[cfg(all(unix, feature = "rocksdb"))]
#[test]
fn prune_rocksdb() {
use reth_db_api::models::storage_sharded_key::StorageShardedKey;
use reth_provider::RocksDBProviderFactory;
use reth_storage_api::StorageSettings;
let db = TestStageDB::default();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=100,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
let (changesets, _) = random_changeset_range(
&mut rng,
blocks.iter(),
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
1..2,
1..2,
);
db.insert_changesets_to_static_files(changesets.clone(), None)
.expect("insert changesets to static files");
let mut storage_indices: BTreeMap<(alloy_primitives::Address, B256), Vec<u64>> =
BTreeMap::new();
for (block, changeset) in changesets.iter().enumerate() {
for (address, _, storage_entries) in changeset {
for entry in storage_entries {
storage_indices.entry((*address, entry.key)).or_default().push(block as u64);
}
}
}
{
let rocksdb = db.factory.rocksdb_provider();
let mut batch = rocksdb.batch();
for ((address, storage_key), block_numbers) in &storage_indices {
let shard = BlockNumberList::new_pre_sorted(block_numbers.clone());
batch
.put::<tables::StoragesHistory>(
StorageShardedKey::last(*address, *storage_key),
&shard,
)
.expect("insert storage history shard");
}
batch.commit().expect("commit rocksdb batch");
}
{
let rocksdb = db.factory.rocksdb_provider();
for (address, storage_key) in storage_indices.keys() {
let shards = rocksdb.storage_history_shards(*address, *storage_key).unwrap();
assert!(!shards.is_empty(), "RocksDB should contain storage history before prune");
}
}
let to_block = 50u64;
let prune_mode = PruneMode::Before(to_block);
let input =
PruneInput { previous_checkpoint: None, to_block, limiter: PruneLimiter::default() };
let segment = StorageHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(
StorageSettings::default()
.with_storage_changesets_in_static_files(true)
.with_storages_history_in_rocksdb(true),
);
let result = segment.prune(&provider, input).unwrap();
provider.commit().expect("commit");
assert_matches!(
result,
SegmentOutput { progress: PruneProgress::Finished, checkpoint: Some(_), .. }
);
{
let rocksdb = db.factory.rocksdb_provider();
for ((address, storage_key), block_numbers) in &storage_indices {
let shards = rocksdb.storage_history_shards(*address, *storage_key).unwrap();
let remaining_blocks: Vec<u64> =
block_numbers.iter().copied().filter(|&b| b > to_block).collect();
if remaining_blocks.is_empty() {
assert!(
shards.is_empty(),
"Shard for {:?}/{:?} should be deleted when all blocks pruned",
address,
storage_key
);
} else {
assert!(!shards.is_empty(), "Shard should exist with remaining blocks");
let actual_blocks: Vec<u64> =
shards.iter().flat_map(|(_, list)| list.iter()).collect();
assert_eq!(
actual_blocks, remaining_blocks,
"RocksDB shard should only contain blocks > {}",
to_block
);
}
}
}
}
}

View File

@@ -2,7 +2,7 @@ use reth_db_api::{table::Value, transaction::DbTxMut};
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
BlockReader, ChainStateBlockReader, DBProvider, PruneCheckpointReader, PruneCheckpointWriter,
StageCheckpointReader, StaticFileProviderFactory, StorageSettingsCache,
RocksDBProviderFactory, StageCheckpointReader, StaticFileProviderFactory,
};
use reth_prune::{
PruneMode, PruneModes, PruneSegment, PrunerBuilder, SegmentOutput, SegmentOutputCheckpoint,
@@ -10,7 +10,7 @@ use reth_prune::{
use reth_stages_api::{
ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
};
use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
use reth_storage_api::{ChangeSetReader, StorageChangeSetReader, StorageSettingsCache};
use tracing::info;
/// The prune stage that runs the pruner with the provided prune modes.
@@ -49,7 +49,8 @@ where
Primitives: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>,
> + StorageSettingsCache
+ ChangeSetReader
+ StorageChangeSetReader,
+ StorageChangeSetReader
+ RocksDBProviderFactory,
{
fn id(&self) -> StageId {
StageId::Prune
@@ -156,7 +157,8 @@ where
Primitives: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>,
> + StorageSettingsCache
+ ChangeSetReader
+ StorageChangeSetReader,
+ StorageChangeSetReader
+ RocksDBProviderFactory,
{
fn id(&self) -> StageId {
StageId::PruneSenderRecovery

View File

@@ -21,8 +21,8 @@ pub mod providers;
pub use providers::{
DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, HistoricalStateProvider,
HistoricalStateProviderRef, LatestStateProvider, LatestStateProviderRef, ProviderFactory,
SaveBlocksMode, StaticFileAccess, StaticFileProviderBuilder, StaticFileWriteCtx,
StaticFileWriter,
PruneShardOutcome, SaveBlocksMode, StaticFileAccess, StaticFileProviderBuilder,
StaticFileWriteCtx, StaticFileWriter,
};
pub mod changeset_walker;

View File

@@ -39,8 +39,8 @@ pub use consistent::ConsistentProvider;
pub(crate) mod rocksdb;
pub use rocksdb::{
RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksDBRawIter, RocksDBStats, RocksDBTableStats,
RocksTx,
PruneShardOutcome, RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksDBRawIter, RocksDBStats,
RocksDBTableStats, RocksTx,
};
/// Helper trait to bound [`NodeTypes`] so that combined with database they satisfy

View File

@@ -6,6 +6,6 @@ mod provider;
pub(crate) use provider::{PendingRocksDBBatches, RocksDBWriteCtx};
pub use provider::{
RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksDBRawIter, RocksDBStats, RocksDBTableStats,
RocksTx,
PruneShardOutcome, RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksDBRawIter, RocksDBStats,
RocksDBTableStats, RocksTx,
};

File diff suppressed because it is too large Load Diff

View File

@@ -189,3 +189,14 @@ pub struct RocksTx;
/// A stub raw iterator for `RocksDB`.
#[derive(Debug)]
pub struct RocksDBRawIter;
/// Outcome of pruning a history shard in RocksDB (stub).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PruneShardOutcome {
/// Shard was deleted entirely.
Deleted,
/// Shard was updated with filtered block numbers.
Updated,
/// Shard was unchanged (no blocks <= `to_block`).
Unchanged,
}