mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
16 Commits
push
...
feat/rocks
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
163cc01f5c | ||
|
|
56188cd48f | ||
|
|
055de28e48 | ||
|
|
47c005c47b | ||
|
|
f1b8be7480 | ||
|
|
f40fcb00c8 | ||
|
|
393dc0c3e0 | ||
|
|
15f6587e84 | ||
|
|
3f3f808ab0 | ||
|
|
21ecafdb06 | ||
|
|
9ecb970e26 | ||
|
|
ecb32978ee | ||
|
|
9f735b9164 | ||
|
|
0346f03bfd | ||
|
|
28e0b1b8d8 | ||
|
|
1b79a6d0b0 |
@@ -42,11 +42,15 @@ rayon.workspace = true
|
||||
tokio.workspace = true
|
||||
rustc-hash.workspace = true
|
||||
|
||||
[features]
|
||||
rocksdb = ["reth-provider/rocksdb"]
|
||||
|
||||
[dev-dependencies]
|
||||
# reth
|
||||
reth-db = { workspace = true, features = ["test-utils"] }
|
||||
reth-stages = { workspace = true, features = ["test-utils"] }
|
||||
reth-stages = { workspace = true, features = ["test-utils", "rocksdb"] }
|
||||
reth-primitives-traits = { workspace = true, features = ["arbitrary"] }
|
||||
reth-storage-api.workspace = true
|
||||
reth-testing-utils.workspace = true
|
||||
reth-tracing.workspace = true
|
||||
|
||||
|
||||
@@ -7,10 +7,10 @@ use reth_primitives_traits::NodePrimitives;
|
||||
use reth_provider::{
|
||||
providers::StaticFileProvider, BlockReader, ChainStateBlockReader, DBProvider,
|
||||
DatabaseProviderFactory, NodePrimitivesProvider, PruneCheckpointReader, PruneCheckpointWriter,
|
||||
StageCheckpointReader, StaticFileProviderFactory, StorageSettingsCache,
|
||||
RocksDBProviderFactory, StageCheckpointReader, StaticFileProviderFactory,
|
||||
};
|
||||
use reth_prune_types::PruneModes;
|
||||
use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
|
||||
use reth_storage_api::{ChangeSetReader, StorageChangeSetReader, StorageSettingsCache};
|
||||
use std::time::Duration;
|
||||
use tokio::sync::watch;
|
||||
|
||||
@@ -85,6 +85,7 @@ impl PrunerBuilder {
|
||||
+ StageCheckpointReader
|
||||
+ ChangeSetReader
|
||||
+ StorageChangeSetReader
|
||||
+ RocksDBProviderFactory
|
||||
+ StaticFileProviderFactory<
|
||||
Primitives: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>,
|
||||
>,
|
||||
@@ -121,7 +122,8 @@ impl PrunerBuilder {
|
||||
+ StorageSettingsCache
|
||||
+ StageCheckpointReader
|
||||
+ ChangeSetReader
|
||||
+ StorageChangeSetReader,
|
||||
+ StorageChangeSetReader
|
||||
+ RocksDBProviderFactory,
|
||||
{
|
||||
let segments = SegmentSet::<Provider>::from_components(static_file_provider, self.segments);
|
||||
|
||||
|
||||
@@ -7,10 +7,11 @@ use reth_db_api::{table::Value, transaction::DbTxMut};
|
||||
use reth_primitives_traits::NodePrimitives;
|
||||
use reth_provider::{
|
||||
providers::StaticFileProvider, BlockReader, ChainStateBlockReader, DBProvider,
|
||||
PruneCheckpointReader, PruneCheckpointWriter, StaticFileProviderFactory, StorageSettingsCache,
|
||||
PruneCheckpointReader, PruneCheckpointWriter, RocksDBProviderFactory,
|
||||
StaticFileProviderFactory,
|
||||
};
|
||||
use reth_prune_types::PruneModes;
|
||||
use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
|
||||
use reth_storage_api::{ChangeSetReader, StorageChangeSetReader, StorageSettingsCache};
|
||||
|
||||
/// Collection of [`Segment`]. Thread-safe, allocated on the heap.
|
||||
#[derive(Debug)]
|
||||
@@ -55,7 +56,8 @@ where
|
||||
+ ChainStateBlockReader
|
||||
+ StorageSettingsCache
|
||||
+ ChangeSetReader
|
||||
+ StorageChangeSetReader,
|
||||
+ StorageChangeSetReader
|
||||
+ RocksDBProviderFactory,
|
||||
{
|
||||
/// Creates a [`SegmentSet`] from an existing components, such as [`StaticFileProvider`] and
|
||||
/// [`PruneModes`].
|
||||
|
||||
@@ -10,20 +10,20 @@ use alloy_primitives::BlockNumber;
|
||||
use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut};
|
||||
use reth_provider::{
|
||||
changeset_walker::StaticFileAccountChangesetWalker, DBProvider, EitherWriter,
|
||||
StaticFileProviderFactory, StorageSettingsCache,
|
||||
RocksDBProviderFactory, StaticFileProviderFactory,
|
||||
};
|
||||
use reth_prune_types::{
|
||||
PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
|
||||
};
|
||||
use reth_static_file_types::StaticFileSegment;
|
||||
use reth_storage_api::ChangeSetReader;
|
||||
use reth_storage_api::{ChangeSetReader, StorageSettingsCache};
|
||||
use rustc_hash::FxHashMap;
|
||||
use tracing::{instrument, trace};
|
||||
|
||||
/// Number of account history tables to prune in one step.
|
||||
///
|
||||
/// Account History consists of two tables: [`tables::AccountChangeSets`] and
|
||||
/// [`tables::AccountsHistory`]. We want to prune them to the same block number.
|
||||
/// Account History consists of two tables: [`tables::AccountChangeSets`] (either in database or
|
||||
/// static files) and [`tables::AccountsHistory`]. We want to prune them to the same block number.
|
||||
const ACCOUNT_HISTORY_TABLES_TO_PRUNE: usize = 2;
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -42,7 +42,8 @@ where
|
||||
Provider: DBProvider<Tx: DbTxMut>
|
||||
+ StaticFileProviderFactory
|
||||
+ StorageSettingsCache
|
||||
+ ChangeSetReader,
|
||||
+ ChangeSetReader
|
||||
+ RocksDBProviderFactory,
|
||||
{
|
||||
fn segment(&self) -> PruneSegment {
|
||||
PruneSegment::AccountHistory
|
||||
@@ -67,7 +68,13 @@ where
|
||||
};
|
||||
let range_end = *range.end();
|
||||
|
||||
// Check where account changesets are stored
|
||||
// Check where account history indices are stored
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if provider.cached_storage_settings().account_history_in_rocksdb {
|
||||
return self.prune_rocksdb(provider, input, range, range_end);
|
||||
}
|
||||
|
||||
// Check where account changesets are stored (MDBX path)
|
||||
if EitherWriter::account_changesets_destination(provider).is_static_file() {
|
||||
self.prune_static_files(provider, input, range, range_end)
|
||||
} else {
|
||||
@@ -94,6 +101,8 @@ impl AccountHistory {
|
||||
input.limiter
|
||||
};
|
||||
|
||||
// The limiter may already be exhausted from a previous segment in the same prune run.
|
||||
// Early exit avoids unnecessary iteration when no budget remains.
|
||||
if limiter.is_limit_reached() {
|
||||
return Ok(SegmentOutput::not_done(
|
||||
limiter.interrupt_reason(),
|
||||
@@ -101,11 +110,14 @@ impl AccountHistory {
|
||||
))
|
||||
}
|
||||
|
||||
// The size of this map it's limited by `prune_delete_limit * blocks_since_last_run /
|
||||
// ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with the current defaults it's usually `3500 * 5 /
|
||||
// 2`, so 8750 entries. Each entry is `160 bit + 64 bit`, so the total size should be up to
|
||||
// ~0.25MB + some hashmap overhead. `blocks_since_last_run` is additionally limited by the
|
||||
// `max_reorg_depth`, so no OOM is expected here.
|
||||
// Deleted account changeset keys (account addresses) with the highest block number deleted
|
||||
// for that key.
|
||||
//
|
||||
// The size of this map is limited by `prune_delete_limit * blocks_since_last_run /
|
||||
// ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with current default it's usually `3500 * 5
|
||||
// / 2`, so 8750 entries. Each entry is `160 bit + 64 bit`, so the total
|
||||
// size should be up to ~0.25MB + some hashmap overhead. `blocks_since_last_run` is
|
||||
// additionally limited by the `max_reorg_depth`, so no OOM is expected here.
|
||||
let mut highest_deleted_accounts = FxHashMap::default();
|
||||
let mut last_changeset_pruned_block = None;
|
||||
let mut pruned_changesets = 0;
|
||||
@@ -124,8 +136,8 @@ impl AccountHistory {
|
||||
limiter.increment_deleted_entries_count();
|
||||
}
|
||||
|
||||
// Delete static file jars below the pruned block
|
||||
if let Some(last_block) = last_changeset_pruned_block {
|
||||
// Delete static file jars only when fully processed
|
||||
if done && let Some(last_block) = last_changeset_pruned_block {
|
||||
provider
|
||||
.static_file_provider()
|
||||
.delete_segment_below_block(StaticFileSegment::AccountChangeSets, last_block + 1)?;
|
||||
@@ -210,6 +222,106 @@ impl AccountHistory {
|
||||
)
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
/// Prunes account history when indices are stored in `RocksDB`.
|
||||
///
|
||||
/// Reads account changesets from static files and prunes the corresponding
|
||||
/// `RocksDB` history shards.
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
fn prune_rocksdb<Provider>(
|
||||
&self,
|
||||
provider: &Provider,
|
||||
input: PruneInput,
|
||||
range: std::ops::RangeInclusive<BlockNumber>,
|
||||
range_end: BlockNumber,
|
||||
) -> Result<SegmentOutput, PrunerError>
|
||||
where
|
||||
Provider: DBProvider + StaticFileProviderFactory + ChangeSetReader + RocksDBProviderFactory,
|
||||
{
|
||||
use reth_provider::PruneShardOutcome;
|
||||
|
||||
// Unlike MDBX path, we don't divide the limit by 2 because RocksDB path only prunes
|
||||
// history shards (no separate changeset table to delete from). The changesets are in
|
||||
// static files which are deleted separately.
|
||||
let mut limiter = input.limiter;
|
||||
|
||||
if limiter.is_limit_reached() {
|
||||
return Ok(SegmentOutput::not_done(
|
||||
limiter.interrupt_reason(),
|
||||
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
|
||||
))
|
||||
}
|
||||
|
||||
let mut highest_deleted_accounts = FxHashMap::default();
|
||||
let mut last_changeset_pruned_block = None;
|
||||
let mut changesets_processed = 0usize;
|
||||
let mut done = true;
|
||||
|
||||
// Walk account changesets from static files using a streaming iterator.
|
||||
// For each changeset, track the highest block number seen for each address
|
||||
// to determine which history shard entries need pruning.
|
||||
let walker = StaticFileAccountChangesetWalker::new(provider, range);
|
||||
for result in walker {
|
||||
if limiter.is_limit_reached() {
|
||||
done = false;
|
||||
break;
|
||||
}
|
||||
let (block_number, changeset) = result?;
|
||||
highest_deleted_accounts.insert(changeset.address, block_number);
|
||||
last_changeset_pruned_block = Some(block_number);
|
||||
changesets_processed += 1;
|
||||
limiter.increment_deleted_entries_count();
|
||||
}
|
||||
trace!(target: "pruner", processed = %changesets_processed, %done, "Scanned account changesets from static files");
|
||||
|
||||
let last_changeset_pruned_block = last_changeset_pruned_block
|
||||
.map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
|
||||
.unwrap_or(range_end);
|
||||
|
||||
// Prune RocksDB history shards for affected accounts
|
||||
let mut deleted_shards = 0usize;
|
||||
let mut updated_shards = 0usize;
|
||||
|
||||
// Sort by address for better RocksDB cache locality
|
||||
let mut sorted_accounts: Vec<_> = highest_deleted_accounts.into_iter().collect();
|
||||
sorted_accounts.sort_unstable_by_key(|(addr, _)| *addr);
|
||||
|
||||
provider.with_rocksdb_batch(|mut batch| {
|
||||
for (address, highest_block) in &sorted_accounts {
|
||||
let prune_to = (*highest_block).min(last_changeset_pruned_block);
|
||||
match batch.prune_account_history_to(*address, prune_to)? {
|
||||
PruneShardOutcome::Deleted => deleted_shards += 1,
|
||||
PruneShardOutcome::Updated => updated_shards += 1,
|
||||
PruneShardOutcome::Unchanged => {}
|
||||
}
|
||||
}
|
||||
Ok(((), Some(batch.into_inner())))
|
||||
})?;
|
||||
trace!(target: "pruner", deleted = deleted_shards, updated = updated_shards, %done, "Pruned account history (RocksDB indices)");
|
||||
|
||||
// Delete static file jars only when fully processed. During provider.commit(), RocksDB
|
||||
// batch is committed before the MDBX checkpoint. If crash occurs after RocksDB commit
|
||||
// but before MDBX commit, on restart the pruner checkpoint indicates data needs
|
||||
// re-pruning, but the RocksDB shards are already pruned - this is safe because pruning
|
||||
// is idempotent (re-pruning already-pruned shards is a no-op).
|
||||
if done {
|
||||
provider.static_file_provider().delete_segment_below_block(
|
||||
StaticFileSegment::AccountChangeSets,
|
||||
last_changeset_pruned_block + 1,
|
||||
)?;
|
||||
}
|
||||
|
||||
let progress = limiter.progress(done);
|
||||
|
||||
Ok(SegmentOutput {
|
||||
progress,
|
||||
pruned: changesets_processed + deleted_shards + updated_shards,
|
||||
checkpoint: Some(SegmentOutputCheckpoint {
|
||||
block_number: Some(last_changeset_pruned_block),
|
||||
tx_number: None,
|
||||
}),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -539,4 +651,272 @@ mod tests {
|
||||
test_prune(998, 2, (PruneProgress::Finished, 1000));
|
||||
test_prune(1400, 3, (PruneProgress::Finished, 804));
|
||||
}
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
#[test]
|
||||
fn prune_rocksdb_path() {
|
||||
use reth_db_api::models::ShardedKey;
|
||||
use reth_provider::{RocksDBProviderFactory, StaticFileProviderFactory};
|
||||
|
||||
let db = TestStageDB::default();
|
||||
let mut rng = generators::rng();
|
||||
|
||||
let blocks = random_block_range(
|
||||
&mut rng,
|
||||
0..=100,
|
||||
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
|
||||
);
|
||||
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
|
||||
|
||||
let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
|
||||
|
||||
let (changesets, _) = random_changeset_range(
|
||||
&mut rng,
|
||||
blocks.iter(),
|
||||
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
|
||||
0..0,
|
||||
0..0,
|
||||
);
|
||||
|
||||
db.insert_changesets_to_static_files(changesets.clone(), None)
|
||||
.expect("insert changesets to static files");
|
||||
|
||||
let mut account_blocks: BTreeMap<_, Vec<u64>> = BTreeMap::new();
|
||||
for (block, changeset) in changesets.iter().enumerate() {
|
||||
for (address, _, _) in changeset {
|
||||
account_blocks.entry(*address).or_default().push(block as u64);
|
||||
}
|
||||
}
|
||||
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let mut batch = rocksdb.batch();
|
||||
for (address, block_numbers) in &account_blocks {
|
||||
let shard = BlockNumberList::new_pre_sorted(block_numbers.iter().copied());
|
||||
batch
|
||||
.put::<tables::AccountsHistory>(ShardedKey::new(*address, u64::MAX), &shard)
|
||||
.unwrap();
|
||||
}
|
||||
batch.commit().unwrap();
|
||||
|
||||
for (address, expected_blocks) in &account_blocks {
|
||||
let shards = rocksdb.account_history_shards(*address).unwrap();
|
||||
assert_eq!(shards.len(), 1);
|
||||
assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), *expected_blocks);
|
||||
}
|
||||
|
||||
let to_block: BlockNumber = 50;
|
||||
let prune_mode = PruneMode::Before(to_block);
|
||||
let input =
|
||||
PruneInput { previous_checkpoint: None, to_block, limiter: PruneLimiter::default() };
|
||||
let segment = AccountHistory::new(prune_mode);
|
||||
|
||||
db.factory.set_storage_settings_cache(
|
||||
StorageSettings::default()
|
||||
.with_account_changesets_in_static_files(true)
|
||||
.with_account_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
let result = segment.prune(&provider, input).unwrap();
|
||||
provider.commit().expect("commit");
|
||||
|
||||
assert_matches!(
|
||||
result,
|
||||
SegmentOutput { progress: PruneProgress::Finished, pruned, checkpoint: Some(_) }
|
||||
if pruned > 0
|
||||
);
|
||||
|
||||
for (address, original_blocks) in &account_blocks {
|
||||
let shards = rocksdb.account_history_shards(*address).unwrap();
|
||||
|
||||
let expected_blocks: Vec<u64> =
|
||||
original_blocks.iter().copied().filter(|b| *b > to_block).collect();
|
||||
|
||||
if expected_blocks.is_empty() {
|
||||
assert!(
|
||||
shards.is_empty(),
|
||||
"Expected no shards for address {address:?} after pruning"
|
||||
);
|
||||
} else {
|
||||
assert_eq!(shards.len(), 1, "Expected 1 shard for address {address:?}");
|
||||
assert_eq!(
|
||||
shards[0].1.iter().collect::<Vec<_>>(),
|
||||
expected_blocks,
|
||||
"Shard blocks mismatch for address {address:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let static_file_provider = db.factory.static_file_provider();
|
||||
let highest_block = static_file_provider.get_highest_static_file_block(
|
||||
reth_static_file_types::StaticFileSegment::AccountChangeSets,
|
||||
);
|
||||
if let Some(block) = highest_block {
|
||||
assert!(
|
||||
block > to_block,
|
||||
"Static files should only contain blocks above to_block ({to_block}), got {block}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Tests that when a limiter stops mid-block (with multiple changes for the same block),
|
||||
/// the checkpoint is set to `block_number - 1` to avoid dangling index entries.
|
||||
#[test]
|
||||
fn prune_partial_progress_mid_block() {
|
||||
use alloy_primitives::{Address, U256};
|
||||
use reth_primitives_traits::Account;
|
||||
use reth_testing_utils::generators::ChangeSet;
|
||||
|
||||
let db = TestStageDB::default();
|
||||
let mut rng = generators::rng();
|
||||
|
||||
// Create blocks 0..=10
|
||||
let blocks = random_block_range(
|
||||
&mut rng,
|
||||
0..=10,
|
||||
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
|
||||
);
|
||||
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
|
||||
|
||||
// Create specific changesets where block 5 has 4 account changes
|
||||
let addr1 = Address::with_last_byte(1);
|
||||
let addr2 = Address::with_last_byte(2);
|
||||
let addr3 = Address::with_last_byte(3);
|
||||
let addr4 = Address::with_last_byte(4);
|
||||
let addr5 = Address::with_last_byte(5);
|
||||
|
||||
let account = Account { nonce: 1, balance: U256::from(100), bytecode_hash: None };
|
||||
|
||||
// Build changesets: blocks 0-4 have 1 change each, block 5 has 4 changes, block 6 has 1
|
||||
let changesets: Vec<ChangeSet> = vec![
|
||||
vec![(addr1, account, vec![])], // block 0
|
||||
vec![(addr1, account, vec![])], // block 1
|
||||
vec![(addr1, account, vec![])], // block 2
|
||||
vec![(addr1, account, vec![])], // block 3
|
||||
vec![(addr1, account, vec![])], // block 4
|
||||
// block 5: 4 different account changes (sorted by address for consistency)
|
||||
vec![
|
||||
(addr1, account, vec![]),
|
||||
(addr2, account, vec![]),
|
||||
(addr3, account, vec![]),
|
||||
(addr4, account, vec![]),
|
||||
],
|
||||
vec![(addr5, account, vec![])], // block 6
|
||||
];
|
||||
|
||||
db.insert_changesets(changesets.clone(), None).expect("insert changesets");
|
||||
db.insert_history(changesets.clone(), None).expect("insert history");
|
||||
|
||||
// Total changesets: 5 (blocks 0-4) + 4 (block 5) + 1 (block 6) = 10
|
||||
assert_eq!(
|
||||
db.table::<tables::AccountChangeSets>().unwrap().len(),
|
||||
changesets.iter().flatten().count()
|
||||
);
|
||||
|
||||
let prune_mode = PruneMode::Before(10);
|
||||
|
||||
// Set limiter to stop after 7 entries (mid-block 5: 5 from blocks 0-4, then 2 of 4 from
|
||||
// block 5). Due to ACCOUNT_HISTORY_TABLES_TO_PRUNE=2, actual limit is 7/2=3
|
||||
// changesets. So we'll process blocks 0, 1, 2 (3 changesets), stopping before block
|
||||
// 3. Actually, let's use a higher limit to reach block 5. With limit=14, we get 7
|
||||
// changeset slots. Blocks 0-4 use 5 slots, leaving 2 for block 5 (which has 4), so
|
||||
// we stop mid-block 5.
|
||||
let deleted_entries_limit = 14; // 14/2 = 7 changeset entries before limit
|
||||
let limiter = PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
|
||||
|
||||
let input = PruneInput { previous_checkpoint: None, to_block: 10, limiter };
|
||||
let segment = AccountHistory::new(prune_mode);
|
||||
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
provider.set_storage_settings_cache(
|
||||
StorageSettings::default().with_account_changesets_in_static_files(false),
|
||||
);
|
||||
let result = segment.prune(&provider, input).unwrap();
|
||||
|
||||
// Should report that there's more data
|
||||
assert!(!result.progress.is_finished(), "Expected HasMoreData since we stopped mid-block");
|
||||
|
||||
// Save checkpoint and commit
|
||||
segment
|
||||
.save_checkpoint(&provider, result.checkpoint.unwrap().as_prune_checkpoint(prune_mode))
|
||||
.unwrap();
|
||||
provider.commit().expect("commit");
|
||||
|
||||
// Verify checkpoint is set to block 4 (not 5), since block 5 is incomplete
|
||||
let checkpoint = db
|
||||
.factory
|
||||
.provider()
|
||||
.unwrap()
|
||||
.get_prune_checkpoint(PruneSegment::AccountHistory)
|
||||
.unwrap()
|
||||
.expect("checkpoint should exist");
|
||||
|
||||
assert_eq!(
|
||||
checkpoint.block_number,
|
||||
Some(4),
|
||||
"Checkpoint should be block 4 (block before incomplete block 5)"
|
||||
);
|
||||
|
||||
// Verify remaining changesets (block 5 and 6 should still have entries)
|
||||
let remaining_changesets = db.table::<tables::AccountChangeSets>().unwrap();
|
||||
// After pruning blocks 0-4, remaining should be block 5 (4 entries) + block 6 (1 entry) = 5
|
||||
// But since we stopped mid-block 5, some of block 5 might be pruned
|
||||
// However, checkpoint is 4, so on re-run we should re-process from block 5
|
||||
assert!(
|
||||
!remaining_changesets.is_empty(),
|
||||
"Should have remaining changesets for blocks 5-6"
|
||||
);
|
||||
|
||||
// Verify no dangling history indices for blocks that weren't fully pruned
|
||||
// The indices for block 5 should still reference blocks <= 5 appropriately
|
||||
let history = db.table::<tables::AccountsHistory>().unwrap();
|
||||
for (key, _blocks) in &history {
|
||||
// All blocks in the history should be > checkpoint block number
|
||||
// OR the shard's highest_block_number should be > checkpoint
|
||||
assert!(
|
||||
key.highest_block_number > 4,
|
||||
"Found stale history shard with highest_block_number {} <= checkpoint 4",
|
||||
key.highest_block_number
|
||||
);
|
||||
}
|
||||
|
||||
// Run prune again to complete - should finish processing block 5 and 6
|
||||
let input2 = PruneInput {
|
||||
previous_checkpoint: Some(checkpoint),
|
||||
to_block: 10,
|
||||
limiter: PruneLimiter::default().set_deleted_entries_limit(100), // high limit
|
||||
};
|
||||
|
||||
let provider2 = db.factory.database_provider_rw().unwrap();
|
||||
provider2.set_storage_settings_cache(
|
||||
StorageSettings::default().with_account_changesets_in_static_files(false),
|
||||
);
|
||||
let result2 = segment.prune(&provider2, input2).unwrap();
|
||||
|
||||
assert!(result2.progress.is_finished(), "Second run should complete");
|
||||
|
||||
segment
|
||||
.save_checkpoint(
|
||||
&provider2,
|
||||
result2.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
|
||||
)
|
||||
.unwrap();
|
||||
provider2.commit().expect("commit");
|
||||
|
||||
// Verify final checkpoint
|
||||
let final_checkpoint = db
|
||||
.factory
|
||||
.provider()
|
||||
.unwrap()
|
||||
.get_prune_checkpoint(PruneSegment::AccountHistory)
|
||||
.unwrap()
|
||||
.expect("checkpoint should exist");
|
||||
|
||||
// Should now be at block 6 (the last block with changesets)
|
||||
assert_eq!(final_checkpoint.block_number, Some(6), "Final checkpoint should be at block 6");
|
||||
|
||||
// All changesets should be pruned
|
||||
let final_changesets = db.table::<tables::AccountChangeSets>().unwrap();
|
||||
assert!(final_changesets.is_empty(), "All changesets up to block 10 should be pruned");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ use reth_db_api::{
|
||||
tables,
|
||||
transaction::DbTxMut,
|
||||
};
|
||||
use reth_provider::{DBProvider, EitherWriter, StaticFileProviderFactory};
|
||||
use reth_provider::{DBProvider, EitherWriter, RocksDBProviderFactory, StaticFileProviderFactory};
|
||||
use reth_prune_types::{
|
||||
PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
|
||||
};
|
||||
@@ -43,7 +43,8 @@ where
|
||||
Provider: DBProvider<Tx: DbTxMut>
|
||||
+ StaticFileProviderFactory
|
||||
+ StorageChangeSetReader
|
||||
+ StorageSettingsCache,
|
||||
+ StorageSettingsCache
|
||||
+ RocksDBProviderFactory,
|
||||
{
|
||||
fn segment(&self) -> PruneSegment {
|
||||
PruneSegment::StorageHistory
|
||||
@@ -68,6 +69,13 @@ where
|
||||
};
|
||||
let range_end = *range.end();
|
||||
|
||||
// Check where storage history indices are stored
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if provider.cached_storage_settings().storages_history_in_rocksdb {
|
||||
return self.prune_rocksdb(provider, input, range, range_end);
|
||||
}
|
||||
|
||||
// Check where storage changesets are stored (MDBX path)
|
||||
if EitherWriter::storage_changesets_destination(provider).is_static_file() {
|
||||
self.prune_static_files(provider, input, range, range_end)
|
||||
} else {
|
||||
@@ -94,6 +102,8 @@ impl StorageHistory {
|
||||
input.limiter
|
||||
};
|
||||
|
||||
// The limiter may already be exhausted from a previous segment in the same prune run.
|
||||
// Early exit avoids unnecessary iteration when no budget remains.
|
||||
if limiter.is_limit_reached() {
|
||||
return Ok(SegmentOutput::not_done(
|
||||
limiter.interrupt_reason(),
|
||||
@@ -126,8 +136,8 @@ impl StorageHistory {
|
||||
limiter.increment_deleted_entries_count();
|
||||
}
|
||||
|
||||
// Delete static file jars below the pruned block
|
||||
if let Some(last_block) = last_changeset_pruned_block {
|
||||
// Delete static file jars only when fully processed
|
||||
if done && let Some(last_block) = last_changeset_pruned_block {
|
||||
provider
|
||||
.static_file_provider()
|
||||
.delete_segment_below_block(StaticFileSegment::StorageChangeSets, last_block + 1)?;
|
||||
@@ -216,6 +226,107 @@ impl StorageHistory {
|
||||
)
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
/// Prunes storage history when indices are stored in `RocksDB`.
|
||||
///
|
||||
/// Reads storage changesets from static files and prunes the corresponding
|
||||
/// `RocksDB` history shards.
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
fn prune_rocksdb<Provider>(
|
||||
&self,
|
||||
provider: &Provider,
|
||||
input: PruneInput,
|
||||
range: std::ops::RangeInclusive<BlockNumber>,
|
||||
range_end: BlockNumber,
|
||||
) -> Result<SegmentOutput, PrunerError>
|
||||
where
|
||||
Provider: DBProvider + StaticFileProviderFactory + RocksDBProviderFactory,
|
||||
{
|
||||
use reth_provider::PruneShardOutcome;
|
||||
|
||||
let mut limiter = input.limiter;
|
||||
|
||||
if limiter.is_limit_reached() {
|
||||
return Ok(SegmentOutput::not_done(
|
||||
limiter.interrupt_reason(),
|
||||
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
|
||||
))
|
||||
}
|
||||
|
||||
let mut highest_deleted_storages: FxHashMap<_, _> = FxHashMap::default();
|
||||
let mut last_changeset_pruned_block = None;
|
||||
let mut changesets_processed = 0usize;
|
||||
let mut done = true;
|
||||
|
||||
// Walk storage changesets from static files using a streaming iterator.
|
||||
// For each changeset, track the highest block number seen for each (address, storage_key)
|
||||
// pair to determine which history shard entries need pruning.
|
||||
let walker = provider.static_file_provider().walk_storage_changeset_range(range);
|
||||
for result in walker {
|
||||
if limiter.is_limit_reached() {
|
||||
done = false;
|
||||
break;
|
||||
}
|
||||
let (block_address, entry) = result?;
|
||||
let block_number = block_address.block_number();
|
||||
let address = block_address.address();
|
||||
highest_deleted_storages.insert((address, entry.key), block_number);
|
||||
last_changeset_pruned_block = Some(block_number);
|
||||
changesets_processed += 1;
|
||||
limiter.increment_deleted_entries_count();
|
||||
}
|
||||
|
||||
trace!(target: "pruner", processed = %changesets_processed, %done, "Scanned storage changesets from static files");
|
||||
|
||||
let last_changeset_pruned_block = last_changeset_pruned_block
|
||||
.map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
|
||||
.unwrap_or(range_end);
|
||||
|
||||
// Prune RocksDB history shards for affected storage slots
|
||||
let mut deleted_shards = 0usize;
|
||||
let mut updated_shards = 0usize;
|
||||
|
||||
// Sort by (address, storage_key) for better RocksDB cache locality
|
||||
let mut sorted_storages: Vec<_> = highest_deleted_storages.into_iter().collect();
|
||||
sorted_storages.sort_unstable_by_key(|((addr, key), _)| (*addr, *key));
|
||||
|
||||
provider.with_rocksdb_batch(|mut batch| {
|
||||
for ((address, storage_key), highest_block) in &sorted_storages {
|
||||
let prune_to = (*highest_block).min(last_changeset_pruned_block);
|
||||
match batch.prune_storage_history_to(*address, *storage_key, prune_to)? {
|
||||
PruneShardOutcome::Deleted => deleted_shards += 1,
|
||||
PruneShardOutcome::Updated => updated_shards += 1,
|
||||
PruneShardOutcome::Unchanged => {}
|
||||
}
|
||||
}
|
||||
Ok(((), Some(batch.into_inner())))
|
||||
})?;
|
||||
|
||||
trace!(target: "pruner", deleted = deleted_shards, updated = updated_shards, %done, "Pruned storage history (RocksDB indices)");
|
||||
|
||||
// Delete static file jars only when fully processed. During provider.commit(), RocksDB
|
||||
// batch is committed before the MDBX checkpoint. If crash occurs after RocksDB commit
|
||||
// but before MDBX commit, on restart the pruner checkpoint indicates data needs
|
||||
// re-pruning, but the RocksDB shards are already pruned - this is safe because pruning
|
||||
// is idempotent (re-pruning already-pruned shards is a no-op).
|
||||
if done {
|
||||
provider.static_file_provider().delete_segment_below_block(
|
||||
StaticFileSegment::StorageChangeSets,
|
||||
last_changeset_pruned_block + 1,
|
||||
)?;
|
||||
}
|
||||
|
||||
let progress = limiter.progress(done);
|
||||
|
||||
Ok(SegmentOutput {
|
||||
progress,
|
||||
pruned: changesets_processed + deleted_shards + updated_shards,
|
||||
checkpoint: Some(SegmentOutputCheckpoint {
|
||||
block_number: Some(last_changeset_pruned_block),
|
||||
tx_number: None,
|
||||
}),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -553,4 +664,270 @@ mod tests {
|
||||
test_prune(998, 2, (PruneProgress::Finished, 500));
|
||||
test_prune(1200, 3, (PruneProgress::Finished, 202));
|
||||
}
|
||||
|
||||
/// Tests that when a limiter stops mid-block (with multiple storage changes for the same
|
||||
/// block), the checkpoint is set to `block_number - 1` to avoid dangling index entries.
|
||||
#[test]
|
||||
fn prune_partial_progress_mid_block() {
|
||||
use alloy_primitives::{Address, U256};
|
||||
use reth_primitives_traits::Account;
|
||||
use reth_testing_utils::generators::ChangeSet;
|
||||
|
||||
let db = TestStageDB::default();
|
||||
let mut rng = generators::rng();
|
||||
|
||||
// Create blocks 0..=10
|
||||
let blocks = random_block_range(
|
||||
&mut rng,
|
||||
0..=10,
|
||||
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
|
||||
);
|
||||
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
|
||||
|
||||
// Create specific changesets where block 5 has 4 storage changes
|
||||
let addr1 = Address::with_last_byte(1);
|
||||
let addr2 = Address::with_last_byte(2);
|
||||
|
||||
let account = Account { nonce: 1, balance: U256::from(100), bytecode_hash: None };
|
||||
|
||||
// Create storage entries
|
||||
let storage_entry = |key: u8| reth_primitives_traits::StorageEntry {
|
||||
key: B256::with_last_byte(key),
|
||||
value: U256::from(100),
|
||||
};
|
||||
|
||||
// Build changesets: blocks 0-4 have 1 storage change each, block 5 has 4 changes, block 6
|
||||
// has 1. Entries within each account must be sorted by key.
|
||||
let changesets: Vec<ChangeSet> = vec![
|
||||
vec![(addr1, account, vec![storage_entry(1)])], // block 0
|
||||
vec![(addr1, account, vec![storage_entry(1)])], // block 1
|
||||
vec![(addr1, account, vec![storage_entry(1)])], // block 2
|
||||
vec![(addr1, account, vec![storage_entry(1)])], // block 3
|
||||
vec![(addr1, account, vec![storage_entry(1)])], // block 4
|
||||
// block 5: 4 different storage changes (2 addresses, each with 2 storage slots)
|
||||
// Sorted by address, then by storage key within each address
|
||||
vec![
|
||||
(addr1, account, vec![storage_entry(1), storage_entry(2)]),
|
||||
(addr2, account, vec![storage_entry(1), storage_entry(2)]),
|
||||
],
|
||||
vec![(addr1, account, vec![storage_entry(3)])], // block 6
|
||||
];
|
||||
|
||||
db.insert_changesets(changesets.clone(), None).expect("insert changesets");
|
||||
db.insert_history(changesets.clone(), None).expect("insert history");
|
||||
|
||||
// Total storage changesets
|
||||
let total_storage_entries: usize =
|
||||
changesets.iter().flat_map(|c| c.iter()).map(|(_, _, entries)| entries.len()).sum();
|
||||
assert_eq!(db.table::<tables::StorageChangeSets>().unwrap().len(), total_storage_entries);
|
||||
|
||||
let prune_mode = PruneMode::Before(10);
|
||||
|
||||
// Set limiter to stop mid-block 5
|
||||
// With STORAGE_HISTORY_TABLES_TO_PRUNE=2, limit=14 gives us 7 storage entries before limit
|
||||
// Blocks 0-4 use 5 slots, leaving 2 for block 5 (which has 4), so we stop mid-block 5
|
||||
let deleted_entries_limit = 14; // 14/2 = 7 storage entries before limit
|
||||
let limiter = PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
|
||||
|
||||
let input = PruneInput { previous_checkpoint: None, to_block: 10, limiter };
|
||||
let segment = StorageHistory::new(prune_mode);
|
||||
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
provider.set_storage_settings_cache(
|
||||
StorageSettings::default().with_storage_changesets_in_static_files(false),
|
||||
);
|
||||
let result = segment.prune(&provider, input).unwrap();
|
||||
|
||||
// Should report that there's more data
|
||||
assert!(!result.progress.is_finished(), "Expected HasMoreData since we stopped mid-block");
|
||||
|
||||
// Save checkpoint and commit
|
||||
segment
|
||||
.save_checkpoint(&provider, result.checkpoint.unwrap().as_prune_checkpoint(prune_mode))
|
||||
.unwrap();
|
||||
provider.commit().expect("commit");
|
||||
|
||||
// Verify checkpoint is set to block 4 (not 5), since block 5 is incomplete
|
||||
let checkpoint = db
|
||||
.factory
|
||||
.provider()
|
||||
.unwrap()
|
||||
.get_prune_checkpoint(PruneSegment::StorageHistory)
|
||||
.unwrap()
|
||||
.expect("checkpoint should exist");
|
||||
|
||||
assert_eq!(
|
||||
checkpoint.block_number,
|
||||
Some(4),
|
||||
"Checkpoint should be block 4 (block before incomplete block 5)"
|
||||
);
|
||||
|
||||
// Verify remaining changesets
|
||||
let remaining_changesets = db.table::<tables::StorageChangeSets>().unwrap();
|
||||
assert!(
|
||||
!remaining_changesets.is_empty(),
|
||||
"Should have remaining changesets for blocks 5-6"
|
||||
);
|
||||
|
||||
// Verify no dangling history indices for blocks that weren't fully pruned
|
||||
let history = db.table::<tables::StoragesHistory>().unwrap();
|
||||
for (key, _blocks) in &history {
|
||||
assert!(
|
||||
key.sharded_key.highest_block_number > 4,
|
||||
"Found stale history shard with highest_block_number {} <= checkpoint 4",
|
||||
key.sharded_key.highest_block_number
|
||||
);
|
||||
}
|
||||
|
||||
// Run prune again to complete - should finish processing block 5 and 6
|
||||
let input2 = PruneInput {
|
||||
previous_checkpoint: Some(checkpoint),
|
||||
to_block: 10,
|
||||
limiter: PruneLimiter::default().set_deleted_entries_limit(100), // high limit
|
||||
};
|
||||
|
||||
let provider2 = db.factory.database_provider_rw().unwrap();
|
||||
provider2.set_storage_settings_cache(
|
||||
StorageSettings::default().with_storage_changesets_in_static_files(false),
|
||||
);
|
||||
let result2 = segment.prune(&provider2, input2).unwrap();
|
||||
|
||||
assert!(result2.progress.is_finished(), "Second run should complete");
|
||||
|
||||
segment
|
||||
.save_checkpoint(
|
||||
&provider2,
|
||||
result2.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
|
||||
)
|
||||
.unwrap();
|
||||
provider2.commit().expect("commit");
|
||||
|
||||
// Verify final checkpoint
|
||||
let final_checkpoint = db
|
||||
.factory
|
||||
.provider()
|
||||
.unwrap()
|
||||
.get_prune_checkpoint(PruneSegment::StorageHistory)
|
||||
.unwrap()
|
||||
.expect("checkpoint should exist");
|
||||
|
||||
// Should now be at block 6 (the last block with changesets)
|
||||
assert_eq!(final_checkpoint.block_number, Some(6), "Final checkpoint should be at block 6");
|
||||
|
||||
// All changesets should be pruned
|
||||
let final_changesets = db.table::<tables::StorageChangeSets>().unwrap();
|
||||
assert!(final_changesets.is_empty(), "All changesets up to block 10 should be pruned");
|
||||
}
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
#[test]
|
||||
fn prune_rocksdb() {
|
||||
use reth_db_api::models::storage_sharded_key::StorageShardedKey;
|
||||
use reth_provider::RocksDBProviderFactory;
|
||||
use reth_storage_api::StorageSettings;
|
||||
|
||||
let db = TestStageDB::default();
|
||||
let mut rng = generators::rng();
|
||||
|
||||
let blocks = random_block_range(
|
||||
&mut rng,
|
||||
0..=100,
|
||||
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
|
||||
);
|
||||
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
|
||||
|
||||
let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
|
||||
|
||||
let (changesets, _) = random_changeset_range(
|
||||
&mut rng,
|
||||
blocks.iter(),
|
||||
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
|
||||
1..2,
|
||||
1..2,
|
||||
);
|
||||
|
||||
db.insert_changesets_to_static_files(changesets.clone(), None)
|
||||
.expect("insert changesets to static files");
|
||||
|
||||
let mut storage_indices: BTreeMap<(alloy_primitives::Address, B256), Vec<u64>> =
|
||||
BTreeMap::new();
|
||||
for (block, changeset) in changesets.iter().enumerate() {
|
||||
for (address, _, storage_entries) in changeset {
|
||||
for entry in storage_entries {
|
||||
storage_indices.entry((*address, entry.key)).or_default().push(block as u64);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let mut batch = rocksdb.batch();
|
||||
for ((address, storage_key), block_numbers) in &storage_indices {
|
||||
let shard = BlockNumberList::new_pre_sorted(block_numbers.clone());
|
||||
batch
|
||||
.put::<tables::StoragesHistory>(
|
||||
StorageShardedKey::last(*address, *storage_key),
|
||||
&shard,
|
||||
)
|
||||
.expect("insert storage history shard");
|
||||
}
|
||||
batch.commit().expect("commit rocksdb batch");
|
||||
}
|
||||
|
||||
{
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
for (address, storage_key) in storage_indices.keys() {
|
||||
let shards = rocksdb.storage_history_shards(*address, *storage_key).unwrap();
|
||||
assert!(!shards.is_empty(), "RocksDB should contain storage history before prune");
|
||||
}
|
||||
}
|
||||
|
||||
let to_block = 50u64;
|
||||
let prune_mode = PruneMode::Before(to_block);
|
||||
let input =
|
||||
PruneInput { previous_checkpoint: None, to_block, limiter: PruneLimiter::default() };
|
||||
let segment = StorageHistory::new(prune_mode);
|
||||
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
provider.set_storage_settings_cache(
|
||||
StorageSettings::default()
|
||||
.with_storage_changesets_in_static_files(true)
|
||||
.with_storages_history_in_rocksdb(true),
|
||||
);
|
||||
let result = segment.prune(&provider, input).unwrap();
|
||||
provider.commit().expect("commit");
|
||||
|
||||
assert_matches!(
|
||||
result,
|
||||
SegmentOutput { progress: PruneProgress::Finished, checkpoint: Some(_), .. }
|
||||
);
|
||||
|
||||
{
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
for ((address, storage_key), block_numbers) in &storage_indices {
|
||||
let shards = rocksdb.storage_history_shards(*address, *storage_key).unwrap();
|
||||
|
||||
let remaining_blocks: Vec<u64> =
|
||||
block_numbers.iter().copied().filter(|&b| b > to_block).collect();
|
||||
|
||||
if remaining_blocks.is_empty() {
|
||||
assert!(
|
||||
shards.is_empty(),
|
||||
"Shard for {:?}/{:?} should be deleted when all blocks pruned",
|
||||
address,
|
||||
storage_key
|
||||
);
|
||||
} else {
|
||||
assert!(!shards.is_empty(), "Shard should exist with remaining blocks");
|
||||
let actual_blocks: Vec<u64> =
|
||||
shards.iter().flat_map(|(_, list)| list.iter()).collect();
|
||||
assert_eq!(
|
||||
actual_blocks, remaining_blocks,
|
||||
"RocksDB shard should only contain blocks > {}",
|
||||
to_block
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ use reth_db_api::{table::Value, transaction::DbTxMut};
|
||||
use reth_primitives_traits::NodePrimitives;
|
||||
use reth_provider::{
|
||||
BlockReader, ChainStateBlockReader, DBProvider, PruneCheckpointReader, PruneCheckpointWriter,
|
||||
StageCheckpointReader, StaticFileProviderFactory, StorageSettingsCache,
|
||||
RocksDBProviderFactory, StageCheckpointReader, StaticFileProviderFactory,
|
||||
};
|
||||
use reth_prune::{
|
||||
PruneMode, PruneModes, PruneSegment, PrunerBuilder, SegmentOutput, SegmentOutputCheckpoint,
|
||||
@@ -10,7 +10,7 @@ use reth_prune::{
|
||||
use reth_stages_api::{
|
||||
ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
|
||||
};
|
||||
use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
|
||||
use reth_storage_api::{ChangeSetReader, StorageChangeSetReader, StorageSettingsCache};
|
||||
use tracing::info;
|
||||
|
||||
/// The prune stage that runs the pruner with the provided prune modes.
|
||||
@@ -49,7 +49,8 @@ where
|
||||
Primitives: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>,
|
||||
> + StorageSettingsCache
|
||||
+ ChangeSetReader
|
||||
+ StorageChangeSetReader,
|
||||
+ StorageChangeSetReader
|
||||
+ RocksDBProviderFactory,
|
||||
{
|
||||
fn id(&self) -> StageId {
|
||||
StageId::Prune
|
||||
@@ -156,7 +157,8 @@ where
|
||||
Primitives: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>,
|
||||
> + StorageSettingsCache
|
||||
+ ChangeSetReader
|
||||
+ StorageChangeSetReader,
|
||||
+ StorageChangeSetReader
|
||||
+ RocksDBProviderFactory,
|
||||
{
|
||||
fn id(&self) -> StageId {
|
||||
StageId::PruneSenderRecovery
|
||||
|
||||
@@ -21,8 +21,8 @@ pub mod providers;
|
||||
pub use providers::{
|
||||
DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, HistoricalStateProvider,
|
||||
HistoricalStateProviderRef, LatestStateProvider, LatestStateProviderRef, ProviderFactory,
|
||||
SaveBlocksMode, StaticFileAccess, StaticFileProviderBuilder, StaticFileWriteCtx,
|
||||
StaticFileWriter,
|
||||
PruneShardOutcome, SaveBlocksMode, StaticFileAccess, StaticFileProviderBuilder,
|
||||
StaticFileWriteCtx, StaticFileWriter,
|
||||
};
|
||||
|
||||
pub mod changeset_walker;
|
||||
|
||||
@@ -39,8 +39,8 @@ pub use consistent::ConsistentProvider;
|
||||
pub(crate) mod rocksdb;
|
||||
|
||||
pub use rocksdb::{
|
||||
RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksDBRawIter, RocksDBStats, RocksDBTableStats,
|
||||
RocksTx,
|
||||
PruneShardOutcome, RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksDBRawIter, RocksDBStats,
|
||||
RocksDBTableStats, RocksTx,
|
||||
};
|
||||
|
||||
/// Helper trait to bound [`NodeTypes`] so that combined with database they satisfy
|
||||
|
||||
@@ -6,6 +6,6 @@ mod provider;
|
||||
|
||||
pub(crate) use provider::{PendingRocksDBBatches, RocksDBWriteCtx};
|
||||
pub use provider::{
|
||||
RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksDBRawIter, RocksDBStats, RocksDBTableStats,
|
||||
RocksTx,
|
||||
PruneShardOutcome, RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksDBRawIter, RocksDBStats,
|
||||
RocksDBTableStats, RocksTx,
|
||||
};
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -189,3 +189,14 @@ pub struct RocksTx;
|
||||
/// A stub raw iterator for `RocksDB`.
|
||||
#[derive(Debug)]
|
||||
pub struct RocksDBRawIter;
|
||||
|
||||
/// Outcome of pruning a history shard in RocksDB (stub).
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum PruneShardOutcome {
|
||||
/// Shard was deleted entirely.
|
||||
Deleted,
|
||||
/// Shard was updated with filtered block numbers.
|
||||
Updated,
|
||||
/// Shard was unchanged (no blocks <= `to_block`).
|
||||
Unchanged,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user