fix(pruner): prune account and storage changeset static files (#21346)

This commit is contained in:
Dan Cline
2026-01-26 19:28:18 +00:00
committed by GitHub
parent 7fe60017cf
commit 94235d64a8
10 changed files with 727 additions and 131 deletions

1
Cargo.lock generated
View File

@@ -10217,6 +10217,7 @@ dependencies = [
"reth-stages",
"reth-stages-types",
"reth-static-file-types",
"reth-storage-api",
"reth-testing-utils",
"reth-tokio-util",
"reth-tracing",

View File

@@ -17,6 +17,7 @@ reth-exex-types.workspace = true
reth-db-api.workspace = true
reth-errors.workspace = true
reth-provider.workspace = true
reth-storage-api.workspace = true
reth-tokio-util.workspace = true
reth-config.workspace = true
reth-prune-types.workspace = true

View File

@@ -10,6 +10,7 @@ use reth_provider::{
StageCheckpointReader, StaticFileProviderFactory, StorageSettingsCache,
};
use reth_prune_types::PruneModes;
use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
use std::time::Duration;
use tokio::sync::watch;
@@ -82,6 +83,8 @@ impl PrunerBuilder {
+ ChainStateBlockReader
+ StorageSettingsCache
+ StageCheckpointReader
+ ChangeSetReader
+ StorageChangeSetReader
+ StaticFileProviderFactory<
Primitives: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>,
>,
@@ -116,7 +119,9 @@ impl PrunerBuilder {
+ PruneCheckpointWriter
+ PruneCheckpointReader
+ StorageSettingsCache
+ StageCheckpointReader,
+ StageCheckpointReader
+ ChangeSetReader
+ StorageChangeSetReader,
{
let segments = SegmentSet::<Provider>::from_components(static_file_provider, self.segments);

View File

@@ -10,6 +10,7 @@ use reth_provider::{
PruneCheckpointReader, PruneCheckpointWriter, StaticFileProviderFactory, StorageSettingsCache,
};
use reth_prune_types::PruneModes;
use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
/// Collection of [`Segment`]. Thread-safe, allocated on the heap.
#[derive(Debug)]
@@ -52,7 +53,9 @@ where
+ PruneCheckpointReader
+ BlockReader<Transaction: Encodable2718>
+ ChainStateBlockReader
+ StorageSettingsCache,
+ StorageSettingsCache
+ ChangeSetReader
+ StorageChangeSetReader,
{
/// Creates a [`SegmentSet`] from an existing components, such as [`StaticFileProvider`] and
/// [`PruneModes`].

View File

@@ -1,14 +1,22 @@
use crate::{
db_ext::DbTxPruneExt,
segments::{user::history::prune_history_indices, PruneInput, Segment},
segments::{
user::history::{finalize_history_prune, HistoryPruneResult},
PruneInput, Segment,
},
PrunerError,
};
use itertools::Itertools;
use alloy_primitives::BlockNumber;
use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut};
use reth_provider::DBProvider;
use reth_provider::{
changeset_walker::StaticFileAccountChangesetWalker, DBProvider, EitherWriter,
StaticFileProviderFactory, StorageSettingsCache,
};
use reth_prune_types::{
PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
use reth_static_file_types::StaticFileSegment;
use reth_storage_api::ChangeSetReader;
use rustc_hash::FxHashMap;
use tracing::{instrument, trace};
@@ -31,7 +39,10 @@ impl AccountHistory {
impl<Provider> Segment<Provider> for AccountHistory
where
Provider: DBProvider<Tx: DbTxMut>,
Provider: DBProvider<Tx: DbTxMut>
+ StaticFileProviderFactory
+ StorageSettingsCache
+ ChangeSetReader,
{
fn segment(&self) -> PruneSegment {
PruneSegment::AccountHistory
@@ -56,11 +67,33 @@ where
};
let range_end = *range.end();
// Check where account changesets are stored
if EitherWriter::account_changesets_destination(provider).is_static_file() {
self.prune_static_files(provider, input, range, range_end)
} else {
self.prune_database(provider, input, range, range_end)
}
}
}
impl AccountHistory {
/// Prunes account history when changesets are stored in static files.
fn prune_static_files<Provider>(
&self,
provider: &Provider,
input: PruneInput,
range: std::ops::RangeInclusive<BlockNumber>,
range_end: BlockNumber,
) -> Result<SegmentOutput, PrunerError>
where
Provider: DBProvider<Tx: DbTxMut> + StaticFileProviderFactory + ChangeSetReader,
{
let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
input.limiter.set_deleted_entries_limit(limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE)
} else {
input.limiter
};
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
@@ -68,15 +101,86 @@ where
))
}
// The size of this map it's limited by `prune_delete_limit * blocks_since_last_run /
// ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with the current defaults it's usually `3500 * 5 /
// 2`, so 8750 entries. Each entry is `160 bit + 64 bit`, so the total size should be up to
// ~0.25MB + some hashmap overhead. `blocks_since_last_run` is additionally limited by the
// `max_reorg_depth`, so no OOM is expected here.
let mut highest_deleted_accounts = FxHashMap::default();
let mut last_changeset_pruned_block = None;
let mut pruned_changesets = 0;
let mut done = true;
let walker = StaticFileAccountChangesetWalker::new(provider, range);
for result in walker {
if limiter.is_limit_reached() {
done = false;
break;
}
let (block_number, changeset) = result?;
highest_deleted_accounts.insert(changeset.address, block_number);
last_changeset_pruned_block = Some(block_number);
pruned_changesets += 1;
limiter.increment_deleted_entries_count();
}
// Delete static file jars below the pruned block
if let Some(last_block) = last_changeset_pruned_block {
provider
.static_file_provider()
.delete_segment_below_block(StaticFileSegment::AccountChangeSets, last_block + 1)?;
}
trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets from static files)");
let result = HistoryPruneResult {
highest_deleted: highest_deleted_accounts,
last_pruned_block: last_changeset_pruned_block,
pruned_count: pruned_changesets,
done,
};
finalize_history_prune::<_, tables::AccountsHistory, _, _>(
provider,
result,
range_end,
&limiter,
ShardedKey::new,
|a, b| a.key == b.key,
)
.map_err(Into::into)
}
fn prune_database<Provider>(
&self,
provider: &Provider,
input: PruneInput,
range: std::ops::RangeInclusive<BlockNumber>,
range_end: BlockNumber,
) -> Result<SegmentOutput, PrunerError>
where
Provider: DBProvider<Tx: DbTxMut>,
{
let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
input.limiter.set_deleted_entries_limit(limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE)
} else {
input.limiter
};
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
))
}
// Deleted account changeset keys (account addresses) with the highest block number deleted
// for that key.
//
// The size of this map it's limited by `prune_delete_limit * blocks_since_last_run /
// ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with current default it's usually `3500 * 5
// / 2`, so 8750 entries. Each entry is `160 bit + 256 bit + 64 bit`, so the total
// size should be up to 0.5MB + some hashmap overhead. `blocks_since_last_run` is
// additionally limited by the `max_reorg_depth`, so no OOM is expected here.
// The size of this map is limited by `prune_delete_limit * blocks_since_last_run /
// ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with the current defaults it's usually `3500 * 5 /
// 2`, so 8750 entries. Each entry is `160 bit + 64 bit`, so the total size should be up to
// ~0.25MB + some hashmap overhead. `blocks_since_last_run` is additionally limited by the
// `max_reorg_depth`, so no OOM is expected here.
let mut last_changeset_pruned_block = None;
let mut highest_deleted_accounts = FxHashMap::default();
let (pruned_changesets, done) =
provider.tx_ref().prune_table_with_range::<tables::AccountChangeSets>(
@@ -88,69 +192,52 @@ where
last_changeset_pruned_block = Some(block_number);
},
)?;
trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets)");
trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets from database)");
let last_changeset_pruned_block = last_changeset_pruned_block
// If there's more account changesets to prune, set the checkpoint block number to
// previous, so we could finish pruning its account changesets on the next run.
.map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
.unwrap_or(range_end);
// Sort highest deleted block numbers by account address and turn them into sharded keys.
// We did not use `BTreeMap` from the beginning, because it's inefficient for hashes.
let highest_sharded_keys = highest_deleted_accounts
.into_iter()
.sorted_unstable() // Unstable is fine because no equal keys exist in the map
.map(|(address, block_number)| {
ShardedKey::new(address, block_number.min(last_changeset_pruned_block))
});
let outcomes = prune_history_indices::<Provider, tables::AccountsHistory, _>(
let result = HistoryPruneResult {
highest_deleted: highest_deleted_accounts,
last_pruned_block: last_changeset_pruned_block,
pruned_count: pruned_changesets,
done,
};
finalize_history_prune::<_, tables::AccountsHistory, _, _>(
provider,
highest_sharded_keys,
result,
range_end,
&limiter,
ShardedKey::new,
|a, b| a.key == b.key,
)?;
trace!(target: "pruner", ?outcomes, %done, "Pruned account history (indices)");
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
pruned: pruned_changesets + outcomes.deleted,
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,
}),
})
)
.map_err(Into::into)
}
}
#[cfg(test)]
mod tests {
use crate::segments::{
user::account_history::ACCOUNT_HISTORY_TABLES_TO_PRUNE, AccountHistory, PruneInput,
PruneLimiter, Segment, SegmentOutput,
};
use super::ACCOUNT_HISTORY_TABLES_TO_PRUNE;
use crate::segments::{AccountHistory, PruneInput, PruneLimiter, Segment, SegmentOutput};
use alloy_primitives::{BlockNumber, B256};
use assert_matches::assert_matches;
use reth_db_api::{tables, BlockNumberList};
use reth_db_api::{models::StorageSettings, tables, BlockNumberList};
use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
use reth_prune_types::{
PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_storage_api::StorageSettingsCache;
use reth_testing_utils::generators::{
self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
};
use std::{collections::BTreeMap, ops::AddAssign};
#[test]
fn prune() {
fn prune_legacy() {
let db = TestStageDB::default();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
1..=5000,
0..=5000,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
@@ -202,6 +289,9 @@ mod tests {
let segment = AccountHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(
StorageSettings::default().with_account_changesets_in_static_files(false),
);
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
@@ -239,20 +329,18 @@ mod tests {
.map(|(i, _)| i)
.unwrap_or_default();
let mut pruned_changesets = changesets
.iter()
// Skip what we've pruned so far, subtracting one to get last pruned block
// number further down
.skip(pruned.saturating_sub(1));
// Skip what we've pruned so far, subtracting one to get last pruned block number
// further down
let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
let last_pruned_block_number = pruned_changesets
.next()
.map(|(block_number, _)| if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
} as BlockNumber)
.unwrap_or(to_block);
.next()
.map(|(block_number, _)| if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
} as BlockNumber)
.unwrap_or(to_block);
let pruned_changesets = pruned_changesets.fold(
BTreeMap::<_, Vec<_>>::new(),
@@ -303,4 +391,152 @@ mod tests {
test_prune(998, 2, (PruneProgress::Finished, 998));
test_prune(1400, 3, (PruneProgress::Finished, 804));
}
#[test]
fn prune_static_file() {
let db = TestStageDB::default();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=5000,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
let (changesets, _) = random_changeset_range(
&mut rng,
blocks.iter(),
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
0..0,
0..0,
);
db.insert_changesets_to_static_files(changesets.clone(), None)
.expect("insert changesets to static files");
db.insert_history(changesets.clone(), None).expect("insert history");
let account_occurrences = db.table::<tables::AccountsHistory>().unwrap().into_iter().fold(
BTreeMap::<_, usize>::new(),
|mut map, (key, _)| {
map.entry(key.key).or_default().add_assign(1);
map
},
);
assert!(account_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
let original_shards = db.table::<tables::AccountsHistory>().unwrap();
let test_prune =
|to_block: BlockNumber, run: usize, expected_result: (PruneProgress, usize)| {
let prune_mode = PruneMode::Before(to_block);
let deleted_entries_limit = 2000;
let mut limiter =
PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
let input = PruneInput {
previous_checkpoint: db
.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::AccountHistory)
.unwrap(),
to_block,
limiter: limiter.clone(),
};
let segment = AccountHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(
StorageSettings::default().with_account_changesets_in_static_files(true),
);
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
assert_matches!(
result,
SegmentOutput {progress, pruned, checkpoint: Some(_)}
if (progress, pruned) == expected_result
);
segment
.save_checkpoint(
&provider,
result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
)
.unwrap();
provider.commit().expect("commit");
let changesets = changesets
.iter()
.enumerate()
.flat_map(|(block_number, changeset)| {
changeset.iter().map(move |change| (block_number, change))
})
.collect::<Vec<_>>();
#[expect(clippy::skip_while_next)]
let pruned = changesets
.iter()
.enumerate()
.skip_while(|(i, (block_number, _))| {
*i < deleted_entries_limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE * run &&
*block_number <= to_block as usize
})
.next()
.map(|(i, _)| i)
.unwrap_or_default();
// Skip what we've pruned so far, subtracting one to get last pruned block number
// further down
let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
let last_pruned_block_number = pruned_changesets
.next()
.map(|(block_number, _)| {
(if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
}) as BlockNumber
})
.unwrap_or(to_block);
let actual_shards = db.table::<tables::AccountsHistory>().unwrap();
let expected_shards = original_shards
.iter()
.filter(|(key, _)| key.highest_block_number > last_pruned_block_number)
.map(|(key, blocks)| {
let new_blocks =
blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
(key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
})
.collect::<Vec<_>>();
assert_eq!(actual_shards, expected_shards);
assert_eq!(
db.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::AccountHistory)
.unwrap(),
Some(PruneCheckpoint {
block_number: Some(last_pruned_block_number),
tx_number: None,
prune_mode
})
);
};
test_prune(
998,
1,
(PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 1000),
);
test_prune(998, 2, (PruneProgress::Finished, 1000));
test_prune(1400, 3, (PruneProgress::Finished, 804));
}
}

View File

@@ -1,4 +1,6 @@
use crate::PruneLimiter;
use alloy_primitives::BlockNumber;
use itertools::Itertools;
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW},
models::ShardedKey,
@@ -7,6 +9,8 @@ use reth_db_api::{
BlockNumberList, DatabaseError, RawKey, RawTable, RawValue,
};
use reth_provider::DBProvider;
use reth_prune_types::{SegmentOutput, SegmentOutputCheckpoint};
use rustc_hash::FxHashMap;
enum PruneShardOutcome {
Deleted,
@@ -21,6 +25,65 @@ pub(crate) struct PrunedIndices {
pub(crate) unchanged: usize,
}
/// Result of pruning history changesets, used to build the final output.
pub(crate) struct HistoryPruneResult<K> {
/// Map of the highest deleted changeset keys to their block numbers.
pub(crate) highest_deleted: FxHashMap<K, BlockNumber>,
/// The last block number that had changesets pruned.
pub(crate) last_pruned_block: Option<BlockNumber>,
/// Number of changesets pruned.
pub(crate) pruned_count: usize,
/// Whether pruning is complete.
pub(crate) done: bool,
}
/// Finalizes history pruning by sorting sharded keys, pruning history indices, and building output.
///
/// This is shared between static file and database pruning for both account and storage history.
pub(crate) fn finalize_history_prune<Provider, T, K, SK>(
provider: &Provider,
result: HistoryPruneResult<K>,
range_end: BlockNumber,
limiter: &PruneLimiter,
to_sharded_key: impl Fn(K, BlockNumber) -> T::Key,
key_matches: impl Fn(&T::Key, &T::Key) -> bool,
) -> Result<SegmentOutput, DatabaseError>
where
Provider: DBProvider<Tx: DbTxMut>,
T: Table<Value = BlockNumberList>,
T::Key: AsRef<ShardedKey<SK>>,
K: Ord,
{
let HistoryPruneResult { highest_deleted, last_pruned_block, pruned_count, done } = result;
// If there's more changesets to prune, set the checkpoint block number to previous,
// so we could finish pruning its changesets on the next run.
let last_changeset_pruned_block = last_pruned_block
.map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
.unwrap_or(range_end);
// Sort highest deleted block numbers and turn them into sharded keys.
// We use `sorted_unstable` because no equal keys exist in the map.
let highest_sharded_keys =
highest_deleted.into_iter().sorted_unstable().map(|(key, block_number)| {
to_sharded_key(key, block_number.min(last_changeset_pruned_block))
});
let outcomes =
prune_history_indices::<Provider, T, _>(provider, highest_sharded_keys, key_matches)?;
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
pruned: pruned_count + outcomes.deleted,
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,
}),
})
}
/// Prune history indices according to the provided list of highest sharded keys.
///
/// Returns total number of deleted, updated and unchanged entities.

View File

@@ -1,20 +1,27 @@
use crate::{
db_ext::DbTxPruneExt,
segments::{user::history::prune_history_indices, PruneInput, Segment, SegmentOutput},
segments::{
user::history::{finalize_history_prune, HistoryPruneResult},
PruneInput, Segment,
},
PrunerError,
};
use itertools::Itertools;
use alloy_primitives::{Address, BlockNumber, B256};
use reth_db_api::{
models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress},
tables,
transaction::DbTxMut,
};
use reth_provider::DBProvider;
use reth_prune_types::{PruneMode, PrunePurpose, PruneSegment, SegmentOutputCheckpoint};
use reth_provider::{DBProvider, EitherWriter, StaticFileProviderFactory};
use reth_prune_types::{
PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
use reth_static_file_types::StaticFileSegment;
use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
use rustc_hash::FxHashMap;
use tracing::{instrument, trace};
/// Number of storage history tables to prune in one step
/// Number of storage history tables to prune in one step.
///
/// Storage History consists of two tables: [`tables::StorageChangeSets`] and
/// [`tables::StoragesHistory`]. We want to prune them to the same block number.
@@ -33,7 +40,10 @@ impl StorageHistory {
impl<Provider> Segment<Provider> for StorageHistory
where
Provider: DBProvider<Tx: DbTxMut>,
Provider: DBProvider<Tx: DbTxMut>
+ StaticFileProviderFactory
+ StorageChangeSetReader
+ StorageSettingsCache,
{
fn segment(&self) -> PruneSegment {
PruneSegment::StorageHistory
@@ -58,11 +68,32 @@ where
};
let range_end = *range.end();
if EitherWriter::storage_changesets_destination(provider).is_static_file() {
self.prune_static_files(provider, input, range, range_end)
} else {
self.prune_database(provider, input, range, range_end)
}
}
}
impl StorageHistory {
/// Prunes storage history when changesets are stored in static files.
fn prune_static_files<Provider>(
&self,
provider: &Provider,
input: PruneInput,
range: std::ops::RangeInclusive<BlockNumber>,
range_end: BlockNumber,
) -> Result<SegmentOutput, PrunerError>
where
Provider: DBProvider<Tx: DbTxMut> + StaticFileProviderFactory,
{
let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
input.limiter.set_deleted_entries_limit(limit / STORAGE_HISTORY_TABLES_TO_PRUNE)
} else {
input.limiter
};
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
@@ -70,15 +101,90 @@ where
))
}
// The size of this map is limited by `prune_delete_limit * blocks_since_last_run /
// STORAGE_HISTORY_TABLES_TO_PRUNE`, and with current defaults it's usually `3500 * 5
// / 2`, so 8750 entries. Each entry is `160 bit + 256 bit + 64 bit`, so the total
// size should be up to ~0.5MB + some hashmap overhead. `blocks_since_last_run` is
// additionally limited by the `max_reorg_depth`, so no OOM is expected here.
let mut highest_deleted_storages = FxHashMap::default();
let mut last_changeset_pruned_block = None;
let mut pruned_changesets = 0;
let mut done = true;
let walker = provider.static_file_provider().walk_storage_changeset_range(range);
for result in walker {
if limiter.is_limit_reached() {
done = false;
break;
}
let (block_address, entry) = result?;
let block_number = block_address.block_number();
let address = block_address.address();
highest_deleted_storages.insert((address, entry.key), block_number);
last_changeset_pruned_block = Some(block_number);
pruned_changesets += 1;
limiter.increment_deleted_entries_count();
}
// Delete static file jars below the pruned block
if let Some(last_block) = last_changeset_pruned_block {
provider
.static_file_provider()
.delete_segment_below_block(StaticFileSegment::StorageChangeSets, last_block + 1)?;
}
trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned storage history (changesets from static files)");
let result = HistoryPruneResult {
highest_deleted: highest_deleted_storages,
last_pruned_block: last_changeset_pruned_block,
pruned_count: pruned_changesets,
done,
};
finalize_history_prune::<_, tables::StoragesHistory, (Address, B256), _>(
provider,
result,
range_end,
&limiter,
|(address, storage_key), block_number| {
StorageShardedKey::new(address, storage_key, block_number)
},
|a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
)
.map_err(Into::into)
}
fn prune_database<Provider>(
&self,
provider: &Provider,
input: PruneInput,
range: std::ops::RangeInclusive<BlockNumber>,
range_end: BlockNumber,
) -> Result<SegmentOutput, PrunerError>
where
Provider: DBProvider<Tx: DbTxMut>,
{
let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
input.limiter.set_deleted_entries_limit(limit / STORAGE_HISTORY_TABLES_TO_PRUNE)
} else {
input.limiter
};
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
))
}
// Deleted storage changeset keys (account addresses and storage slots) with the highest
// block number deleted for that key.
//
// The size of this map it's limited by `prune_delete_limit * blocks_since_last_run /
// STORAGE_HISTORY_TABLES_TO_PRUNE`, and with current default it's usually `3500 * 5
// The size of this map is limited by `prune_delete_limit * blocks_since_last_run /
// STORAGE_HISTORY_TABLES_TO_PRUNE`, and with current defaults it's usually `3500 * 5
// / 2`, so 8750 entries. Each entry is `160 bit + 256 bit + 64 bit`, so the total
// size should be up to 0.5MB + some hashmap overhead. `blocks_since_last_run` is
// size should be up to ~0.5MB + some hashmap overhead. `blocks_since_last_run` is
// additionally limited by the `max_reorg_depth`, so no OOM is expected here.
let mut last_changeset_pruned_block = None;
let mut highest_deleted_storages = FxHashMap::default();
let (pruned_changesets, done) =
provider.tx_ref().prune_table_with_range::<tables::StorageChangeSets>(
@@ -92,64 +198,46 @@ where
)?;
trace!(target: "pruner", deleted = %pruned_changesets, %done, "Pruned storage history (changesets)");
let last_changeset_pruned_block = last_changeset_pruned_block
// If there's more storage changesets to prune, set the checkpoint block number to
// previous, so we could finish pruning its storage changesets on the next run.
.map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
.unwrap_or(range_end);
// Sort highest deleted block numbers by account address and storage key and turn them into
// sharded keys.
// We did not use `BTreeMap` from the beginning, because it's inefficient for hashes.
let highest_sharded_keys = highest_deleted_storages
.into_iter()
.sorted_unstable() // Unstable is fine because no equal keys exist in the map
.map(|((address, storage_key), block_number)| {
StorageShardedKey::new(
address,
storage_key,
block_number.min(last_changeset_pruned_block),
)
});
let outcomes = prune_history_indices::<Provider, tables::StoragesHistory, _>(
let result = HistoryPruneResult {
highest_deleted: highest_deleted_storages,
last_pruned_block: last_changeset_pruned_block,
pruned_count: pruned_changesets,
done,
};
finalize_history_prune::<_, tables::StoragesHistory, (Address, B256), _>(
provider,
highest_sharded_keys,
result,
range_end,
&limiter,
|(address, storage_key), block_number| {
StorageShardedKey::new(address, storage_key, block_number)
},
|a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
)?;
trace!(target: "pruner", ?outcomes, %done, "Pruned storage history (indices)");
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
pruned: pruned_changesets + outcomes.deleted,
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,
}),
})
)
.map_err(Into::into)
}
}
#[cfg(test)]
mod tests {
use crate::segments::{
user::storage_history::STORAGE_HISTORY_TABLES_TO_PRUNE, PruneInput, PruneLimiter, Segment,
SegmentOutput, StorageHistory,
};
use super::STORAGE_HISTORY_TABLES_TO_PRUNE;
use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, StorageHistory};
use alloy_primitives::{BlockNumber, B256};
use assert_matches::assert_matches;
use reth_db_api::{tables, BlockNumberList};
use reth_db_api::{models::StorageSettings, tables, BlockNumberList};
use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
use reth_prune_types::{PruneCheckpoint, PruneMode, PruneProgress, PruneSegment};
use reth_prune_types::{
PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_storage_api::StorageSettingsCache;
use reth_testing_utils::generators::{
self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
};
use std::{collections::BTreeMap, ops::AddAssign};
#[test]
fn prune() {
fn prune_legacy() {
let db = TestStageDB::default();
let mut rng = generators::rng();
@@ -208,6 +296,9 @@ mod tests {
let segment = StorageHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(
StorageSettings::default().with_storage_changesets_in_static_files(false),
);
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
@@ -247,19 +338,19 @@ mod tests {
.map(|(i, _)| i)
.unwrap_or_default();
let mut pruned_changesets = changesets
.iter()
// Skip what we've pruned so far, subtracting one to get last pruned block number
// further down
.skip(pruned.saturating_sub(1));
// Skip what we've pruned so far, subtracting one to get last pruned block number
// further down
let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
let last_pruned_block_number = pruned_changesets
.next()
.map(|(block_number, _, _)| if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
} as BlockNumber)
.map(|(block_number, _, _)| {
(if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
}) as BlockNumber
})
.unwrap_or(to_block);
let pruned_changesets = pruned_changesets.fold(
@@ -306,14 +397,160 @@ mod tests {
test_prune(
998,
1,
(
PruneProgress::HasMoreData(
reth_prune_types::PruneInterruptReason::DeletedEntriesLimitReached,
),
500,
),
(PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 500),
);
test_prune(998, 2, (PruneProgress::Finished, 499));
test_prune(1200, 3, (PruneProgress::Finished, 202));
}
#[test]
fn prune_static_file() {
let db = TestStageDB::default();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=5000,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
let (changesets, _) = random_changeset_range(
&mut rng,
blocks.iter(),
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
1..2,
1..2,
);
db.insert_changesets_to_static_files(changesets.clone(), None)
.expect("insert changesets to static files");
db.insert_history(changesets.clone(), None).expect("insert history");
let storage_occurrences = db.table::<tables::StoragesHistory>().unwrap().into_iter().fold(
BTreeMap::<_, usize>::new(),
|mut map, (key, _)| {
map.entry((key.address, key.sharded_key.key)).or_default().add_assign(1);
map
},
);
assert!(storage_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
let original_shards = db.table::<tables::StoragesHistory>().unwrap();
let test_prune = |to_block: BlockNumber,
run: usize,
expected_result: (PruneProgress, usize)| {
let prune_mode = PruneMode::Before(to_block);
let deleted_entries_limit = 1000;
let mut limiter =
PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
let input = PruneInput {
previous_checkpoint: db
.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::StorageHistory)
.unwrap(),
to_block,
limiter: limiter.clone(),
};
let segment = StorageHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(
StorageSettings::default().with_storage_changesets_in_static_files(true),
);
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
assert_matches!(
result,
SegmentOutput {progress, pruned, checkpoint: Some(_)}
if (progress, pruned) == expected_result
);
segment
.save_checkpoint(
&provider,
result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
)
.unwrap();
provider.commit().expect("commit");
let changesets = changesets
.iter()
.enumerate()
.flat_map(|(block_number, changeset)| {
changeset.iter().flat_map(move |(address, _, entries)| {
entries.iter().map(move |entry| (block_number, address, entry))
})
})
.collect::<Vec<_>>();
#[expect(clippy::skip_while_next)]
let pruned = changesets
.iter()
.enumerate()
.skip_while(|(i, (block_number, _, _))| {
*i < deleted_entries_limit / STORAGE_HISTORY_TABLES_TO_PRUNE * run &&
*block_number <= to_block as usize
})
.next()
.map(|(i, _)| i)
.unwrap_or_default();
// Skip what we've pruned so far, subtracting one to get last pruned block number
// further down
let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
let last_pruned_block_number = pruned_changesets
.next()
.map(|(block_number, _, _)| {
(if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
}) as BlockNumber
})
.unwrap_or(to_block);
let actual_shards = db.table::<tables::StoragesHistory>().unwrap();
let expected_shards = original_shards
.iter()
.filter(|(key, _)| key.sharded_key.highest_block_number > last_pruned_block_number)
.map(|(key, blocks)| {
let new_blocks =
blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
(key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
})
.collect::<Vec<_>>();
assert_eq!(actual_shards, expected_shards);
assert_eq!(
db.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::StorageHistory)
.unwrap(),
Some(PruneCheckpoint {
block_number: Some(last_pruned_block_number),
tx_number: None,
prune_mode
})
);
};
test_prune(
998,
1,
(PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 500),
);
test_prune(998, 2, (PruneProgress::Finished, 500));
test_prune(1200, 3, (PruneProgress::Finished, 202));
}
}

View File

@@ -24,9 +24,9 @@ pub enum PruneSegment {
Receipts,
/// Prune segment responsible for some rows in `Receipts` table filtered by logs.
ContractLogs,
/// Prune segment responsible for the `AccountChangeSets` and `AccountsHistory` tables.
/// Prunes account changesets (static files/MDBX) and `AccountsHistory`.
AccountHistory,
/// Prune segment responsible for the `StorageChangeSets` and `StoragesHistory` tables.
/// Prunes storage changesets (static files/MDBX) and `StoragesHistory`.
StorageHistory,
#[deprecated = "Variant indexes cannot be changed"]
#[strum(disabled)]

View File

@@ -10,6 +10,7 @@ use reth_prune::{
use reth_stages_api::{
ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
};
use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
use tracing::info;
/// The prune stage that runs the pruner with the provided prune modes.
@@ -46,7 +47,9 @@ where
+ StageCheckpointReader
+ StaticFileProviderFactory<
Primitives: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>,
> + StorageSettingsCache,
> + StorageSettingsCache
+ ChangeSetReader
+ StorageChangeSetReader,
{
fn id(&self) -> StageId {
StageId::Prune
@@ -151,7 +154,9 @@ where
+ StageCheckpointReader
+ StaticFileProviderFactory<
Primitives: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>,
> + StorageSettingsCache,
> + StorageSettingsCache
+ ChangeSetReader
+ StorageChangeSetReader,
{
fn id(&self) -> StageId {
StageId::PruneSenderRecovery

View File

@@ -11,7 +11,7 @@ use reth_db_api::{
common::KeyValue,
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
database::Database,
models::{AccountBeforeTx, StoredBlockBodyIndices},
models::{AccountBeforeTx, StorageBeforeTx, StoredBlockBodyIndices},
table::Table,
tables,
transaction::{DbTx, DbTxMut},
@@ -473,6 +473,51 @@ impl TestStageDB {
})
}
/// Insert collection of [`ChangeSet`] into static files (account and storage changesets).
pub fn insert_changesets_to_static_files<I>(
&self,
changesets: I,
block_offset: Option<u64>,
) -> ProviderResult<()>
where
I: IntoIterator<Item = ChangeSet>,
{
let offset = block_offset.unwrap_or_default();
let static_file_provider = self.factory.static_file_provider();
let mut account_changeset_writer =
static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
let mut storage_changeset_writer =
static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
for (block, changeset) in changesets.into_iter().enumerate() {
let block_number = offset + block as u64;
let mut account_changesets = Vec::new();
let mut storage_changesets = Vec::new();
for (address, old_account, old_storage) in changeset {
account_changesets.push(AccountBeforeTx { address, info: Some(old_account) });
for entry in old_storage {
storage_changesets.push(StorageBeforeTx {
address,
key: entry.key,
value: entry.value,
});
}
}
account_changeset_writer.append_account_changeset(account_changesets, block_number)?;
storage_changeset_writer.append_storage_changeset(storage_changesets, block_number)?;
}
account_changeset_writer.commit()?;
storage_changeset_writer.commit()?;
Ok(())
}
pub fn insert_history<I>(&self, changesets: I, _block_offset: Option<u64>) -> ProviderResult<()>
where
I: IntoIterator<Item = ChangeSet>,