Compare commits

...

8 Commits

Author SHA1 Message Date
yongkangc
eb570d658c fix: start static file tests from block 0, remove unused imports
Amp-Thread-ID: https://ampcode.com/threads/T-019be7bf-0e67-770d-9c38-45c6b5b286ae
2026-01-22 22:23:58 +00:00
yongkangc
8b9a644853 feat(prune): support pruning history from static files 2026-01-22 22:20:37 +00:00
yongkangc
165b362aaf feat(prune): add tests for RocksDB history pruners and invariants healing
- Add comprehensive test suite for AccountsHistoryPruner (4 tests)
- Add comprehensive test suite for StoragesHistoryPruner (4 tests)
- Add changeset-based healing tests for invariants.rs (3 tests)
- Refactor StoragesHistoryPruner to use StorageChangeSetReader trait
  abstraction instead of explicit static file/MDBX logic
- Fix clippy warnings in storage history pruner

The pruners now use trait abstractions (ChangeSetReader for accounts,
StorageChangeSetReader for storage) which handle static file vs MDBX
routing internally, making the pruner code simpler and more consistent.

Closes #20417

Amp-Thread-ID: https://ampcode.com/threads/T-019be7b1-7664-759f-9590-46716d320658
2026-01-22 22:06:04 +00:00
yongkangc
2171624e28 fix: correct checkpoint logic in RocksDB history pruners
- Remove saturating_sub(1) that caused keys in last scanned block to be skipped
- Move limiter check after block processing to capture completed work
- Remove redundant limiter check at loop start in storage pruner
- Trim verbose module docs

Amp-Thread-ID: https://ampcode.com/threads/T-019be770-1374-70a9-8066-f108a1dd15df
2026-01-22 21:17:31 +00:00
yongkangc
502d18a0b8 chore(prune): clean up verbose comments in RocksDB history pruners 2026-01-22 21:17:31 +00:00
yongkangc
93160ff97f refactor(prune): use ChangeSetReader for RocksDB history pruners
Update AccountsHistoryPruner and StoragesHistoryPruner to read changesets
via ChangeSetReader/StorageChangeSetReader traits instead of directly
iterating MDBX tables. This allows the pruners to work regardless of
whether changesets are stored in MDBX or static files.

Key changes:
- Replace DBProvider<Tx: DbTxMut> bounds with ChangeSetReader/StorageChangeSetReader
- Use account_block_changeset()/storage_block_changeset() to iterate changesets
- Remove MDBX changeset deletion (handled by regular history pruner segments)
- Keep existing checkpoint and limiter logic

Closes #20417
2026-01-22 21:17:31 +00:00
yongkangc
e913be192c review: address feedback on RocksDB history pruner
- Add atomicity limitation documentation to segment module docs
- Deduplicate prune_*_history_to with shared helper function
- Add 8 more storage history equivalence tests (parity with account)
- Include updated_shards in pruned count
- Inline to_block variable
- Remove redundant inner cfg attributes
2026-01-22 21:17:31 +00:00
yongkangc
299de4a5f9 feat(prune): add RocksDB history indices pruner segments
Add AccountsHistoryPruner and StoragesHistoryPruner segments for pruning
history indices stored in RocksDB. These segments use changeset-driven
pruning: they prune MDBX changesets first to determine which keys need
RocksDB history pruning, then prune RocksDB shards for those keys.

Key changes:
- Add prune_account_history_to and prune_storage_history_to to RocksDBBatch
- Add PruneShardOutcome enum (Deleted, Updated, Unchanged)
- Add AccountsHistoryPruner and StoragesHistoryPruner segments
- Feature-gated with #[cfg(all(unix, feature = "rocksdb"))]
- Add 21 tests covering edge cases and MDBX equivalence

The implementation maintains the sentinel shard invariant (last shard
has u64::MAX key) and produces logically equivalent results to MDBX
prune_shard.

Closes #21292

Amp-Thread-ID: https://ampcode.com/threads/T-019be66d-e360-712b-8f84-af0142f8a44b
2026-01-22 21:17:31 +00:00
13 changed files with 2736 additions and 62 deletions

1
Cargo.lock generated
View File

@@ -10281,6 +10281,7 @@ dependencies = [
"reth-stages",
"reth-stages-types",
"reth-static-file-types",
"reth-storage-api",
"reth-testing-utils",
"reth-tokio-util",
"reth-tracing",

View File

@@ -41,11 +41,15 @@ rayon.workspace = true
tokio.workspace = true
rustc-hash.workspace = true
[features]
rocksdb = ["reth-provider/rocksdb"]
[dev-dependencies]
# reth
reth-db = { workspace = true, features = ["test-utils"] }
reth-stages = { workspace = true, features = ["test-utils"] }
reth-primitives-traits = { workspace = true, features = ["arbitrary"] }
reth-storage-api.workspace = true
reth-testing-utils.workspace = true
reth-tracing.workspace = true

View File

@@ -20,6 +20,8 @@ pub use user::{
AccountHistory, Bodies, Receipts as UserReceipts, ReceiptsByLogs, SenderRecovery,
StorageHistory, TransactionLookup,
};
#[cfg(all(unix, feature = "rocksdb"))]
pub use user::{AccountsHistoryPruner, StoragesHistoryPruner};
/// Prunes data from static files for a given segment.
///

View File

@@ -3,6 +3,10 @@ mod bodies;
mod history;
mod receipts;
mod receipts_by_logs;
#[cfg(all(unix, feature = "rocksdb"))]
mod rocksdb_account_history;
#[cfg(all(unix, feature = "rocksdb"))]
mod rocksdb_storage_history;
mod sender_recovery;
mod storage_history;
mod transaction_lookup;
@@ -11,6 +15,10 @@ pub use account_history::AccountHistory;
pub use bodies::Bodies;
pub use receipts::Receipts;
pub use receipts_by_logs::ReceiptsByLogs;
#[cfg(all(unix, feature = "rocksdb"))]
pub use rocksdb_account_history::AccountsHistoryPruner;
#[cfg(all(unix, feature = "rocksdb"))]
pub use rocksdb_storage_history::StoragesHistoryPruner;
pub use sender_recovery::SenderRecovery;
pub use storage_history::StorageHistory;
pub use transaction_lookup::TransactionLookup;

View File

@@ -0,0 +1,557 @@
//! `RocksDB` account history index pruner.
use crate::{
segments::{PruneInput, Segment},
PrunerError,
};
use reth_provider::{
ChangeSetReader, DBProvider, EitherWriter, PruneShardOutcome, RocksDBProviderFactory,
StaticFileProviderFactory, StorageSettingsCache,
};
use reth_prune_types::{
PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
use reth_static_file_types::StaticFileSegment;
use rustc_hash::FxHashMap;
use std::ops::RangeInclusive;
use tracing::{instrument, trace};
#[derive(Debug)]
pub struct AccountsHistoryPruner {
mode: PruneMode,
}
impl AccountsHistoryPruner {
pub const fn new(mode: PruneMode) -> Self {
Self { mode }
}
}
impl AccountsHistoryPruner {
fn prune_static_files<Provider>(
&self,
provider: &Provider,
range: RangeInclusive<u64>,
range_end: u64,
input: PruneInput,
) -> Result<SegmentOutput, PrunerError>
where
Provider: ChangeSetReader + RocksDBProviderFactory + StaticFileProviderFactory,
{
let mut limiter = input.limiter;
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
))
}
let mut highest_deleted_accounts = FxHashMap::default();
let mut last_changeset_pruned_block = None;
let mut scanned_changesets = 0usize;
let mut done = true;
for block in range {
let changes = provider.account_block_changeset(block)?;
let changes_count = changes.len();
for change in changes {
highest_deleted_accounts.insert(change.address, block);
}
scanned_changesets += changes_count;
limiter.increment_deleted_entries_count_by(changes_count);
last_changeset_pruned_block = Some(block);
if limiter.is_limit_reached() {
done = false;
break;
}
}
trace!(target: "pruner", scanned = %scanned_changesets, %done, "Scanned account changesets");
let last_changeset_pruned_block = last_changeset_pruned_block.unwrap_or(range_end);
let mut keys_deleted = 0usize;
let mut keys_updated = 0usize;
provider.with_rocksdb_batch(|mut batch| {
for (address, highest_block) in &highest_deleted_accounts {
let prune_to = (*highest_block).min(last_changeset_pruned_block);
match batch.prune_account_history_to(*address, prune_to)? {
PruneShardOutcome::Deleted => keys_deleted += 1,
PruneShardOutcome::Updated => keys_updated += 1,
PruneShardOutcome::Unchanged => {}
}
}
Ok(((), Some(batch.into_inner())))
})?;
trace!(target: "pruner", keys_deleted, keys_updated, %done, "Pruned account history (RocksDB indices)");
if done {
provider.static_file_provider().delete_segment_below_block(
StaticFileSegment::AccountChangeSets,
last_changeset_pruned_block + 1,
)?;
}
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
pruned: scanned_changesets + keys_deleted,
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,
}),
})
}
fn prune_database<Provider>(
&self,
provider: &Provider,
range: RangeInclusive<u64>,
range_end: u64,
input: PruneInput,
) -> Result<SegmentOutput, PrunerError>
where
Provider: ChangeSetReader + RocksDBProviderFactory,
{
let mut limiter = input.limiter;
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
))
}
let mut highest_deleted_accounts = FxHashMap::default();
let mut last_changeset_pruned_block = None;
let mut scanned_changesets = 0usize;
let mut done = true;
for block in range {
let changes = provider.account_block_changeset(block)?;
let changes_count = changes.len();
for change in changes {
highest_deleted_accounts.insert(change.address, block);
}
scanned_changesets += changes_count;
limiter.increment_deleted_entries_count_by(changes_count);
last_changeset_pruned_block = Some(block);
if limiter.is_limit_reached() {
done = false;
break;
}
}
trace!(target: "pruner", scanned = %scanned_changesets, %done, "Scanned account changesets");
let last_changeset_pruned_block = last_changeset_pruned_block.unwrap_or(range_end);
let mut keys_deleted = 0usize;
let mut keys_updated = 0usize;
provider.with_rocksdb_batch(|mut batch| {
for (address, highest_block) in &highest_deleted_accounts {
let prune_to = (*highest_block).min(last_changeset_pruned_block);
match batch.prune_account_history_to(*address, prune_to)? {
PruneShardOutcome::Deleted => keys_deleted += 1,
PruneShardOutcome::Updated => keys_updated += 1,
PruneShardOutcome::Unchanged => {}
}
}
Ok(((), Some(batch.into_inner())))
})?;
trace!(target: "pruner", keys_deleted, keys_updated, %done, "Pruned account history (RocksDB indices)");
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
pruned: scanned_changesets + keys_deleted,
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,
}),
})
}
}
impl<Provider> Segment<Provider> for AccountsHistoryPruner
where
Provider: ChangeSetReader
+ RocksDBProviderFactory
+ StaticFileProviderFactory
+ StorageSettingsCache
+ DBProvider,
{
fn segment(&self) -> PruneSegment {
PruneSegment::AccountHistory
}
fn mode(&self) -> Option<PruneMode> {
Some(self.mode)
}
fn purpose(&self) -> PrunePurpose {
PrunePurpose::User
}
#[instrument(target = "pruner", skip(self, provider), ret(level = "trace"))]
fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
let range = match input.get_next_block_range() {
Some(range) => range,
None => {
trace!(target: "pruner", "No account history to prune");
return Ok(SegmentOutput::done())
}
};
let range_end = *range.end();
if EitherWriter::account_changesets_destination(provider).is_static_file() {
self.prune_static_files(provider, range, range_end, input)
} else {
self.prune_database(provider, range, range_end, input)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::segments::{PruneInput, PruneLimiter, Segment};
use alloy_primitives::{Address, B256};
use reth_db_api::{
models::{AccountBeforeTx, ShardedKey},
tables,
transaction::DbTxMut,
BlockNumberList,
};
use reth_provider::{
DatabaseProviderFactory, RocksDBProviderFactory, StaticFileProviderFactory,
StaticFileWriter,
};
use reth_prune_types::{PruneMode, PruneProgress};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_static_file_types::StaticFileSegment;
use reth_storage_api::{DBProvider, StorageSettings, StorageSettingsCache};
use reth_testing_utils::generators::{
self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
};
use std::collections::BTreeMap;
fn setup_rocksdb_test_db() -> TestStageDB {
let db = TestStageDB::default();
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
db
}
fn generate_test_changeset(_block: u64, addresses: &[Address]) -> Vec<AccountBeforeTx> {
addresses.iter().map(|&address| AccountBeforeTx { address, info: None }).collect()
}
/// Helper to write account history to RocksDB.
fn write_account_history_to_rocksdb(db: &TestStageDB, history: &[(Address, Vec<u64>)]) {
let provider = db.factory.database_provider_rw().unwrap();
provider
.with_rocksdb_batch(|mut batch| {
for (address, blocks) in history {
let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
batch
.put::<tables::AccountsHistory>(ShardedKey::last(*address), &shard)
.unwrap();
}
Ok(((), Some(batch.into_inner())))
})
.unwrap();
provider.commit().unwrap();
}
#[test]
fn test_prune_account_history_static_files_full_range() {
let db = setup_rocksdb_test_db();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=10,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let addresses: Vec<Address> = (0..3)
.map(|i| {
let mut addr = Address::ZERO;
addr.0[0] = i as u8;
addr
})
.collect();
let static_file_provider = db.factory.static_file_provider();
{
let mut writer = static_file_provider
.latest_writer(StaticFileSegment::AccountChangeSets)
.expect("get writer");
for block_num in 0..=10 {
let changeset = generate_test_changeset(block_num, &addresses);
writer.append_account_changeset(changeset, block_num).expect("append changeset");
}
writer.commit().expect("commit static file");
}
let highest = static_file_provider
.get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
assert_eq!(highest, Some(10), "Static file should cover blocks 0-10");
// Write account history to RocksDB (not MDBX)
let history: Vec<_> = addresses.iter().map(|&addr| (addr, (0..=10).collect())).collect();
write_account_history_to_rocksdb(&db, &history);
let prune_mode = PruneMode::Before(5);
let input =
PruneInput { previous_checkpoint: None, to_block: 5, limiter: PruneLimiter::default() };
let segment = AccountsHistoryPruner::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
let result = segment.prune(&provider, input).expect("prune should succeed");
provider.commit().expect("commit after prune");
assert_eq!(result.progress, PruneProgress::Finished);
assert!(result.pruned > 0, "Should have pruned some entries");
let checkpoint = result.checkpoint.expect("should have checkpoint");
assert_eq!(checkpoint.block_number, Some(5));
}
#[test]
fn test_prune_account_history_mdbx_fallback() {
let db = setup_rocksdb_test_db();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
1..=10,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
let (changesets, _) = random_changeset_range(
&mut rng,
blocks.iter(),
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
0..0,
0..0,
);
db.insert_changesets(changesets.clone(), None).expect("insert changesets");
db.insert_history(changesets, None).expect("insert history");
let mdbx_count_before = db.table::<tables::AccountChangeSets>().unwrap().len();
assert!(mdbx_count_before > 0, "Should have MDBX changesets");
let static_file_provider = db.factory.static_file_provider();
let highest = static_file_provider
.get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
assert!(highest.is_none(), "No static file coverage expected");
let prune_mode = PruneMode::Before(5);
let input =
PruneInput { previous_checkpoint: None, to_block: 5, limiter: PruneLimiter::default() };
let segment = AccountsHistoryPruner::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
let result = segment.prune(&provider, input).expect("prune should succeed");
provider.commit().expect("commit after prune");
assert_eq!(result.progress, PruneProgress::Finished);
assert!(result.pruned > 0, "Should have pruned some entries");
}
#[test]
fn test_prune_account_history_limiter_block_boundary() {
let db = setup_rocksdb_test_db();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=20,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let addresses: Vec<Address> = (0..10)
.map(|i| {
let mut addr = Address::ZERO;
addr.0[0] = i as u8;
addr
})
.collect();
let static_file_provider = db.factory.static_file_provider();
{
let mut writer = static_file_provider
.latest_writer(StaticFileSegment::AccountChangeSets)
.expect("get writer");
for block_num in 0..=20 {
let changeset = generate_test_changeset(block_num, &addresses);
writer.append_account_changeset(changeset, block_num).expect("append changeset");
}
writer.commit().expect("commit static file");
}
let provider = db.factory.database_provider_rw().unwrap();
for block_num in 0..=20u64 {
for addr in &addresses {
provider
.tx_ref()
.put::<tables::AccountsHistory>(
reth_db_api::models::ShardedKey::new(*addr, block_num),
reth_db_api::BlockNumberList::new([block_num]).unwrap(),
)
.expect("insert history");
}
}
provider.commit().expect("commit provider");
let prune_mode = PruneMode::Before(15);
let limiter = PruneLimiter::default().set_deleted_entries_limit(25);
let input = PruneInput { previous_checkpoint: None, to_block: 15, limiter };
let segment = AccountsHistoryPruner::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
let result = segment.prune(&provider, input).expect("prune should succeed");
provider.commit().expect("commit after prune");
assert_eq!(
result.progress,
PruneProgress::HasMoreData(
reth_prune_types::PruneInterruptReason::DeletedEntriesLimitReached
),
"Should stop due to limit"
);
let checkpoint = result.checkpoint.expect("should have checkpoint");
let pruned_block = checkpoint.block_number.expect("should have block number");
assert!(pruned_block < 15, "Should have stopped before target block");
assert!(pruned_block >= 0, "Should have pruned at least one block");
}
#[test]
fn test_prune_account_history_empty_range() {
let db = setup_rocksdb_test_db();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
1..=5,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let prune_mode = PruneMode::Before(5);
let input = PruneInput {
previous_checkpoint: Some(reth_prune_types::PruneCheckpoint {
block_number: Some(5),
tx_number: None,
prune_mode,
}),
to_block: 5,
limiter: PruneLimiter::default(),
};
let segment = AccountsHistoryPruner::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
let result = segment.prune(&provider, input).expect("prune should succeed");
assert_eq!(result.progress, PruneProgress::Finished);
assert_eq!(result.pruned, 0, "Nothing to prune");
}
#[test]
fn test_prune_account_history_mixed_static_and_mdbx() {
let db = setup_rocksdb_test_db();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=20,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let addresses: Vec<Address> = (0..3)
.map(|i| {
let mut addr = Address::ZERO;
addr.0[0] = i as u8;
addr
})
.collect();
let static_file_provider = db.factory.static_file_provider();
{
let mut writer = static_file_provider
.latest_writer(StaticFileSegment::AccountChangeSets)
.expect("get writer");
for block_num in 0..=10 {
let changeset = generate_test_changeset(block_num, &addresses);
writer.append_account_changeset(changeset, block_num).expect("append changeset");
}
writer.commit().expect("commit static file");
}
db.commit(|tx| {
for block_num in 11..=20u64 {
for addr in &addresses {
tx.put::<tables::AccountChangeSets>(
block_num,
AccountBeforeTx { address: *addr, info: None },
)?;
}
}
Ok(())
})
.expect("insert MDBX changesets");
let provider = db.factory.database_provider_rw().unwrap();
for block_num in 0..=20u64 {
for addr in &addresses {
provider
.tx_ref()
.put::<tables::AccountsHistory>(
reth_db_api::models::ShardedKey::new(*addr, block_num),
reth_db_api::BlockNumberList::new([block_num]).unwrap(),
)
.expect("insert history");
}
}
provider.commit().expect("commit provider");
let prune_mode = PruneMode::Before(15);
let input = PruneInput {
previous_checkpoint: None,
to_block: 15,
limiter: PruneLimiter::default(),
};
let segment = AccountsHistoryPruner::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
let result = segment.prune(&provider, input).expect("prune should succeed");
provider.commit().expect("commit after prune");
assert_eq!(result.progress, PruneProgress::Finished);
assert!(result.pruned > 0, "Should have pruned entries from both static and MDBX");
let checkpoint = result.checkpoint.expect("should have checkpoint");
assert_eq!(checkpoint.block_number, Some(15));
}
}

View File

@@ -0,0 +1,561 @@
//! `RocksDB` storage history index pruner.
use crate::{
segments::{PruneInput, Segment},
PrunerError,
};
use alloy_primitives::BlockNumber;
use reth_provider::{
DBProvider, EitherWriter, PruneShardOutcome, RocksDBProviderFactory, StaticFileProviderFactory,
StorageChangeSetReader, StorageSettingsCache,
};
use reth_prune_types::{
PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
use reth_static_file_types::StaticFileSegment;
use rustc_hash::FxHashMap;
use std::ops::RangeInclusive;
use tracing::{instrument, trace};
#[derive(Debug)]
pub struct StoragesHistoryPruner {
mode: PruneMode,
}
impl StoragesHistoryPruner {
pub const fn new(mode: PruneMode) -> Self {
Self { mode }
}
/// Prune when storage changesets are in static files.
fn prune_static_files<Provider>(
&self,
provider: &Provider,
range: RangeInclusive<BlockNumber>,
range_end: BlockNumber,
input: PruneInput,
) -> Result<SegmentOutput, PrunerError>
where
Provider: StorageChangeSetReader + RocksDBProviderFactory + StaticFileProviderFactory,
{
let mut limiter = input.limiter;
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
))
}
let mut highest_deleted_storages = FxHashMap::default();
let mut last_changeset_pruned_block = None;
let mut scanned_changesets = 0usize;
let mut done = true;
for block in range {
let changes = provider.storage_block_changeset(block)?;
let changes_count = changes.len();
for change in changes {
highest_deleted_storages.insert((change.address, change.key), block);
}
scanned_changesets += changes_count;
limiter.increment_deleted_entries_count_by(changes_count);
last_changeset_pruned_block = Some(block);
if limiter.is_limit_reached() {
done = false;
break;
}
}
trace!(target: "pruner", scanned = %scanned_changesets, %done, "Scanned storage changesets");
let last_changeset_pruned_block = last_changeset_pruned_block.unwrap_or(range_end);
let mut keys_deleted = 0usize;
let mut keys_updated = 0usize;
provider.with_rocksdb_batch(|mut batch| {
for ((address, storage_key), highest_block) in &highest_deleted_storages {
let prune_to = (*highest_block).min(last_changeset_pruned_block);
match batch.prune_storage_history_to(*address, *storage_key, prune_to)? {
PruneShardOutcome::Deleted => keys_deleted += 1,
PruneShardOutcome::Updated => keys_updated += 1,
PruneShardOutcome::Unchanged => {}
}
}
Ok(((), Some(batch.into_inner())))
})?;
trace!(target: "pruner", keys_deleted, keys_updated, %done, "Pruned storage history (RocksDB indices)");
// Delete static file jars below the pruned block
provider.static_file_provider().delete_segment_below_block(
StaticFileSegment::StorageChangeSets,
last_changeset_pruned_block + 1,
)?;
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
pruned: scanned_changesets + keys_deleted,
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,
}),
})
}
/// Prune when storage changesets are in the database.
fn prune_database<Provider>(
&self,
provider: &Provider,
range: RangeInclusive<BlockNumber>,
range_end: BlockNumber,
input: PruneInput,
) -> Result<SegmentOutput, PrunerError>
where
Provider: StorageChangeSetReader + RocksDBProviderFactory,
{
let mut limiter = input.limiter;
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
))
}
let mut highest_deleted_storages = FxHashMap::default();
let mut last_changeset_pruned_block = None;
let mut scanned_changesets = 0usize;
let mut done = true;
for block in range {
let changes = provider.storage_block_changeset(block)?;
let changes_count = changes.len();
for change in changes {
highest_deleted_storages.insert((change.address, change.key), block);
}
scanned_changesets += changes_count;
limiter.increment_deleted_entries_count_by(changes_count);
last_changeset_pruned_block = Some(block);
if limiter.is_limit_reached() {
done = false;
break;
}
}
trace!(target: "pruner", scanned = %scanned_changesets, %done, "Scanned storage changesets");
let last_changeset_pruned_block = last_changeset_pruned_block.unwrap_or(range_end);
let mut keys_deleted = 0usize;
let mut keys_updated = 0usize;
provider.with_rocksdb_batch(|mut batch| {
for ((address, storage_key), highest_block) in &highest_deleted_storages {
let prune_to = (*highest_block).min(last_changeset_pruned_block);
match batch.prune_storage_history_to(*address, *storage_key, prune_to)? {
PruneShardOutcome::Deleted => keys_deleted += 1,
PruneShardOutcome::Updated => keys_updated += 1,
PruneShardOutcome::Unchanged => {}
}
}
Ok(((), Some(batch.into_inner())))
})?;
trace!(target: "pruner", keys_deleted, keys_updated, %done, "Pruned storage history (RocksDB indices)");
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
pruned: scanned_changesets + keys_deleted,
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,
}),
})
}
}
impl<Provider> Segment<Provider> for StoragesHistoryPruner
where
Provider: DBProvider
+ StorageChangeSetReader
+ RocksDBProviderFactory
+ StaticFileProviderFactory
+ StorageSettingsCache,
{
fn segment(&self) -> PruneSegment {
PruneSegment::StorageHistory
}
fn mode(&self) -> Option<PruneMode> {
Some(self.mode)
}
fn purpose(&self) -> PrunePurpose {
PrunePurpose::User
}
#[instrument(target = "pruner", skip(self, provider), ret(level = "trace"))]
fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
let range = match input.get_next_block_range() {
Some(range) => range,
None => {
trace!(target: "pruner", "No storage history to prune");
return Ok(SegmentOutput::done())
}
};
let range_end = *range.end();
if EitherWriter::storage_changesets_destination(provider).is_static_file() {
self.prune_static_files(provider, range, range_end, input)
} else {
self.prune_database(provider, range, range_end, input)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::segments::{PruneInput, PruneLimiter, Segment};
use alloy_primitives::{Address, B256, U256};
use reth_db_api::{
models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress, StorageBeforeTx},
tables,
transaction::DbTxMut,
BlockNumberList,
};
use reth_primitives_traits::StorageEntry;
use reth_provider::{
DatabaseProviderFactory, RocksDBProviderFactory, StaticFileProviderFactory,
StaticFileWriter,
};
use reth_prune_types::{PruneMode, PruneProgress};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_static_file_types::StaticFileSegment;
use reth_storage_api::{DBProvider, StorageSettings, StorageSettingsCache};
use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
fn setup_rocksdb_test_db() -> TestStageDB {
let db = TestStageDB::default();
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
db
}
/// Helper to write storage changesets to static files for testing.
fn write_storage_changesets_to_static_files(
db: &TestStageDB,
changesets: &[(u64, Address, B256, U256)],
) {
let static_file_provider = db.factory.static_file_provider();
let mut writer =
static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
let mut current_block: Option<u64> = None;
let mut block_entries = Vec::new();
for (block_number, address, key, value) in changesets {
if current_block != Some(*block_number) {
if !block_entries.is_empty() {
writer
.append_storage_changeset(
std::mem::take(&mut block_entries),
current_block.unwrap(),
)
.unwrap();
}
current_block = Some(*block_number);
}
block_entries.push(StorageBeforeTx { address: *address, key: *key, value: *value });
}
if !block_entries.is_empty() {
writer.append_storage_changeset(block_entries, current_block.unwrap()).unwrap();
}
writer.commit().unwrap();
static_file_provider.initialize_index().unwrap();
}
/// Helper to write storage history to `RocksDB`.
fn write_storage_history_to_rocksdb(db: &TestStageDB, history: &[(Address, B256, Vec<u64>)]) {
let provider = db.factory.database_provider_rw().unwrap();
provider
.with_rocksdb_batch(|mut batch| {
for (address, key, blocks) in history {
let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
batch
.put::<tables::StoragesHistory>(
StorageShardedKey::last(*address, *key),
&shard,
)
.unwrap();
}
Ok(((), Some(batch.into_inner())))
})
.unwrap();
provider.commit().unwrap();
}
/// Helper to read storage history from `RocksDB`.
fn read_storage_history_from_rocksdb(
db: &TestStageDB,
address: Address,
key: B256,
) -> Vec<u64> {
let provider = db.factory.database_provider_rw().unwrap();
let rocksdb = provider.rocksdb_provider();
let shards = rocksdb.storage_history_shards(address, key).unwrap();
shards.into_iter().flat_map(|(_, blocks)| blocks.iter().collect::<Vec<_>>()).collect()
}
#[test]
fn test_prune_storage_history_static_files_full_range() {
let db = setup_rocksdb_test_db();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=10,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let addr1 = Address::from([0x11; 20]);
let addr2 = Address::from([0x22; 20]);
let key1 = B256::from([0x01; 32]);
let key2 = B256::from([0x02; 32]);
let changesets = vec![
(0, addr1, key1, U256::from(50)),
(1, addr1, key1, U256::from(100)),
(1, addr1, key2, U256::from(200)),
(2, addr1, key1, U256::from(110)),
(2, addr2, key1, U256::from(300)),
(3, addr1, key1, U256::from(120)),
(3, addr2, key2, U256::from(400)),
];
write_storage_changesets_to_static_files(&db, &changesets);
write_storage_history_to_rocksdb(
&db,
&[
(addr1, key1, vec![0, 1, 2, 3, 10, 20]),
(addr1, key2, vec![1, 15]),
(addr2, key1, vec![2, 25]),
(addr2, key2, vec![3, 30]),
],
);
let segment = StoragesHistoryPruner::new(PruneMode::Before(3));
let provider = db.factory.database_provider_rw().unwrap();
let input =
PruneInput { previous_checkpoint: None, to_block: 3, limiter: PruneLimiter::default() };
let result = segment.prune(&provider, input).unwrap();
provider.commit().unwrap();
assert_eq!(result.progress, PruneProgress::Finished);
assert!(result.pruned > 0);
let history1 = read_storage_history_from_rocksdb(&db, addr1, key1);
assert!(!history1.contains(&0));
assert!(!history1.contains(&1));
assert!(!history1.contains(&2));
assert!(!history1.contains(&3));
assert!(history1.contains(&10) || history1.contains(&20));
let history2 = read_storage_history_from_rocksdb(&db, addr1, key2);
assert!(!history2.contains(&1));
let history3 = read_storage_history_from_rocksdb(&db, addr2, key1);
assert!(!history3.contains(&2));
let history4 = read_storage_history_from_rocksdb(&db, addr2, key2);
assert!(!history4.contains(&3));
}
#[test]
fn test_prune_storage_history_mdbx_fallback() {
let db = setup_rocksdb_test_db();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
1..=10,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let addr1 = Address::from([0x11; 20]);
let key1 = B256::from([0x01; 32]);
db.commit(|tx| {
for block in 1..=5u64 {
tx.put::<tables::StorageChangeSets>(
BlockNumberAddress((block, addr1)),
StorageEntry { key: key1, value: U256::from(block * 100) },
)?;
}
Ok(())
})
.unwrap();
write_storage_history_to_rocksdb(&db, &[(addr1, key1, vec![1, 2, 3, 4, 5])]);
let segment = StoragesHistoryPruner::new(PruneMode::Before(3));
let provider = db.factory.database_provider_rw().unwrap();
let input =
PruneInput { previous_checkpoint: None, to_block: 3, limiter: PruneLimiter::default() };
let result = segment.prune(&provider, input).unwrap();
provider.commit().unwrap();
assert_eq!(result.progress, PruneProgress::Finished);
assert!(result.pruned > 0);
}
#[test]
fn test_prune_storage_history_limiter_block_boundary() {
let db = setup_rocksdb_test_db();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=20,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let addr1 = Address::from([0x11; 20]);
let key1 = B256::from([0x01; 32]);
let mut changesets = Vec::new();
for block in 0..=10u64 {
for i in 0..10 {
changesets.push((block, addr1, B256::from([i as u8; 32]), U256::from(block * 100)));
}
}
write_storage_changesets_to_static_files(&db, &changesets);
let all_blocks: Vec<u64> = (0..=10).collect();
write_storage_history_to_rocksdb(&db, &[(addr1, key1, all_blocks)]);
let segment = StoragesHistoryPruner::new(PruneMode::Before(10));
let provider = db.factory.database_provider_rw().unwrap();
let limiter = PruneLimiter::default().set_deleted_entries_limit(15);
let input = PruneInput { previous_checkpoint: None, to_block: 10, limiter };
let result = segment.prune(&provider, input).unwrap();
provider.commit().unwrap();
assert_eq!(
result.progress,
PruneProgress::HasMoreData(
reth_prune_types::PruneInterruptReason::DeletedEntriesLimitReached
),
"Should stop due to limit"
);
let checkpoint = result.checkpoint.expect("should have checkpoint");
let pruned_block = checkpoint.block_number.expect("should have block number");
assert!(pruned_block < 10, "Should have stopped before target block");
assert!(pruned_block > 0, "Should have pruned at least some blocks");
}
#[test]
fn test_prune_storage_history_empty_range() {
let db = setup_rocksdb_test_db();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
1..=5,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let segment = StoragesHistoryPruner::new(PruneMode::Before(5));
let provider = db.factory.database_provider_rw().unwrap();
let input = PruneInput {
previous_checkpoint: Some(reth_prune_types::PruneCheckpoint {
block_number: Some(5),
tx_number: None,
prune_mode: PruneMode::Before(5),
}),
to_block: 5,
limiter: PruneLimiter::default(),
};
let result = segment.prune(&provider, input).unwrap();
assert_eq!(result.progress, PruneProgress::Finished);
assert_eq!(result.pruned, 0, "Nothing to prune");
}
#[test]
fn test_prune_storage_history_mixed_static_and_mdbx() {
let db = setup_rocksdb_test_db();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=20,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let addr1 = Address::from([0x11; 20]);
let key1 = B256::from([0x01; 32]);
let mut changesets = Vec::new();
for block in 0..=10u64 {
changesets.push((block, addr1, key1, U256::from(block * 100)));
}
write_storage_changesets_to_static_files(&db, &changesets);
db.commit(|tx| {
for block in 11..=20u64 {
tx.put::<tables::StorageChangeSets>(
BlockNumberAddress((block, addr1)),
StorageEntry { key: key1, value: U256::from(block * 100) },
)?;
}
Ok(())
})
.unwrap();
write_storage_history_to_rocksdb(&db, &[(addr1, key1, (0..=20).collect())]);
let segment = StoragesHistoryPruner::new(PruneMode::Before(15));
let provider = db.factory.database_provider_rw().unwrap();
let input = PruneInput {
previous_checkpoint: None,
to_block: 15,
limiter: PruneLimiter::default(),
};
let result = segment.prune(&provider, input).unwrap();
provider.commit().unwrap();
assert_eq!(result.progress, PruneProgress::Finished);
assert!(result.pruned > 0, "Should have pruned entries from both static and MDBX");
let checkpoint = result.checkpoint.expect("should have checkpoint");
assert_eq!(checkpoint.block_number, Some(15));
}
}

View File

@@ -158,38 +158,33 @@ where
let append_only =
provider.count_entries::<tables::TransactionHashNumbers>()?.is_zero();
// Auto-commits on threshold; consistency check heals any crash.
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb = provider.rocksdb_provider();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_batch = rocksdb.batch_with_auto_commit();
#[cfg(not(all(unix, feature = "rocksdb")))]
let rocksdb_batch = ();
let mut writer =
EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
let hash_iter = hash_collector.iter()?;
for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
let (hash_bytes, number_bytes) = hash_to_number?;
if index > 0 && index.is_multiple_of(interval) {
info!(
target: "sync::stages::transaction_lookup",
?append_only,
progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
"Inserting hashes"
);
provider.with_rocksdb_batch(|rocksdb_batch| {
// Create writer that routes to either MDBX or RocksDB based on settings
let mut writer =
EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
for (index, hash_to_number) in hash_iter.enumerate() {
let (hash_bytes, number_bytes) =
hash_to_number.map_err(ProviderError::other)?;
if index > 0 && index.is_multiple_of(interval) {
info!(
target: "sync::stages::transaction_lookup",
?append_only,
progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
"Inserting hashes"
);
}
// Decode from raw ETL bytes
let hash = TxHash::decode(&hash_bytes)?;
let tx_num = TxNumber::decompress(&number_bytes)?;
writer.put_transaction_hash_number(hash, tx_num, append_only)?;
}
// Decode from raw ETL bytes
let hash = TxHash::decode(&hash_bytes)?;
let tx_num = TxNumber::decompress(&number_bytes)?;
writer.put_transaction_hash_number(hash, tx_num, append_only)?;
}
// Extract and register RocksDB batch for commit at provider level
#[cfg(all(unix, feature = "rocksdb"))]
if let Some(batch) = writer.into_raw_rocksdb_batch() {
provider.set_pending_rocksdb_batch(batch);
}
Ok(((), writer.into_raw_rocksdb_batch()))
})?;
trace!(target: "sync::stages::transaction_lookup",
total_hashes,
@@ -215,14 +210,6 @@ where
) -> Result<UnwindOutput, StageError> {
let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size);
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb = provider.rocksdb_provider();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_batch = rocksdb.batch();
#[cfg(not(all(unix, feature = "rocksdb")))]
let rocksdb_batch = ();
let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
let static_file_provider = provider.static_file_provider();
let rev_walker = provider
.block_body_indices_range(range.clone())?
@@ -230,24 +217,25 @@ where
.zip(range.collect::<Vec<_>>())
.rev();
for (body, number) in rev_walker {
if number <= unwind_to {
break;
}
provider.with_rocksdb_batch(|rocksdb_batch| {
// Create writer that routes to either MDBX or RocksDB based on settings
let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
// Delete all transactions that belong to this block
for tx_id in body.tx_num_range() {
if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? {
writer.delete_transaction_hash_number(transaction.trie_hash())?;
for (body, number) in rev_walker {
if number <= unwind_to {
break;
}
// Delete all transactions that belong to this block
for tx_id in body.tx_num_range() {
if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? {
writer.delete_transaction_hash_number(transaction.trie_hash())?;
}
}
}
}
// Extract and register RocksDB batch for commit at provider level
#[cfg(all(unix, feature = "rocksdb"))]
if let Some(batch) = writer.into_raw_rocksdb_batch() {
provider.set_pending_rocksdb_batch(batch);
}
Ok(((), writer.into_raw_rocksdb_batch()))
})?;
Ok(UnwindOutput {
checkpoint: StageCheckpoint::new(unwind_to)

View File

@@ -21,8 +21,8 @@ pub mod providers;
pub use providers::{
DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, HistoricalStateProvider,
HistoricalStateProviderRef, LatestStateProvider, LatestStateProviderRef, ProviderFactory,
SaveBlocksMode, StaticFileAccess, StaticFileProviderBuilder, StaticFileWriteCtx,
StaticFileWriter,
PruneShardOutcome, SaveBlocksMode, StaticFileAccess, StaticFileProviderBuilder,
StaticFileWriteCtx, StaticFileWriter,
};
pub mod changeset_walker;

View File

@@ -39,7 +39,8 @@ pub use consistent::ConsistentProvider;
pub(crate) mod rocksdb;
pub use rocksdb::{
RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksDBRawIter, RocksDBTableStats, RocksTx,
PruneShardOutcome, RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksDBRawIter,
RocksDBTableStats, RocksTx,
};
/// Helper trait to bound [`NodeTypes`] so that combined with database they satisfy

View File

@@ -5,7 +5,10 @@
//! inconsistencies between `RocksDB` data and MDBX checkpoints.
use super::RocksDBProvider;
use crate::StaticFileProviderFactory;
use crate::{
changeset_walker::{StaticFileAccountChangesetWalker, StaticFileStorageChangesetWalker},
StaticFileProviderFactory,
};
use alloy_eips::eip2718::Encodable2718;
use alloy_primitives::BlockNumber;
use rayon::prelude::*;
@@ -17,6 +20,7 @@ use reth_storage_api::{
DBProvider, StageCheckpointReader, StorageSettingsCache, TransactionsProvider,
};
use reth_storage_errors::provider::ProviderResult;
use std::collections::HashSet;
impl RocksDBProvider {
/// Checks consistency of `RocksDB` tables against MDBX stage checkpoints.
@@ -230,7 +234,7 @@ impl RocksDBProvider {
provider: &Provider,
) -> ProviderResult<Option<BlockNumber>>
where
Provider: DBProvider + StageCheckpointReader,
Provider: DBProvider + StageCheckpointReader + StaticFileProviderFactory,
{
// Get the IndexStorageHistory stage checkpoint
let checkpoint = provider
@@ -249,7 +253,7 @@ impl RocksDBProvider {
target: "reth::providers::rocksdb",
"StoragesHistory has data but checkpoint is 0, clearing all"
);
self.prune_storages_history_above(0)?;
self.prune_storages_history_in_range(provider, 0..=u64::MAX)?;
return Ok(None);
}
@@ -283,7 +287,10 @@ impl RocksDBProvider {
checkpoint,
"StoragesHistory ahead of checkpoint, pruning excess data"
);
self.prune_storages_history_above(checkpoint)?;
self.prune_storages_history_in_range(
provider,
(checkpoint + 1)..=max_highest_block,
)?;
} else if max_highest_block < checkpoint {
// RocksDB is behind checkpoint, return highest block to signal unwind needed
tracing::warn!(
@@ -351,7 +358,7 @@ impl RocksDBProvider {
provider: &Provider,
) -> ProviderResult<Option<BlockNumber>>
where
Provider: DBProvider + StageCheckpointReader,
Provider: DBProvider + StageCheckpointReader + StaticFileProviderFactory,
{
// Get the IndexAccountHistory stage checkpoint
let checkpoint = provider
@@ -370,7 +377,7 @@ impl RocksDBProvider {
target: "reth::providers::rocksdb",
"AccountsHistory has data but checkpoint is 0, clearing all"
);
self.prune_accounts_history_above(0)?;
self.prune_accounts_history_in_range(provider, 0..=u64::MAX)?;
return Ok(None);
}
@@ -404,7 +411,10 @@ impl RocksDBProvider {
checkpoint,
"AccountsHistory ahead of checkpoint, pruning excess data"
);
self.prune_accounts_history_above(checkpoint)?;
self.prune_accounts_history_in_range(
provider,
(checkpoint + 1)..=max_highest_block,
)?;
return Ok(None);
}
@@ -467,6 +477,181 @@ impl RocksDBProvider {
Ok(())
}
/// Prunes `AccountsHistory` entries for accounts changed in the given block range.
///
/// Uses static file changesets to identify which addresses were changed, then unwinds
/// the corresponding `RocksDB` history entries.
/// Falls back to table iteration if static files don't cover the range.
pub fn prune_accounts_history_in_range<Provider>(
&self,
provider: &Provider,
block_range: std::ops::RangeInclusive<BlockNumber>,
) -> ProviderResult<()>
where
Provider: StaticFileProviderFactory,
{
if block_range.is_empty() {
return Ok(());
}
let static_file_provider = provider.static_file_provider();
// Check if static files cover the range we need
let highest_static_block = static_file_provider
.get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
match highest_static_block {
Some(highest) if highest >= *block_range.start() => {
// Static files cover at least part of the range - use changeset-based approach
let effective_end = (*block_range.end()).min(highest);
let static_range = *block_range.start()..=effective_end;
tracing::info!(
target: "reth::providers::rocksdb",
?static_range,
"Using static file changesets for AccountsHistory healing"
);
// Collect unique addresses from static file changesets
let mut addresses_to_unwind = HashSet::new();
let walker: StaticFileAccountChangesetWalker<_> =
static_file_provider.walk_account_changeset_range(static_range);
for result in walker {
let (_block_number, account) = result?;
addresses_to_unwind.insert(account.address);
}
if !addresses_to_unwind.is_empty() {
tracing::info!(
target: "reth::providers::rocksdb",
addresses_count = addresses_to_unwind.len(),
"Unwinding AccountsHistory for changed accounts"
);
// Unwind history for each address to keep only blocks <= (start - 1)
let keep_to = block_range.start().saturating_sub(1);
let mut batch = self.batch();
for address in addresses_to_unwind {
batch.unwind_account_history_to(address, keep_to)?;
}
batch.commit()?;
}
// If static files didn't cover the entire range, fall back for remainder
if effective_end < *block_range.end() {
tracing::info!(
target: "reth::providers::rocksdb",
from_block = effective_end + 1,
to_block = *block_range.end(),
"Static files don't cover full range, using table scan for remainder"
);
self.prune_accounts_history_above(effective_end)?;
}
Ok(())
}
_ => {
// Static files don't cover the range - fall back to table iteration
tracing::info!(
target: "reth::providers::rocksdb",
?highest_static_block,
?block_range,
"Static files don't cover range, falling back to table scan"
);
self.prune_accounts_history_above(block_range.start().saturating_sub(1))
}
}
}
/// Prunes `StoragesHistory` entries for storage slots changed in the given block range.
///
/// Uses static file changesets to identify which `(address, storage_key)` pairs
/// were changed, then unwinds the corresponding `RocksDB` history entries.
/// Falls back to table iteration if static files don't cover the range.
pub fn prune_storages_history_in_range<Provider>(
&self,
provider: &Provider,
block_range: std::ops::RangeInclusive<BlockNumber>,
) -> ProviderResult<()>
where
Provider: StaticFileProviderFactory,
{
if block_range.is_empty() {
return Ok(());
}
let static_file_provider = provider.static_file_provider();
// Check if static files cover the range we need
let highest_static_block = static_file_provider
.get_highest_static_file_block(StaticFileSegment::StorageChangeSets);
match highest_static_block {
Some(highest) if highest >= *block_range.start() => {
// Static files cover at least part of the range - use changeset-based approach
let effective_end = (*block_range.end()).min(highest);
let static_range = *block_range.start()..=effective_end;
tracing::info!(
target: "reth::providers::rocksdb",
?static_range,
"Using static file changesets for StoragesHistory healing"
);
// Collect unique (address, storage_key) pairs from static file changesets
let mut keys_to_unwind = HashSet::new();
let walker: StaticFileStorageChangesetWalker<_> =
static_file_provider.walk_storage_changeset_range(static_range);
for result in walker {
let (block_number_address, entry) = result?;
keys_to_unwind.insert((block_number_address.address(), entry.key));
}
if !keys_to_unwind.is_empty() {
tracing::info!(
target: "reth::providers::rocksdb",
keys_count = keys_to_unwind.len(),
"Unwinding StoragesHistory for changed storage slots"
);
// Unwind history for each (address, storage_key) pair to keep only
// blocks <= (start - 1)
let keep_to = block_range.start().saturating_sub(1);
let mut batch = self.batch();
for (address, storage_key) in keys_to_unwind {
batch.unwind_storage_history_to(address, storage_key, keep_to)?;
}
batch.commit()?;
}
// If static files didn't cover the entire range, fall back for remainder
if effective_end < *block_range.end() {
tracing::info!(
target: "reth::providers::rocksdb",
from_block = effective_end + 1,
to_block = *block_range.end(),
"Static files don't cover full range, using table scan for remainder"
);
self.prune_storages_history_above(effective_end)?;
}
Ok(())
}
_ => {
// Static files don't cover the range - fall back to table iteration
tracing::info!(
target: "reth::providers::rocksdb",
?highest_static_block,
?block_range,
"Static files don't cover range, falling back to table scan"
);
self.prune_storages_history_above(block_range.start().saturating_sub(1))
}
}
}
}
#[cfg(test)]
@@ -1402,4 +1587,291 @@ mod tests {
"Should require unwind to block 50 to rebuild AccountsHistory"
);
}
/// Test that healing for `AccountsHistory` uses static file changesets when available.
///
/// This test verifies that:
/// 1. When static files contain changesets, healing uses them to identify which addresses need
/// their history unwound
/// 2. Addresses NOT in the changeset range remain untouched
/// 3. The unwind re-keys shards appropriately (to `u64::MAX` sentinel)
#[test]
fn test_accounts_history_healing_uses_changesets() {
use crate::providers::static_file::StaticFileWriter;
use reth_db_api::models::ShardedKey;
use reth_primitives_traits::Account;
use reth_static_file_types::StaticFileSegment;
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::AccountsHistory>()
.build()
.unwrap();
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
let addr1 = Address::repeat_byte(0x11);
let addr2 = Address::repeat_byte(0x22);
let addr3 = Address::repeat_byte(0x33);
// Write changesets for addr1 and addr2 (not addr3)
{
let static_file_provider = factory.static_file_provider();
let mut writer =
static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
for block_num in 0..=10 {
let changeset = vec![
reth_db_api::models::AccountBeforeTx {
address: addr1,
info: Some(Account { nonce: block_num, ..Default::default() }),
},
reth_db_api::models::AccountBeforeTx {
address: addr2,
info: Some(Account { nonce: block_num + 100, ..Default::default() }),
},
];
writer.append_account_changeset(changeset, block_num).unwrap();
}
writer.commit().unwrap();
}
// Insert history shards into RocksDB
let key_addr1_block5 = ShardedKey::new(addr1, 5);
let key_addr1_block10 = ShardedKey::new(addr1, 10);
let key_addr2_block10 = ShardedKey::new(addr2, 10);
let key_addr3_block10 = ShardedKey::new(addr3, 10);
let block_list_5 = BlockNumberList::new_pre_sorted([1, 2, 3, 4, 5]);
let block_list_10 = BlockNumberList::new_pre_sorted([6, 7, 8, 9, 10]);
rocksdb.put::<tables::AccountsHistory>(key_addr1_block5.clone(), &block_list_5).unwrap();
rocksdb.put::<tables::AccountsHistory>(key_addr1_block10.clone(), &block_list_10).unwrap();
rocksdb.put::<tables::AccountsHistory>(key_addr2_block10.clone(), &block_list_10).unwrap();
rocksdb.put::<tables::AccountsHistory>(key_addr3_block10.clone(), &block_list_10).unwrap();
let highest = factory
.static_file_provider()
.get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
assert_eq!(highest, Some(10), "Static files should cover blocks 0-10");
// Prune blocks 6-10 - should use changesets
rocksdb.prune_accounts_history_in_range(&factory, 6..=10).unwrap();
// After healing, addr1's block5 shard is re-keyed to sentinel
let key_addr1_sentinel = ShardedKey::new(addr1, u64::MAX);
assert!(
rocksdb.get::<tables::AccountsHistory>(key_addr1_block5).unwrap().is_none(),
"addr1 block 5 shard should be re-keyed to sentinel"
);
assert!(
rocksdb.get::<tables::AccountsHistory>(key_addr1_sentinel).unwrap().is_some(),
"addr1 should have sentinel shard after healing"
);
assert!(
rocksdb.get::<tables::AccountsHistory>(key_addr1_block10).unwrap().is_none(),
"addr1 block 10 shard should be pruned"
);
assert!(
rocksdb.get::<tables::AccountsHistory>(key_addr2_block10).unwrap().is_none(),
"addr2 block 10 shard should be pruned (all blocks > 5)"
);
assert!(
rocksdb.get::<tables::AccountsHistory>(key_addr3_block10).unwrap().is_some(),
"addr3 block 10 shard should remain (not in static file changesets)"
);
}
/// Test that healing for `StoragesHistory` uses static file changesets when available.
#[test]
fn test_storages_history_healing_uses_changesets() {
use crate::providers::static_file::StaticFileWriter;
use reth_static_file_types::StaticFileSegment;
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::StoragesHistory>()
.build()
.unwrap();
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
let addr1 = Address::repeat_byte(0x11);
let storage_key1 = B256::repeat_byte(0xAA);
let storage_key2 = B256::repeat_byte(0xBB);
let addr2 = Address::repeat_byte(0x22);
let addr_not_in_changeset = Address::repeat_byte(0x99);
// Write storage changesets
{
let static_file_provider = factory.static_file_provider();
let mut writer =
static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
for block_num in 0..=10 {
let changeset = vec![
reth_db_api::models::StorageBeforeTx {
address: addr1,
key: storage_key1,
value: alloy_primitives::U256::from(block_num),
},
reth_db_api::models::StorageBeforeTx {
address: addr1,
key: storage_key2,
value: alloy_primitives::U256::from(block_num + 100),
},
reth_db_api::models::StorageBeforeTx {
address: addr2,
key: storage_key1,
value: alloy_primitives::U256::from(block_num + 200),
},
];
writer.append_storage_changeset(changeset, block_num).unwrap();
}
writer.commit().unwrap();
}
// Insert history shards
let key_addr1_key1_block5 = StorageShardedKey::new(addr1, storage_key1, 5);
let key_addr1_key1_block10 = StorageShardedKey::new(addr1, storage_key1, 10);
let key_addr1_key2_block10 = StorageShardedKey::new(addr1, storage_key2, 10);
let key_addr2_key1_block10 = StorageShardedKey::new(addr2, storage_key1, 10);
let key_not_in_changeset = StorageShardedKey::new(addr_not_in_changeset, storage_key1, 10);
let block_list_5 = BlockNumberList::new_pre_sorted([1, 2, 3, 4, 5]);
let block_list_10 = BlockNumberList::new_pre_sorted([6, 7, 8, 9, 10]);
rocksdb
.put::<tables::StoragesHistory>(key_addr1_key1_block5.clone(), &block_list_5)
.unwrap();
rocksdb
.put::<tables::StoragesHistory>(key_addr1_key1_block10.clone(), &block_list_10)
.unwrap();
rocksdb
.put::<tables::StoragesHistory>(key_addr1_key2_block10.clone(), &block_list_10)
.unwrap();
rocksdb
.put::<tables::StoragesHistory>(key_addr2_key1_block10.clone(), &block_list_10)
.unwrap();
rocksdb
.put::<tables::StoragesHistory>(key_not_in_changeset.clone(), &block_list_10)
.unwrap();
let highest = factory
.static_file_provider()
.get_highest_static_file_block(StaticFileSegment::StorageChangeSets);
assert_eq!(highest, Some(10), "Static files should cover blocks 0-10");
rocksdb.prune_storages_history_in_range(&factory, 6..=10).unwrap();
let key_addr1_key1_sentinel = StorageShardedKey::new(addr1, storage_key1, u64::MAX);
assert!(
rocksdb.get::<tables::StoragesHistory>(key_addr1_key1_block5).unwrap().is_none(),
"addr1/key1 block 5 shard should be re-keyed to sentinel"
);
assert!(
rocksdb.get::<tables::StoragesHistory>(key_addr1_key1_sentinel).unwrap().is_some(),
"addr1/key1 should have sentinel shard"
);
assert!(
rocksdb.get::<tables::StoragesHistory>(key_addr1_key1_block10).unwrap().is_none(),
"addr1/key1 block 10 shard should be pruned"
);
assert!(
rocksdb.get::<tables::StoragesHistory>(key_addr1_key2_block10).unwrap().is_none(),
"addr1/key2 block 10 shard should be pruned"
);
assert!(
rocksdb.get::<tables::StoragesHistory>(key_addr2_key1_block10).unwrap().is_none(),
"addr2/key1 block 10 shard should be pruned"
);
assert!(
rocksdb.get::<tables::StoragesHistory>(key_not_in_changeset).unwrap().is_some(),
"addr not in changeset should remain"
);
}
/// Test fallback to table scan when static files don't cover the range.
#[test]
fn test_history_healing_fallback_when_no_static_files() {
use reth_db_api::models::ShardedKey;
use reth_static_file_types::StaticFileSegment;
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::AccountsHistory>()
.with_table::<tables::StoragesHistory>()
.build()
.unwrap();
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy()
.with_account_history_in_rocksdb(true)
.with_storages_history_in_rocksdb(true),
);
let addr1 = Address::repeat_byte(0x11);
let addr2 = Address::repeat_byte(0x22);
// Insert shards - no static file changesets written
let key_addr1_block50 = ShardedKey::new(addr1, 50);
let key_addr2_block100 = ShardedKey::new(addr2, 100);
let key_addr1_sentinel = ShardedKey::new(addr1, u64::MAX);
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
rocksdb.put::<tables::AccountsHistory>(key_addr1_block50.clone(), &block_list).unwrap();
rocksdb.put::<tables::AccountsHistory>(key_addr2_block100.clone(), &block_list).unwrap();
rocksdb.put::<tables::AccountsHistory>(key_addr1_sentinel.clone(), &block_list).unwrap();
let storage_key = B256::repeat_byte(0xAA);
let storage_key_block50 = StorageShardedKey::new(addr1, storage_key, 50);
let storage_key_block100 = StorageShardedKey::new(addr1, storage_key, 100);
let storage_key_sentinel = StorageShardedKey::new(addr1, storage_key, u64::MAX);
rocksdb.put::<tables::StoragesHistory>(storage_key_block50.clone(), &block_list).unwrap();
rocksdb.put::<tables::StoragesHistory>(storage_key_block100.clone(), &block_list).unwrap();
rocksdb.put::<tables::StoragesHistory>(storage_key_sentinel.clone(), &block_list).unwrap();
// No static files exist
let highest = factory
.static_file_provider()
.get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
assert!(highest.is_none(), "No static files should exist");
// Without changesets, fallback deletes all shards with highest_block > 50
rocksdb.prune_accounts_history_in_range(&factory, 51..=100).unwrap();
assert!(
rocksdb.get::<tables::AccountsHistory>(key_addr1_block50).unwrap().is_some(),
"addr1 block 50 should remain (at boundary)"
);
assert!(
rocksdb.get::<tables::AccountsHistory>(key_addr2_block100).unwrap().is_none(),
"addr2 block 100 should be pruned (fallback deletes > 50)"
);
assert!(
rocksdb.get::<tables::AccountsHistory>(key_addr1_sentinel).unwrap().is_some(),
"sentinel entry should remain"
);
rocksdb.prune_storages_history_in_range(&factory, 51..=100).unwrap();
assert!(
rocksdb.get::<tables::StoragesHistory>(storage_key_block50).unwrap().is_some(),
"storage block 50 should remain"
);
assert!(
rocksdb.get::<tables::StoragesHistory>(storage_key_block100).unwrap().is_none(),
"storage block 100 should be pruned"
);
assert!(
rocksdb.get::<tables::StoragesHistory>(storage_key_sentinel).unwrap().is_some(),
"storage sentinel should remain"
);
}
}

View File

@@ -6,5 +6,6 @@ mod provider;
pub(crate) use provider::{PendingRocksDBBatches, RocksDBWriteCtx};
pub use provider::{
RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksDBRawIter, RocksDBTableStats, RocksTx,
PruneShardOutcome, RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksDBRawIter,
RocksDBTableStats, RocksTx,
};

File diff suppressed because it is too large Load Diff

View File

@@ -158,3 +158,14 @@ pub struct RocksTx;
/// A stub raw iterator for `RocksDB`.
#[derive(Debug)]
pub struct RocksDBRawIter;
/// Outcome of pruning a history shard (stub).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PruneShardOutcome {
/// Shard was deleted entirely.
Deleted,
/// Shard was updated with filtered block numbers.
Updated,
/// Shard was unchanged (no blocks <= `to_block`).
Unchanged,
}