mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
eb570d658c | ||
|
|
8b9a644853 | ||
|
|
165b362aaf | ||
|
|
2171624e28 | ||
|
|
502d18a0b8 | ||
|
|
93160ff97f | ||
|
|
e913be192c | ||
|
|
299de4a5f9 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -10281,6 +10281,7 @@ dependencies = [
|
||||
"reth-stages",
|
||||
"reth-stages-types",
|
||||
"reth-static-file-types",
|
||||
"reth-storage-api",
|
||||
"reth-testing-utils",
|
||||
"reth-tokio-util",
|
||||
"reth-tracing",
|
||||
|
||||
@@ -41,11 +41,15 @@ rayon.workspace = true
|
||||
tokio.workspace = true
|
||||
rustc-hash.workspace = true
|
||||
|
||||
[features]
|
||||
rocksdb = ["reth-provider/rocksdb"]
|
||||
|
||||
[dev-dependencies]
|
||||
# reth
|
||||
reth-db = { workspace = true, features = ["test-utils"] }
|
||||
reth-stages = { workspace = true, features = ["test-utils"] }
|
||||
reth-primitives-traits = { workspace = true, features = ["arbitrary"] }
|
||||
reth-storage-api.workspace = true
|
||||
reth-testing-utils.workspace = true
|
||||
reth-tracing.workspace = true
|
||||
|
||||
|
||||
@@ -20,6 +20,8 @@ pub use user::{
|
||||
AccountHistory, Bodies, Receipts as UserReceipts, ReceiptsByLogs, SenderRecovery,
|
||||
StorageHistory, TransactionLookup,
|
||||
};
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
pub use user::{AccountsHistoryPruner, StoragesHistoryPruner};
|
||||
|
||||
/// Prunes data from static files for a given segment.
|
||||
///
|
||||
|
||||
@@ -3,6 +3,10 @@ mod bodies;
|
||||
mod history;
|
||||
mod receipts;
|
||||
mod receipts_by_logs;
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
mod rocksdb_account_history;
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
mod rocksdb_storage_history;
|
||||
mod sender_recovery;
|
||||
mod storage_history;
|
||||
mod transaction_lookup;
|
||||
@@ -11,6 +15,10 @@ pub use account_history::AccountHistory;
|
||||
pub use bodies::Bodies;
|
||||
pub use receipts::Receipts;
|
||||
pub use receipts_by_logs::ReceiptsByLogs;
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
pub use rocksdb_account_history::AccountsHistoryPruner;
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
pub use rocksdb_storage_history::StoragesHistoryPruner;
|
||||
pub use sender_recovery::SenderRecovery;
|
||||
pub use storage_history::StorageHistory;
|
||||
pub use transaction_lookup::TransactionLookup;
|
||||
|
||||
557
crates/prune/prune/src/segments/user/rocksdb_account_history.rs
Normal file
557
crates/prune/prune/src/segments/user/rocksdb_account_history.rs
Normal file
@@ -0,0 +1,557 @@
|
||||
//! `RocksDB` account history index pruner.
|
||||
|
||||
use crate::{
|
||||
segments::{PruneInput, Segment},
|
||||
PrunerError,
|
||||
};
|
||||
use reth_provider::{
|
||||
ChangeSetReader, DBProvider, EitherWriter, PruneShardOutcome, RocksDBProviderFactory,
|
||||
StaticFileProviderFactory, StorageSettingsCache,
|
||||
};
|
||||
use reth_prune_types::{
|
||||
PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
|
||||
};
|
||||
use reth_static_file_types::StaticFileSegment;
|
||||
use rustc_hash::FxHashMap;
|
||||
use std::ops::RangeInclusive;
|
||||
use tracing::{instrument, trace};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct AccountsHistoryPruner {
|
||||
mode: PruneMode,
|
||||
}
|
||||
|
||||
impl AccountsHistoryPruner {
|
||||
pub const fn new(mode: PruneMode) -> Self {
|
||||
Self { mode }
|
||||
}
|
||||
}
|
||||
|
||||
impl AccountsHistoryPruner {
|
||||
fn prune_static_files<Provider>(
|
||||
&self,
|
||||
provider: &Provider,
|
||||
range: RangeInclusive<u64>,
|
||||
range_end: u64,
|
||||
input: PruneInput,
|
||||
) -> Result<SegmentOutput, PrunerError>
|
||||
where
|
||||
Provider: ChangeSetReader + RocksDBProviderFactory + StaticFileProviderFactory,
|
||||
{
|
||||
let mut limiter = input.limiter;
|
||||
if limiter.is_limit_reached() {
|
||||
return Ok(SegmentOutput::not_done(
|
||||
limiter.interrupt_reason(),
|
||||
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
|
||||
))
|
||||
}
|
||||
|
||||
let mut highest_deleted_accounts = FxHashMap::default();
|
||||
let mut last_changeset_pruned_block = None;
|
||||
let mut scanned_changesets = 0usize;
|
||||
let mut done = true;
|
||||
|
||||
for block in range {
|
||||
let changes = provider.account_block_changeset(block)?;
|
||||
let changes_count = changes.len();
|
||||
|
||||
for change in changes {
|
||||
highest_deleted_accounts.insert(change.address, block);
|
||||
}
|
||||
|
||||
scanned_changesets += changes_count;
|
||||
limiter.increment_deleted_entries_count_by(changes_count);
|
||||
last_changeset_pruned_block = Some(block);
|
||||
|
||||
if limiter.is_limit_reached() {
|
||||
done = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
trace!(target: "pruner", scanned = %scanned_changesets, %done, "Scanned account changesets");
|
||||
|
||||
let last_changeset_pruned_block = last_changeset_pruned_block.unwrap_or(range_end);
|
||||
|
||||
let mut keys_deleted = 0usize;
|
||||
let mut keys_updated = 0usize;
|
||||
|
||||
provider.with_rocksdb_batch(|mut batch| {
|
||||
for (address, highest_block) in &highest_deleted_accounts {
|
||||
let prune_to = (*highest_block).min(last_changeset_pruned_block);
|
||||
match batch.prune_account_history_to(*address, prune_to)? {
|
||||
PruneShardOutcome::Deleted => keys_deleted += 1,
|
||||
PruneShardOutcome::Updated => keys_updated += 1,
|
||||
PruneShardOutcome::Unchanged => {}
|
||||
}
|
||||
}
|
||||
Ok(((), Some(batch.into_inner())))
|
||||
})?;
|
||||
|
||||
trace!(target: "pruner", keys_deleted, keys_updated, %done, "Pruned account history (RocksDB indices)");
|
||||
|
||||
if done {
|
||||
provider.static_file_provider().delete_segment_below_block(
|
||||
StaticFileSegment::AccountChangeSets,
|
||||
last_changeset_pruned_block + 1,
|
||||
)?;
|
||||
}
|
||||
|
||||
let progress = limiter.progress(done);
|
||||
|
||||
Ok(SegmentOutput {
|
||||
progress,
|
||||
pruned: scanned_changesets + keys_deleted,
|
||||
checkpoint: Some(SegmentOutputCheckpoint {
|
||||
block_number: Some(last_changeset_pruned_block),
|
||||
tx_number: None,
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
fn prune_database<Provider>(
|
||||
&self,
|
||||
provider: &Provider,
|
||||
range: RangeInclusive<u64>,
|
||||
range_end: u64,
|
||||
input: PruneInput,
|
||||
) -> Result<SegmentOutput, PrunerError>
|
||||
where
|
||||
Provider: ChangeSetReader + RocksDBProviderFactory,
|
||||
{
|
||||
let mut limiter = input.limiter;
|
||||
if limiter.is_limit_reached() {
|
||||
return Ok(SegmentOutput::not_done(
|
||||
limiter.interrupt_reason(),
|
||||
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
|
||||
))
|
||||
}
|
||||
|
||||
let mut highest_deleted_accounts = FxHashMap::default();
|
||||
let mut last_changeset_pruned_block = None;
|
||||
let mut scanned_changesets = 0usize;
|
||||
let mut done = true;
|
||||
|
||||
for block in range {
|
||||
let changes = provider.account_block_changeset(block)?;
|
||||
let changes_count = changes.len();
|
||||
|
||||
for change in changes {
|
||||
highest_deleted_accounts.insert(change.address, block);
|
||||
}
|
||||
|
||||
scanned_changesets += changes_count;
|
||||
limiter.increment_deleted_entries_count_by(changes_count);
|
||||
last_changeset_pruned_block = Some(block);
|
||||
|
||||
if limiter.is_limit_reached() {
|
||||
done = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
trace!(target: "pruner", scanned = %scanned_changesets, %done, "Scanned account changesets");
|
||||
|
||||
let last_changeset_pruned_block = last_changeset_pruned_block.unwrap_or(range_end);
|
||||
|
||||
let mut keys_deleted = 0usize;
|
||||
let mut keys_updated = 0usize;
|
||||
|
||||
provider.with_rocksdb_batch(|mut batch| {
|
||||
for (address, highest_block) in &highest_deleted_accounts {
|
||||
let prune_to = (*highest_block).min(last_changeset_pruned_block);
|
||||
match batch.prune_account_history_to(*address, prune_to)? {
|
||||
PruneShardOutcome::Deleted => keys_deleted += 1,
|
||||
PruneShardOutcome::Updated => keys_updated += 1,
|
||||
PruneShardOutcome::Unchanged => {}
|
||||
}
|
||||
}
|
||||
Ok(((), Some(batch.into_inner())))
|
||||
})?;
|
||||
|
||||
trace!(target: "pruner", keys_deleted, keys_updated, %done, "Pruned account history (RocksDB indices)");
|
||||
|
||||
let progress = limiter.progress(done);
|
||||
|
||||
Ok(SegmentOutput {
|
||||
progress,
|
||||
pruned: scanned_changesets + keys_deleted,
|
||||
checkpoint: Some(SegmentOutputCheckpoint {
|
||||
block_number: Some(last_changeset_pruned_block),
|
||||
tx_number: None,
|
||||
}),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<Provider> Segment<Provider> for AccountsHistoryPruner
|
||||
where
|
||||
Provider: ChangeSetReader
|
||||
+ RocksDBProviderFactory
|
||||
+ StaticFileProviderFactory
|
||||
+ StorageSettingsCache
|
||||
+ DBProvider,
|
||||
{
|
||||
fn segment(&self) -> PruneSegment {
|
||||
PruneSegment::AccountHistory
|
||||
}
|
||||
|
||||
fn mode(&self) -> Option<PruneMode> {
|
||||
Some(self.mode)
|
||||
}
|
||||
|
||||
fn purpose(&self) -> PrunePurpose {
|
||||
PrunePurpose::User
|
||||
}
|
||||
|
||||
#[instrument(target = "pruner", skip(self, provider), ret(level = "trace"))]
|
||||
fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
|
||||
let range = match input.get_next_block_range() {
|
||||
Some(range) => range,
|
||||
None => {
|
||||
trace!(target: "pruner", "No account history to prune");
|
||||
return Ok(SegmentOutput::done())
|
||||
}
|
||||
};
|
||||
let range_end = *range.end();
|
||||
|
||||
if EitherWriter::account_changesets_destination(provider).is_static_file() {
|
||||
self.prune_static_files(provider, range, range_end, input)
|
||||
} else {
|
||||
self.prune_database(provider, range, range_end, input)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::segments::{PruneInput, PruneLimiter, Segment};
|
||||
use alloy_primitives::{Address, B256};
|
||||
use reth_db_api::{
|
||||
models::{AccountBeforeTx, ShardedKey},
|
||||
tables,
|
||||
transaction::DbTxMut,
|
||||
BlockNumberList,
|
||||
};
|
||||
use reth_provider::{
|
||||
DatabaseProviderFactory, RocksDBProviderFactory, StaticFileProviderFactory,
|
||||
StaticFileWriter,
|
||||
};
|
||||
use reth_prune_types::{PruneMode, PruneProgress};
|
||||
use reth_stages::test_utils::{StorageKind, TestStageDB};
|
||||
use reth_static_file_types::StaticFileSegment;
|
||||
use reth_storage_api::{DBProvider, StorageSettings, StorageSettingsCache};
|
||||
use reth_testing_utils::generators::{
|
||||
self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
|
||||
};
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
fn setup_rocksdb_test_db() -> TestStageDB {
|
||||
let db = TestStageDB::default();
|
||||
db.factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_account_history_in_rocksdb(true),
|
||||
);
|
||||
db
|
||||
}
|
||||
|
||||
fn generate_test_changeset(_block: u64, addresses: &[Address]) -> Vec<AccountBeforeTx> {
|
||||
addresses.iter().map(|&address| AccountBeforeTx { address, info: None }).collect()
|
||||
}
|
||||
|
||||
/// Helper to write account history to RocksDB.
|
||||
fn write_account_history_to_rocksdb(db: &TestStageDB, history: &[(Address, Vec<u64>)]) {
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
provider
|
||||
.with_rocksdb_batch(|mut batch| {
|
||||
for (address, blocks) in history {
|
||||
let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
|
||||
batch
|
||||
.put::<tables::AccountsHistory>(ShardedKey::last(*address), &shard)
|
||||
.unwrap();
|
||||
}
|
||||
Ok(((), Some(batch.into_inner())))
|
||||
})
|
||||
.unwrap();
|
||||
provider.commit().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_account_history_static_files_full_range() {
|
||||
let db = setup_rocksdb_test_db();
|
||||
let mut rng = generators::rng();
|
||||
|
||||
let blocks = random_block_range(
|
||||
&mut rng,
|
||||
0..=10,
|
||||
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
|
||||
);
|
||||
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
|
||||
|
||||
let addresses: Vec<Address> = (0..3)
|
||||
.map(|i| {
|
||||
let mut addr = Address::ZERO;
|
||||
addr.0[0] = i as u8;
|
||||
addr
|
||||
})
|
||||
.collect();
|
||||
|
||||
let static_file_provider = db.factory.static_file_provider();
|
||||
{
|
||||
let mut writer = static_file_provider
|
||||
.latest_writer(StaticFileSegment::AccountChangeSets)
|
||||
.expect("get writer");
|
||||
|
||||
for block_num in 0..=10 {
|
||||
let changeset = generate_test_changeset(block_num, &addresses);
|
||||
writer.append_account_changeset(changeset, block_num).expect("append changeset");
|
||||
}
|
||||
writer.commit().expect("commit static file");
|
||||
}
|
||||
|
||||
let highest = static_file_provider
|
||||
.get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
|
||||
assert_eq!(highest, Some(10), "Static file should cover blocks 0-10");
|
||||
|
||||
// Write account history to RocksDB (not MDBX)
|
||||
let history: Vec<_> = addresses.iter().map(|&addr| (addr, (0..=10).collect())).collect();
|
||||
write_account_history_to_rocksdb(&db, &history);
|
||||
|
||||
let prune_mode = PruneMode::Before(5);
|
||||
let input =
|
||||
PruneInput { previous_checkpoint: None, to_block: 5, limiter: PruneLimiter::default() };
|
||||
let segment = AccountsHistoryPruner::new(prune_mode);
|
||||
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
let result = segment.prune(&provider, input).expect("prune should succeed");
|
||||
provider.commit().expect("commit after prune");
|
||||
|
||||
assert_eq!(result.progress, PruneProgress::Finished);
|
||||
assert!(result.pruned > 0, "Should have pruned some entries");
|
||||
|
||||
let checkpoint = result.checkpoint.expect("should have checkpoint");
|
||||
assert_eq!(checkpoint.block_number, Some(5));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_account_history_mdbx_fallback() {
|
||||
let db = setup_rocksdb_test_db();
|
||||
let mut rng = generators::rng();
|
||||
|
||||
let blocks = random_block_range(
|
||||
&mut rng,
|
||||
1..=10,
|
||||
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
|
||||
);
|
||||
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
|
||||
|
||||
let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
|
||||
let (changesets, _) = random_changeset_range(
|
||||
&mut rng,
|
||||
blocks.iter(),
|
||||
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
|
||||
0..0,
|
||||
0..0,
|
||||
);
|
||||
db.insert_changesets(changesets.clone(), None).expect("insert changesets");
|
||||
db.insert_history(changesets, None).expect("insert history");
|
||||
|
||||
let mdbx_count_before = db.table::<tables::AccountChangeSets>().unwrap().len();
|
||||
assert!(mdbx_count_before > 0, "Should have MDBX changesets");
|
||||
|
||||
let static_file_provider = db.factory.static_file_provider();
|
||||
let highest = static_file_provider
|
||||
.get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
|
||||
assert!(highest.is_none(), "No static file coverage expected");
|
||||
|
||||
let prune_mode = PruneMode::Before(5);
|
||||
let input =
|
||||
PruneInput { previous_checkpoint: None, to_block: 5, limiter: PruneLimiter::default() };
|
||||
let segment = AccountsHistoryPruner::new(prune_mode);
|
||||
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
let result = segment.prune(&provider, input).expect("prune should succeed");
|
||||
provider.commit().expect("commit after prune");
|
||||
|
||||
assert_eq!(result.progress, PruneProgress::Finished);
|
||||
assert!(result.pruned > 0, "Should have pruned some entries");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_account_history_limiter_block_boundary() {
|
||||
let db = setup_rocksdb_test_db();
|
||||
let mut rng = generators::rng();
|
||||
|
||||
let blocks = random_block_range(
|
||||
&mut rng,
|
||||
0..=20,
|
||||
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
|
||||
);
|
||||
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
|
||||
|
||||
let addresses: Vec<Address> = (0..10)
|
||||
.map(|i| {
|
||||
let mut addr = Address::ZERO;
|
||||
addr.0[0] = i as u8;
|
||||
addr
|
||||
})
|
||||
.collect();
|
||||
|
||||
let static_file_provider = db.factory.static_file_provider();
|
||||
{
|
||||
let mut writer = static_file_provider
|
||||
.latest_writer(StaticFileSegment::AccountChangeSets)
|
||||
.expect("get writer");
|
||||
|
||||
for block_num in 0..=20 {
|
||||
let changeset = generate_test_changeset(block_num, &addresses);
|
||||
writer.append_account_changeset(changeset, block_num).expect("append changeset");
|
||||
}
|
||||
writer.commit().expect("commit static file");
|
||||
}
|
||||
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
for block_num in 0..=20u64 {
|
||||
for addr in &addresses {
|
||||
provider
|
||||
.tx_ref()
|
||||
.put::<tables::AccountsHistory>(
|
||||
reth_db_api::models::ShardedKey::new(*addr, block_num),
|
||||
reth_db_api::BlockNumberList::new([block_num]).unwrap(),
|
||||
)
|
||||
.expect("insert history");
|
||||
}
|
||||
}
|
||||
provider.commit().expect("commit provider");
|
||||
|
||||
let prune_mode = PruneMode::Before(15);
|
||||
let limiter = PruneLimiter::default().set_deleted_entries_limit(25);
|
||||
let input = PruneInput { previous_checkpoint: None, to_block: 15, limiter };
|
||||
let segment = AccountsHistoryPruner::new(prune_mode);
|
||||
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
let result = segment.prune(&provider, input).expect("prune should succeed");
|
||||
provider.commit().expect("commit after prune");
|
||||
|
||||
assert_eq!(
|
||||
result.progress,
|
||||
PruneProgress::HasMoreData(
|
||||
reth_prune_types::PruneInterruptReason::DeletedEntriesLimitReached
|
||||
),
|
||||
"Should stop due to limit"
|
||||
);
|
||||
|
||||
let checkpoint = result.checkpoint.expect("should have checkpoint");
|
||||
let pruned_block = checkpoint.block_number.expect("should have block number");
|
||||
assert!(pruned_block < 15, "Should have stopped before target block");
|
||||
assert!(pruned_block >= 0, "Should have pruned at least one block");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_account_history_empty_range() {
|
||||
let db = setup_rocksdb_test_db();
|
||||
let mut rng = generators::rng();
|
||||
|
||||
let blocks = random_block_range(
|
||||
&mut rng,
|
||||
1..=5,
|
||||
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
|
||||
);
|
||||
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
|
||||
|
||||
let prune_mode = PruneMode::Before(5);
|
||||
let input = PruneInput {
|
||||
previous_checkpoint: Some(reth_prune_types::PruneCheckpoint {
|
||||
block_number: Some(5),
|
||||
tx_number: None,
|
||||
prune_mode,
|
||||
}),
|
||||
to_block: 5,
|
||||
limiter: PruneLimiter::default(),
|
||||
};
|
||||
let segment = AccountsHistoryPruner::new(prune_mode);
|
||||
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
let result = segment.prune(&provider, input).expect("prune should succeed");
|
||||
|
||||
assert_eq!(result.progress, PruneProgress::Finished);
|
||||
assert_eq!(result.pruned, 0, "Nothing to prune");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_account_history_mixed_static_and_mdbx() {
|
||||
let db = setup_rocksdb_test_db();
|
||||
let mut rng = generators::rng();
|
||||
|
||||
let blocks = random_block_range(
|
||||
&mut rng,
|
||||
0..=20,
|
||||
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
|
||||
);
|
||||
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
|
||||
|
||||
let addresses: Vec<Address> = (0..3)
|
||||
.map(|i| {
|
||||
let mut addr = Address::ZERO;
|
||||
addr.0[0] = i as u8;
|
||||
addr
|
||||
})
|
||||
.collect();
|
||||
|
||||
let static_file_provider = db.factory.static_file_provider();
|
||||
{
|
||||
let mut writer = static_file_provider
|
||||
.latest_writer(StaticFileSegment::AccountChangeSets)
|
||||
.expect("get writer");
|
||||
|
||||
for block_num in 0..=10 {
|
||||
let changeset = generate_test_changeset(block_num, &addresses);
|
||||
writer.append_account_changeset(changeset, block_num).expect("append changeset");
|
||||
}
|
||||
writer.commit().expect("commit static file");
|
||||
}
|
||||
|
||||
db.commit(|tx| {
|
||||
for block_num in 11..=20u64 {
|
||||
for addr in &addresses {
|
||||
tx.put::<tables::AccountChangeSets>(
|
||||
block_num,
|
||||
AccountBeforeTx { address: *addr, info: None },
|
||||
)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.expect("insert MDBX changesets");
|
||||
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
for block_num in 0..=20u64 {
|
||||
for addr in &addresses {
|
||||
provider
|
||||
.tx_ref()
|
||||
.put::<tables::AccountsHistory>(
|
||||
reth_db_api::models::ShardedKey::new(*addr, block_num),
|
||||
reth_db_api::BlockNumberList::new([block_num]).unwrap(),
|
||||
)
|
||||
.expect("insert history");
|
||||
}
|
||||
}
|
||||
provider.commit().expect("commit provider");
|
||||
|
||||
let prune_mode = PruneMode::Before(15);
|
||||
let input = PruneInput {
|
||||
previous_checkpoint: None,
|
||||
to_block: 15,
|
||||
limiter: PruneLimiter::default(),
|
||||
};
|
||||
let segment = AccountsHistoryPruner::new(prune_mode);
|
||||
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
let result = segment.prune(&provider, input).expect("prune should succeed");
|
||||
provider.commit().expect("commit after prune");
|
||||
|
||||
assert_eq!(result.progress, PruneProgress::Finished);
|
||||
assert!(result.pruned > 0, "Should have pruned entries from both static and MDBX");
|
||||
|
||||
let checkpoint = result.checkpoint.expect("should have checkpoint");
|
||||
assert_eq!(checkpoint.block_number, Some(15));
|
||||
}
|
||||
}
|
||||
561
crates/prune/prune/src/segments/user/rocksdb_storage_history.rs
Normal file
561
crates/prune/prune/src/segments/user/rocksdb_storage_history.rs
Normal file
@@ -0,0 +1,561 @@
|
||||
//! `RocksDB` storage history index pruner.
|
||||
|
||||
use crate::{
|
||||
segments::{PruneInput, Segment},
|
||||
PrunerError,
|
||||
};
|
||||
use alloy_primitives::BlockNumber;
|
||||
use reth_provider::{
|
||||
DBProvider, EitherWriter, PruneShardOutcome, RocksDBProviderFactory, StaticFileProviderFactory,
|
||||
StorageChangeSetReader, StorageSettingsCache,
|
||||
};
|
||||
use reth_prune_types::{
|
||||
PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
|
||||
};
|
||||
use reth_static_file_types::StaticFileSegment;
|
||||
use rustc_hash::FxHashMap;
|
||||
use std::ops::RangeInclusive;
|
||||
use tracing::{instrument, trace};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct StoragesHistoryPruner {
|
||||
mode: PruneMode,
|
||||
}
|
||||
|
||||
impl StoragesHistoryPruner {
|
||||
pub const fn new(mode: PruneMode) -> Self {
|
||||
Self { mode }
|
||||
}
|
||||
|
||||
/// Prune when storage changesets are in static files.
|
||||
fn prune_static_files<Provider>(
|
||||
&self,
|
||||
provider: &Provider,
|
||||
range: RangeInclusive<BlockNumber>,
|
||||
range_end: BlockNumber,
|
||||
input: PruneInput,
|
||||
) -> Result<SegmentOutput, PrunerError>
|
||||
where
|
||||
Provider: StorageChangeSetReader + RocksDBProviderFactory + StaticFileProviderFactory,
|
||||
{
|
||||
let mut limiter = input.limiter;
|
||||
if limiter.is_limit_reached() {
|
||||
return Ok(SegmentOutput::not_done(
|
||||
limiter.interrupt_reason(),
|
||||
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
|
||||
))
|
||||
}
|
||||
|
||||
let mut highest_deleted_storages = FxHashMap::default();
|
||||
let mut last_changeset_pruned_block = None;
|
||||
let mut scanned_changesets = 0usize;
|
||||
let mut done = true;
|
||||
|
||||
for block in range {
|
||||
let changes = provider.storage_block_changeset(block)?;
|
||||
let changes_count = changes.len();
|
||||
|
||||
for change in changes {
|
||||
highest_deleted_storages.insert((change.address, change.key), block);
|
||||
}
|
||||
|
||||
scanned_changesets += changes_count;
|
||||
limiter.increment_deleted_entries_count_by(changes_count);
|
||||
last_changeset_pruned_block = Some(block);
|
||||
|
||||
if limiter.is_limit_reached() {
|
||||
done = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
trace!(target: "pruner", scanned = %scanned_changesets, %done, "Scanned storage changesets");
|
||||
|
||||
let last_changeset_pruned_block = last_changeset_pruned_block.unwrap_or(range_end);
|
||||
|
||||
let mut keys_deleted = 0usize;
|
||||
let mut keys_updated = 0usize;
|
||||
|
||||
provider.with_rocksdb_batch(|mut batch| {
|
||||
for ((address, storage_key), highest_block) in &highest_deleted_storages {
|
||||
let prune_to = (*highest_block).min(last_changeset_pruned_block);
|
||||
match batch.prune_storage_history_to(*address, *storage_key, prune_to)? {
|
||||
PruneShardOutcome::Deleted => keys_deleted += 1,
|
||||
PruneShardOutcome::Updated => keys_updated += 1,
|
||||
PruneShardOutcome::Unchanged => {}
|
||||
}
|
||||
}
|
||||
Ok(((), Some(batch.into_inner())))
|
||||
})?;
|
||||
|
||||
trace!(target: "pruner", keys_deleted, keys_updated, %done, "Pruned storage history (RocksDB indices)");
|
||||
|
||||
// Delete static file jars below the pruned block
|
||||
provider.static_file_provider().delete_segment_below_block(
|
||||
StaticFileSegment::StorageChangeSets,
|
||||
last_changeset_pruned_block + 1,
|
||||
)?;
|
||||
|
||||
let progress = limiter.progress(done);
|
||||
|
||||
Ok(SegmentOutput {
|
||||
progress,
|
||||
pruned: scanned_changesets + keys_deleted,
|
||||
checkpoint: Some(SegmentOutputCheckpoint {
|
||||
block_number: Some(last_changeset_pruned_block),
|
||||
tx_number: None,
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
/// Prune when storage changesets are in the database.
|
||||
fn prune_database<Provider>(
|
||||
&self,
|
||||
provider: &Provider,
|
||||
range: RangeInclusive<BlockNumber>,
|
||||
range_end: BlockNumber,
|
||||
input: PruneInput,
|
||||
) -> Result<SegmentOutput, PrunerError>
|
||||
where
|
||||
Provider: StorageChangeSetReader + RocksDBProviderFactory,
|
||||
{
|
||||
let mut limiter = input.limiter;
|
||||
if limiter.is_limit_reached() {
|
||||
return Ok(SegmentOutput::not_done(
|
||||
limiter.interrupt_reason(),
|
||||
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
|
||||
))
|
||||
}
|
||||
|
||||
let mut highest_deleted_storages = FxHashMap::default();
|
||||
let mut last_changeset_pruned_block = None;
|
||||
let mut scanned_changesets = 0usize;
|
||||
let mut done = true;
|
||||
|
||||
for block in range {
|
||||
let changes = provider.storage_block_changeset(block)?;
|
||||
let changes_count = changes.len();
|
||||
|
||||
for change in changes {
|
||||
highest_deleted_storages.insert((change.address, change.key), block);
|
||||
}
|
||||
|
||||
scanned_changesets += changes_count;
|
||||
limiter.increment_deleted_entries_count_by(changes_count);
|
||||
last_changeset_pruned_block = Some(block);
|
||||
|
||||
if limiter.is_limit_reached() {
|
||||
done = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
trace!(target: "pruner", scanned = %scanned_changesets, %done, "Scanned storage changesets");
|
||||
|
||||
let last_changeset_pruned_block = last_changeset_pruned_block.unwrap_or(range_end);
|
||||
|
||||
let mut keys_deleted = 0usize;
|
||||
let mut keys_updated = 0usize;
|
||||
|
||||
provider.with_rocksdb_batch(|mut batch| {
|
||||
for ((address, storage_key), highest_block) in &highest_deleted_storages {
|
||||
let prune_to = (*highest_block).min(last_changeset_pruned_block);
|
||||
match batch.prune_storage_history_to(*address, *storage_key, prune_to)? {
|
||||
PruneShardOutcome::Deleted => keys_deleted += 1,
|
||||
PruneShardOutcome::Updated => keys_updated += 1,
|
||||
PruneShardOutcome::Unchanged => {}
|
||||
}
|
||||
}
|
||||
Ok(((), Some(batch.into_inner())))
|
||||
})?;
|
||||
|
||||
trace!(target: "pruner", keys_deleted, keys_updated, %done, "Pruned storage history (RocksDB indices)");
|
||||
|
||||
let progress = limiter.progress(done);
|
||||
|
||||
Ok(SegmentOutput {
|
||||
progress,
|
||||
pruned: scanned_changesets + keys_deleted,
|
||||
checkpoint: Some(SegmentOutputCheckpoint {
|
||||
block_number: Some(last_changeset_pruned_block),
|
||||
tx_number: None,
|
||||
}),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<Provider> Segment<Provider> for StoragesHistoryPruner
|
||||
where
|
||||
Provider: DBProvider
|
||||
+ StorageChangeSetReader
|
||||
+ RocksDBProviderFactory
|
||||
+ StaticFileProviderFactory
|
||||
+ StorageSettingsCache,
|
||||
{
|
||||
fn segment(&self) -> PruneSegment {
|
||||
PruneSegment::StorageHistory
|
||||
}
|
||||
|
||||
fn mode(&self) -> Option<PruneMode> {
|
||||
Some(self.mode)
|
||||
}
|
||||
|
||||
fn purpose(&self) -> PrunePurpose {
|
||||
PrunePurpose::User
|
||||
}
|
||||
|
||||
#[instrument(target = "pruner", skip(self, provider), ret(level = "trace"))]
|
||||
fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
|
||||
let range = match input.get_next_block_range() {
|
||||
Some(range) => range,
|
||||
None => {
|
||||
trace!(target: "pruner", "No storage history to prune");
|
||||
return Ok(SegmentOutput::done())
|
||||
}
|
||||
};
|
||||
let range_end = *range.end();
|
||||
|
||||
if EitherWriter::storage_changesets_destination(provider).is_static_file() {
|
||||
self.prune_static_files(provider, range, range_end, input)
|
||||
} else {
|
||||
self.prune_database(provider, range, range_end, input)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::segments::{PruneInput, PruneLimiter, Segment};
|
||||
use alloy_primitives::{Address, B256, U256};
|
||||
use reth_db_api::{
|
||||
models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress, StorageBeforeTx},
|
||||
tables,
|
||||
transaction::DbTxMut,
|
||||
BlockNumberList,
|
||||
};
|
||||
use reth_primitives_traits::StorageEntry;
|
||||
use reth_provider::{
|
||||
DatabaseProviderFactory, RocksDBProviderFactory, StaticFileProviderFactory,
|
||||
StaticFileWriter,
|
||||
};
|
||||
use reth_prune_types::{PruneMode, PruneProgress};
|
||||
use reth_stages::test_utils::{StorageKind, TestStageDB};
|
||||
use reth_static_file_types::StaticFileSegment;
|
||||
use reth_storage_api::{DBProvider, StorageSettings, StorageSettingsCache};
|
||||
use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
|
||||
|
||||
fn setup_rocksdb_test_db() -> TestStageDB {
|
||||
let db = TestStageDB::default();
|
||||
db.factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
|
||||
);
|
||||
db
|
||||
}
|
||||
|
||||
/// Helper to write storage changesets to static files for testing.
|
||||
fn write_storage_changesets_to_static_files(
|
||||
db: &TestStageDB,
|
||||
changesets: &[(u64, Address, B256, U256)],
|
||||
) {
|
||||
let static_file_provider = db.factory.static_file_provider();
|
||||
let mut writer =
|
||||
static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
|
||||
|
||||
let mut current_block: Option<u64> = None;
|
||||
let mut block_entries = Vec::new();
|
||||
|
||||
for (block_number, address, key, value) in changesets {
|
||||
if current_block != Some(*block_number) {
|
||||
if !block_entries.is_empty() {
|
||||
writer
|
||||
.append_storage_changeset(
|
||||
std::mem::take(&mut block_entries),
|
||||
current_block.unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
current_block = Some(*block_number);
|
||||
}
|
||||
block_entries.push(StorageBeforeTx { address: *address, key: *key, value: *value });
|
||||
}
|
||||
|
||||
if !block_entries.is_empty() {
|
||||
writer.append_storage_changeset(block_entries, current_block.unwrap()).unwrap();
|
||||
}
|
||||
|
||||
writer.commit().unwrap();
|
||||
static_file_provider.initialize_index().unwrap();
|
||||
}
|
||||
|
||||
/// Helper to write storage history to `RocksDB`.
|
||||
fn write_storage_history_to_rocksdb(db: &TestStageDB, history: &[(Address, B256, Vec<u64>)]) {
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
provider
|
||||
.with_rocksdb_batch(|mut batch| {
|
||||
for (address, key, blocks) in history {
|
||||
let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
|
||||
batch
|
||||
.put::<tables::StoragesHistory>(
|
||||
StorageShardedKey::last(*address, *key),
|
||||
&shard,
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
Ok(((), Some(batch.into_inner())))
|
||||
})
|
||||
.unwrap();
|
||||
provider.commit().unwrap();
|
||||
}
|
||||
|
||||
/// Helper to read storage history from `RocksDB`.
|
||||
fn read_storage_history_from_rocksdb(
|
||||
db: &TestStageDB,
|
||||
address: Address,
|
||||
key: B256,
|
||||
) -> Vec<u64> {
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
let rocksdb = provider.rocksdb_provider();
|
||||
let shards = rocksdb.storage_history_shards(address, key).unwrap();
|
||||
shards.into_iter().flat_map(|(_, blocks)| blocks.iter().collect::<Vec<_>>()).collect()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_storage_history_static_files_full_range() {
|
||||
let db = setup_rocksdb_test_db();
|
||||
let mut rng = generators::rng();
|
||||
|
||||
let blocks = random_block_range(
|
||||
&mut rng,
|
||||
0..=10,
|
||||
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
|
||||
);
|
||||
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
|
||||
|
||||
let addr1 = Address::from([0x11; 20]);
|
||||
let addr2 = Address::from([0x22; 20]);
|
||||
let key1 = B256::from([0x01; 32]);
|
||||
let key2 = B256::from([0x02; 32]);
|
||||
|
||||
let changesets = vec![
|
||||
(0, addr1, key1, U256::from(50)),
|
||||
(1, addr1, key1, U256::from(100)),
|
||||
(1, addr1, key2, U256::from(200)),
|
||||
(2, addr1, key1, U256::from(110)),
|
||||
(2, addr2, key1, U256::from(300)),
|
||||
(3, addr1, key1, U256::from(120)),
|
||||
(3, addr2, key2, U256::from(400)),
|
||||
];
|
||||
write_storage_changesets_to_static_files(&db, &changesets);
|
||||
|
||||
write_storage_history_to_rocksdb(
|
||||
&db,
|
||||
&[
|
||||
(addr1, key1, vec![0, 1, 2, 3, 10, 20]),
|
||||
(addr1, key2, vec![1, 15]),
|
||||
(addr2, key1, vec![2, 25]),
|
||||
(addr2, key2, vec![3, 30]),
|
||||
],
|
||||
);
|
||||
|
||||
let segment = StoragesHistoryPruner::new(PruneMode::Before(3));
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
|
||||
let input =
|
||||
PruneInput { previous_checkpoint: None, to_block: 3, limiter: PruneLimiter::default() };
|
||||
|
||||
let result = segment.prune(&provider, input).unwrap();
|
||||
provider.commit().unwrap();
|
||||
|
||||
assert_eq!(result.progress, PruneProgress::Finished);
|
||||
assert!(result.pruned > 0);
|
||||
|
||||
let history1 = read_storage_history_from_rocksdb(&db, addr1, key1);
|
||||
assert!(!history1.contains(&0));
|
||||
assert!(!history1.contains(&1));
|
||||
assert!(!history1.contains(&2));
|
||||
assert!(!history1.contains(&3));
|
||||
assert!(history1.contains(&10) || history1.contains(&20));
|
||||
|
||||
let history2 = read_storage_history_from_rocksdb(&db, addr1, key2);
|
||||
assert!(!history2.contains(&1));
|
||||
|
||||
let history3 = read_storage_history_from_rocksdb(&db, addr2, key1);
|
||||
assert!(!history3.contains(&2));
|
||||
|
||||
let history4 = read_storage_history_from_rocksdb(&db, addr2, key2);
|
||||
assert!(!history4.contains(&3));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_storage_history_mdbx_fallback() {
|
||||
let db = setup_rocksdb_test_db();
|
||||
let mut rng = generators::rng();
|
||||
|
||||
let blocks = random_block_range(
|
||||
&mut rng,
|
||||
1..=10,
|
||||
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
|
||||
);
|
||||
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
|
||||
|
||||
let addr1 = Address::from([0x11; 20]);
|
||||
let key1 = B256::from([0x01; 32]);
|
||||
|
||||
db.commit(|tx| {
|
||||
for block in 1..=5u64 {
|
||||
tx.put::<tables::StorageChangeSets>(
|
||||
BlockNumberAddress((block, addr1)),
|
||||
StorageEntry { key: key1, value: U256::from(block * 100) },
|
||||
)?;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
write_storage_history_to_rocksdb(&db, &[(addr1, key1, vec![1, 2, 3, 4, 5])]);
|
||||
|
||||
let segment = StoragesHistoryPruner::new(PruneMode::Before(3));
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
|
||||
let input =
|
||||
PruneInput { previous_checkpoint: None, to_block: 3, limiter: PruneLimiter::default() };
|
||||
|
||||
let result = segment.prune(&provider, input).unwrap();
|
||||
provider.commit().unwrap();
|
||||
|
||||
assert_eq!(result.progress, PruneProgress::Finished);
|
||||
assert!(result.pruned > 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_storage_history_limiter_block_boundary() {
|
||||
let db = setup_rocksdb_test_db();
|
||||
let mut rng = generators::rng();
|
||||
|
||||
let blocks = random_block_range(
|
||||
&mut rng,
|
||||
0..=20,
|
||||
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
|
||||
);
|
||||
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
|
||||
|
||||
let addr1 = Address::from([0x11; 20]);
|
||||
let key1 = B256::from([0x01; 32]);
|
||||
|
||||
let mut changesets = Vec::new();
|
||||
for block in 0..=10u64 {
|
||||
for i in 0..10 {
|
||||
changesets.push((block, addr1, B256::from([i as u8; 32]), U256::from(block * 100)));
|
||||
}
|
||||
}
|
||||
write_storage_changesets_to_static_files(&db, &changesets);
|
||||
|
||||
let all_blocks: Vec<u64> = (0..=10).collect();
|
||||
write_storage_history_to_rocksdb(&db, &[(addr1, key1, all_blocks)]);
|
||||
|
||||
let segment = StoragesHistoryPruner::new(PruneMode::Before(10));
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
|
||||
let limiter = PruneLimiter::default().set_deleted_entries_limit(15);
|
||||
let input = PruneInput { previous_checkpoint: None, to_block: 10, limiter };
|
||||
|
||||
let result = segment.prune(&provider, input).unwrap();
|
||||
provider.commit().unwrap();
|
||||
|
||||
assert_eq!(
|
||||
result.progress,
|
||||
PruneProgress::HasMoreData(
|
||||
reth_prune_types::PruneInterruptReason::DeletedEntriesLimitReached
|
||||
),
|
||||
"Should stop due to limit"
|
||||
);
|
||||
|
||||
let checkpoint = result.checkpoint.expect("should have checkpoint");
|
||||
let pruned_block = checkpoint.block_number.expect("should have block number");
|
||||
assert!(pruned_block < 10, "Should have stopped before target block");
|
||||
assert!(pruned_block > 0, "Should have pruned at least some blocks");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_storage_history_empty_range() {
|
||||
let db = setup_rocksdb_test_db();
|
||||
let mut rng = generators::rng();
|
||||
|
||||
let blocks = random_block_range(
|
||||
&mut rng,
|
||||
1..=5,
|
||||
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
|
||||
);
|
||||
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
|
||||
|
||||
let segment = StoragesHistoryPruner::new(PruneMode::Before(5));
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
|
||||
let input = PruneInput {
|
||||
previous_checkpoint: Some(reth_prune_types::PruneCheckpoint {
|
||||
block_number: Some(5),
|
||||
tx_number: None,
|
||||
prune_mode: PruneMode::Before(5),
|
||||
}),
|
||||
to_block: 5,
|
||||
limiter: PruneLimiter::default(),
|
||||
};
|
||||
|
||||
let result = segment.prune(&provider, input).unwrap();
|
||||
|
||||
assert_eq!(result.progress, PruneProgress::Finished);
|
||||
assert_eq!(result.pruned, 0, "Nothing to prune");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_storage_history_mixed_static_and_mdbx() {
|
||||
let db = setup_rocksdb_test_db();
|
||||
let mut rng = generators::rng();
|
||||
|
||||
let blocks = random_block_range(
|
||||
&mut rng,
|
||||
0..=20,
|
||||
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
|
||||
);
|
||||
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
|
||||
|
||||
let addr1 = Address::from([0x11; 20]);
|
||||
let key1 = B256::from([0x01; 32]);
|
||||
|
||||
let mut changesets = Vec::new();
|
||||
for block in 0..=10u64 {
|
||||
changesets.push((block, addr1, key1, U256::from(block * 100)));
|
||||
}
|
||||
write_storage_changesets_to_static_files(&db, &changesets);
|
||||
|
||||
db.commit(|tx| {
|
||||
for block in 11..=20u64 {
|
||||
tx.put::<tables::StorageChangeSets>(
|
||||
BlockNumberAddress((block, addr1)),
|
||||
StorageEntry { key: key1, value: U256::from(block * 100) },
|
||||
)?;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
write_storage_history_to_rocksdb(&db, &[(addr1, key1, (0..=20).collect())]);
|
||||
|
||||
let segment = StoragesHistoryPruner::new(PruneMode::Before(15));
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
|
||||
let input = PruneInput {
|
||||
previous_checkpoint: None,
|
||||
to_block: 15,
|
||||
limiter: PruneLimiter::default(),
|
||||
};
|
||||
|
||||
let result = segment.prune(&provider, input).unwrap();
|
||||
provider.commit().unwrap();
|
||||
|
||||
assert_eq!(result.progress, PruneProgress::Finished);
|
||||
assert!(result.pruned > 0, "Should have pruned entries from both static and MDBX");
|
||||
|
||||
let checkpoint = result.checkpoint.expect("should have checkpoint");
|
||||
assert_eq!(checkpoint.block_number, Some(15));
|
||||
}
|
||||
}
|
||||
@@ -158,38 +158,33 @@ where
|
||||
let append_only =
|
||||
provider.count_entries::<tables::TransactionHashNumbers>()?.is_zero();
|
||||
|
||||
// Auto-commits on threshold; consistency check heals any crash.
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let rocksdb = provider.rocksdb_provider();
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let rocksdb_batch = rocksdb.batch_with_auto_commit();
|
||||
#[cfg(not(all(unix, feature = "rocksdb")))]
|
||||
let rocksdb_batch = ();
|
||||
let mut writer =
|
||||
EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
|
||||
let hash_iter = hash_collector.iter()?;
|
||||
|
||||
for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
|
||||
let (hash_bytes, number_bytes) = hash_to_number?;
|
||||
if index > 0 && index.is_multiple_of(interval) {
|
||||
info!(
|
||||
target: "sync::stages::transaction_lookup",
|
||||
?append_only,
|
||||
progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
|
||||
"Inserting hashes"
|
||||
);
|
||||
provider.with_rocksdb_batch(|rocksdb_batch| {
|
||||
// Create writer that routes to either MDBX or RocksDB based on settings
|
||||
let mut writer =
|
||||
EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
|
||||
|
||||
for (index, hash_to_number) in hash_iter.enumerate() {
|
||||
let (hash_bytes, number_bytes) =
|
||||
hash_to_number.map_err(ProviderError::other)?;
|
||||
if index > 0 && index.is_multiple_of(interval) {
|
||||
info!(
|
||||
target: "sync::stages::transaction_lookup",
|
||||
?append_only,
|
||||
progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
|
||||
"Inserting hashes"
|
||||
);
|
||||
}
|
||||
|
||||
// Decode from raw ETL bytes
|
||||
let hash = TxHash::decode(&hash_bytes)?;
|
||||
let tx_num = TxNumber::decompress(&number_bytes)?;
|
||||
writer.put_transaction_hash_number(hash, tx_num, append_only)?;
|
||||
}
|
||||
|
||||
// Decode from raw ETL bytes
|
||||
let hash = TxHash::decode(&hash_bytes)?;
|
||||
let tx_num = TxNumber::decompress(&number_bytes)?;
|
||||
writer.put_transaction_hash_number(hash, tx_num, append_only)?;
|
||||
}
|
||||
|
||||
// Extract and register RocksDB batch for commit at provider level
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if let Some(batch) = writer.into_raw_rocksdb_batch() {
|
||||
provider.set_pending_rocksdb_batch(batch);
|
||||
}
|
||||
Ok(((), writer.into_raw_rocksdb_batch()))
|
||||
})?;
|
||||
|
||||
trace!(target: "sync::stages::transaction_lookup",
|
||||
total_hashes,
|
||||
@@ -215,14 +210,6 @@ where
|
||||
) -> Result<UnwindOutput, StageError> {
|
||||
let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size);
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let rocksdb = provider.rocksdb_provider();
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let rocksdb_batch = rocksdb.batch();
|
||||
#[cfg(not(all(unix, feature = "rocksdb")))]
|
||||
let rocksdb_batch = ();
|
||||
let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
|
||||
|
||||
let static_file_provider = provider.static_file_provider();
|
||||
let rev_walker = provider
|
||||
.block_body_indices_range(range.clone())?
|
||||
@@ -230,24 +217,25 @@ where
|
||||
.zip(range.collect::<Vec<_>>())
|
||||
.rev();
|
||||
|
||||
for (body, number) in rev_walker {
|
||||
if number <= unwind_to {
|
||||
break;
|
||||
}
|
||||
provider.with_rocksdb_batch(|rocksdb_batch| {
|
||||
// Create writer that routes to either MDBX or RocksDB based on settings
|
||||
let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
|
||||
|
||||
// Delete all transactions that belong to this block
|
||||
for tx_id in body.tx_num_range() {
|
||||
if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? {
|
||||
writer.delete_transaction_hash_number(transaction.trie_hash())?;
|
||||
for (body, number) in rev_walker {
|
||||
if number <= unwind_to {
|
||||
break;
|
||||
}
|
||||
|
||||
// Delete all transactions that belong to this block
|
||||
for tx_id in body.tx_num_range() {
|
||||
if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? {
|
||||
writer.delete_transaction_hash_number(transaction.trie_hash())?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Extract and register RocksDB batch for commit at provider level
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if let Some(batch) = writer.into_raw_rocksdb_batch() {
|
||||
provider.set_pending_rocksdb_batch(batch);
|
||||
}
|
||||
Ok(((), writer.into_raw_rocksdb_batch()))
|
||||
})?;
|
||||
|
||||
Ok(UnwindOutput {
|
||||
checkpoint: StageCheckpoint::new(unwind_to)
|
||||
|
||||
@@ -21,8 +21,8 @@ pub mod providers;
|
||||
pub use providers::{
|
||||
DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, HistoricalStateProvider,
|
||||
HistoricalStateProviderRef, LatestStateProvider, LatestStateProviderRef, ProviderFactory,
|
||||
SaveBlocksMode, StaticFileAccess, StaticFileProviderBuilder, StaticFileWriteCtx,
|
||||
StaticFileWriter,
|
||||
PruneShardOutcome, SaveBlocksMode, StaticFileAccess, StaticFileProviderBuilder,
|
||||
StaticFileWriteCtx, StaticFileWriter,
|
||||
};
|
||||
|
||||
pub mod changeset_walker;
|
||||
|
||||
@@ -39,7 +39,8 @@ pub use consistent::ConsistentProvider;
|
||||
pub(crate) mod rocksdb;
|
||||
|
||||
pub use rocksdb::{
|
||||
RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksDBRawIter, RocksDBTableStats, RocksTx,
|
||||
PruneShardOutcome, RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksDBRawIter,
|
||||
RocksDBTableStats, RocksTx,
|
||||
};
|
||||
|
||||
/// Helper trait to bound [`NodeTypes`] so that combined with database they satisfy
|
||||
|
||||
@@ -5,7 +5,10 @@
|
||||
//! inconsistencies between `RocksDB` data and MDBX checkpoints.
|
||||
|
||||
use super::RocksDBProvider;
|
||||
use crate::StaticFileProviderFactory;
|
||||
use crate::{
|
||||
changeset_walker::{StaticFileAccountChangesetWalker, StaticFileStorageChangesetWalker},
|
||||
StaticFileProviderFactory,
|
||||
};
|
||||
use alloy_eips::eip2718::Encodable2718;
|
||||
use alloy_primitives::BlockNumber;
|
||||
use rayon::prelude::*;
|
||||
@@ -17,6 +20,7 @@ use reth_storage_api::{
|
||||
DBProvider, StageCheckpointReader, StorageSettingsCache, TransactionsProvider,
|
||||
};
|
||||
use reth_storage_errors::provider::ProviderResult;
|
||||
use std::collections::HashSet;
|
||||
|
||||
impl RocksDBProvider {
|
||||
/// Checks consistency of `RocksDB` tables against MDBX stage checkpoints.
|
||||
@@ -230,7 +234,7 @@ impl RocksDBProvider {
|
||||
provider: &Provider,
|
||||
) -> ProviderResult<Option<BlockNumber>>
|
||||
where
|
||||
Provider: DBProvider + StageCheckpointReader,
|
||||
Provider: DBProvider + StageCheckpointReader + StaticFileProviderFactory,
|
||||
{
|
||||
// Get the IndexStorageHistory stage checkpoint
|
||||
let checkpoint = provider
|
||||
@@ -249,7 +253,7 @@ impl RocksDBProvider {
|
||||
target: "reth::providers::rocksdb",
|
||||
"StoragesHistory has data but checkpoint is 0, clearing all"
|
||||
);
|
||||
self.prune_storages_history_above(0)?;
|
||||
self.prune_storages_history_in_range(provider, 0..=u64::MAX)?;
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
@@ -283,7 +287,10 @@ impl RocksDBProvider {
|
||||
checkpoint,
|
||||
"StoragesHistory ahead of checkpoint, pruning excess data"
|
||||
);
|
||||
self.prune_storages_history_above(checkpoint)?;
|
||||
self.prune_storages_history_in_range(
|
||||
provider,
|
||||
(checkpoint + 1)..=max_highest_block,
|
||||
)?;
|
||||
} else if max_highest_block < checkpoint {
|
||||
// RocksDB is behind checkpoint, return highest block to signal unwind needed
|
||||
tracing::warn!(
|
||||
@@ -351,7 +358,7 @@ impl RocksDBProvider {
|
||||
provider: &Provider,
|
||||
) -> ProviderResult<Option<BlockNumber>>
|
||||
where
|
||||
Provider: DBProvider + StageCheckpointReader,
|
||||
Provider: DBProvider + StageCheckpointReader + StaticFileProviderFactory,
|
||||
{
|
||||
// Get the IndexAccountHistory stage checkpoint
|
||||
let checkpoint = provider
|
||||
@@ -370,7 +377,7 @@ impl RocksDBProvider {
|
||||
target: "reth::providers::rocksdb",
|
||||
"AccountsHistory has data but checkpoint is 0, clearing all"
|
||||
);
|
||||
self.prune_accounts_history_above(0)?;
|
||||
self.prune_accounts_history_in_range(provider, 0..=u64::MAX)?;
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
@@ -404,7 +411,10 @@ impl RocksDBProvider {
|
||||
checkpoint,
|
||||
"AccountsHistory ahead of checkpoint, pruning excess data"
|
||||
);
|
||||
self.prune_accounts_history_above(checkpoint)?;
|
||||
self.prune_accounts_history_in_range(
|
||||
provider,
|
||||
(checkpoint + 1)..=max_highest_block,
|
||||
)?;
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
@@ -467,6 +477,181 @@ impl RocksDBProvider {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Prunes `AccountsHistory` entries for accounts changed in the given block range.
|
||||
///
|
||||
/// Uses static file changesets to identify which addresses were changed, then unwinds
|
||||
/// the corresponding `RocksDB` history entries.
|
||||
/// Falls back to table iteration if static files don't cover the range.
|
||||
pub fn prune_accounts_history_in_range<Provider>(
|
||||
&self,
|
||||
provider: &Provider,
|
||||
block_range: std::ops::RangeInclusive<BlockNumber>,
|
||||
) -> ProviderResult<()>
|
||||
where
|
||||
Provider: StaticFileProviderFactory,
|
||||
{
|
||||
if block_range.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let static_file_provider = provider.static_file_provider();
|
||||
|
||||
// Check if static files cover the range we need
|
||||
let highest_static_block = static_file_provider
|
||||
.get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
|
||||
|
||||
match highest_static_block {
|
||||
Some(highest) if highest >= *block_range.start() => {
|
||||
// Static files cover at least part of the range - use changeset-based approach
|
||||
let effective_end = (*block_range.end()).min(highest);
|
||||
let static_range = *block_range.start()..=effective_end;
|
||||
|
||||
tracing::info!(
|
||||
target: "reth::providers::rocksdb",
|
||||
?static_range,
|
||||
"Using static file changesets for AccountsHistory healing"
|
||||
);
|
||||
|
||||
// Collect unique addresses from static file changesets
|
||||
let mut addresses_to_unwind = HashSet::new();
|
||||
let walker: StaticFileAccountChangesetWalker<_> =
|
||||
static_file_provider.walk_account_changeset_range(static_range);
|
||||
|
||||
for result in walker {
|
||||
let (_block_number, account) = result?;
|
||||
addresses_to_unwind.insert(account.address);
|
||||
}
|
||||
|
||||
if !addresses_to_unwind.is_empty() {
|
||||
tracing::info!(
|
||||
target: "reth::providers::rocksdb",
|
||||
addresses_count = addresses_to_unwind.len(),
|
||||
"Unwinding AccountsHistory for changed accounts"
|
||||
);
|
||||
|
||||
// Unwind history for each address to keep only blocks <= (start - 1)
|
||||
let keep_to = block_range.start().saturating_sub(1);
|
||||
let mut batch = self.batch();
|
||||
for address in addresses_to_unwind {
|
||||
batch.unwind_account_history_to(address, keep_to)?;
|
||||
}
|
||||
batch.commit()?;
|
||||
}
|
||||
|
||||
// If static files didn't cover the entire range, fall back for remainder
|
||||
if effective_end < *block_range.end() {
|
||||
tracing::info!(
|
||||
target: "reth::providers::rocksdb",
|
||||
from_block = effective_end + 1,
|
||||
to_block = *block_range.end(),
|
||||
"Static files don't cover full range, using table scan for remainder"
|
||||
);
|
||||
self.prune_accounts_history_above(effective_end)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
_ => {
|
||||
// Static files don't cover the range - fall back to table iteration
|
||||
tracing::info!(
|
||||
target: "reth::providers::rocksdb",
|
||||
?highest_static_block,
|
||||
?block_range,
|
||||
"Static files don't cover range, falling back to table scan"
|
||||
);
|
||||
self.prune_accounts_history_above(block_range.start().saturating_sub(1))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Prunes `StoragesHistory` entries for storage slots changed in the given block range.
|
||||
///
|
||||
/// Uses static file changesets to identify which `(address, storage_key)` pairs
|
||||
/// were changed, then unwinds the corresponding `RocksDB` history entries.
|
||||
/// Falls back to table iteration if static files don't cover the range.
|
||||
pub fn prune_storages_history_in_range<Provider>(
|
||||
&self,
|
||||
provider: &Provider,
|
||||
block_range: std::ops::RangeInclusive<BlockNumber>,
|
||||
) -> ProviderResult<()>
|
||||
where
|
||||
Provider: StaticFileProviderFactory,
|
||||
{
|
||||
if block_range.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let static_file_provider = provider.static_file_provider();
|
||||
|
||||
// Check if static files cover the range we need
|
||||
let highest_static_block = static_file_provider
|
||||
.get_highest_static_file_block(StaticFileSegment::StorageChangeSets);
|
||||
|
||||
match highest_static_block {
|
||||
Some(highest) if highest >= *block_range.start() => {
|
||||
// Static files cover at least part of the range - use changeset-based approach
|
||||
let effective_end = (*block_range.end()).min(highest);
|
||||
let static_range = *block_range.start()..=effective_end;
|
||||
|
||||
tracing::info!(
|
||||
target: "reth::providers::rocksdb",
|
||||
?static_range,
|
||||
"Using static file changesets for StoragesHistory healing"
|
||||
);
|
||||
|
||||
// Collect unique (address, storage_key) pairs from static file changesets
|
||||
let mut keys_to_unwind = HashSet::new();
|
||||
let walker: StaticFileStorageChangesetWalker<_> =
|
||||
static_file_provider.walk_storage_changeset_range(static_range);
|
||||
|
||||
for result in walker {
|
||||
let (block_number_address, entry) = result?;
|
||||
keys_to_unwind.insert((block_number_address.address(), entry.key));
|
||||
}
|
||||
|
||||
if !keys_to_unwind.is_empty() {
|
||||
tracing::info!(
|
||||
target: "reth::providers::rocksdb",
|
||||
keys_count = keys_to_unwind.len(),
|
||||
"Unwinding StoragesHistory for changed storage slots"
|
||||
);
|
||||
|
||||
// Unwind history for each (address, storage_key) pair to keep only
|
||||
// blocks <= (start - 1)
|
||||
let keep_to = block_range.start().saturating_sub(1);
|
||||
let mut batch = self.batch();
|
||||
for (address, storage_key) in keys_to_unwind {
|
||||
batch.unwind_storage_history_to(address, storage_key, keep_to)?;
|
||||
}
|
||||
batch.commit()?;
|
||||
}
|
||||
|
||||
// If static files didn't cover the entire range, fall back for remainder
|
||||
if effective_end < *block_range.end() {
|
||||
tracing::info!(
|
||||
target: "reth::providers::rocksdb",
|
||||
from_block = effective_end + 1,
|
||||
to_block = *block_range.end(),
|
||||
"Static files don't cover full range, using table scan for remainder"
|
||||
);
|
||||
self.prune_storages_history_above(effective_end)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
_ => {
|
||||
// Static files don't cover the range - fall back to table iteration
|
||||
tracing::info!(
|
||||
target: "reth::providers::rocksdb",
|
||||
?highest_static_block,
|
||||
?block_range,
|
||||
"Static files don't cover range, falling back to table scan"
|
||||
);
|
||||
self.prune_storages_history_above(block_range.start().saturating_sub(1))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -1402,4 +1587,291 @@ mod tests {
|
||||
"Should require unwind to block 50 to rebuild AccountsHistory"
|
||||
);
|
||||
}
|
||||
|
||||
/// Test that healing for `AccountsHistory` uses static file changesets when available.
|
||||
///
|
||||
/// This test verifies that:
|
||||
/// 1. When static files contain changesets, healing uses them to identify which addresses need
|
||||
/// their history unwound
|
||||
/// 2. Addresses NOT in the changeset range remain untouched
|
||||
/// 3. The unwind re-keys shards appropriately (to `u64::MAX` sentinel)
|
||||
#[test]
|
||||
fn test_accounts_history_healing_uses_changesets() {
|
||||
use crate::providers::static_file::StaticFileWriter;
|
||||
use reth_db_api::models::ShardedKey;
|
||||
use reth_primitives_traits::Account;
|
||||
use reth_static_file_types::StaticFileSegment;
|
||||
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let rocksdb = RocksDBBuilder::new(temp_dir.path())
|
||||
.with_table::<tables::AccountsHistory>()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let factory = create_test_provider_factory();
|
||||
factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_account_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
let addr1 = Address::repeat_byte(0x11);
|
||||
let addr2 = Address::repeat_byte(0x22);
|
||||
let addr3 = Address::repeat_byte(0x33);
|
||||
|
||||
// Write changesets for addr1 and addr2 (not addr3)
|
||||
{
|
||||
let static_file_provider = factory.static_file_provider();
|
||||
let mut writer =
|
||||
static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
|
||||
for block_num in 0..=10 {
|
||||
let changeset = vec![
|
||||
reth_db_api::models::AccountBeforeTx {
|
||||
address: addr1,
|
||||
info: Some(Account { nonce: block_num, ..Default::default() }),
|
||||
},
|
||||
reth_db_api::models::AccountBeforeTx {
|
||||
address: addr2,
|
||||
info: Some(Account { nonce: block_num + 100, ..Default::default() }),
|
||||
},
|
||||
];
|
||||
writer.append_account_changeset(changeset, block_num).unwrap();
|
||||
}
|
||||
writer.commit().unwrap();
|
||||
}
|
||||
|
||||
// Insert history shards into RocksDB
|
||||
let key_addr1_block5 = ShardedKey::new(addr1, 5);
|
||||
let key_addr1_block10 = ShardedKey::new(addr1, 10);
|
||||
let key_addr2_block10 = ShardedKey::new(addr2, 10);
|
||||
let key_addr3_block10 = ShardedKey::new(addr3, 10);
|
||||
|
||||
let block_list_5 = BlockNumberList::new_pre_sorted([1, 2, 3, 4, 5]);
|
||||
let block_list_10 = BlockNumberList::new_pre_sorted([6, 7, 8, 9, 10]);
|
||||
rocksdb.put::<tables::AccountsHistory>(key_addr1_block5.clone(), &block_list_5).unwrap();
|
||||
rocksdb.put::<tables::AccountsHistory>(key_addr1_block10.clone(), &block_list_10).unwrap();
|
||||
rocksdb.put::<tables::AccountsHistory>(key_addr2_block10.clone(), &block_list_10).unwrap();
|
||||
rocksdb.put::<tables::AccountsHistory>(key_addr3_block10.clone(), &block_list_10).unwrap();
|
||||
|
||||
let highest = factory
|
||||
.static_file_provider()
|
||||
.get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
|
||||
assert_eq!(highest, Some(10), "Static files should cover blocks 0-10");
|
||||
|
||||
// Prune blocks 6-10 - should use changesets
|
||||
rocksdb.prune_accounts_history_in_range(&factory, 6..=10).unwrap();
|
||||
|
||||
// After healing, addr1's block5 shard is re-keyed to sentinel
|
||||
let key_addr1_sentinel = ShardedKey::new(addr1, u64::MAX);
|
||||
|
||||
assert!(
|
||||
rocksdb.get::<tables::AccountsHistory>(key_addr1_block5).unwrap().is_none(),
|
||||
"addr1 block 5 shard should be re-keyed to sentinel"
|
||||
);
|
||||
assert!(
|
||||
rocksdb.get::<tables::AccountsHistory>(key_addr1_sentinel).unwrap().is_some(),
|
||||
"addr1 should have sentinel shard after healing"
|
||||
);
|
||||
assert!(
|
||||
rocksdb.get::<tables::AccountsHistory>(key_addr1_block10).unwrap().is_none(),
|
||||
"addr1 block 10 shard should be pruned"
|
||||
);
|
||||
assert!(
|
||||
rocksdb.get::<tables::AccountsHistory>(key_addr2_block10).unwrap().is_none(),
|
||||
"addr2 block 10 shard should be pruned (all blocks > 5)"
|
||||
);
|
||||
assert!(
|
||||
rocksdb.get::<tables::AccountsHistory>(key_addr3_block10).unwrap().is_some(),
|
||||
"addr3 block 10 shard should remain (not in static file changesets)"
|
||||
);
|
||||
}
|
||||
|
||||
/// Test that healing for `StoragesHistory` uses static file changesets when available.
|
||||
#[test]
|
||||
fn test_storages_history_healing_uses_changesets() {
|
||||
use crate::providers::static_file::StaticFileWriter;
|
||||
use reth_static_file_types::StaticFileSegment;
|
||||
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let rocksdb = RocksDBBuilder::new(temp_dir.path())
|
||||
.with_table::<tables::StoragesHistory>()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let factory = create_test_provider_factory();
|
||||
factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
let addr1 = Address::repeat_byte(0x11);
|
||||
let storage_key1 = B256::repeat_byte(0xAA);
|
||||
let storage_key2 = B256::repeat_byte(0xBB);
|
||||
let addr2 = Address::repeat_byte(0x22);
|
||||
let addr_not_in_changeset = Address::repeat_byte(0x99);
|
||||
|
||||
// Write storage changesets
|
||||
{
|
||||
let static_file_provider = factory.static_file_provider();
|
||||
let mut writer =
|
||||
static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
|
||||
for block_num in 0..=10 {
|
||||
let changeset = vec![
|
||||
reth_db_api::models::StorageBeforeTx {
|
||||
address: addr1,
|
||||
key: storage_key1,
|
||||
value: alloy_primitives::U256::from(block_num),
|
||||
},
|
||||
reth_db_api::models::StorageBeforeTx {
|
||||
address: addr1,
|
||||
key: storage_key2,
|
||||
value: alloy_primitives::U256::from(block_num + 100),
|
||||
},
|
||||
reth_db_api::models::StorageBeforeTx {
|
||||
address: addr2,
|
||||
key: storage_key1,
|
||||
value: alloy_primitives::U256::from(block_num + 200),
|
||||
},
|
||||
];
|
||||
writer.append_storage_changeset(changeset, block_num).unwrap();
|
||||
}
|
||||
writer.commit().unwrap();
|
||||
}
|
||||
|
||||
// Insert history shards
|
||||
let key_addr1_key1_block5 = StorageShardedKey::new(addr1, storage_key1, 5);
|
||||
let key_addr1_key1_block10 = StorageShardedKey::new(addr1, storage_key1, 10);
|
||||
let key_addr1_key2_block10 = StorageShardedKey::new(addr1, storage_key2, 10);
|
||||
let key_addr2_key1_block10 = StorageShardedKey::new(addr2, storage_key1, 10);
|
||||
let key_not_in_changeset = StorageShardedKey::new(addr_not_in_changeset, storage_key1, 10);
|
||||
|
||||
let block_list_5 = BlockNumberList::new_pre_sorted([1, 2, 3, 4, 5]);
|
||||
let block_list_10 = BlockNumberList::new_pre_sorted([6, 7, 8, 9, 10]);
|
||||
rocksdb
|
||||
.put::<tables::StoragesHistory>(key_addr1_key1_block5.clone(), &block_list_5)
|
||||
.unwrap();
|
||||
rocksdb
|
||||
.put::<tables::StoragesHistory>(key_addr1_key1_block10.clone(), &block_list_10)
|
||||
.unwrap();
|
||||
rocksdb
|
||||
.put::<tables::StoragesHistory>(key_addr1_key2_block10.clone(), &block_list_10)
|
||||
.unwrap();
|
||||
rocksdb
|
||||
.put::<tables::StoragesHistory>(key_addr2_key1_block10.clone(), &block_list_10)
|
||||
.unwrap();
|
||||
rocksdb
|
||||
.put::<tables::StoragesHistory>(key_not_in_changeset.clone(), &block_list_10)
|
||||
.unwrap();
|
||||
|
||||
let highest = factory
|
||||
.static_file_provider()
|
||||
.get_highest_static_file_block(StaticFileSegment::StorageChangeSets);
|
||||
assert_eq!(highest, Some(10), "Static files should cover blocks 0-10");
|
||||
|
||||
rocksdb.prune_storages_history_in_range(&factory, 6..=10).unwrap();
|
||||
|
||||
let key_addr1_key1_sentinel = StorageShardedKey::new(addr1, storage_key1, u64::MAX);
|
||||
|
||||
assert!(
|
||||
rocksdb.get::<tables::StoragesHistory>(key_addr1_key1_block5).unwrap().is_none(),
|
||||
"addr1/key1 block 5 shard should be re-keyed to sentinel"
|
||||
);
|
||||
assert!(
|
||||
rocksdb.get::<tables::StoragesHistory>(key_addr1_key1_sentinel).unwrap().is_some(),
|
||||
"addr1/key1 should have sentinel shard"
|
||||
);
|
||||
assert!(
|
||||
rocksdb.get::<tables::StoragesHistory>(key_addr1_key1_block10).unwrap().is_none(),
|
||||
"addr1/key1 block 10 shard should be pruned"
|
||||
);
|
||||
assert!(
|
||||
rocksdb.get::<tables::StoragesHistory>(key_addr1_key2_block10).unwrap().is_none(),
|
||||
"addr1/key2 block 10 shard should be pruned"
|
||||
);
|
||||
assert!(
|
||||
rocksdb.get::<tables::StoragesHistory>(key_addr2_key1_block10).unwrap().is_none(),
|
||||
"addr2/key1 block 10 shard should be pruned"
|
||||
);
|
||||
assert!(
|
||||
rocksdb.get::<tables::StoragesHistory>(key_not_in_changeset).unwrap().is_some(),
|
||||
"addr not in changeset should remain"
|
||||
);
|
||||
}
|
||||
|
||||
/// Test fallback to table scan when static files don't cover the range.
|
||||
#[test]
|
||||
fn test_history_healing_fallback_when_no_static_files() {
|
||||
use reth_db_api::models::ShardedKey;
|
||||
use reth_static_file_types::StaticFileSegment;
|
||||
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let rocksdb = RocksDBBuilder::new(temp_dir.path())
|
||||
.with_table::<tables::AccountsHistory>()
|
||||
.with_table::<tables::StoragesHistory>()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let factory = create_test_provider_factory();
|
||||
factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy()
|
||||
.with_account_history_in_rocksdb(true)
|
||||
.with_storages_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
let addr1 = Address::repeat_byte(0x11);
|
||||
let addr2 = Address::repeat_byte(0x22);
|
||||
|
||||
// Insert shards - no static file changesets written
|
||||
let key_addr1_block50 = ShardedKey::new(addr1, 50);
|
||||
let key_addr2_block100 = ShardedKey::new(addr2, 100);
|
||||
let key_addr1_sentinel = ShardedKey::new(addr1, u64::MAX);
|
||||
|
||||
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
|
||||
rocksdb.put::<tables::AccountsHistory>(key_addr1_block50.clone(), &block_list).unwrap();
|
||||
rocksdb.put::<tables::AccountsHistory>(key_addr2_block100.clone(), &block_list).unwrap();
|
||||
rocksdb.put::<tables::AccountsHistory>(key_addr1_sentinel.clone(), &block_list).unwrap();
|
||||
|
||||
let storage_key = B256::repeat_byte(0xAA);
|
||||
let storage_key_block50 = StorageShardedKey::new(addr1, storage_key, 50);
|
||||
let storage_key_block100 = StorageShardedKey::new(addr1, storage_key, 100);
|
||||
let storage_key_sentinel = StorageShardedKey::new(addr1, storage_key, u64::MAX);
|
||||
|
||||
rocksdb.put::<tables::StoragesHistory>(storage_key_block50.clone(), &block_list).unwrap();
|
||||
rocksdb.put::<tables::StoragesHistory>(storage_key_block100.clone(), &block_list).unwrap();
|
||||
rocksdb.put::<tables::StoragesHistory>(storage_key_sentinel.clone(), &block_list).unwrap();
|
||||
|
||||
// No static files exist
|
||||
let highest = factory
|
||||
.static_file_provider()
|
||||
.get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
|
||||
assert!(highest.is_none(), "No static files should exist");
|
||||
|
||||
// Without changesets, fallback deletes all shards with highest_block > 50
|
||||
rocksdb.prune_accounts_history_in_range(&factory, 51..=100).unwrap();
|
||||
|
||||
assert!(
|
||||
rocksdb.get::<tables::AccountsHistory>(key_addr1_block50).unwrap().is_some(),
|
||||
"addr1 block 50 should remain (at boundary)"
|
||||
);
|
||||
assert!(
|
||||
rocksdb.get::<tables::AccountsHistory>(key_addr2_block100).unwrap().is_none(),
|
||||
"addr2 block 100 should be pruned (fallback deletes > 50)"
|
||||
);
|
||||
assert!(
|
||||
rocksdb.get::<tables::AccountsHistory>(key_addr1_sentinel).unwrap().is_some(),
|
||||
"sentinel entry should remain"
|
||||
);
|
||||
|
||||
rocksdb.prune_storages_history_in_range(&factory, 51..=100).unwrap();
|
||||
|
||||
assert!(
|
||||
rocksdb.get::<tables::StoragesHistory>(storage_key_block50).unwrap().is_some(),
|
||||
"storage block 50 should remain"
|
||||
);
|
||||
assert!(
|
||||
rocksdb.get::<tables::StoragesHistory>(storage_key_block100).unwrap().is_none(),
|
||||
"storage block 100 should be pruned"
|
||||
);
|
||||
assert!(
|
||||
rocksdb.get::<tables::StoragesHistory>(storage_key_sentinel).unwrap().is_some(),
|
||||
"storage sentinel should remain"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,5 +6,6 @@ mod provider;
|
||||
|
||||
pub(crate) use provider::{PendingRocksDBBatches, RocksDBWriteCtx};
|
||||
pub use provider::{
|
||||
RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksDBRawIter, RocksDBTableStats, RocksTx,
|
||||
PruneShardOutcome, RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksDBRawIter,
|
||||
RocksDBTableStats, RocksTx,
|
||||
};
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -158,3 +158,14 @@ pub struct RocksTx;
|
||||
/// A stub raw iterator for `RocksDB`.
|
||||
#[derive(Debug)]
|
||||
pub struct RocksDBRawIter;
|
||||
|
||||
/// Outcome of pruning a history shard (stub).
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum PruneShardOutcome {
|
||||
/// Shard was deleted entirely.
|
||||
Deleted,
|
||||
/// Shard was updated with filtered block numbers.
|
||||
Updated,
|
||||
/// Shard was unchanged (no blocks <= `to_block`).
|
||||
Unchanged,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user