From 94235d64a83cc62e62fde458699b912ff6a0facb Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Mon, 26 Jan 2026 19:28:18 +0000 Subject: [PATCH] fix(pruner): prune account and storage changeset static files (#21346) --- Cargo.lock | 1 + crates/prune/prune/Cargo.toml | 1 + crates/prune/prune/src/builder.rs | 7 +- crates/prune/prune/src/segments/set.rs | 5 +- .../src/segments/user/account_history.rs | 352 ++++++++++++++--- .../prune/prune/src/segments/user/history.rs | 63 +++ .../src/segments/user/storage_history.rs | 369 ++++++++++++++---- crates/prune/types/src/segment.rs | 4 +- crates/stages/stages/src/stages/prune.rs | 9 +- .../stages/stages/src/test_utils/test_db.rs | 47 ++- 10 files changed, 727 insertions(+), 131 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ea860db7ca..f6427d8b86 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10217,6 +10217,7 @@ dependencies = [ "reth-stages", "reth-stages-types", "reth-static-file-types", + "reth-storage-api", "reth-testing-utils", "reth-tokio-util", "reth-tracing", diff --git a/crates/prune/prune/Cargo.toml b/crates/prune/prune/Cargo.toml index ff4d47054e..d86a35eaf1 100644 --- a/crates/prune/prune/Cargo.toml +++ b/crates/prune/prune/Cargo.toml @@ -17,6 +17,7 @@ reth-exex-types.workspace = true reth-db-api.workspace = true reth-errors.workspace = true reth-provider.workspace = true +reth-storage-api.workspace = true reth-tokio-util.workspace = true reth-config.workspace = true reth-prune-types.workspace = true diff --git a/crates/prune/prune/src/builder.rs b/crates/prune/prune/src/builder.rs index 41a496bd92..52b175c66a 100644 --- a/crates/prune/prune/src/builder.rs +++ b/crates/prune/prune/src/builder.rs @@ -10,6 +10,7 @@ use reth_provider::{ StageCheckpointReader, StaticFileProviderFactory, StorageSettingsCache, }; use reth_prune_types::PruneModes; +use reth_storage_api::{ChangeSetReader, StorageChangeSetReader}; use std::time::Duration; use tokio::sync::watch; @@ -82,6 +83,8 @@ impl PrunerBuilder { + ChainStateBlockReader + StorageSettingsCache + StageCheckpointReader + + ChangeSetReader + + StorageChangeSetReader + StaticFileProviderFactory< Primitives: NodePrimitives, >, @@ -116,7 +119,9 @@ impl PrunerBuilder { + PruneCheckpointWriter + PruneCheckpointReader + StorageSettingsCache - + StageCheckpointReader, + + StageCheckpointReader + + ChangeSetReader + + StorageChangeSetReader, { let segments = SegmentSet::::from_components(static_file_provider, self.segments); diff --git a/crates/prune/prune/src/segments/set.rs b/crates/prune/prune/src/segments/set.rs index f5ceae6325..3e56664f26 100644 --- a/crates/prune/prune/src/segments/set.rs +++ b/crates/prune/prune/src/segments/set.rs @@ -10,6 +10,7 @@ use reth_provider::{ PruneCheckpointReader, PruneCheckpointWriter, StaticFileProviderFactory, StorageSettingsCache, }; use reth_prune_types::PruneModes; +use reth_storage_api::{ChangeSetReader, StorageChangeSetReader}; /// Collection of [`Segment`]. Thread-safe, allocated on the heap. #[derive(Debug)] @@ -52,7 +53,9 @@ where + PruneCheckpointReader + BlockReader + ChainStateBlockReader - + StorageSettingsCache, + + StorageSettingsCache + + ChangeSetReader + + StorageChangeSetReader, { /// Creates a [`SegmentSet`] from an existing components, such as [`StaticFileProvider`] and /// [`PruneModes`]. diff --git a/crates/prune/prune/src/segments/user/account_history.rs b/crates/prune/prune/src/segments/user/account_history.rs index 317337f050..9bdd26d111 100644 --- a/crates/prune/prune/src/segments/user/account_history.rs +++ b/crates/prune/prune/src/segments/user/account_history.rs @@ -1,14 +1,22 @@ use crate::{ db_ext::DbTxPruneExt, - segments::{user::history::prune_history_indices, PruneInput, Segment}, + segments::{ + user::history::{finalize_history_prune, HistoryPruneResult}, + PruneInput, Segment, + }, PrunerError, }; -use itertools::Itertools; +use alloy_primitives::BlockNumber; use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut}; -use reth_provider::DBProvider; +use reth_provider::{ + changeset_walker::StaticFileAccountChangesetWalker, DBProvider, EitherWriter, + StaticFileProviderFactory, StorageSettingsCache, +}; use reth_prune_types::{ PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint, }; +use reth_static_file_types::StaticFileSegment; +use reth_storage_api::ChangeSetReader; use rustc_hash::FxHashMap; use tracing::{instrument, trace}; @@ -31,7 +39,10 @@ impl AccountHistory { impl Segment for AccountHistory where - Provider: DBProvider, + Provider: DBProvider + + StaticFileProviderFactory + + StorageSettingsCache + + ChangeSetReader, { fn segment(&self) -> PruneSegment { PruneSegment::AccountHistory @@ -56,11 +67,33 @@ where }; let range_end = *range.end(); + // Check where account changesets are stored + if EitherWriter::account_changesets_destination(provider).is_static_file() { + self.prune_static_files(provider, input, range, range_end) + } else { + self.prune_database(provider, input, range, range_end) + } + } +} + +impl AccountHistory { + /// Prunes account history when changesets are stored in static files. + fn prune_static_files( + &self, + provider: &Provider, + input: PruneInput, + range: std::ops::RangeInclusive, + range_end: BlockNumber, + ) -> Result + where + Provider: DBProvider + StaticFileProviderFactory + ChangeSetReader, + { let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() { input.limiter.set_deleted_entries_limit(limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE) } else { input.limiter }; + if limiter.is_limit_reached() { return Ok(SegmentOutput::not_done( limiter.interrupt_reason(), @@ -68,15 +101,86 @@ where )) } + // The size of this map it's limited by `prune_delete_limit * blocks_since_last_run / + // ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with the current defaults it's usually `3500 * 5 / + // 2`, so 8750 entries. Each entry is `160 bit + 64 bit`, so the total size should be up to + // ~0.25MB + some hashmap overhead. `blocks_since_last_run` is additionally limited by the + // `max_reorg_depth`, so no OOM is expected here. + let mut highest_deleted_accounts = FxHashMap::default(); let mut last_changeset_pruned_block = None; + let mut pruned_changesets = 0; + let mut done = true; + + let walker = StaticFileAccountChangesetWalker::new(provider, range); + for result in walker { + if limiter.is_limit_reached() { + done = false; + break; + } + let (block_number, changeset) = result?; + highest_deleted_accounts.insert(changeset.address, block_number); + last_changeset_pruned_block = Some(block_number); + pruned_changesets += 1; + limiter.increment_deleted_entries_count(); + } + + // Delete static file jars below the pruned block + if let Some(last_block) = last_changeset_pruned_block { + provider + .static_file_provider() + .delete_segment_below_block(StaticFileSegment::AccountChangeSets, last_block + 1)?; + } + trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets from static files)"); + + let result = HistoryPruneResult { + highest_deleted: highest_deleted_accounts, + last_pruned_block: last_changeset_pruned_block, + pruned_count: pruned_changesets, + done, + }; + finalize_history_prune::<_, tables::AccountsHistory, _, _>( + provider, + result, + range_end, + &limiter, + ShardedKey::new, + |a, b| a.key == b.key, + ) + .map_err(Into::into) + } + + fn prune_database( + &self, + provider: &Provider, + input: PruneInput, + range: std::ops::RangeInclusive, + range_end: BlockNumber, + ) -> Result + where + Provider: DBProvider, + { + let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() { + input.limiter.set_deleted_entries_limit(limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE) + } else { + input.limiter + }; + + if limiter.is_limit_reached() { + return Ok(SegmentOutput::not_done( + limiter.interrupt_reason(), + input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint), + )) + } + // Deleted account changeset keys (account addresses) with the highest block number deleted // for that key. // - // The size of this map it's limited by `prune_delete_limit * blocks_since_last_run / - // ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with current default it's usually `3500 * 5 - // / 2`, so 8750 entries. Each entry is `160 bit + 256 bit + 64 bit`, so the total - // size should be up to 0.5MB + some hashmap overhead. `blocks_since_last_run` is - // additionally limited by the `max_reorg_depth`, so no OOM is expected here. + // The size of this map is limited by `prune_delete_limit * blocks_since_last_run / + // ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with the current defaults it's usually `3500 * 5 / + // 2`, so 8750 entries. Each entry is `160 bit + 64 bit`, so the total size should be up to + // ~0.25MB + some hashmap overhead. `blocks_since_last_run` is additionally limited by the + // `max_reorg_depth`, so no OOM is expected here. + let mut last_changeset_pruned_block = None; let mut highest_deleted_accounts = FxHashMap::default(); let (pruned_changesets, done) = provider.tx_ref().prune_table_with_range::( @@ -88,69 +192,52 @@ where last_changeset_pruned_block = Some(block_number); }, )?; - trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets)"); + trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets from database)"); - let last_changeset_pruned_block = last_changeset_pruned_block - // If there's more account changesets to prune, set the checkpoint block number to - // previous, so we could finish pruning its account changesets on the next run. - .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) }) - .unwrap_or(range_end); - - // Sort highest deleted block numbers by account address and turn them into sharded keys. - // We did not use `BTreeMap` from the beginning, because it's inefficient for hashes. - let highest_sharded_keys = highest_deleted_accounts - .into_iter() - .sorted_unstable() // Unstable is fine because no equal keys exist in the map - .map(|(address, block_number)| { - ShardedKey::new(address, block_number.min(last_changeset_pruned_block)) - }); - let outcomes = prune_history_indices::( + let result = HistoryPruneResult { + highest_deleted: highest_deleted_accounts, + last_pruned_block: last_changeset_pruned_block, + pruned_count: pruned_changesets, + done, + }; + finalize_history_prune::<_, tables::AccountsHistory, _, _>( provider, - highest_sharded_keys, + result, + range_end, + &limiter, + ShardedKey::new, |a, b| a.key == b.key, - )?; - trace!(target: "pruner", ?outcomes, %done, "Pruned account history (indices)"); - - let progress = limiter.progress(done); - - Ok(SegmentOutput { - progress, - pruned: pruned_changesets + outcomes.deleted, - checkpoint: Some(SegmentOutputCheckpoint { - block_number: Some(last_changeset_pruned_block), - tx_number: None, - }), - }) + ) + .map_err(Into::into) } } #[cfg(test)] mod tests { - use crate::segments::{ - user::account_history::ACCOUNT_HISTORY_TABLES_TO_PRUNE, AccountHistory, PruneInput, - PruneLimiter, Segment, SegmentOutput, - }; + use super::ACCOUNT_HISTORY_TABLES_TO_PRUNE; + use crate::segments::{AccountHistory, PruneInput, PruneLimiter, Segment, SegmentOutput}; use alloy_primitives::{BlockNumber, B256}; use assert_matches::assert_matches; - use reth_db_api::{tables, BlockNumberList}; + use reth_db_api::{models::StorageSettings, tables, BlockNumberList}; use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader}; use reth_prune_types::{ PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment, }; use reth_stages::test_utils::{StorageKind, TestStageDB}; + use reth_storage_api::StorageSettingsCache; use reth_testing_utils::generators::{ self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams, }; use std::{collections::BTreeMap, ops::AddAssign}; #[test] - fn prune() { + fn prune_legacy() { let db = TestStageDB::default(); let mut rng = generators::rng(); let blocks = random_block_range( &mut rng, - 1..=5000, + 0..=5000, BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() }, ); db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks"); @@ -202,6 +289,9 @@ mod tests { let segment = AccountHistory::new(prune_mode); let provider = db.factory.database_provider_rw().unwrap(); + provider.set_storage_settings_cache( + StorageSettings::default().with_account_changesets_in_static_files(false), + ); let result = segment.prune(&provider, input).unwrap(); limiter.increment_deleted_entries_count_by(result.pruned); @@ -239,20 +329,18 @@ mod tests { .map(|(i, _)| i) .unwrap_or_default(); - let mut pruned_changesets = changesets - .iter() - // Skip what we've pruned so far, subtracting one to get last pruned block - // number further down - .skip(pruned.saturating_sub(1)); + // Skip what we've pruned so far, subtracting one to get last pruned block number + // further down + let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1)); let last_pruned_block_number = pruned_changesets - .next() - .map(|(block_number, _)| if result.progress.is_finished() { - *block_number - } else { - block_number.saturating_sub(1) - } as BlockNumber) - .unwrap_or(to_block); + .next() + .map(|(block_number, _)| if result.progress.is_finished() { + *block_number + } else { + block_number.saturating_sub(1) + } as BlockNumber) + .unwrap_or(to_block); let pruned_changesets = pruned_changesets.fold( BTreeMap::<_, Vec<_>>::new(), @@ -303,4 +391,152 @@ mod tests { test_prune(998, 2, (PruneProgress::Finished, 998)); test_prune(1400, 3, (PruneProgress::Finished, 804)); } + + #[test] + fn prune_static_file() { + let db = TestStageDB::default(); + let mut rng = generators::rng(); + + let blocks = random_block_range( + &mut rng, + 0..=5000, + BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() }, + ); + db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks"); + + let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::>(); + + let (changesets, _) = random_changeset_range( + &mut rng, + blocks.iter(), + accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))), + 0..0, + 0..0, + ); + + db.insert_changesets_to_static_files(changesets.clone(), None) + .expect("insert changesets to static files"); + db.insert_history(changesets.clone(), None).expect("insert history"); + + let account_occurrences = db.table::().unwrap().into_iter().fold( + BTreeMap::<_, usize>::new(), + |mut map, (key, _)| { + map.entry(key.key).or_default().add_assign(1); + map + }, + ); + assert!(account_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1)); + + let original_shards = db.table::().unwrap(); + + let test_prune = + |to_block: BlockNumber, run: usize, expected_result: (PruneProgress, usize)| { + let prune_mode = PruneMode::Before(to_block); + let deleted_entries_limit = 2000; + let mut limiter = + PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit); + let input = PruneInput { + previous_checkpoint: db + .factory + .provider() + .unwrap() + .get_prune_checkpoint(PruneSegment::AccountHistory) + .unwrap(), + to_block, + limiter: limiter.clone(), + }; + let segment = AccountHistory::new(prune_mode); + + let provider = db.factory.database_provider_rw().unwrap(); + provider.set_storage_settings_cache( + StorageSettings::default().with_account_changesets_in_static_files(true), + ); + let result = segment.prune(&provider, input).unwrap(); + limiter.increment_deleted_entries_count_by(result.pruned); + + assert_matches!( + result, + SegmentOutput {progress, pruned, checkpoint: Some(_)} + if (progress, pruned) == expected_result + ); + + segment + .save_checkpoint( + &provider, + result.checkpoint.unwrap().as_prune_checkpoint(prune_mode), + ) + .unwrap(); + provider.commit().expect("commit"); + + let changesets = changesets + .iter() + .enumerate() + .flat_map(|(block_number, changeset)| { + changeset.iter().map(move |change| (block_number, change)) + }) + .collect::>(); + + #[expect(clippy::skip_while_next)] + let pruned = changesets + .iter() + .enumerate() + .skip_while(|(i, (block_number, _))| { + *i < deleted_entries_limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE * run && + *block_number <= to_block as usize + }) + .next() + .map(|(i, _)| i) + .unwrap_or_default(); + + // Skip what we've pruned so far, subtracting one to get last pruned block number + // further down + let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1)); + + let last_pruned_block_number = pruned_changesets + .next() + .map(|(block_number, _)| { + (if result.progress.is_finished() { + *block_number + } else { + block_number.saturating_sub(1) + }) as BlockNumber + }) + .unwrap_or(to_block); + + let actual_shards = db.table::().unwrap(); + + let expected_shards = original_shards + .iter() + .filter(|(key, _)| key.highest_block_number > last_pruned_block_number) + .map(|(key, blocks)| { + let new_blocks = + blocks.iter().skip_while(|block| *block <= last_pruned_block_number); + (key.clone(), BlockNumberList::new_pre_sorted(new_blocks)) + }) + .collect::>(); + + assert_eq!(actual_shards, expected_shards); + + assert_eq!( + db.factory + .provider() + .unwrap() + .get_prune_checkpoint(PruneSegment::AccountHistory) + .unwrap(), + Some(PruneCheckpoint { + block_number: Some(last_pruned_block_number), + tx_number: None, + prune_mode + }) + ); + }; + + test_prune( + 998, + 1, + (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 1000), + ); + test_prune(998, 2, (PruneProgress::Finished, 1000)); + test_prune(1400, 3, (PruneProgress::Finished, 804)); + } } diff --git a/crates/prune/prune/src/segments/user/history.rs b/crates/prune/prune/src/segments/user/history.rs index 9d95b2fd3b..d4e6ddcf78 100644 --- a/crates/prune/prune/src/segments/user/history.rs +++ b/crates/prune/prune/src/segments/user/history.rs @@ -1,4 +1,6 @@ +use crate::PruneLimiter; use alloy_primitives::BlockNumber; +use itertools::Itertools; use reth_db_api::{ cursor::{DbCursorRO, DbCursorRW}, models::ShardedKey, @@ -7,6 +9,8 @@ use reth_db_api::{ BlockNumberList, DatabaseError, RawKey, RawTable, RawValue, }; use reth_provider::DBProvider; +use reth_prune_types::{SegmentOutput, SegmentOutputCheckpoint}; +use rustc_hash::FxHashMap; enum PruneShardOutcome { Deleted, @@ -21,6 +25,65 @@ pub(crate) struct PrunedIndices { pub(crate) unchanged: usize, } +/// Result of pruning history changesets, used to build the final output. +pub(crate) struct HistoryPruneResult { + /// Map of the highest deleted changeset keys to their block numbers. + pub(crate) highest_deleted: FxHashMap, + /// The last block number that had changesets pruned. + pub(crate) last_pruned_block: Option, + /// Number of changesets pruned. + pub(crate) pruned_count: usize, + /// Whether pruning is complete. + pub(crate) done: bool, +} + +/// Finalizes history pruning by sorting sharded keys, pruning history indices, and building output. +/// +/// This is shared between static file and database pruning for both account and storage history. +pub(crate) fn finalize_history_prune( + provider: &Provider, + result: HistoryPruneResult, + range_end: BlockNumber, + limiter: &PruneLimiter, + to_sharded_key: impl Fn(K, BlockNumber) -> T::Key, + key_matches: impl Fn(&T::Key, &T::Key) -> bool, +) -> Result +where + Provider: DBProvider, + T: Table, + T::Key: AsRef>, + K: Ord, +{ + let HistoryPruneResult { highest_deleted, last_pruned_block, pruned_count, done } = result; + + // If there's more changesets to prune, set the checkpoint block number to previous, + // so we could finish pruning its changesets on the next run. + let last_changeset_pruned_block = last_pruned_block + .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) }) + .unwrap_or(range_end); + + // Sort highest deleted block numbers and turn them into sharded keys. + // We use `sorted_unstable` because no equal keys exist in the map. + let highest_sharded_keys = + highest_deleted.into_iter().sorted_unstable().map(|(key, block_number)| { + to_sharded_key(key, block_number.min(last_changeset_pruned_block)) + }); + + let outcomes = + prune_history_indices::(provider, highest_sharded_keys, key_matches)?; + + let progress = limiter.progress(done); + + Ok(SegmentOutput { + progress, + pruned: pruned_count + outcomes.deleted, + checkpoint: Some(SegmentOutputCheckpoint { + block_number: Some(last_changeset_pruned_block), + tx_number: None, + }), + }) +} + /// Prune history indices according to the provided list of highest sharded keys. /// /// Returns total number of deleted, updated and unchanged entities. diff --git a/crates/prune/prune/src/segments/user/storage_history.rs b/crates/prune/prune/src/segments/user/storage_history.rs index 556babb9a7..7abe709e11 100644 --- a/crates/prune/prune/src/segments/user/storage_history.rs +++ b/crates/prune/prune/src/segments/user/storage_history.rs @@ -1,20 +1,27 @@ use crate::{ db_ext::DbTxPruneExt, - segments::{user::history::prune_history_indices, PruneInput, Segment, SegmentOutput}, + segments::{ + user::history::{finalize_history_prune, HistoryPruneResult}, + PruneInput, Segment, + }, PrunerError, }; -use itertools::Itertools; +use alloy_primitives::{Address, BlockNumber, B256}; use reth_db_api::{ models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress}, tables, transaction::DbTxMut, }; -use reth_provider::DBProvider; -use reth_prune_types::{PruneMode, PrunePurpose, PruneSegment, SegmentOutputCheckpoint}; +use reth_provider::{DBProvider, EitherWriter, StaticFileProviderFactory}; +use reth_prune_types::{ + PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint, +}; +use reth_static_file_types::StaticFileSegment; +use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache}; use rustc_hash::FxHashMap; use tracing::{instrument, trace}; -/// Number of storage history tables to prune in one step +/// Number of storage history tables to prune in one step. /// /// Storage History consists of two tables: [`tables::StorageChangeSets`] and /// [`tables::StoragesHistory`]. We want to prune them to the same block number. @@ -33,7 +40,10 @@ impl StorageHistory { impl Segment for StorageHistory where - Provider: DBProvider, + Provider: DBProvider + + StaticFileProviderFactory + + StorageChangeSetReader + + StorageSettingsCache, { fn segment(&self) -> PruneSegment { PruneSegment::StorageHistory @@ -58,11 +68,32 @@ where }; let range_end = *range.end(); + if EitherWriter::storage_changesets_destination(provider).is_static_file() { + self.prune_static_files(provider, input, range, range_end) + } else { + self.prune_database(provider, input, range, range_end) + } + } +} + +impl StorageHistory { + /// Prunes storage history when changesets are stored in static files. + fn prune_static_files( + &self, + provider: &Provider, + input: PruneInput, + range: std::ops::RangeInclusive, + range_end: BlockNumber, + ) -> Result + where + Provider: DBProvider + StaticFileProviderFactory, + { let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() { input.limiter.set_deleted_entries_limit(limit / STORAGE_HISTORY_TABLES_TO_PRUNE) } else { input.limiter }; + if limiter.is_limit_reached() { return Ok(SegmentOutput::not_done( limiter.interrupt_reason(), @@ -70,15 +101,90 @@ where )) } + // The size of this map is limited by `prune_delete_limit * blocks_since_last_run / + // STORAGE_HISTORY_TABLES_TO_PRUNE`, and with current defaults it's usually `3500 * 5 + // / 2`, so 8750 entries. Each entry is `160 bit + 256 bit + 64 bit`, so the total + // size should be up to ~0.5MB + some hashmap overhead. `blocks_since_last_run` is + // additionally limited by the `max_reorg_depth`, so no OOM is expected here. + let mut highest_deleted_storages = FxHashMap::default(); let mut last_changeset_pruned_block = None; + let mut pruned_changesets = 0; + let mut done = true; + + let walker = provider.static_file_provider().walk_storage_changeset_range(range); + for result in walker { + if limiter.is_limit_reached() { + done = false; + break; + } + let (block_address, entry) = result?; + let block_number = block_address.block_number(); + let address = block_address.address(); + highest_deleted_storages.insert((address, entry.key), block_number); + last_changeset_pruned_block = Some(block_number); + pruned_changesets += 1; + limiter.increment_deleted_entries_count(); + } + + // Delete static file jars below the pruned block + if let Some(last_block) = last_changeset_pruned_block { + provider + .static_file_provider() + .delete_segment_below_block(StaticFileSegment::StorageChangeSets, last_block + 1)?; + } + trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned storage history (changesets from static files)"); + + let result = HistoryPruneResult { + highest_deleted: highest_deleted_storages, + last_pruned_block: last_changeset_pruned_block, + pruned_count: pruned_changesets, + done, + }; + finalize_history_prune::<_, tables::StoragesHistory, (Address, B256), _>( + provider, + result, + range_end, + &limiter, + |(address, storage_key), block_number| { + StorageShardedKey::new(address, storage_key, block_number) + }, + |a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key, + ) + .map_err(Into::into) + } + + fn prune_database( + &self, + provider: &Provider, + input: PruneInput, + range: std::ops::RangeInclusive, + range_end: BlockNumber, + ) -> Result + where + Provider: DBProvider, + { + let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() { + input.limiter.set_deleted_entries_limit(limit / STORAGE_HISTORY_TABLES_TO_PRUNE) + } else { + input.limiter + }; + + if limiter.is_limit_reached() { + return Ok(SegmentOutput::not_done( + limiter.interrupt_reason(), + input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint), + )) + } + // Deleted storage changeset keys (account addresses and storage slots) with the highest // block number deleted for that key. // - // The size of this map it's limited by `prune_delete_limit * blocks_since_last_run / - // STORAGE_HISTORY_TABLES_TO_PRUNE`, and with current default it's usually `3500 * 5 + // The size of this map is limited by `prune_delete_limit * blocks_since_last_run / + // STORAGE_HISTORY_TABLES_TO_PRUNE`, and with current defaults it's usually `3500 * 5 // / 2`, so 8750 entries. Each entry is `160 bit + 256 bit + 64 bit`, so the total - // size should be up to 0.5MB + some hashmap overhead. `blocks_since_last_run` is + // size should be up to ~0.5MB + some hashmap overhead. `blocks_since_last_run` is // additionally limited by the `max_reorg_depth`, so no OOM is expected here. + let mut last_changeset_pruned_block = None; let mut highest_deleted_storages = FxHashMap::default(); let (pruned_changesets, done) = provider.tx_ref().prune_table_with_range::( @@ -92,64 +198,46 @@ where )?; trace!(target: "pruner", deleted = %pruned_changesets, %done, "Pruned storage history (changesets)"); - let last_changeset_pruned_block = last_changeset_pruned_block - // If there's more storage changesets to prune, set the checkpoint block number to - // previous, so we could finish pruning its storage changesets on the next run. - .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) }) - .unwrap_or(range_end); - - // Sort highest deleted block numbers by account address and storage key and turn them into - // sharded keys. - // We did not use `BTreeMap` from the beginning, because it's inefficient for hashes. - let highest_sharded_keys = highest_deleted_storages - .into_iter() - .sorted_unstable() // Unstable is fine because no equal keys exist in the map - .map(|((address, storage_key), block_number)| { - StorageShardedKey::new( - address, - storage_key, - block_number.min(last_changeset_pruned_block), - ) - }); - let outcomes = prune_history_indices::( + let result = HistoryPruneResult { + highest_deleted: highest_deleted_storages, + last_pruned_block: last_changeset_pruned_block, + pruned_count: pruned_changesets, + done, + }; + finalize_history_prune::<_, tables::StoragesHistory, (Address, B256), _>( provider, - highest_sharded_keys, + result, + range_end, + &limiter, + |(address, storage_key), block_number| { + StorageShardedKey::new(address, storage_key, block_number) + }, |a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key, - )?; - trace!(target: "pruner", ?outcomes, %done, "Pruned storage history (indices)"); - - let progress = limiter.progress(done); - - Ok(SegmentOutput { - progress, - pruned: pruned_changesets + outcomes.deleted, - checkpoint: Some(SegmentOutputCheckpoint { - block_number: Some(last_changeset_pruned_block), - tx_number: None, - }), - }) + ) + .map_err(Into::into) } } #[cfg(test)] mod tests { - use crate::segments::{ - user::storage_history::STORAGE_HISTORY_TABLES_TO_PRUNE, PruneInput, PruneLimiter, Segment, - SegmentOutput, StorageHistory, - }; + use super::STORAGE_HISTORY_TABLES_TO_PRUNE; + use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, StorageHistory}; use alloy_primitives::{BlockNumber, B256}; use assert_matches::assert_matches; - use reth_db_api::{tables, BlockNumberList}; + use reth_db_api::{models::StorageSettings, tables, BlockNumberList}; use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader}; - use reth_prune_types::{PruneCheckpoint, PruneMode, PruneProgress, PruneSegment}; + use reth_prune_types::{ + PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment, + }; use reth_stages::test_utils::{StorageKind, TestStageDB}; + use reth_storage_api::StorageSettingsCache; use reth_testing_utils::generators::{ self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams, }; use std::{collections::BTreeMap, ops::AddAssign}; #[test] - fn prune() { + fn prune_legacy() { let db = TestStageDB::default(); let mut rng = generators::rng(); @@ -208,6 +296,9 @@ mod tests { let segment = StorageHistory::new(prune_mode); let provider = db.factory.database_provider_rw().unwrap(); + provider.set_storage_settings_cache( + StorageSettings::default().with_storage_changesets_in_static_files(false), + ); let result = segment.prune(&provider, input).unwrap(); limiter.increment_deleted_entries_count_by(result.pruned); @@ -247,19 +338,19 @@ mod tests { .map(|(i, _)| i) .unwrap_or_default(); - let mut pruned_changesets = changesets - .iter() - // Skip what we've pruned so far, subtracting one to get last pruned block number - // further down - .skip(pruned.saturating_sub(1)); + // Skip what we've pruned so far, subtracting one to get last pruned block number + // further down + let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1)); let last_pruned_block_number = pruned_changesets .next() - .map(|(block_number, _, _)| if result.progress.is_finished() { - *block_number - } else { - block_number.saturating_sub(1) - } as BlockNumber) + .map(|(block_number, _, _)| { + (if result.progress.is_finished() { + *block_number + } else { + block_number.saturating_sub(1) + }) as BlockNumber + }) .unwrap_or(to_block); let pruned_changesets = pruned_changesets.fold( @@ -306,14 +397,160 @@ mod tests { test_prune( 998, 1, - ( - PruneProgress::HasMoreData( - reth_prune_types::PruneInterruptReason::DeletedEntriesLimitReached, - ), - 500, - ), + (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 500), ); test_prune(998, 2, (PruneProgress::Finished, 499)); test_prune(1200, 3, (PruneProgress::Finished, 202)); } + + #[test] + fn prune_static_file() { + let db = TestStageDB::default(); + let mut rng = generators::rng(); + + let blocks = random_block_range( + &mut rng, + 0..=5000, + BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() }, + ); + db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks"); + + let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::>(); + + let (changesets, _) = random_changeset_range( + &mut rng, + blocks.iter(), + accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))), + 1..2, + 1..2, + ); + + db.insert_changesets_to_static_files(changesets.clone(), None) + .expect("insert changesets to static files"); + db.insert_history(changesets.clone(), None).expect("insert history"); + + let storage_occurrences = db.table::().unwrap().into_iter().fold( + BTreeMap::<_, usize>::new(), + |mut map, (key, _)| { + map.entry((key.address, key.sharded_key.key)).or_default().add_assign(1); + map + }, + ); + assert!(storage_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1)); + + let original_shards = db.table::().unwrap(); + + let test_prune = |to_block: BlockNumber, + run: usize, + expected_result: (PruneProgress, usize)| { + let prune_mode = PruneMode::Before(to_block); + let deleted_entries_limit = 1000; + let mut limiter = + PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit); + let input = PruneInput { + previous_checkpoint: db + .factory + .provider() + .unwrap() + .get_prune_checkpoint(PruneSegment::StorageHistory) + .unwrap(), + to_block, + limiter: limiter.clone(), + }; + let segment = StorageHistory::new(prune_mode); + + let provider = db.factory.database_provider_rw().unwrap(); + provider.set_storage_settings_cache( + StorageSettings::default().with_storage_changesets_in_static_files(true), + ); + let result = segment.prune(&provider, input).unwrap(); + limiter.increment_deleted_entries_count_by(result.pruned); + + assert_matches!( + result, + SegmentOutput {progress, pruned, checkpoint: Some(_)} + if (progress, pruned) == expected_result + ); + + segment + .save_checkpoint( + &provider, + result.checkpoint.unwrap().as_prune_checkpoint(prune_mode), + ) + .unwrap(); + provider.commit().expect("commit"); + + let changesets = changesets + .iter() + .enumerate() + .flat_map(|(block_number, changeset)| { + changeset.iter().flat_map(move |(address, _, entries)| { + entries.iter().map(move |entry| (block_number, address, entry)) + }) + }) + .collect::>(); + + #[expect(clippy::skip_while_next)] + let pruned = changesets + .iter() + .enumerate() + .skip_while(|(i, (block_number, _, _))| { + *i < deleted_entries_limit / STORAGE_HISTORY_TABLES_TO_PRUNE * run && + *block_number <= to_block as usize + }) + .next() + .map(|(i, _)| i) + .unwrap_or_default(); + + // Skip what we've pruned so far, subtracting one to get last pruned block number + // further down + let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1)); + + let last_pruned_block_number = pruned_changesets + .next() + .map(|(block_number, _, _)| { + (if result.progress.is_finished() { + *block_number + } else { + block_number.saturating_sub(1) + }) as BlockNumber + }) + .unwrap_or(to_block); + + let actual_shards = db.table::().unwrap(); + + let expected_shards = original_shards + .iter() + .filter(|(key, _)| key.sharded_key.highest_block_number > last_pruned_block_number) + .map(|(key, blocks)| { + let new_blocks = + blocks.iter().skip_while(|block| *block <= last_pruned_block_number); + (key.clone(), BlockNumberList::new_pre_sorted(new_blocks)) + }) + .collect::>(); + + assert_eq!(actual_shards, expected_shards); + + assert_eq!( + db.factory + .provider() + .unwrap() + .get_prune_checkpoint(PruneSegment::StorageHistory) + .unwrap(), + Some(PruneCheckpoint { + block_number: Some(last_pruned_block_number), + tx_number: None, + prune_mode + }) + ); + }; + + test_prune( + 998, + 1, + (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 500), + ); + test_prune(998, 2, (PruneProgress::Finished, 500)); + test_prune(1200, 3, (PruneProgress::Finished, 202)); + } } diff --git a/crates/prune/types/src/segment.rs b/crates/prune/types/src/segment.rs index d3643b2ee8..0e3f4e1edc 100644 --- a/crates/prune/types/src/segment.rs +++ b/crates/prune/types/src/segment.rs @@ -24,9 +24,9 @@ pub enum PruneSegment { Receipts, /// Prune segment responsible for some rows in `Receipts` table filtered by logs. ContractLogs, - /// Prune segment responsible for the `AccountChangeSets` and `AccountsHistory` tables. + /// Prunes account changesets (static files/MDBX) and `AccountsHistory`. AccountHistory, - /// Prune segment responsible for the `StorageChangeSets` and `StoragesHistory` tables. + /// Prunes storage changesets (static files/MDBX) and `StoragesHistory`. StorageHistory, #[deprecated = "Variant indexes cannot be changed"] #[strum(disabled)] diff --git a/crates/stages/stages/src/stages/prune.rs b/crates/stages/stages/src/stages/prune.rs index efb17e8e95..98c9a578c6 100644 --- a/crates/stages/stages/src/stages/prune.rs +++ b/crates/stages/stages/src/stages/prune.rs @@ -10,6 +10,7 @@ use reth_prune::{ use reth_stages_api::{ ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput, }; +use reth_storage_api::{ChangeSetReader, StorageChangeSetReader}; use tracing::info; /// The prune stage that runs the pruner with the provided prune modes. @@ -46,7 +47,9 @@ where + StageCheckpointReader + StaticFileProviderFactory< Primitives: NodePrimitives, - > + StorageSettingsCache, + > + StorageSettingsCache + + ChangeSetReader + + StorageChangeSetReader, { fn id(&self) -> StageId { StageId::Prune @@ -151,7 +154,9 @@ where + StageCheckpointReader + StaticFileProviderFactory< Primitives: NodePrimitives, - > + StorageSettingsCache, + > + StorageSettingsCache + + ChangeSetReader + + StorageChangeSetReader, { fn id(&self) -> StageId { StageId::PruneSenderRecovery diff --git a/crates/stages/stages/src/test_utils/test_db.rs b/crates/stages/stages/src/test_utils/test_db.rs index fd9a456cc2..00cd834f70 100644 --- a/crates/stages/stages/src/test_utils/test_db.rs +++ b/crates/stages/stages/src/test_utils/test_db.rs @@ -11,7 +11,7 @@ use reth_db_api::{ common::KeyValue, cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO}, database::Database, - models::{AccountBeforeTx, StoredBlockBodyIndices}, + models::{AccountBeforeTx, StorageBeforeTx, StoredBlockBodyIndices}, table::Table, tables, transaction::{DbTx, DbTxMut}, @@ -473,6 +473,51 @@ impl TestStageDB { }) } + /// Insert collection of [`ChangeSet`] into static files (account and storage changesets). + pub fn insert_changesets_to_static_files( + &self, + changesets: I, + block_offset: Option, + ) -> ProviderResult<()> + where + I: IntoIterator, + { + let offset = block_offset.unwrap_or_default(); + let static_file_provider = self.factory.static_file_provider(); + + let mut account_changeset_writer = + static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?; + let mut storage_changeset_writer = + static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?; + + for (block, changeset) in changesets.into_iter().enumerate() { + let block_number = offset + block as u64; + + let mut account_changesets = Vec::new(); + let mut storage_changesets = Vec::new(); + + for (address, old_account, old_storage) in changeset { + account_changesets.push(AccountBeforeTx { address, info: Some(old_account) }); + + for entry in old_storage { + storage_changesets.push(StorageBeforeTx { + address, + key: entry.key, + value: entry.value, + }); + } + } + + account_changeset_writer.append_account_changeset(account_changesets, block_number)?; + storage_changeset_writer.append_storage_changeset(storage_changesets, block_number)?; + } + + account_changeset_writer.commit()?; + storage_changeset_writer.commit()?; + + Ok(()) + } + pub fn insert_history(&self, changesets: I, _block_offset: Option) -> ProviderResult<()> where I: IntoIterator,